1 module mysql.inserter; 2 3 4 import std.array; 5 import std.meta; 6 import std.range; 7 import std.traits; 8 9 10 import mysql.appender; 11 12 13 enum OnDuplicate : size_t { 14 Ignore, 15 Error, 16 Replace, 17 Update, 18 UpdateAll, 19 } 20 21 22 auto inserter(ConnectionType)(ConnectionType connection) { 23 return Inserter!ConnectionType(connection); 24 } 25 26 27 auto inserter(ConnectionType, Args...)(ConnectionType connection, OnDuplicate action, string tableName, Args columns) { 28 auto insert = Inserter!ConnectionType(connection); 29 insert.start(action, tableName, columns); 30 return insert; 31 } 32 33 34 auto inserter(ConnectionType, Args...)(ConnectionType connection, string tableName, Args columns) { 35 auto insert = Inserter!ConnectionType(connection); 36 insert.start(OnDuplicate.Error, tableName, columns); 37 return insert; 38 } 39 40 41 struct Inserter(ConnectionType) { 42 @disable this(); 43 44 this(ConnectionType connection) { 45 conn_ = connection; 46 pending_ = 0; 47 flushes_ = 0; 48 } 49 50 ~this() { 51 flush(); 52 } 53 54 void start(Args...)(string tableName, Args fieldNames) if ((Args.length > 0) && (allSatisfy!(isSomeString, Args) || ((Args.length == 1) && isSomeString!(ElementType!(Args[0]))))) { 55 start(OnDuplicate.Error, tableName, fieldNames); 56 } 57 58 void start(Args...)(OnDuplicate action, string tableName, Args fieldNames) if ((Args.length > 0) && (allSatisfy!(isSomeString, Args) || ((Args.length == 1) && isSomeString!(ElementType!(Args[0]))))) { 59 fields_ = fieldNames.length; 60 61 Appender!(char[]) app; 62 63 final switch(action) with (OnDuplicate) { 64 case Ignore: 65 app.put("insert ignore into "); 66 break; 67 case Replace: 68 app.put("replace into "); 69 break; 70 case UpdateAll: 71 Appender!(char[]) dupapp; 72 73 static if (isSomeString!(Args[0])) { 74 alias Columns = fieldNames; 75 } else { 76 auto Columns = fieldNames[0]; 77 } 78 79 foreach(size_t i, name; Columns) { 80 dupapp.put('`'); 81 dupapp.put(name); 82 dupapp.put("`=values(`"); 83 dupapp.put(name); 84 dupapp.put("`)"); 85 if (i + 1 != Columns.length) 86 dupapp.put(','); 87 } 88 dupUpdate_ = dupapp.data; 89 goto case Update; 90 case Update: 91 case Error: 92 app.put("insert into "); 93 break; 94 } 95 96 app.put(tableName); 97 app.put('('); 98 99 static if (isSomeString!(Args[0])) { 100 alias Columns = fieldNames; 101 } else { 102 auto Columns = fieldNames[0]; 103 } 104 105 foreach(size_t i, name; Columns) { 106 app.put('`'); 107 app.put(name); 108 app.put('`'); 109 if (i + 1 != Columns.length) 110 app.put(','); 111 } 112 113 app.put(")values"); 114 start_ = app.data; 115 } 116 117 auto ref duplicateUpdate(string update) { 118 dupUpdate_ = cast(char[])update; 119 return this; 120 } 121 122 void row(Values...)(Values values) { 123 assert(values.length == fields_, "Column count and value count must match"); 124 assert(!start_.empty, "Must call start before inserting a row"); 125 126 if (!pending_) 127 values_.put(cast(char[])start_); 128 129 values_.put(pending_ ? ",(" : "("); 130 ++pending_; 131 foreach (size_t i, value; values) { 132 appendValue(values_, value); 133 if (i != values.length-1) 134 values_.put(','); 135 } 136 values_.put(')'); 137 138 if (values_.data.length > (128 << 10)) // todo: make parameter 139 flush(); 140 } 141 142 @property size_t pending() const { 143 return pending_ != 0; 144 } 145 146 @property size_t flushes() const { 147 return flushes_; 148 } 149 150 void flush() { 151 if (pending_) { 152 if (dupUpdate_.length) { 153 values_.put(cast(ubyte[])" on duplicate key update "); 154 values_.put(cast(ubyte[])dupUpdate_); 155 } 156 157 auto sql = cast(char[])values_.data(); 158 values_.clear; 159 pending_ = 0; 160 161 conn_.execute(sql); 162 ++flushes_; 163 } 164 } 165 166 private: 167 char[] start_; 168 char[] dupUpdate_; 169 Appender!(char[]) values_; 170 171 ConnectionType conn_; 172 size_t pending_; 173 size_t flushes_; 174 size_t fields_; 175 }