1 module mysql.inserter; 2 3 4 import std.array; 5 import std.traits; 6 7 8 import mysql.appender; 9 10 11 enum OnDuplicate : size_t { 12 Ignore, 13 Error, 14 Replace, 15 Update, 16 } 17 18 19 auto inserter(ConnectionType)(ConnectionType connection) { 20 return Inserter!ConnectionType(connection); 21 } 22 23 24 auto inserter(ConnectionType, Args...)(ConnectionType connection, OnDuplicate action, string tableName, Args columns) { 25 auto insert = Inserter!ConnectionType(connection); 26 insert.start(action, tableName, columns); 27 return insert; 28 } 29 30 31 auto inserter(ConnectionType, Args...)(ConnectionType connection, string tableName, Args columns) { 32 auto insert = Inserter!ConnectionType(connection); 33 insert.start(OnDuplicate.Error, tableName, columns); 34 return insert; 35 } 36 37 38 struct Inserter(ConnectionType) { 39 @disable this(); 40 41 this(ConnectionType connection) { 42 conn_ = connection; 43 pending_ = 0; 44 flushes_ = 0; 45 } 46 47 ~this() { 48 flush(); 49 } 50 51 void start(Args...)(string tableName, Args fieldNames) { 52 start(OnDuplicate.Error, tableName, fieldNames); 53 } 54 55 void start(Args...)(OnDuplicate action, string tableName, Args fieldNames) { 56 fields_ = fieldNames.length; 57 58 Appender!(char[]) appender; 59 60 final switch(action) { 61 case OnDuplicate.Ignore: 62 appender.put("insert ignore into "); 63 break; 64 case OnDuplicate.Replace: 65 appender.put("replace into "); 66 break; 67 case OnDuplicate.Update: 68 case OnDuplicate.Error: 69 appender.put("insert into "); 70 break; 71 } 72 73 appender.put(tableName); 74 appender.put('('); 75 foreach(size_t i, name; fieldNames) { 76 appender.put('`'); 77 appender.put(name); 78 appender.put('`'); 79 if (i != fieldNames.length-1) 80 appender.put(','); 81 } 82 appender.put(")values"); 83 start_ = appender.data; 84 } 85 86 auto ref duplicateUpdate(string update) { 87 dupUpdate_ = cast(char[])update; 88 return this; 89 } 90 91 void row(Values...)(Values values) { 92 assert(values.length == fields_, "Column count and value count must match"); 93 assert(!start_.empty, "Must call start before inserting a row"); 94 95 if (!pending_) 96 values_.put(cast(char[])start_); 97 98 values_.put(pending_ ? ",(" : "("); 99 ++pending_; 100 foreach (size_t i, value; values) { 101 appendValue(values_, value); 102 if (i != values.length-1) 103 values_.put(','); 104 } 105 values_.put(')'); 106 107 if (values_.data.length > (128 << 10)) // todo: make parameter 108 flush(); 109 } 110 111 @property size_t pending() const { 112 return pending_ != 0; 113 } 114 115 @property size_t flushes() const { 116 return flushes_; 117 } 118 119 void flush() { 120 if (pending_) { 121 if (dupUpdate_.length) { 122 values_.put(cast(ubyte[])" on duplicate key update "); 123 values_.put(cast(ubyte[])dupUpdate_); 124 } 125 126 auto sql = cast(char[])values_.data(); 127 values_.clear; 128 pending_ = 0; 129 130 conn_.execute(sql); 131 ++flushes_; 132 } 133 } 134 135 private: 136 char[] start_; 137 char[] dupUpdate_; 138 Appender!(char[]) values_; 139 140 ConnectionType conn_; 141 size_t pending_; 142 size_t flushes_; 143 size_t fields_; 144 }