1 module mysql.inserter;
2 
3 
4 import std.array;
5 import std.meta;
6 import std.range;
7 import std.traits;
8 
9 
10 import mysql.appender;
11 
12 
13 enum OnDuplicate : size_t {
14 	Ignore,
15 	Error,
16 	Replace,
17 	Update,
18 	UpdateAll,
19 }
20 
21 
22 auto inserter(ConnectionType)(ConnectionType connection) {
23 	return Inserter!ConnectionType(connection);
24 }
25 
26 
27 auto inserter(ConnectionType, Args...)(ConnectionType connection, OnDuplicate action, string tableName, Args columns) {
28 	auto insert = Inserter!ConnectionType(connection);
29 	insert.start(action, tableName, columns);
30 	return insert;
31 }
32 
33 
34 auto inserter(ConnectionType, Args...)(ConnectionType connection, string tableName, Args columns) {
35 	auto insert = Inserter!ConnectionType(connection);
36 	insert.start(OnDuplicate.Error, tableName, columns);
37 	return insert;
38 }
39 
40 
41 struct Inserter(ConnectionType) {
42 	@disable this();
43 
44 	this(ConnectionType connection) {
45 		conn_ = connection;
46 		pending_ = 0;
47 		flushes_ = 0;
48 	}
49 
50 	~this() {
51 		flush();
52 	}
53 
54 	void start(Args...)(string tableName, Args fieldNames) if ((Args.length > 0) && (allSatisfy!(isSomeString, Args) || ((Args.length == 1) && isSomeString!(ElementType!(Args[0]))))) {
55 		start(OnDuplicate.Error, tableName, fieldNames);
56 	}
57 
58 	void start(Args...)(OnDuplicate action, string tableName, Args fieldNames) if ((Args.length > 0) && (allSatisfy!(isSomeString, Args) || ((Args.length == 1) && isSomeString!(ElementType!(Args[0]))))) {
59 		fields_ = fieldNames.length;
60 
61 		Appender!(char[]) app;
62 
63 		final switch(action) with (OnDuplicate) {
64 		case Ignore:
65 			app.put("insert ignore into ");
66 			break;
67 		case Replace:
68 			app.put("replace into ");
69 			break;
70 		case UpdateAll:
71 			Appender!(char[]) dupapp;
72 
73 			static if (isSomeString!(Args[0])) {
74 				alias Columns = fieldNames;
75 			} else {
76 				auto Columns = fieldNames[0];
77 			}
78 
79 			foreach(size_t i, name; Columns) {
80 				dupapp.put('`');
81 				dupapp.put(name);
82 				dupapp.put("`=values(`");
83 				dupapp.put(name);
84 				dupapp.put("`)");
85 				if (i + 1 != Columns.length)
86 					dupapp.put(',');
87 			}
88 			dupUpdate_ = dupapp.data;
89 			goto case Update;
90 		case Update:
91 		case Error:
92 			app.put("insert into ");
93 			break;
94 		}
95 
96 		app.put(tableName);
97 		app.put('(');
98 
99 		static if (isSomeString!(Args[0])) {
100 			alias Columns = fieldNames;
101 		} else {
102 			auto Columns = fieldNames[0];
103 		}
104 
105 		foreach(size_t i, name; Columns) {
106 			app.put('`');
107 			app.put(name);
108 			app.put('`');
109 			if (i + 1 != Columns.length)
110 				app.put(',');
111 		}
112 
113 		app.put(")values");
114 		start_ = app.data;
115 	}
116 
117 	auto ref duplicateUpdate(string update) {
118 		dupUpdate_ = cast(char[])update;
119 		return this;
120 	}
121 
122 	void row(Values...)(Values values) {
123 		assert(values.length == fields_, "Column count and value count must match");
124 		assert(!start_.empty, "Must call start before inserting a row");
125 
126 		if (!pending_)
127 			values_.put(cast(char[])start_);
128 
129 		values_.put(pending_ ? ",(" : "(");
130 		++pending_;
131 		foreach (size_t i, value; values) {
132 			appendValue(values_, value);
133 			if (i != values.length-1)
134 				values_.put(',');
135 		}
136 		values_.put(')');
137 
138 		if (values_.data.length > (128 << 10)) // todo: make parameter
139 			flush();
140 	}
141 
142 	@property size_t pending() const {
143 		return pending_ != 0;
144 	}
145 
146 	@property size_t flushes() const {
147 		return flushes_;
148 	}
149 
150 	void flush() {
151 		if (pending_) {
152 			if (dupUpdate_.length) {
153 				values_.put(cast(ubyte[])" on duplicate key update ");
154 				values_.put(cast(ubyte[])dupUpdate_);
155 			}
156 
157 			auto sql = cast(char[])values_.data();
158 			values_.clear;
159 			pending_ = 0;
160 
161 			conn_.execute(sql);
162 			++flushes_;
163 		}
164 	}
165 
166 private:
167 	char[] start_;
168 	char[] dupUpdate_;
169 	Appender!(char[]) values_;
170 
171 	ConnectionType conn_;
172 	size_t pending_;
173 	size_t flushes_;
174 	size_t fields_;
175 }