1 module mysql.connection;
2 
3 
4 import std.algorithm;
5 import std.array;
6 import std.string;
7 import std.traits;
8 
9 public import mysql.exception;
10 import mysql.packet;
11 import mysql.protocol;
12 public import mysql.type;
13 
14 
15 immutable CapabilityFlags DefaultClientCaps = CapabilityFlags.CLIENT_LONG_PASSWORD | CapabilityFlags.CLIENT_LONG_FLAG |
16 	CapabilityFlags.CLIENT_CONNECT_WITH_DB | CapabilityFlags.CLIENT_PROTOCOL_41 | CapabilityFlags.CLIENT_SECURE_CONNECTION;
17 
18 
19 struct ConnectionStatus {
20 	ulong affected = 0;
21 	ulong insertID = 0;
22 	ushort flags = 0;
23 	ushort error = 0;
24 	ushort warnings = 0;
25 }
26 
27 
28 private struct ConnectionSettings {
29 	CapabilityFlags caps = DefaultClientCaps;
30 
31 	const(char)[] host;
32 	const(char)[] user;
33 	const(char)[] pwd;
34 	const(char)[] db;
35 	ushort port = 3306;
36 }
37 
38 
39 private struct ServerInfo {
40 	const(char)[] versionString;
41 	ubyte protocol;
42 	ubyte charSet;
43 	ushort status;
44 	uint connection;
45 	uint caps;
46 }
47 
48 
49 struct PreparedStatement {
50 package:
51 	uint id;    // todo: investigate if it's really necessary to close statements explicitly
52 	uint params;
53 }
54 
55 
56 struct Connection(SocketType) {
57 	void connect(string connectionString) {
58 		connectionSettings(connectionString);
59 		connect();
60 	}
61 
62 	void connect(const(char)[] host, ushort port, const(char)[] user, const(char)[] pwd, const(char)[] db, CapabilityFlags caps = DefaultClientCaps) {
63 		settings_.host = host;
64 		settings_.user = user;
65 		settings_.pwd = pwd;
66 		settings_.db = db;
67 		settings_.port = port;
68 		settings_.caps = caps | CapabilityFlags.CLIENT_LONG_PASSWORD | CapabilityFlags.CLIENT_PROTOCOL_41;
69 
70 		connect();
71 	}
72 
73 	void use(const(char)[] db) {
74 		send(Commands.COM_INIT_DB, db);
75 		eatStatus(retrieve());
76 	}
77 
78 	void ping() {
79 		send(Commands.COM_PING);
80 		eatStatus(retrieve());
81 	}
82 
83 	void refresh() {
84 		send(Commands.COM_REFRESH);
85 		eatStatus(retrieve());
86 	}
87 
88 	void reset() {
89 		send(Commands.COM_RESET_CONNECTION);
90 		eatStatus(retrieve());
91 	}
92 
93 	const(char)[] statistics() {
94 		send(Commands.COM_STATISTICS);
95 
96 		auto answer = retrieve();
97 		return answer.eat!(const(char)[])(answer.remaining);
98 	}
99 
100 	auto prepare(const(char)[] sql) {
101 		send(Commands.COM_STMT_PREPARE, sql);
102 
103 		auto answer = retrieve();
104 
105 		if (answer.peek!ubyte != StatusPackets.OK_Packet)
106 			check(answer);
107 
108 		answer.expect!ubyte(0);
109 
110 		auto id = answer.eat!uint;
111 		auto columns = answer.eat!ushort;
112 		auto params = answer.eat!ushort;
113 		answer.expect!ubyte(0);
114 
115 		auto warnings = answer.eat!ushort;
116 
117 		if (params) {
118 			foreach (i; 0..params)
119 				skipColumnDef(retrieve(), Commands.COM_STMT_PREPARE);
120 
121 			eatEOF(retrieve());
122 		}
123 
124 		if (columns) {
125 			MySQLColumn def;
126 			foreach (i; 0..columns)
127 				skipColumnDef(retrieve(), Commands.COM_STMT_PREPARE);
128 
129 			eatEOF(retrieve());
130 		}
131 
132 		return PreparedStatement(id, params);
133 	}
134 
135 	void execute(Args...)(const(char)[] stmt, Args args) {
136 		scope(failure) disconnect();
137 
138 		auto id = prepare(stmt);
139 		execute(id, args);
140 		close(id);
141 	}
142 
143 	void begin() {
144 		if (status_.flags & StatusFlags.SERVER_STATUS_IN_TRANS)
145 			throw new MySQLErrorException("MySQL does not support nested transactions - commit or rollback before starting a new transaction");
146 
147 		query("start transaction");
148 
149 		assert(status_.flags & StatusFlags.SERVER_STATUS_IN_TRANS);
150 	}
151 
152 	void commit() {
153 		if ((status_.flags & StatusFlags.SERVER_STATUS_IN_TRANS) == 0)
154 			throw new MySQLErrorException("No active transaction");
155 
156 		query("commit");
157 
158 		assert((status_.flags & StatusFlags.SERVER_STATUS_IN_TRANS) == 0);
159 	}
160 
161 	void rollback() {
162 		if (connected) {
163 			if ((status_.flags & StatusFlags.SERVER_STATUS_IN_TRANS) == 0)
164 				throw new MySQLErrorException("No active transaction");
165 
166 			query("rollback");
167 
168 			assert((status_.flags & StatusFlags.SERVER_STATUS_IN_TRANS) == 0);
169 		}
170 	}
171 
172 	@property bool inTransaction() const {
173 		return connected && (status_.flags & StatusFlags.SERVER_STATUS_IN_TRANS);
174 	}
175 
176 	void execute(Args...)(PreparedStatement stmt, Args args) {
177 		scope(failure) disconnect();
178 
179 		ensureConnected();
180 
181 		seq_ = 0;
182 		auto packet = OutputPacket(&out_);
183 		packet.put!ubyte(Commands.COM_STMT_EXECUTE);
184 		packet.put!uint(stmt.id);
185 		packet.put!ubyte(Cursors.CURSOR_TYPE_READ_ONLY);
186 		packet.put!uint(1);
187 
188 		static if (args.length == 0) {
189 			enum shouldDiscard = true;
190 		} else {
191 			enum shouldDiscard = !isCallable!(args[args.length - 1]);
192 		}
193 
194 		enum argCount = shouldDiscard ? args.length : (args.length - 1);
195 
196 		if (!argCount && stmt.params)
197 			throw new MySQLErrorException("Wrong number of parameters for query");
198 
199 		static if (argCount) {
200 			enum NullsCapacity = 128; // must be power of 2
201 			ubyte[NullsCapacity >> 3] nulls;
202 			size_t bitsOut = 0;
203 			size_t indexArg = 0;
204 			foreach(i, arg; args[0..argCount]) {
205 				const auto index = (indexArg >> 3) & (NullsCapacity - 1);
206 				const auto bit = indexArg & 7;
207 
208 				static if (is(typeof(arg) == typeof(null))) {
209 					nulls[index] = nulls[index] | (1 << bit);
210 					++indexArg;
211 				} else static if (is(Unqual!(typeof(arg)) == MySQLValue)) {
212 					if (arg.isNull)
213 						nulls[index] = nulls[index] | (1 << bit);
214 					++indexArg;
215 				} else static if (isArray!(typeof(arg)) && !isSomeString!(typeof(arg))) {
216 					indexArg += arg.length;
217 				} else {
218 					++indexArg;
219 				}
220 
221 				auto finishing = (i == argCount - 1);
222 				auto remaining = indexArg - bitsOut;
223 
224 				if (finishing || (remaining >= NullsCapacity)) {
225 					while (remaining) {
226 						auto bits = min(remaining, NullsCapacity);
227 
228 						packet.put(nulls[0..(bits + 7) >> 3]);
229 						bitsOut += bits;
230 						nulls[] = 0;
231 
232 						remaining = (indexArg - bitsOut);
233 						if (!remaining || (!finishing && (remaining < NullsCapacity)))
234 							break;
235 					}
236 				}
237 			}
238 			packet.put!ubyte(1);
239 
240 			if (indexArg != stmt.params)
241 				throw new MySQLErrorException("Wrong number of parameters for query");
242 
243 			foreach (arg; args[0..argCount])
244 				putValueType(packet, arg);
245 
246 			foreach (arg; args[0..argCount]) {
247 				static if (!is(typeof(arg) == typeof(null))) {
248 					putValue(packet, arg);
249 				}
250 			}
251 		}
252 
253 		packet.finalize(seq_);
254 		++seq_;
255 
256 		socket_.write(packet.get());
257 
258 		auto answer = retrieve();
259 		if (isStatus(answer)) {
260 			eatStatus(answer);
261 		} else {
262 			static if (!shouldDiscard) {
263 				resultSet(answer, stmt.id, Commands.COM_STMT_EXECUTE, args[args.length - 1]);
264 			} else {
265 				discardAll(answer, Commands.COM_STMT_EXECUTE);
266 			}
267 		}
268 	}
269 
270 	void close(PreparedStatement stmt) {
271 		uint[1] data = [ stmt.id ];
272 		send(Commands.COM_STMT_CLOSE, data);
273 	}
274 
275 	alias OnStatusCallback = void delegate(ConnectionStatus status, const(char)[] message);
276 	@property void onStatus(OnStatusCallback callback) {
277 		onStatus_ = callback;
278 	}
279 
280 	@property OnStatusCallback onStatus() const {
281 		return onStatus_;
282 	}
283 
284 	@property ulong insertID() {
285 		return status_.insertID;
286 	}
287 
288 	@property ulong affected() {
289 		return cast(size_t)status_.affected;
290 	}
291 
292 	@property size_t warnings() {
293 		return status_.warnings;
294 	}
295 
296 	@property size_t error() {
297 		return status_.error;
298 	}
299 
300 	@property const(char)[] status() const {
301 		return info_;
302 	}
303 
304 	@property bool connected() const {
305 		return socket_.connected;
306 	}
307 
308 	void disconnect() {
309 		socket_.close();
310 	}
311 
312 	~this() {
313 		disconnect();
314 	}
315 
316 private:
317 	void connect() {
318 		socket_.connect(settings_.host, settings_.port);
319 
320 		seq_ = 0;
321 		eatHandshake(retrieve());
322 	}
323 
324 	void send(T)(Commands cmd, T[] data) {
325 		send(cmd, cast(ubyte*)data.ptr, data.length * T.sizeof);
326 	}
327 
328 	void send(Commands cmd, ubyte* data = null, size_t length = 0) {
329 		if(!socket_.connected)
330 			connect();
331 
332 		seq_ = 0;
333 		auto header = OutputPacket(&out_);
334 		header.put!ubyte(cmd);
335 		header.finalize(seq_, length);
336 		++seq_;
337 
338 		socket_.write(header.get());
339 		if (length)
340 			socket_.write(data[0..length]);
341 	}
342 
343 	void query(const(char)[] sql) {
344 		send(Commands.COM_QUERY, sql);
345 
346 		auto answer = retrieve();
347 		if (isStatus(answer))
348 			eatStatus(answer);
349 	}
350 
351 	void ensureConnected() {
352 		if(!socket_.connected)
353 			connect();
354 	}
355 
356 	bool isStatus(InputPacket packet) {
357 		auto id = packet.peek!ubyte;
358 		switch (id) {
359 			case StatusPackets.ERR_Packet:
360 			case StatusPackets.OK_Packet:
361 				return 1;
362 			default:
363 				return false;
364 		}
365 	}
366 
367 	void check(InputPacket packet, bool smallError = false) {
368 		auto id = packet.peek!ubyte;
369 		switch (id) {
370 			case StatusPackets.ERR_Packet:
371 			case StatusPackets.OK_Packet:
372 				eatStatus(packet, smallError);
373 				break;
374 			default:
375 				break;
376 		}
377 	}
378 
379 	InputPacket retrieve() {
380 		scope(failure) disconnect();
381 
382 		ubyte[4] header;
383 		socket_.read(header);
384 
385 		auto len = header[0] | (header[1] << 8) | (header[2] << 16);
386 		auto seq = header[3];
387 
388 		if (seq != seq_)
389 			throw new MySQLConnectionException("Out of order packet received");
390 
391 		++seq_;
392 
393 		in_.length = len;
394 		socket_.read(in_);
395 
396 		if (in_.length != len)
397 			throw new MySQLConnectionException("Wrong number of bytes read");
398 
399 		return InputPacket(&in_);
400 	}
401 
402 	void eatHandshake(InputPacket packet) {
403 		scope(failure) disconnect();
404 
405 		check(packet, true);
406 
407 		server_.protocol = packet.eat!ubyte;
408 		server_.versionString = packet.eat!(const(char)[])(packet.countUntil(0, true));
409 		packet.skip(1);
410 
411 		server_.connection = packet.eat!uint;
412 
413 		const auto authLengthStart = 8;
414 		size_t authLength = authLengthStart;
415 
416 		ubyte[256] auth;
417 		auth[0..authLength] = packet.eat!(ubyte[])(authLength);
418 
419 		packet.expect!ubyte(0);
420 
421 		server_.caps = packet.eat!ushort;
422 
423 		if (!packet.empty) {
424 			server_.charSet = packet.eat!ubyte;
425 			server_.status = packet.eat!ushort;
426 			server_.caps |= packet.eat!ushort << 16;
427 			server_.caps |= CapabilityFlags.CLIENT_LONG_PASSWORD;
428 
429 			if ((server_.caps & CapabilityFlags.CLIENT_PROTOCOL_41) == 0)
430 				throw new MySQLProtocolException("Server doesn't support protocol v4.1");
431 
432 			if (server_.caps & CapabilityFlags.CLIENT_SECURE_CONNECTION) {
433 				packet.skip(1);
434 			} else {
435 				packet.expect!ubyte(0);
436 			}
437 
438 			packet.skip(10);
439 
440 			authLength += packet.countUntil(0, true);
441 			if (authLength > auth.length)
442 				throw new MySQLConnectionException("Bad packet format");
443 
444 			auth[authLengthStart..authLength] = packet.eat!(ubyte[])(authLength - authLengthStart);
445 
446 			packet.expect!ubyte(0);
447 		}
448 
449 		ubyte[20] token;
450 		{
451 			import std.digest.sha;
452 
453 			auto pass = sha1Of(cast(const(ubyte)[])settings_.pwd);
454 			token = sha1Of(pass);
455 
456 			SHA1 sha1;
457 			sha1.start();
458 			sha1.put(auth[0..authLength]);
459 			sha1.put(token);
460 			token = sha1.finish();
461 
462 			foreach (i; 0..20)
463 				token[i] = token[i] ^ pass[i];
464 		}
465 
466 		caps_ = cast(CapabilityFlags)(settings_.caps & server_.caps);
467 
468 		auto reply = OutputPacket(&out_);
469 		reply.reserve(64 + settings_.user.length + settings_.pwd.length + settings_.db.length);
470 
471 		reply.put!uint(caps_);
472 		reply.put!uint(1);
473 		reply.put!ubyte(33);
474 		reply.fill(0, 23);
475 
476 		reply.put(settings_.user);
477 		reply.put!ubyte(0);
478 
479 		if (settings_.pwd.length) {
480 			if (caps_ & CapabilityFlags.CLIENT_SECURE_CONNECTION) {
481 				reply.put!ubyte(token.length);
482 				reply.put(token);
483 			} else {
484 				reply.put(token);
485 				reply.put!ubyte(0);
486 			}
487 		} else {
488 			reply.put!ubyte(0);
489 		}
490 
491 		if (settings_.db.length && (caps_ & CapabilityFlags.CLIENT_CONNECT_WITH_DB))
492 			reply.put(settings_.db);
493 
494 		reply.put!ubyte(0);
495 
496 		reply.finalize(seq_);
497 		++seq_;
498 
499 		socket_.write(reply.get());
500 
501 		eatStatus(retrieve());
502 	}
503 
504 	void eatStatus(InputPacket packet, bool smallError = false) {
505 		auto id = packet.eat!ubyte;
506 
507 		switch (id) {
508 		case StatusPackets.OK_Packet:
509 			status_.error = 0;
510 			status_.affected = packet.eatLenEnc();
511 			status_.insertID = packet.eatLenEnc();
512 			status_.flags = packet.eat!ushort;
513 			status_.warnings = packet.eat!ushort;
514 
515 			if (caps_ & CapabilityFlags.CLIENT_SESSION_TRACK) {
516 				info(packet.eat!(const(char)[])(cast(size_t)packet.eatLenEnc()));
517 				packet.skip(1);
518 
519 				if (status_.flags & StatusFlags.SERVER_SESSION_STATE_CHANGED) {
520 					packet.skip(cast(size_t)packet.eatLenEnc());
521 					packet.skip(1);
522 				}
523 			} else if (!packet.empty) {
524 				auto len = cast(size_t)packet.eatLenEnc();
525 				info(packet.eat!(const(char)[])(min(len, packet.remaining)));
526 			}
527 
528 			if (onStatus_)
529 				onStatus_(status_, info_);
530 
531 			break;
532 		case StatusPackets.EOF_Packet:
533 			status_.error = 0;
534 			status_.warnings = packet.eat!ushort;
535 			status_.flags = packet.eat!ushort;
536 			info([]);
537 
538 			if (onStatus_)
539 				onStatus_(status_, info_);
540 
541 			break;
542 		case StatusPackets.ERR_Packet:
543 			status_.flags = 0;
544 			status_.warnings = 0;
545 			status_.error = packet.eat!ushort;
546 			if (!smallError)
547 				packet.skip(6);
548 			info(packet.eat!(const(char)[])(packet.remaining));
549 
550 			if (onStatus_)
551 				onStatus_(status_, info_);
552 
553 			switch(status_.error) {
554 			case ErrorCodes.ER_DUP_ENTRY_WITH_KEY_NAME:
555 			case ErrorCodes.ER_DUP_ENTRY:
556 				throw new MySQLDuplicateEntryException(cast(string)info_);
557 			default:
558 				throw new MySQLErrorException(cast(string)info_);
559 			}
560 		default:
561 			throw new MySQLProtocolException("Unexpected packet format");
562 		}
563 	}
564 
565 	void info(const(char)[] value) {
566 		info_.length = value.length;
567 		info_[0..$] = value;
568 	}
569 
570 	void skipColumnDef(InputPacket packet, Commands cmd) {
571 		packet.skip(cast(size_t)packet.eatLenEnc());	// catalog
572 		packet.skip(cast(size_t)packet.eatLenEnc());	// schema
573 		packet.skip(cast(size_t)packet.eatLenEnc());	// table
574 		packet.skip(cast(size_t)packet.eatLenEnc());	// original_table
575 		packet.skip(cast(size_t)packet.eatLenEnc());	// name
576 		packet.skip(cast(size_t)packet.eatLenEnc());	// original_name
577 		packet.skipLenEnc();							// next_length
578 		packet.skip(10); // 2 + 4 + 1 + 2 + 1			// charset, length, type, flags, decimals
579 		packet.expect!ushort(0);
580 
581 		if (cmd == Commands.COM_FIELD_LIST)
582 			packet.skip(cast(size_t)packet.eatLenEnc());// default values
583 	}
584 
585 	void columnDef(InputPacket packet, Commands cmd, ref MySQLColumn def) {
586 		packet.skip(cast(size_t)packet.eatLenEnc());	// catalog
587 		packet.skip(cast(size_t)packet.eatLenEnc());	// schema
588 		packet.skip(cast(size_t)packet.eatLenEnc());	// table
589 		packet.skip(cast(size_t)packet.eatLenEnc());	// original_table
590 		auto len = cast(size_t)packet.eatLenEnc();
591 		columns_ ~= packet.eat!(const(char)[])(len);
592 		def.name = columns_[$-len..$];
593 		packet.skip(cast(size_t)packet.eatLenEnc());	// original_name
594 		packet.skipLenEnc();							// next_length
595 		packet.skip(2);									// charset
596 		def.length = packet.eat!uint;
597 		def.type = cast(ColumnTypes)packet.eat!ubyte;
598 		def.flags = packet.eat!ushort;
599 		def.decimals = packet.eat!ubyte;
600 
601 		packet.expect!ushort(0);
602 
603 		if (cmd == Commands.COM_FIELD_LIST)
604 			packet.skip(cast(size_t)packet.eatLenEnc());// default values
605 	}
606 
607 	auto columnDefs(size_t count, Commands cmd) {
608 		header_.length = count;
609 		foreach (i; 0..count)
610 			columnDef(retrieve(), cmd, header_[i]);
611 		return header_;
612 	}
613 
614 	void resultSetRow(InputPacket packet, Commands cmd, MySQLHeader header, MySQLRow row) {
615 		assert(row.length == header.length);
616 
617 		packet.expect!ubyte(0);
618 		auto nulls = packet.eat!(ubyte[])((header.length + 2 + 7) >> 3);
619 		foreach (i, column; header) {
620 			const auto index = (i + 2) >> 3; // bit offset of 2
621 			const auto bit = (i + 2) & 7;
622 
623 			if ((nulls[index] & (1 << bit)) == 0) {
624 				row.set(i, eatValue(packet, column));
625 			} else {
626 				row.nullify(i);
627 			}
628 		}
629 		assert(packet.empty);
630 	}
631 
632 	bool callHandler(RowHandler)(RowHandler handler, size_t i, MySQLHeader header, MySQLRow row) if ((ParameterTypeTuple!(RowHandler).length == 1) && is(ParameterTypeTuple!(RowHandler)[0] == MySQLRow)) {
633 		static if (is(ReturnType!(RowHandler) == void)) {
634 			handler(row);
635 			return true;
636 		} else {
637 			return handler(row); // return type must be bool
638 		}
639 	}
640 
641 	bool callHandler(RowHandler)(RowHandler handler, size_t i, MySQLHeader header, MySQLRow row) if ((ParameterTypeTuple!(RowHandler).length == 2) && isNumeric!(ParameterTypeTuple!(RowHandler)[0]) && is(ParameterTypeTuple!(RowHandler)[1] == MySQLRow)) {
642 		static if (is(ReturnType!(RowHandler) == void)) {
643 			handler(cast(ParameterTypeTuple!(RowHandler)[0])i, row);
644 			return true;
645 		} else {
646 			return handler(cast(ParameterTypeTuple!(RowHandler)[0])i, row); // return type must be bool
647 		}
648 	}
649 
650 	bool callHandler(RowHandler)(RowHandler handler, size_t i, MySQLHeader header, MySQLRow row) if ((ParameterTypeTuple!(RowHandler).length == 2) && is(ParameterTypeTuple!(RowHandler)[0] == MySQLHeader) && is(ParameterTypeTuple!(RowHandler)[1] == MySQLRow)) {
651 		static if (is(ReturnType!(RowHandler) == void)) {
652 			handler(header, row);
653 			return true;
654 		} else {
655 			return handler(header, row); // return type must be bool
656 		}
657 	}
658 
659 	bool callHandler(RowHandler)(RowHandler handler, size_t i, MySQLHeader header, MySQLRow row) if ((ParameterTypeTuple!(RowHandler).length == 3) && isNumeric!(ParameterTypeTuple!(RowHandler)[0]) && is(ParameterTypeTuple!(RowHandler)[1] == MySQLHeader) && is(ParameterTypeTuple!(RowHandler)[2] == MySQLRow)) {
660 		static if (is(ReturnType!(RowHandler) == void)) {
661 			handler(i, header, row);
662 			return true;
663 		} else {
664 			return handler(i, header, row); // return type must be bool
665 		}
666 	}
667 
668 	void resultSet(RowHandler)(InputPacket packet, uint stmt, Commands cmd, RowHandler handler) {
669 		columns_.length = 0;
670 
671 		auto columns = cast(size_t)packet.eatLenEnc();
672 		auto header = columnDefs(columns, cmd);
673 		row_.length = columns;
674 		row_.header(header);
675 
676 		size_t index = 0;
677 		auto statusFlags = eatEOF(retrieve());
678 		if (statusFlags & StatusFlags.SERVER_STATUS_CURSOR_EXISTS) {
679 			uint[2] data = [ stmt, 4096 ]; // todo: make setting - rows per fetch
680 			while (statusFlags & (StatusFlags.SERVER_STATUS_CURSOR_EXISTS | StatusFlags.SERVER_MORE_RESULTS_EXISTS)) {
681 				send(Commands.COM_STMT_FETCH, data);
682 
683 				auto answer = retrieve();
684 				if (answer.peek!ubyte == StatusPackets.ERR_Packet)
685 					check(answer);
686 
687 				auto row = answer.empty ? retrieve() : answer;
688 				while (true) {
689 					if (row.peek!ubyte == StatusPackets.EOF_Packet) {
690 						statusFlags = eatEOF(row);
691 						break;
692 					}
693 
694 					resultSetRow(row, Commands.COM_STMT_FETCH, header, row_);
695 					if (!callHandler(handler, index++, header, row_)) {
696 						discardUntilEOF(retrieve());
697 						statusFlags = 0;
698 						break;
699 					}
700 					row = retrieve();
701 				}
702 			}
703 		} else {
704 			auto row = retrieve();
705 			while (true) {
706 				if (row.peek!ubyte == StatusPackets.EOF_Packet) {
707 					eatEOF(row);
708 					break;
709 				}
710 
711 				resultSetRow(row, cmd, header, row_);
712 				if (!callHandler(handler, index++, header, row_)) {
713 					discardUntilEOF(retrieve());
714 					break;
715 				}
716 
717 				row = retrieve();
718 			}
719 		}
720 	}
721 
722 	void discardAll(InputPacket packet, Commands cmd) {
723 		auto columns = cast(size_t)packet.eatLenEnc();
724 		auto defs = columnDefs(columns, cmd);
725 
726 		auto statusFlags = eatEOF(retrieve());
727 		if ((statusFlags & StatusFlags.SERVER_STATUS_CURSOR_EXISTS) == 0) {
728 			while (true) {
729 				auto row = retrieve();
730 				if (row.peek!ubyte == StatusPackets.EOF_Packet) {
731 					eatEOF(row);
732 					break;
733 				}
734 			}
735 		}
736 	}
737 
738 	void discardUntilEOF(InputPacket packet) {
739 		while (true) {
740 			if (packet.peek!ubyte == StatusPackets.EOF_Packet) {
741 				eatEOF(packet);
742 				break;
743 			}
744 			packet = retrieve();
745 		}
746 	}
747 
748 	auto eatEOF(InputPacket packet) {
749 		auto id = packet.eat!ubyte;
750 		if (id != StatusPackets.EOF_Packet)
751 			throw new MySQLProtocolException("Unexpected packet format");
752 
753 		status_.error = 0;
754 		status_.warnings = packet.eat!ushort();
755 		status_.flags = packet.eat!ushort();
756 		info([]);
757 
758 		if (onStatus_)
759 			onStatus_(status_, info_);
760 
761 		return status_.flags;
762 	}
763 
764 	void connectionSettings(const(char)[] connectionString) {
765 		import std.conv;
766 
767 		auto remaining = connectionString;
768 
769 		auto indexValue = remaining.indexOf("=");
770 		while (!remaining.empty) {
771 			auto indexValueEnd = remaining.indexOf(";", indexValue);
772 			if (indexValueEnd <= 0)
773 				indexValueEnd = remaining.length;
774 
775 			auto name = strip(remaining[0..indexValue]);
776 			auto value = strip(remaining[indexValue+1..indexValueEnd]);
777 
778 			switch (name) {
779 			case "host":
780 				settings_.host = value;
781 				break;
782 			case "user":
783 				settings_.user = value;
784 				break;
785 			case "pwd":
786 				settings_.pwd = value;
787 				break;
788 			case "db":
789 				settings_.db = value;
790 				break;
791 			case "port":
792 				settings_.port = to!ushort(value);
793 				break;
794 			default:
795 				throw new MySQLException("Bad connection string: " ~ cast(string)connectionString);
796 			}
797 
798 			if (indexValueEnd == remaining.length)
799 				return;
800 
801 			remaining = remaining[indexValueEnd+1..$];
802 			indexValue = remaining.indexOf("=");
803 		}
804 
805 		throw new MySQLException("Bad connection string: " ~ cast(string)connectionString);
806 	}
807 
808 	SocketType socket_;
809 	MySQLHeader header_;
810 	MySQLRow row_;
811 	char[] columns_;
812 	char[] info_;
813 	ubyte[] in_;
814 	ubyte[] out_;
815 	ubyte seq_ = 0;
816 
817 	OnStatusCallback onStatus_;
818 	CapabilityFlags caps_;
819 	ConnectionStatus status_;
820 	ConnectionSettings settings_;
821 	ServerInfo server_;
822 }