1 module mysql.connection;
2 
3 
4 import std.algorithm;
5 import std.array;
6 import std.string;
7 import std.traits;
8 
9 public import mysql.exception;
10 import mysql.packet;
11 import mysql.protocol;
12 public import mysql.type;
13 
14 
15 immutable CapabilityFlags DefaultClientCaps = CapabilityFlags.CLIENT_LONG_PASSWORD | CapabilityFlags.CLIENT_LONG_FLAG |
16 	CapabilityFlags.CLIENT_CONNECT_WITH_DB | CapabilityFlags.CLIENT_PROTOCOL_41 | CapabilityFlags.CLIENT_SECURE_CONNECTION;
17 
18 
19 struct ConnectionStatus {
20 	ulong affected = 0;
21 	ulong insertID = 0;
22 	ushort flags = 0;
23 	ushort error = 0;
24 	ushort warnings = 0;
25 }
26 
27 
28 private struct ConnectionSettings {
29 	CapabilityFlags caps = DefaultClientCaps;
30 
31 	const(char)[] host;
32 	const(char)[] user;
33 	const(char)[] pwd;
34 	const(char)[] db;
35 	ushort port = 3306;
36 }
37 
38 
39 private struct ServerInfo {
40 	const(char)[] versionString;
41 	ubyte protocol;
42 	ubyte charSet;
43 	ushort status;
44 	uint connection;
45 	uint caps;
46 }
47 
48 
49 @property string placeholders(size_t x, bool parens = true) {
50 	import std.range : repeat, take;
51 
52 	if (parens)
53 		return "(" ~ ("?".repeat().take(x).join(",")) ~ ")";
54 	return "?".repeat.take(x).join(",");
55 }
56 
57 
58 @property string placeholders(T)(T[] x, bool parens = true) {
59 	import std.range : repeat, take;
60 
61 	if (parens)
62 		return "(" ~ ("?".repeat().take(x.length).join(",")) ~ ")";
63 	return "?".repeat.take(x.length).join(",");
64 }
65 
66 
67 struct PreparedStatement {
68 package:
69 	uint id;
70 	uint params;
71 }
72 
73 
74 struct Connection(SocketType) {
75 	void connect(string connectionString) {
76 		connectionSettings(connectionString);
77 		connect();
78 	}
79 
80 	void connect(const(char)[] host, ushort port, const(char)[] user, const(char)[] pwd, const(char)[] db, CapabilityFlags caps = DefaultClientCaps) {
81 		settings_.host = host;
82 		settings_.user = user;
83 		settings_.pwd = pwd;
84 		settings_.db = db;
85 		settings_.port = port;
86 		settings_.caps = caps | CapabilityFlags.CLIENT_LONG_PASSWORD | CapabilityFlags.CLIENT_PROTOCOL_41;
87 
88 		connect();
89 	}
90 
91 	void use(const(char)[] db) {
92 		send(Commands.COM_INIT_DB, db);
93 		eatStatus(retrieve());
94 	}
95 
96 	void ping() {
97 		send(Commands.COM_PING);
98 		eatStatus(retrieve());
99 	}
100 
101 	void refresh() {
102 		send(Commands.COM_REFRESH);
103 		eatStatus(retrieve());
104 	}
105 
106 	void reset() {
107 		send(Commands.COM_RESET_CONNECTION);
108 		eatStatus(retrieve());
109 	}
110 
111 	const(char)[] statistics() {
112 		send(Commands.COM_STATISTICS);
113 
114 		auto answer = retrieve();
115 		return answer.eat!(const(char)[])(answer.remaining);
116 	}
117 
118 	auto prepare(const(char)[] sql) {
119 		send(Commands.COM_STMT_PREPARE, sql);
120 
121 		auto answer = retrieve();
122 
123 		if (answer.peek!ubyte != StatusPackets.OK_Packet)
124 			eatStatus(answer);
125 
126 		answer.expect!ubyte(0);
127 
128 		auto id = answer.eat!uint;
129 		auto columns = answer.eat!ushort;
130 		auto params = answer.eat!ushort;
131 		answer.expect!ubyte(0);
132 
133 		auto warnings = answer.eat!ushort;
134 
135 		if (params) {
136 			foreach (i; 0..params)
137 				skipColumnDef(retrieve(), Commands.COM_STMT_PREPARE);
138 
139 			eatEOF(retrieve());
140 		}
141 
142 		if (columns) {
143 			MySQLColumn def;
144 			foreach (i; 0..columns)
145 				skipColumnDef(retrieve(), Commands.COM_STMT_PREPARE);
146 
147 			eatEOF(retrieve());
148 		}
149 
150 		return PreparedStatement(id, params);
151 	}
152 
153 	void execute(Args...)(const(char)[] stmt, Args args) {
154 		scope(failure) disconnect();
155 
156 		auto id = prepare(stmt);
157 		execute(id, args);
158 		close(id);
159 	}
160 
161 	void begin() {
162 		if (inTransaction)
163 			throw new MySQLErrorException("MySQL does not support nested transactions - commit or rollback before starting a new transaction");
164 
165 		query("start transaction");
166 
167 		assert(inTransaction);
168 	}
169 
170 	void commit() {
171 		if (!inTransaction)
172 			throw new MySQLErrorException("No active transaction");
173 
174 		query("commit");
175 
176 		assert(!inTransaction);
177 	}
178 
179 	void rollback() {
180 		if (connected) {
181 			if ((status_.flags & StatusFlags.SERVER_STATUS_IN_TRANS) == 0)
182 				throw new MySQLErrorException("No active transaction");
183 
184 			query("rollback");
185 
186 			assert(!inTransaction);
187 		}
188 	}
189 
190 	@property bool inTransaction() const {
191 		return connected && (status_.flags & StatusFlags.SERVER_STATUS_IN_TRANS);
192 	}
193 
194 	void execute(Args...)(PreparedStatement stmt, Args args) {
195 		scope(failure) disconnect();
196 
197 		ensureConnected();
198 
199 		seq_ = 0;
200 		auto packet = OutputPacket(&out_);
201 		packet.put!ubyte(Commands.COM_STMT_EXECUTE);
202 		packet.put!uint(stmt.id);
203 		packet.put!ubyte(Cursors.CURSOR_TYPE_READ_ONLY);
204 		packet.put!uint(1);
205 
206 		static if (args.length == 0) {
207 			enum shouldDiscard = true;
208 		} else {
209 			enum shouldDiscard = !isCallable!(args[args.length - 1]);
210 		}
211 
212 		enum argCount = shouldDiscard ? args.length : (args.length - 1);
213 
214 		if (!argCount && stmt.params)
215 			throw new MySQLErrorException(format("Wrong number of parameters for query. Got 0 but expected %d.", stmt.params));
216 
217 		static if (argCount) {
218 		enum NullsCapacity = 128; // must be power of 2
219 		ubyte[NullsCapacity >> 3] nulls;
220 		size_t bitsOut = 0;
221 		size_t indexArg = 0;
222 			foreach(i, arg; args[0..argCount]) {
223 			const auto index = (indexArg >> 3) & (NullsCapacity - 1);
224 			const auto bit = indexArg & 7;
225 
226 			static if (is(typeof(arg) == typeof(null))) {
227 				nulls[index] = nulls[index] | (1 << bit);
228 				++indexArg;
229 			} else static if (is(Unqual!(typeof(arg)) == MySQLValue)) {
230 				if (arg.isNull)
231 					nulls[index] = nulls[index] | (1 << bit);
232 				++indexArg;
233 			} else static if (isArray!(typeof(arg)) && !isSomeString!(typeof(arg))) {
234 				indexArg += arg.length;
235 			} else {
236 				++indexArg;
237 			}
238 
239 				auto finishing = (i == argCount - 1);
240 			auto remaining = indexArg - bitsOut;
241 
242 			if (finishing || (remaining >= NullsCapacity)) {
243 				while (remaining) {
244 					auto bits = min(remaining, NullsCapacity);
245 
246 					packet.put(nulls[0..(bits + 7) >> 3]);
247 					bitsOut += bits;
248 					nulls[] = 0;
249 
250 					remaining = (indexArg - bitsOut);
251 					if (!remaining || (!finishing && (remaining < NullsCapacity)))
252 						break;
253 				}
254 			}
255 		}
256 		packet.put!ubyte(1);
257 
258 			if (indexArg != stmt.params)
259 				throw new MySQLErrorException(format("Wrong number of parameters for query. Got %d but expected %d.", indexArg, stmt.params));
260 
261 			foreach (arg; args[0..argCount])
262 			putValueType(packet, arg);
263 
264 			foreach (arg; args[0..argCount]) {
265 			static if (!is(typeof(arg) == typeof(null))) {
266 				putValue(packet, arg);
267 			}
268 		}
269 	}
270 
271 		packet.finalize(seq_);
272 		++seq_;
273 
274 		socket_.write(packet.get());
275 
276 		auto answer = retrieve();
277 		if (isStatus(answer)) {
278 			eatStatus(answer);
279 		} else {
280 			static if (!shouldDiscard) {
281 				resultSet(answer, stmt.id, Commands.COM_STMT_EXECUTE, args[args.length - 1]);
282 			} else {
283 				discardAll(answer, Commands.COM_STMT_EXECUTE);
284 			}
285 		}
286 	}
287 
288 	void close(PreparedStatement stmt) {
289 		uint[1] data = [ stmt.id ];
290 		send(Commands.COM_STMT_CLOSE, data);
291 	}
292 
293 	alias OnStatusCallback = void delegate(ConnectionStatus status, const(char)[] message);
294 	@property void onStatus(OnStatusCallback callback) {
295 		onStatus_ = callback;
296 	}
297 
298 	@property OnStatusCallback onStatus() const {
299 		return onStatus_;
300 	}
301 
302 	@property ulong insertID() {
303 		return status_.insertID;
304 	}
305 
306 	@property ulong affected() {
307 		return cast(size_t)status_.affected;
308 	}
309 
310 	@property size_t warnings() {
311 		return status_.warnings;
312 	}
313 
314 	@property size_t error() {
315 		return status_.error;
316 	}
317 
318 	@property const(char)[] status() const {
319 		return info_;
320 	}
321 
322 	@property bool connected() const {
323 		return socket_.connected;
324 	}
325 
326 	void disconnect() {
327 		socket_.close();
328 	}
329 
330 	~this() {
331 		disconnect();
332 	}
333 
334 private:
335 	void connect() {
336 		socket_.connect(settings_.host, settings_.port);
337 
338 		seq_ = 0;
339 		eatHandshake(retrieve());
340 	}
341 
342 	void send(T)(Commands cmd, T[] data) {
343 		send(cmd, cast(ubyte*)data.ptr, data.length * T.sizeof);
344 	}
345 
346 	void send(Commands cmd, ubyte* data = null, size_t length = 0) {
347 		if(!socket_.connected)
348 			connect();
349 
350 		seq_ = 0;
351 		auto header = OutputPacket(&out_);
352 		header.put!ubyte(cmd);
353 		header.finalize(seq_, length);
354 		++seq_;
355 
356 		socket_.write(header.get());
357 		if (length)
358 			socket_.write(data[0..length]);
359 	}
360 
361 	void query(const(char)[] sql) {
362 		send(Commands.COM_QUERY, sql);
363 
364 		auto answer = retrieve();
365 		if (isStatus(answer))
366 			eatStatus(answer);
367 	}
368 
369 	void ensureConnected() {
370 		if(!socket_.connected)
371 			connect();
372 	}
373 
374 	bool isStatus(InputPacket packet) {
375 		auto id = packet.peek!ubyte;
376 		switch (id) {
377 			case StatusPackets.ERR_Packet:
378 			case StatusPackets.OK_Packet:
379 				return 1;
380 			default:
381 				return false;
382 		}
383 	}
384 
385 	void check(InputPacket packet, bool smallError = false) {
386 		auto id = packet.peek!ubyte;
387 		switch (id) {
388 			case StatusPackets.ERR_Packet:
389 			case StatusPackets.OK_Packet:
390 				eatStatus(packet, smallError);
391 				break;
392 			default:
393 				break;
394 		}
395 	}
396 
397 	InputPacket retrieve() {
398 		scope(failure) disconnect();
399 
400 		ubyte[4] header;
401 		socket_.read(header);
402 
403 		auto len = header[0] | (header[1] << 8) | (header[2] << 16);
404 		auto seq = header[3];
405 
406 		if (seq != seq_)
407 			throw new MySQLConnectionException("Out of order packet received");
408 
409 		++seq_;
410 
411 		in_.length = len;
412 		socket_.read(in_);
413 
414 		if (in_.length != len)
415 			throw new MySQLConnectionException("Wrong number of bytes read");
416 
417 		return InputPacket(&in_);
418 	}
419 
420 	void eatHandshake(InputPacket packet) {
421 		scope(failure) disconnect();
422 
423 		check(packet, true);
424 
425 		server_.protocol = packet.eat!ubyte;
426 		server_.versionString = packet.eat!(const(char)[])(packet.countUntil(0, true));
427 		packet.skip(1);
428 
429 		server_.connection = packet.eat!uint;
430 
431 		const auto authLengthStart = 8;
432 		size_t authLength = authLengthStart;
433 
434 		ubyte[256] auth;
435 		auth[0..authLength] = packet.eat!(ubyte[])(authLength);
436 
437 		packet.expect!ubyte(0);
438 
439 		server_.caps = packet.eat!ushort;
440 
441 		if (!packet.empty) {
442 			server_.charSet = packet.eat!ubyte;
443 			server_.status = packet.eat!ushort;
444 			server_.caps |= packet.eat!ushort << 16;
445 			server_.caps |= CapabilityFlags.CLIENT_LONG_PASSWORD;
446 
447 			if ((server_.caps & CapabilityFlags.CLIENT_PROTOCOL_41) == 0)
448 				throw new MySQLProtocolException("Server doesn't support protocol v4.1");
449 
450 			if (server_.caps & CapabilityFlags.CLIENT_SECURE_CONNECTION) {
451 				packet.skip(1);
452 			} else {
453 				packet.expect!ubyte(0);
454 			}
455 
456 			packet.skip(10);
457 
458 			authLength += packet.countUntil(0, true);
459 			if (authLength > auth.length)
460 				throw new MySQLConnectionException("Bad packet format");
461 
462 			auth[authLengthStart..authLength] = packet.eat!(ubyte[])(authLength - authLengthStart);
463 
464 			packet.expect!ubyte(0);
465 		}
466 
467 		ubyte[20] token;
468 		{
469 			import std.digest.sha;
470 
471 			auto pass = sha1Of(cast(const(ubyte)[])settings_.pwd);
472 			token = sha1Of(pass);
473 
474 			SHA1 sha1;
475 			sha1.start();
476 			sha1.put(auth[0..authLength]);
477 			sha1.put(token);
478 			token = sha1.finish();
479 
480 			foreach (i; 0..20)
481 				token[i] = token[i] ^ pass[i];
482 		}
483 
484 		caps_ = cast(CapabilityFlags)(settings_.caps & server_.caps);
485 
486 		auto reply = OutputPacket(&out_);
487 		reply.reserve(64 + settings_.user.length + settings_.pwd.length + settings_.db.length);
488 
489 		reply.put!uint(caps_);
490 		reply.put!uint(1);
491 		reply.put!ubyte(33);
492 		reply.fill(0, 23);
493 
494 		reply.put(settings_.user);
495 		reply.put!ubyte(0);
496 
497 		if (settings_.pwd.length) {
498 			if (caps_ & CapabilityFlags.CLIENT_SECURE_CONNECTION) {
499 				reply.put!ubyte(token.length);
500 				reply.put(token);
501 			} else {
502 				reply.put(token);
503 				reply.put!ubyte(0);
504 			}
505 		} else {
506 			reply.put!ubyte(0);
507 		}
508 
509 		if (settings_.db.length && (caps_ & CapabilityFlags.CLIENT_CONNECT_WITH_DB))
510 			reply.put(settings_.db);
511 
512 		reply.put!ubyte(0);
513 
514 		reply.finalize(seq_);
515 		++seq_;
516 
517 		socket_.write(reply.get());
518 
519 		eatStatus(retrieve());
520 	}
521 
522 	void eatStatus(InputPacket packet, bool smallError = false) {
523 		auto id = packet.eat!ubyte;
524 
525 		switch (id) {
526 		case StatusPackets.OK_Packet:
527 			status_.error = 0;
528 			status_.affected = packet.eatLenEnc();
529 			status_.insertID = packet.eatLenEnc();
530 			status_.flags = packet.eat!ushort;
531 			status_.warnings = packet.eat!ushort;
532 
533 			if (caps_ & CapabilityFlags.CLIENT_SESSION_TRACK) {
534 				info(packet.eat!(const(char)[])(cast(size_t)packet.eatLenEnc()));
535 				packet.skip(1);
536 
537 				if (status_.flags & StatusFlags.SERVER_SESSION_STATE_CHANGED) {
538 					packet.skip(cast(size_t)packet.eatLenEnc());
539 					packet.skip(1);
540 				}
541 			} else if (!packet.empty) {
542 				auto len = cast(size_t)packet.eatLenEnc();
543 				info(packet.eat!(const(char)[])(min(len, packet.remaining)));
544 			}
545 
546 			if (onStatus_)
547 				onStatus_(status_, info_);
548 
549 			break;
550 		case StatusPackets.EOF_Packet:
551 			status_.error = 0;
552 			status_.warnings = packet.eat!ushort;
553 			status_.flags = packet.eat!ushort;
554 			info([]);
555 
556 			if (onStatus_)
557 				onStatus_(status_, info_);
558 
559 			break;
560 		case StatusPackets.ERR_Packet:
561 			status_.flags = 0;
562 			status_.warnings = 0;
563 			status_.error = packet.eat!ushort;
564 			if (!smallError)
565 				packet.skip(6);
566 			info(packet.eat!(const(char)[])(packet.remaining));
567 
568 			if (onStatus_)
569 				onStatus_(status_, info_);
570 
571 			switch(status_.error) {
572 			case ErrorCodes.ER_DUP_ENTRY_WITH_KEY_NAME:
573 			case ErrorCodes.ER_DUP_ENTRY:
574 				throw new MySQLDuplicateEntryException(cast(string)info_);
575 			default:
576 				throw new MySQLErrorException(cast(string)info_);
577 			}
578 		default:
579 			throw new MySQLProtocolException("Unexpected packet format");
580 		}
581 	}
582 
583 	void info(const(char)[] value) {
584 		info_.length = value.length;
585 		info_[0..$] = value;
586 	}
587 
588 	void skipColumnDef(InputPacket packet, Commands cmd) {
589 		packet.skip(cast(size_t)packet.eatLenEnc());	// catalog
590 		packet.skip(cast(size_t)packet.eatLenEnc());	// schema
591 		packet.skip(cast(size_t)packet.eatLenEnc());	// table
592 		packet.skip(cast(size_t)packet.eatLenEnc());	// original_table
593 		packet.skip(cast(size_t)packet.eatLenEnc());	// name
594 		packet.skip(cast(size_t)packet.eatLenEnc());	// original_name
595 		packet.skipLenEnc();							// next_length
596 		packet.skip(10); // 2 + 4 + 1 + 2 + 1			// charset, length, type, flags, decimals
597 		packet.expect!ushort(0);
598 
599 		if (cmd == Commands.COM_FIELD_LIST)
600 			packet.skip(cast(size_t)packet.eatLenEnc());// default values
601 	}
602 
603 	void columnDef(InputPacket packet, Commands cmd, ref MySQLColumn def) {
604 		packet.skip(cast(size_t)packet.eatLenEnc());	// catalog
605 		packet.skip(cast(size_t)packet.eatLenEnc());	// schema
606 		packet.skip(cast(size_t)packet.eatLenEnc());	// table
607 		packet.skip(cast(size_t)packet.eatLenEnc());	// original_table
608 		auto len = cast(size_t)packet.eatLenEnc();
609 		columns_ ~= packet.eat!(const(char)[])(len);
610 		def.name = columns_[$-len..$];
611 		packet.skip(cast(size_t)packet.eatLenEnc());	// original_name
612 		packet.skipLenEnc();							// next_length
613 		packet.skip(2);									// charset
614 		def.length = packet.eat!uint;
615 		def.type = cast(ColumnTypes)packet.eat!ubyte;
616 		def.flags = packet.eat!ushort;
617 		def.decimals = packet.eat!ubyte;
618 
619 		packet.expect!ushort(0);
620 
621 		if (cmd == Commands.COM_FIELD_LIST)
622 			packet.skip(cast(size_t)packet.eatLenEnc());// default values
623 	}
624 
625 	void columnDefs(size_t count, Commands cmd, ref MySQLColumn[] defs) {
626 		defs.length = count;
627 		foreach (i; 0..count)
628 			columnDef(retrieve(), cmd, defs[i]);
629 	}
630 
631 	void resultSetRow(InputPacket packet, Commands cmd, MySQLHeader header, MySQLRow row) {
632 		assert(row.length == header.length);
633 
634 		packet.expect!ubyte(0);
635 		auto nulls = packet.eat!(ubyte[])((header.length + 2 + 7) >> 3);
636 		foreach (i, column; header) {
637 			const auto index = (i + 2) >> 3; // bit offset of 2
638 			const auto bit = (i + 2) & 7;
639 
640 			if ((nulls[index] & (1 << bit)) == 0) {
641 				row.set(i, eatValue(packet, column));
642 			} else {
643 				row.nullify(i);
644 			}
645 		}
646 		assert(packet.empty);
647 	}
648 
649 	bool callHandler(RowHandler)(RowHandler handler, size_t i, MySQLHeader header, MySQLRow row) if ((ParameterTypeTuple!(RowHandler).length == 1) && is(ParameterTypeTuple!(RowHandler)[0] == MySQLRow)) {
650 		static if (is(ReturnType!(RowHandler) == void)) {
651 			handler(row);
652 			return true;
653 		} else {
654 			return handler(row); // return type must be bool
655 		}
656 	}
657 
658 	bool callHandler(RowHandler)(RowHandler handler, size_t i, MySQLHeader header, MySQLRow row) if ((ParameterTypeTuple!(RowHandler).length == 2) && isNumeric!(ParameterTypeTuple!(RowHandler)[0]) && is(ParameterTypeTuple!(RowHandler)[1] == MySQLRow)) {
659 		static if (is(ReturnType!(RowHandler) == void)) {
660 			handler(cast(ParameterTypeTuple!(RowHandler)[0])i, row);
661 			return true;
662 		} else {
663 			return handler(cast(ParameterTypeTuple!(RowHandler)[0])i, row); // return type must be bool
664 		}
665 	}
666 
667 	bool callHandler(RowHandler)(RowHandler handler, size_t i, MySQLHeader header, MySQLRow row) if ((ParameterTypeTuple!(RowHandler).length == 2) && is(ParameterTypeTuple!(RowHandler)[0] == MySQLHeader) && is(ParameterTypeTuple!(RowHandler)[1] == MySQLRow)) {
668 		static if (is(ReturnType!(RowHandler) == void)) {
669 			handler(header, row);
670 			return true;
671 		} else {
672 			return handler(header, row); // return type must be bool
673 		}
674 	}
675 
676 	bool callHandler(RowHandler)(RowHandler handler, size_t i, MySQLHeader header, MySQLRow row) if ((ParameterTypeTuple!(RowHandler).length == 3) && isNumeric!(ParameterTypeTuple!(RowHandler)[0]) && is(ParameterTypeTuple!(RowHandler)[1] == MySQLHeader) && is(ParameterTypeTuple!(RowHandler)[2] == MySQLRow)) {
677 		static if (is(ReturnType!(RowHandler) == void)) {
678 			handler(i, header, row);
679 			return true;
680 		} else {
681 			return handler(i, header, row); // return type must be bool
682 		}
683 	}
684 
685 	void resultSet(RowHandler)(InputPacket packet, uint stmt, Commands cmd, RowHandler handler) {
686 		columns_.length = 0;
687 
688 		auto columns = cast(size_t)packet.eatLenEnc();
689 		columnDefs(columns, cmd, header_);
690 		row_.length = columns;
691 		row_.header(header_);
692 
693 		auto status = retrieve();
694 		if (status.peek!ubyte == StatusPackets.ERR_Packet)
695 			eatStatus(status);
696 
697 		size_t index = 0;
698 		auto statusFlags = eatEOF(status);
699 		if (statusFlags & StatusFlags.SERVER_STATUS_CURSOR_EXISTS) {
700 			uint[2] data = [ stmt, 4096 ]; // todo: make setting - rows per fetch
701 			while (statusFlags & (StatusFlags.SERVER_STATUS_CURSOR_EXISTS | StatusFlags.SERVER_MORE_RESULTS_EXISTS)) {
702 				send(Commands.COM_STMT_FETCH, data);
703 
704 				auto answer = retrieve();
705 				if (answer.peek!ubyte == StatusPackets.ERR_Packet)
706 					eatStatus(answer);
707 
708 				auto row = answer.empty ? retrieve() : answer;
709 				while (true) {
710 					if (row.peek!ubyte == StatusPackets.EOF_Packet) {
711 						statusFlags = eatEOF(row);
712 						break;
713 					}
714 
715 					resultSetRow(row, Commands.COM_STMT_FETCH, header_, row_);
716 					if (!callHandler(handler, index++, header_, row_)) {
717 						discardUntilEOF(retrieve());
718 						statusFlags = 0;
719 						break;
720 					}
721 					row = retrieve();
722 				}
723 			}
724 		} else {
725 			auto row = retrieve();
726 			while (true) {
727 				if (row.peek!ubyte == StatusPackets.EOF_Packet) {
728 					eatEOF(row);
729 					break;
730 				}
731 
732 				resultSetRow(row, cmd, header_, row_);
733 				if (!callHandler(handler, index++, header_, row_)) {
734 					discardUntilEOF(retrieve());
735 					break;
736 				}
737 
738 				row = retrieve();
739 			}
740 		}
741 	}
742 
743 	void discardAll(InputPacket packet, Commands cmd) {
744 		auto columns = cast(size_t)packet.eatLenEnc();
745 		columnDefs(columns, cmd, header_);
746 
747 		auto statusFlags = eatEOF(retrieve());
748 		if ((statusFlags & StatusFlags.SERVER_STATUS_CURSOR_EXISTS) == 0) {
749 			while (true) {
750 				auto row = retrieve();
751 				if (row.peek!ubyte == StatusPackets.EOF_Packet) {
752 					eatEOF(row);
753 					break;
754 				}
755 			}
756 		}
757 	}
758 
759 	void discardUntilEOF(InputPacket packet) {
760 		while (true) {
761 			if (packet.peek!ubyte == StatusPackets.EOF_Packet) {
762 				eatEOF(packet);
763 				break;
764 			}
765 			packet = retrieve();
766 		}
767 	}
768 
769 	auto eatEOF(InputPacket packet) {
770 		auto id = packet.eat!ubyte;
771 		if (id != StatusPackets.EOF_Packet)
772 			throw new MySQLProtocolException("Unexpected packet format");
773 
774 		status_.error = 0;
775 		status_.warnings = packet.eat!ushort();
776 		status_.flags = packet.eat!ushort();
777 		info([]);
778 
779 		if (onStatus_)
780 			onStatus_(status_, info_);
781 
782 		return status_.flags;
783 	}
784 
785 	void connectionSettings(const(char)[] connectionString) {
786 		import std.conv;
787 
788 		auto remaining = connectionString;
789 
790 		auto indexValue = remaining.indexOf("=");
791 		while (!remaining.empty) {
792 			auto indexValueEnd = remaining.indexOf(";", indexValue);
793 			if (indexValueEnd <= 0)
794 				indexValueEnd = remaining.length;
795 
796 			auto name = strip(remaining[0..indexValue]);
797 			auto value = strip(remaining[indexValue+1..indexValueEnd]);
798 
799 			switch (name) {
800 			case "host":
801 				settings_.host = value;
802 				break;
803 			case "user":
804 				settings_.user = value;
805 				break;
806 			case "pwd":
807 				settings_.pwd = value;
808 				break;
809 			case "db":
810 				settings_.db = value;
811 				break;
812 			case "port":
813 				settings_.port = to!ushort(value);
814 				break;
815 			default:
816 				throw new MySQLException("Bad connection string: " ~ cast(string)connectionString);
817 			}
818 
819 			if (indexValueEnd == remaining.length)
820 				return;
821 
822 			remaining = remaining[indexValueEnd+1..$];
823 			indexValue = remaining.indexOf("=");
824 		}
825 
826 		throw new MySQLException("Bad connection string: " ~ cast(string)connectionString);
827 	}
828 
829 	SocketType socket_;
830 	MySQLHeader header_;
831 	MySQLRow row_;
832 	char[] columns_;
833 	char[] info_;
834 	ubyte[] in_;
835 	ubyte[] out_;
836 	ubyte seq_ = 0;
837 
838 	OnStatusCallback onStatus_;
839 	CapabilityFlags caps_;
840 	ConnectionStatus status_;
841 	ConnectionSettings settings_;
842 	ServerInfo server_;
843 }