1 module mysql.inserter;
2 
3 
4 import std.array;
5 import std.traits;
6 
7 
8 import mysql.appender;
9 
10 
11 enum OnDuplicate : size_t {
12 	Ignore,
13 	Error,
14 	Replace,
15 	Update,
16 }
17 
18 
19 auto inserter(ConnectionType)(ConnectionType connection) {
20 	return Inserter!ConnectionType(connection);
21 }
22 
23 
24 auto inserter(ConnectionType, Args...)(ConnectionType connection, OnDuplicate action, string tableName, Args columns) {
25 	auto insert = Inserter!ConnectionType(connection);
26 	insert.start(action, tableName, columns);
27 	return insert;
28 }
29 
30 
31 auto inserter(ConnectionType, Args...)(ConnectionType connection, string tableName, Args columns) {
32 	auto insert = Inserter!ConnectionType(connection);
33 	insert.start(OnDuplicate.Error, tableName, columns);
34 	return insert;
35 }
36 
37 
38 struct Inserter(ConnectionType) {
39 	@disable this();
40 
41 	this(ConnectionType connection) {
42 		conn_ = connection;
43 		pending_ = 0;
44 		flushes_ = 0;
45 	}
46 
47 	~this() {
48 		flush();
49 	}
50 
51 	void start(Args...)(string tableName, Args fieldNames) {
52 		start(OnDuplicate.Error, tableName, fieldNames);
53 	}
54 
55 	void start(Args...)(OnDuplicate action, string tableName, Args fieldNames) {
56 		fields_ = fieldNames.length;
57 
58 		Appender!(char[]) appender;
59 
60 		final switch(action) {
61 			case OnDuplicate.Ignore:
62 				appender.put("insert ignore into ");
63 				break;
64 			case OnDuplicate.Replace:
65 				appender.put("replace into ");
66 				break;
67 			case OnDuplicate.Update:
68 			case OnDuplicate.Error:
69 				appender.put("insert into ");
70 				break;
71 		}
72 
73 		appender.put(tableName);
74 		appender.put('(');
75 		foreach(size_t i, name; fieldNames) {
76 			appender.put('`');
77 			appender.put(name);
78 			appender.put('`');
79 			if (i != fieldNames.length-1)
80 				appender.put(',');
81 		}
82 		appender.put(")values");
83 		start_ = appender.data;
84 	}
85 
86 	auto ref duplicateUpdate(string update) {
87 		dupUpdate_ = cast(char[])update;
88 		return this;
89 	}
90 
91 	void row(Values...)(Values values) {
92 		assert(values.length == fields_, "Column count and value count must match");
93 		assert(!start_.empty, "Must call start before inserting a row");
94 
95 		if (!pending_)
96 			values_.put(cast(char[])start_);
97 
98 		values_.put(pending_ ? ",(" : "(");
99 		++pending_;
100 		foreach (size_t i, value; values) {
101 			appendValue(values_, value);
102 			if (i != values.length-1)
103 				values_.put(',');
104 		}
105 		values_.put(')');
106 
107 		if (values_.data.length > (128 << 10)) // todo: make parameter
108 			flush();
109 	}
110 
111 	@property size_t pending() const {
112 		return pending_ != 0;
113 	}
114 
115 	@property size_t flushes() const {
116 		return flushes_;
117 	}
118 
119 	void flush() {
120 		if (pending_) {
121 			if (dupUpdate_.length) {
122 				values_.put(cast(ubyte[])" on duplicate key update ");
123 				values_.put(cast(ubyte[])dupUpdate_);
124 			}
125 
126 			auto sql = cast(char[])values_.data();
127 			values_.clear;
128 			pending_ = 0;
129 
130 			conn_.execute(sql);
131 			++flushes_;
132 		}
133 	}
134 
135 private:
136 	char[] start_;
137 	char[] dupUpdate_;
138 	Appender!(char[]) values_;
139 
140 	ConnectionType conn_;
141 	size_t pending_;
142 	size_t flushes_;
143 	size_t fields_;
144 }