execute method
Executes given query
execute can be used to make any query type (SELECT, INSERT, UPDATE)
You can pass named parameters using params
Pass iterable
true if you want to receive rows one by one in Stream fashion
Implementation
Future<IResultSet> execute(
String query, [
Map<String, dynamic>? params,
bool iterable = false,
]) async {
if (!_connected) {
throw MySQLClientException("Can not execute query: connection closed");
}
// wait for ready state
if (_state != _MySQLConnectionState.connectionEstablished) {
await _waitForState(_MySQLConnectionState.connectionEstablished)
.timeout(Duration(milliseconds: _timeoutMs));
}
_state = _MySQLConnectionState.waitingCommandResponse;
if (params != null && params.isNotEmpty) {
try {
query = _substitureParams(query, params);
} catch (e) {
_state = _MySQLConnectionState.connectionEstablished;
rethrow;
}
}
final payload = MySQLPacketCommQuery(query: query);
final packet = MySQLPacket(
sequenceID: 0,
payload: payload,
payloadLength: 0,
);
final completer = Completer<IResultSet>();
/**
* 0 - initial
* 1 - columnCount decoded
* 2 - columnDefs parsed
* 3 - eofParsed
* 4 - rowsParsed
*/
int state = 0;
int colsCount = 0;
List<MySQLColumnDefinitionPacket> colDefs = [];
List<MySQLResultSetRowPacket> resultSetRows = [];
// support for iterable result set
IterableResultSet? iterableResultSet;
StreamSink<ResultSetRow>? sink;
// used as a pointer to handle multiple result sets
IResultSet? currentResultSet;
IResultSet? firstResultSet;
_responseCallback = (data) async {
try {
MySQLPacket? packet;
switch (state) {
case 0:
// if packet is OK packet, there is no data
if (MySQLPacket.detectPacketType(data) ==
MySQLGenericPacketType.ok) {
final okPacket = MySQLPacket.decodeGenericPacket(data);
_state = _MySQLConnectionState.connectionEstablished;
completer.complete(
EmptyResultSet(okPacket: okPacket.payload as MySQLPacketOK),
);
return;
}
packet = MySQLPacket.decodeColumnCountPacket(data);
break;
case 1:
packet = MySQLPacket.decodeColumnDefPacket(data);
break;
case 2:
packet = MySQLPacket.decodeGenericPacket(data);
if (packet.isEOFPacket()) {
state = 3;
}
break;
case 3:
if (iterable) {
if (iterableResultSet == null) {
iterableResultSet = IterableResultSet._(
columns: colDefs,
);
sink = iterableResultSet!._sink;
completer.complete(iterableResultSet);
}
// check eof
if (MySQLPacket.detectPacketType(data) ==
MySQLGenericPacketType.eof) {
state = 4;
_state = _MySQLConnectionState.connectionEstablished;
await sink!.close();
return;
}
packet = MySQLPacket.decodeResultSetRowPacket(data, colsCount);
final values = (packet.payload as MySQLResultSetRowPacket).values;
sink!.add(ResultSetRow._(colDefs: colDefs, values: values));
packet = null;
break;
} else {
// check eof
if (MySQLPacket.detectPacketType(data) ==
MySQLGenericPacketType.eof) {
final resultSetPacket = MySQLPacketResultSet(
columnCount: BigInt.from(colsCount),
columns: colDefs,
rows: resultSetRows,
);
final resultSet = ResultSet._(resultSetPacket: resultSetPacket);
if (currentResultSet != null) {
currentResultSet!.next = resultSet;
} else {
firstResultSet = resultSet;
}
currentResultSet = resultSet;
final eofPacket = MySQLPacket.decodeGenericPacket(data);
final eofPayload = eofPacket.payload as MySQLPacketEOF;
if (eofPayload.statusFlags & mysqlServerFlagMoreResultsExists !=
0) {
state = 0;
colsCount = 0;
colDefs = [];
resultSetRows = [];
return;
} else {
// there is no more results, just return
state = 4;
_state = _MySQLConnectionState.connectionEstablished;
completer.complete(firstResultSet);
return;
}
}
packet = MySQLPacket.decodeResultSetRowPacket(data, colsCount);
break;
}
}
if (packet != null) {
final payload = packet.payload;
if (payload is MySQLPacketError) {
completer.completeError(
MySQLServerException(payload.errorMessage, payload.errorCode),
);
_state = _MySQLConnectionState.connectionEstablished;
return;
} else if (payload is MySQLPacketOK || payload is MySQLPacketEOF) {
// do nothing
} else if (payload is MySQLPacketColumnCount) {
state = 1;
colsCount = payload.columnCount.toInt();
return;
} else if (payload is MySQLColumnDefinitionPacket) {
colDefs.add(payload);
if (colDefs.length == colsCount) {
state = 2;
}
} else if (payload is MySQLResultSetRowPacket) {
assert(iterable == false);
resultSetRows.add(payload);
} else {
completer.completeError(
MySQLClientException(
"Unexpected payload received in response to COMM_QUERY request",
),
StackTrace.current,
);
_forceClose();
return;
}
}
} catch (e) {
completer.completeError(e, StackTrace.current);
_forceClose();
}
};
_socket.add(packet.encode());
return completer.future;
}