1 module mysql.connection; 2 3 4 import std.algorithm; 5 import std.array; 6 import std.string; 7 import std.traits; 8 9 public import mysql.exception; 10 import mysql.packet; 11 import mysql.protocol; 12 public import mysql.type; 13 14 15 immutable CapabilityFlags DefaultClientCaps = CapabilityFlags.CLIENT_LONG_PASSWORD | CapabilityFlags.CLIENT_LONG_FLAG | 16 CapabilityFlags.CLIENT_CONNECT_WITH_DB | CapabilityFlags.CLIENT_PROTOCOL_41 | CapabilityFlags.CLIENT_SECURE_CONNECTION; 17 18 19 struct ConnectionStatus { 20 ulong affected = 0; 21 ulong insertID = 0; 22 ushort flags = 0; 23 ushort error = 0; 24 ushort warnings = 0; 25 } 26 27 28 private struct ConnectionSettings { 29 CapabilityFlags caps = DefaultClientCaps; 30 31 const(char)[] host; 32 const(char)[] user; 33 const(char)[] pwd; 34 const(char)[] db; 35 ushort port = 3306; 36 } 37 38 39 private struct ServerInfo { 40 const(char)[] versionString; 41 ubyte protocol; 42 ubyte charSet; 43 ushort status; 44 uint connection; 45 uint caps; 46 } 47 48 49 @property string placeholders(size_t x, bool parens = true) { 50 import std.range : repeat, take; 51 52 if (parens) 53 return "(" ~ ("?".repeat().take(x).join(",")) ~ ")"; 54 return "?".repeat.take(x).join(","); 55 } 56 57 58 @property string placeholders(T)(T[] x, bool parens = true) { 59 import std.range : repeat, take; 60 61 if (parens) 62 return "(" ~ ("?".repeat().take(x.length).join(",")) ~ ")"; 63 return "?".repeat.take(x.length).join(","); 64 } 65 66 67 struct PreparedStatement { 68 package: 69 uint id; 70 uint params; 71 } 72 73 74 struct Connection(SocketType) { 75 void connect(string connectionString) { 76 connectionSettings(connectionString); 77 connect(); 78 } 79 80 void connect(const(char)[] host, ushort port, const(char)[] user, const(char)[] pwd, const(char)[] db, CapabilityFlags caps = DefaultClientCaps) { 81 settings_.host = host; 82 settings_.user = user; 83 settings_.pwd = pwd; 84 settings_.db = db; 85 settings_.port = port; 86 settings_.caps = caps | CapabilityFlags.CLIENT_LONG_PASSWORD | CapabilityFlags.CLIENT_PROTOCOL_41; 87 88 connect(); 89 } 90 91 void use(const(char)[] db) { 92 send(Commands.COM_INIT_DB, db); 93 eatStatus(retrieve()); 94 } 95 96 void ping() { 97 send(Commands.COM_PING); 98 eatStatus(retrieve()); 99 } 100 101 void refresh() { 102 send(Commands.COM_REFRESH); 103 eatStatus(retrieve()); 104 } 105 106 void reset() { 107 send(Commands.COM_RESET_CONNECTION); 108 eatStatus(retrieve()); 109 } 110 111 const(char)[] statistics() { 112 send(Commands.COM_STATISTICS); 113 114 auto answer = retrieve(); 115 return answer.eat!(const(char)[])(answer.remaining); 116 } 117 118 auto prepare(const(char)[] sql) { 119 send(Commands.COM_STMT_PREPARE, sql); 120 121 auto answer = retrieve(); 122 123 if (answer.peek!ubyte != StatusPackets.OK_Packet) 124 eatStatus(answer); 125 126 answer.expect!ubyte(0); 127 128 auto id = answer.eat!uint; 129 auto columns = answer.eat!ushort; 130 auto params = answer.eat!ushort; 131 answer.expect!ubyte(0); 132 133 auto warnings = answer.eat!ushort; 134 135 if (params) { 136 foreach (i; 0..params) 137 skipColumnDef(retrieve(), Commands.COM_STMT_PREPARE); 138 139 eatEOF(retrieve()); 140 } 141 142 if (columns) { 143 MySQLColumn def; 144 foreach (i; 0..columns) 145 skipColumnDef(retrieve(), Commands.COM_STMT_PREPARE); 146 147 eatEOF(retrieve()); 148 } 149 150 return PreparedStatement(id, params); 151 } 152 153 void execute(Args...)(const(char)[] stmt, Args args) { 154 scope(failure) disconnect(); 155 156 auto id = prepare(stmt); 157 execute(id, args); 158 close(id); 159 } 160 161 void begin() { 162 if (inTransaction) 163 throw new MySQLErrorException("MySQL does not support nested transactions - commit or rollback before starting a new transaction"); 164 165 query("start transaction"); 166 167 assert(inTransaction); 168 } 169 170 void commit() { 171 if (!inTransaction) 172 throw new MySQLErrorException("No active transaction"); 173 174 query("commit"); 175 176 assert(!inTransaction); 177 } 178 179 void rollback() { 180 if (connected) { 181 if ((status_.flags & StatusFlags.SERVER_STATUS_IN_TRANS) == 0) 182 throw new MySQLErrorException("No active transaction"); 183 184 query("rollback"); 185 186 assert(!inTransaction); 187 } 188 } 189 190 @property bool inTransaction() const { 191 return connected && (status_.flags & StatusFlags.SERVER_STATUS_IN_TRANS); 192 } 193 194 void execute(Args...)(PreparedStatement stmt, Args args) { 195 scope(failure) disconnect(); 196 197 ensureConnected(); 198 199 seq_ = 0; 200 auto packet = OutputPacket(&out_); 201 packet.put!ubyte(Commands.COM_STMT_EXECUTE); 202 packet.put!uint(stmt.id); 203 packet.put!ubyte(Cursors.CURSOR_TYPE_READ_ONLY); 204 packet.put!uint(1); 205 206 static if (args.length == 0) { 207 enum shouldDiscard = true; 208 } else { 209 enum shouldDiscard = !isCallable!(args[args.length - 1]); 210 } 211 212 enum argCount = shouldDiscard ? args.length : (args.length - 1); 213 214 if (!argCount && stmt.params) 215 throw new MySQLErrorException(format("Wrong number of parameters for query. Got 0 but expected %d.", stmt.params)); 216 217 static if (argCount) { 218 enum NullsCapacity = 128; // must be power of 2 219 ubyte[NullsCapacity >> 3] nulls; 220 size_t bitsOut = 0; 221 size_t indexArg = 0; 222 foreach(i, arg; args[0..argCount]) { 223 const auto index = (indexArg >> 3) & (NullsCapacity - 1); 224 const auto bit = indexArg & 7; 225 226 static if (is(typeof(arg) == typeof(null))) { 227 nulls[index] = nulls[index] | (1 << bit); 228 ++indexArg; 229 } else static if (is(Unqual!(typeof(arg)) == MySQLValue)) { 230 if (arg.isNull) 231 nulls[index] = nulls[index] | (1 << bit); 232 ++indexArg; 233 } else static if (isArray!(typeof(arg)) && !isSomeString!(typeof(arg))) { 234 indexArg += arg.length; 235 } else { 236 ++indexArg; 237 } 238 239 auto finishing = (i == argCount - 1); 240 auto remaining = indexArg - bitsOut; 241 242 if (finishing || (remaining >= NullsCapacity)) { 243 while (remaining) { 244 auto bits = min(remaining, NullsCapacity); 245 246 packet.put(nulls[0..(bits + 7) >> 3]); 247 bitsOut += bits; 248 nulls[] = 0; 249 250 remaining = (indexArg - bitsOut); 251 if (!remaining || (!finishing && (remaining < NullsCapacity))) 252 break; 253 } 254 } 255 } 256 packet.put!ubyte(1); 257 258 if (indexArg != stmt.params) 259 throw new MySQLErrorException(format("Wrong number of parameters for query. Got %d but expected %d.", indexArg, stmt.params)); 260 261 foreach (arg; args[0..argCount]) 262 putValueType(packet, arg); 263 264 foreach (arg; args[0..argCount]) { 265 static if (!is(typeof(arg) == typeof(null))) { 266 putValue(packet, arg); 267 } 268 } 269 } 270 271 packet.finalize(seq_); 272 ++seq_; 273 274 socket_.write(packet.get()); 275 276 auto answer = retrieve(); 277 if (isStatus(answer)) { 278 eatStatus(answer); 279 } else { 280 static if (!shouldDiscard) { 281 resultSet(answer, stmt.id, Commands.COM_STMT_EXECUTE, args[args.length - 1]); 282 } else { 283 discardAll(answer, Commands.COM_STMT_EXECUTE); 284 } 285 } 286 } 287 288 void close(PreparedStatement stmt) { 289 uint[1] data = [ stmt.id ]; 290 send(Commands.COM_STMT_CLOSE, data); 291 } 292 293 alias OnStatusCallback = void delegate(ConnectionStatus status, const(char)[] message); 294 @property void onStatus(OnStatusCallback callback) { 295 onStatus_ = callback; 296 } 297 298 @property OnStatusCallback onStatus() const { 299 return onStatus_; 300 } 301 302 @property ulong insertID() { 303 return status_.insertID; 304 } 305 306 @property ulong affected() { 307 return cast(size_t)status_.affected; 308 } 309 310 @property size_t warnings() { 311 return status_.warnings; 312 } 313 314 @property size_t error() { 315 return status_.error; 316 } 317 318 @property const(char)[] status() const { 319 return info_; 320 } 321 322 @property bool connected() const { 323 return socket_.connected; 324 } 325 326 void disconnect() { 327 socket_.close(); 328 } 329 330 ~this() { 331 disconnect(); 332 } 333 334 private: 335 void connect() { 336 socket_.connect(settings_.host, settings_.port); 337 338 seq_ = 0; 339 eatHandshake(retrieve()); 340 } 341 342 void send(T)(Commands cmd, T[] data) { 343 send(cmd, cast(ubyte*)data.ptr, data.length * T.sizeof); 344 } 345 346 void send(Commands cmd, ubyte* data = null, size_t length = 0) { 347 if(!socket_.connected) 348 connect(); 349 350 seq_ = 0; 351 auto header = OutputPacket(&out_); 352 header.put!ubyte(cmd); 353 header.finalize(seq_, length); 354 ++seq_; 355 356 socket_.write(header.get()); 357 if (length) 358 socket_.write(data[0..length]); 359 } 360 361 void query(const(char)[] sql) { 362 send(Commands.COM_QUERY, sql); 363 364 auto answer = retrieve(); 365 if (isStatus(answer)) 366 eatStatus(answer); 367 } 368 369 void ensureConnected() { 370 if(!socket_.connected) 371 connect(); 372 } 373 374 bool isStatus(InputPacket packet) { 375 auto id = packet.peek!ubyte; 376 switch (id) { 377 case StatusPackets.ERR_Packet: 378 case StatusPackets.OK_Packet: 379 return 1; 380 default: 381 return false; 382 } 383 } 384 385 void check(InputPacket packet, bool smallError = false) { 386 auto id = packet.peek!ubyte; 387 switch (id) { 388 case StatusPackets.ERR_Packet: 389 case StatusPackets.OK_Packet: 390 eatStatus(packet, smallError); 391 break; 392 default: 393 break; 394 } 395 } 396 397 InputPacket retrieve() { 398 scope(failure) disconnect(); 399 400 ubyte[4] header; 401 socket_.read(header); 402 403 auto len = header[0] | (header[1] << 8) | (header[2] << 16); 404 auto seq = header[3]; 405 406 if (seq != seq_) 407 throw new MySQLConnectionException("Out of order packet received"); 408 409 ++seq_; 410 411 in_.length = len; 412 socket_.read(in_); 413 414 if (in_.length != len) 415 throw new MySQLConnectionException("Wrong number of bytes read"); 416 417 return InputPacket(&in_); 418 } 419 420 void eatHandshake(InputPacket packet) { 421 scope(failure) disconnect(); 422 423 check(packet, true); 424 425 server_.protocol = packet.eat!ubyte; 426 server_.versionString = packet.eat!(const(char)[])(packet.countUntil(0, true)); 427 packet.skip(1); 428 429 server_.connection = packet.eat!uint; 430 431 const auto authLengthStart = 8; 432 size_t authLength = authLengthStart; 433 434 ubyte[256] auth; 435 auth[0..authLength] = packet.eat!(ubyte[])(authLength); 436 437 packet.expect!ubyte(0); 438 439 server_.caps = packet.eat!ushort; 440 441 if (!packet.empty) { 442 server_.charSet = packet.eat!ubyte; 443 server_.status = packet.eat!ushort; 444 server_.caps |= packet.eat!ushort << 16; 445 server_.caps |= CapabilityFlags.CLIENT_LONG_PASSWORD; 446 447 if ((server_.caps & CapabilityFlags.CLIENT_PROTOCOL_41) == 0) 448 throw new MySQLProtocolException("Server doesn't support protocol v4.1"); 449 450 if (server_.caps & CapabilityFlags.CLIENT_SECURE_CONNECTION) { 451 packet.skip(1); 452 } else { 453 packet.expect!ubyte(0); 454 } 455 456 packet.skip(10); 457 458 authLength += packet.countUntil(0, true); 459 if (authLength > auth.length) 460 throw new MySQLConnectionException("Bad packet format"); 461 462 auth[authLengthStart..authLength] = packet.eat!(ubyte[])(authLength - authLengthStart); 463 464 packet.expect!ubyte(0); 465 } 466 467 ubyte[20] token; 468 { 469 import std.digest.sha; 470 471 auto pass = sha1Of(cast(const(ubyte)[])settings_.pwd); 472 token = sha1Of(pass); 473 474 SHA1 sha1; 475 sha1.start(); 476 sha1.put(auth[0..authLength]); 477 sha1.put(token); 478 token = sha1.finish(); 479 480 foreach (i; 0..20) 481 token[i] = token[i] ^ pass[i]; 482 } 483 484 caps_ = cast(CapabilityFlags)(settings_.caps & server_.caps); 485 486 auto reply = OutputPacket(&out_); 487 reply.reserve(64 + settings_.user.length + settings_.pwd.length + settings_.db.length); 488 489 reply.put!uint(caps_); 490 reply.put!uint(1); 491 reply.put!ubyte(33); 492 reply.fill(0, 23); 493 494 reply.put(settings_.user); 495 reply.put!ubyte(0); 496 497 if (settings_.pwd.length) { 498 if (caps_ & CapabilityFlags.CLIENT_SECURE_CONNECTION) { 499 reply.put!ubyte(token.length); 500 reply.put(token); 501 } else { 502 reply.put(token); 503 reply.put!ubyte(0); 504 } 505 } else { 506 reply.put!ubyte(0); 507 } 508 509 if (settings_.db.length && (caps_ & CapabilityFlags.CLIENT_CONNECT_WITH_DB)) 510 reply.put(settings_.db); 511 512 reply.put!ubyte(0); 513 514 reply.finalize(seq_); 515 ++seq_; 516 517 socket_.write(reply.get()); 518 519 eatStatus(retrieve()); 520 } 521 522 void eatStatus(InputPacket packet, bool smallError = false) { 523 auto id = packet.eat!ubyte; 524 525 switch (id) { 526 case StatusPackets.OK_Packet: 527 status_.error = 0; 528 status_.affected = packet.eatLenEnc(); 529 status_.insertID = packet.eatLenEnc(); 530 status_.flags = packet.eat!ushort; 531 status_.warnings = packet.eat!ushort; 532 533 if (caps_ & CapabilityFlags.CLIENT_SESSION_TRACK) { 534 info(packet.eat!(const(char)[])(cast(size_t)packet.eatLenEnc())); 535 packet.skip(1); 536 537 if (status_.flags & StatusFlags.SERVER_SESSION_STATE_CHANGED) { 538 packet.skip(cast(size_t)packet.eatLenEnc()); 539 packet.skip(1); 540 } 541 } else if (!packet.empty) { 542 auto len = cast(size_t)packet.eatLenEnc(); 543 info(packet.eat!(const(char)[])(min(len, packet.remaining))); 544 } 545 546 if (onStatus_) 547 onStatus_(status_, info_); 548 549 break; 550 case StatusPackets.EOF_Packet: 551 status_.error = 0; 552 status_.warnings = packet.eat!ushort; 553 status_.flags = packet.eat!ushort; 554 info([]); 555 556 if (onStatus_) 557 onStatus_(status_, info_); 558 559 break; 560 case StatusPackets.ERR_Packet: 561 status_.flags = 0; 562 status_.warnings = 0; 563 status_.error = packet.eat!ushort; 564 if (!smallError) 565 packet.skip(6); 566 info(packet.eat!(const(char)[])(packet.remaining)); 567 568 if (onStatus_) 569 onStatus_(status_, info_); 570 571 switch(status_.error) { 572 case ErrorCodes.ER_DUP_ENTRY_WITH_KEY_NAME: 573 case ErrorCodes.ER_DUP_ENTRY: 574 throw new MySQLDuplicateEntryException(cast(string)info_); 575 default: 576 throw new MySQLErrorException(cast(string)info_); 577 } 578 default: 579 throw new MySQLProtocolException("Unexpected packet format"); 580 } 581 } 582 583 void info(const(char)[] value) { 584 info_.length = value.length; 585 info_[0..$] = value; 586 } 587 588 void skipColumnDef(InputPacket packet, Commands cmd) { 589 packet.skip(cast(size_t)packet.eatLenEnc()); // catalog 590 packet.skip(cast(size_t)packet.eatLenEnc()); // schema 591 packet.skip(cast(size_t)packet.eatLenEnc()); // table 592 packet.skip(cast(size_t)packet.eatLenEnc()); // original_table 593 packet.skip(cast(size_t)packet.eatLenEnc()); // name 594 packet.skip(cast(size_t)packet.eatLenEnc()); // original_name 595 packet.skipLenEnc(); // next_length 596 packet.skip(10); // 2 + 4 + 1 + 2 + 1 // charset, length, type, flags, decimals 597 packet.expect!ushort(0); 598 599 if (cmd == Commands.COM_FIELD_LIST) 600 packet.skip(cast(size_t)packet.eatLenEnc());// default values 601 } 602 603 void columnDef(InputPacket packet, Commands cmd, ref MySQLColumn def) { 604 packet.skip(cast(size_t)packet.eatLenEnc()); // catalog 605 packet.skip(cast(size_t)packet.eatLenEnc()); // schema 606 packet.skip(cast(size_t)packet.eatLenEnc()); // table 607 packet.skip(cast(size_t)packet.eatLenEnc()); // original_table 608 auto len = cast(size_t)packet.eatLenEnc(); 609 columns_ ~= packet.eat!(const(char)[])(len); 610 def.name = columns_[$-len..$]; 611 packet.skip(cast(size_t)packet.eatLenEnc()); // original_name 612 packet.skipLenEnc(); // next_length 613 packet.skip(2); // charset 614 def.length = packet.eat!uint; 615 def.type = cast(ColumnTypes)packet.eat!ubyte; 616 def.flags = packet.eat!ushort; 617 def.decimals = packet.eat!ubyte; 618 619 packet.expect!ushort(0); 620 621 if (cmd == Commands.COM_FIELD_LIST) 622 packet.skip(cast(size_t)packet.eatLenEnc());// default values 623 } 624 625 void columnDefs(size_t count, Commands cmd, ref MySQLColumn[] defs) { 626 defs.length = count; 627 foreach (i; 0..count) 628 columnDef(retrieve(), cmd, defs[i]); 629 } 630 631 void resultSetRow(InputPacket packet, Commands cmd, MySQLHeader header, MySQLRow row) { 632 assert(row.length == header.length); 633 634 packet.expect!ubyte(0); 635 auto nulls = packet.eat!(ubyte[])((header.length + 2 + 7) >> 3); 636 foreach (i, column; header) { 637 const auto index = (i + 2) >> 3; // bit offset of 2 638 const auto bit = (i + 2) & 7; 639 640 if ((nulls[index] & (1 << bit)) == 0) { 641 row.set(i, eatValue(packet, column)); 642 } else { 643 row.nullify(i); 644 } 645 } 646 assert(packet.empty); 647 } 648 649 bool callHandler(RowHandler)(RowHandler handler, size_t i, MySQLHeader header, MySQLRow row) if ((ParameterTypeTuple!(RowHandler).length == 1) && is(ParameterTypeTuple!(RowHandler)[0] == MySQLRow)) { 650 static if (is(ReturnType!(RowHandler) == void)) { 651 handler(row); 652 return true; 653 } else { 654 return handler(row); // return type must be bool 655 } 656 } 657 658 bool callHandler(RowHandler)(RowHandler handler, size_t i, MySQLHeader header, MySQLRow row) if ((ParameterTypeTuple!(RowHandler).length == 2) && isNumeric!(ParameterTypeTuple!(RowHandler)[0]) && is(ParameterTypeTuple!(RowHandler)[1] == MySQLRow)) { 659 static if (is(ReturnType!(RowHandler) == void)) { 660 handler(cast(ParameterTypeTuple!(RowHandler)[0])i, row); 661 return true; 662 } else { 663 return handler(cast(ParameterTypeTuple!(RowHandler)[0])i, row); // return type must be bool 664 } 665 } 666 667 bool callHandler(RowHandler)(RowHandler handler, size_t i, MySQLHeader header, MySQLRow row) if ((ParameterTypeTuple!(RowHandler).length == 2) && is(ParameterTypeTuple!(RowHandler)[0] == MySQLHeader) && is(ParameterTypeTuple!(RowHandler)[1] == MySQLRow)) { 668 static if (is(ReturnType!(RowHandler) == void)) { 669 handler(header, row); 670 return true; 671 } else { 672 return handler(header, row); // return type must be bool 673 } 674 } 675 676 bool callHandler(RowHandler)(RowHandler handler, size_t i, MySQLHeader header, MySQLRow row) if ((ParameterTypeTuple!(RowHandler).length == 3) && isNumeric!(ParameterTypeTuple!(RowHandler)[0]) && is(ParameterTypeTuple!(RowHandler)[1] == MySQLHeader) && is(ParameterTypeTuple!(RowHandler)[2] == MySQLRow)) { 677 static if (is(ReturnType!(RowHandler) == void)) { 678 handler(i, header, row); 679 return true; 680 } else { 681 return handler(i, header, row); // return type must be bool 682 } 683 } 684 685 void resultSet(RowHandler)(InputPacket packet, uint stmt, Commands cmd, RowHandler handler) { 686 columns_.length = 0; 687 688 auto columns = cast(size_t)packet.eatLenEnc(); 689 columnDefs(columns, cmd, header_); 690 row_.length = columns; 691 row_.header(header_); 692 693 auto status = retrieve(); 694 if (status.peek!ubyte == StatusPackets.ERR_Packet) 695 eatStatus(status); 696 697 size_t index = 0; 698 auto statusFlags = eatEOF(status); 699 if (statusFlags & StatusFlags.SERVER_STATUS_CURSOR_EXISTS) { 700 uint[2] data = [ stmt, 4096 ]; // todo: make setting - rows per fetch 701 while (statusFlags & (StatusFlags.SERVER_STATUS_CURSOR_EXISTS | StatusFlags.SERVER_MORE_RESULTS_EXISTS)) { 702 send(Commands.COM_STMT_FETCH, data); 703 704 auto answer = retrieve(); 705 if (answer.peek!ubyte == StatusPackets.ERR_Packet) 706 eatStatus(answer); 707 708 auto row = answer.empty ? retrieve() : answer; 709 while (true) { 710 if (row.peek!ubyte == StatusPackets.EOF_Packet) { 711 statusFlags = eatEOF(row); 712 break; 713 } 714 715 resultSetRow(row, Commands.COM_STMT_FETCH, header_, row_); 716 if (!callHandler(handler, index++, header_, row_)) { 717 discardUntilEOF(retrieve()); 718 statusFlags = 0; 719 break; 720 } 721 row = retrieve(); 722 } 723 } 724 } else { 725 auto row = retrieve(); 726 while (true) { 727 if (row.peek!ubyte == StatusPackets.EOF_Packet) { 728 eatEOF(row); 729 break; 730 } 731 732 resultSetRow(row, cmd, header_, row_); 733 if (!callHandler(handler, index++, header_, row_)) { 734 discardUntilEOF(retrieve()); 735 break; 736 } 737 738 row = retrieve(); 739 } 740 } 741 } 742 743 void discardAll(InputPacket packet, Commands cmd) { 744 auto columns = cast(size_t)packet.eatLenEnc(); 745 columnDefs(columns, cmd, header_); 746 747 auto statusFlags = eatEOF(retrieve()); 748 if ((statusFlags & StatusFlags.SERVER_STATUS_CURSOR_EXISTS) == 0) { 749 while (true) { 750 auto row = retrieve(); 751 if (row.peek!ubyte == StatusPackets.EOF_Packet) { 752 eatEOF(row); 753 break; 754 } 755 } 756 } 757 } 758 759 void discardUntilEOF(InputPacket packet) { 760 while (true) { 761 if (packet.peek!ubyte == StatusPackets.EOF_Packet) { 762 eatEOF(packet); 763 break; 764 } 765 packet = retrieve(); 766 } 767 } 768 769 auto eatEOF(InputPacket packet) { 770 auto id = packet.eat!ubyte; 771 if (id != StatusPackets.EOF_Packet) 772 throw new MySQLProtocolException("Unexpected packet format"); 773 774 status_.error = 0; 775 status_.warnings = packet.eat!ushort(); 776 status_.flags = packet.eat!ushort(); 777 info([]); 778 779 if (onStatus_) 780 onStatus_(status_, info_); 781 782 return status_.flags; 783 } 784 785 void connectionSettings(const(char)[] connectionString) { 786 import std.conv; 787 788 auto remaining = connectionString; 789 790 auto indexValue = remaining.indexOf("="); 791 while (!remaining.empty) { 792 auto indexValueEnd = remaining.indexOf(";", indexValue); 793 if (indexValueEnd <= 0) 794 indexValueEnd = remaining.length; 795 796 auto name = strip(remaining[0..indexValue]); 797 auto value = strip(remaining[indexValue+1..indexValueEnd]); 798 799 switch (name) { 800 case "host": 801 settings_.host = value; 802 break; 803 case "user": 804 settings_.user = value; 805 break; 806 case "pwd": 807 settings_.pwd = value; 808 break; 809 case "db": 810 settings_.db = value; 811 break; 812 case "port": 813 settings_.port = to!ushort(value); 814 break; 815 default: 816 throw new MySQLException("Bad connection string: " ~ cast(string)connectionString); 817 } 818 819 if (indexValueEnd == remaining.length) 820 return; 821 822 remaining = remaining[indexValueEnd+1..$]; 823 indexValue = remaining.indexOf("="); 824 } 825 826 throw new MySQLException("Bad connection string: " ~ cast(string)connectionString); 827 } 828 829 SocketType socket_; 830 MySQLHeader header_; 831 MySQLRow row_; 832 char[] columns_; 833 char[] info_; 834 ubyte[] in_; 835 ubyte[] out_; 836 ubyte seq_ = 0; 837 838 OnStatusCallback onStatus_; 839 CapabilityFlags caps_; 840 ConnectionStatus status_; 841 ConnectionSettings settings_; 842 ServerInfo server_; 843 }