execute method

Future<IResultSet> execute(
  1. String query, [
  2. Map<String, dynamic>? params,
  3. bool iterable = false
])

Executes given query

execute can be used to make any query type (SELECT, INSERT, UPDATE) You can pass named parameters using params Pass iterable true if you want to receive rows one by one in Stream fashion

Implementation

Future<IResultSet> execute(
  String query, [
  Map<String, dynamic>? params,
  bool iterable = false,
]) async {
  if (!_connected) {
    throw MySQLClientException("Can not execute query: connection closed");
  }

  // wait for ready state
  if (_state != _MySQLConnectionState.connectionEstablished) {
    await _waitForState(_MySQLConnectionState.connectionEstablished)
        .timeout(Duration(milliseconds: _timeoutMs));
  }

  _state = _MySQLConnectionState.waitingCommandResponse;

  if (params != null && params.isNotEmpty) {
    try {
      query = _substitureParams(query, params);
    } catch (e) {
      _state = _MySQLConnectionState.connectionEstablished;
      rethrow;
    }
  }

  final payload = MySQLPacketCommQuery(query: query);

  final packet = MySQLPacket(
    sequenceID: 0,
    payload: payload,
    payloadLength: 0,
  );

  final completer = Completer<IResultSet>();

  /**
   * 0 - initial
   * 1 - columnCount decoded
   * 2 - columnDefs parsed
   * 3 - eofParsed
   * 4 - rowsParsed
   */
  int state = 0;
  int colsCount = 0;
  List<MySQLColumnDefinitionPacket> colDefs = [];
  List<MySQLResultSetRowPacket> resultSetRows = [];

  // support for iterable result set
  IterableResultSet? iterableResultSet;
  StreamSink<ResultSetRow>? sink;

  // used as a pointer to handle multiple result sets
  IResultSet? currentResultSet;
  IResultSet? firstResultSet;

  _responseCallback = (data) async {
    try {
      MySQLPacket? packet;

      switch (state) {
        case 0:
          // if packet is OK packet, there is no data
          if (MySQLPacket.detectPacketType(data) ==
              MySQLGenericPacketType.ok) {
            final okPacket = MySQLPacket.decodeGenericPacket(data);
            _state = _MySQLConnectionState.connectionEstablished;
            completer.complete(
              EmptyResultSet(okPacket: okPacket.payload as MySQLPacketOK),
            );

            return;
          }

          packet = MySQLPacket.decodeColumnCountPacket(data);
          break;
        case 1:
          packet = MySQLPacket.decodeColumnDefPacket(data);
          break;
        case 2:
          packet = MySQLPacket.decodeGenericPacket(data);
          if (packet.isEOFPacket()) {
            state = 3;
          }
          break;
        case 3:
          if (iterable) {
            if (iterableResultSet == null) {
              iterableResultSet = IterableResultSet._(
                columns: colDefs,
              );

              sink = iterableResultSet!._sink;
              completer.complete(iterableResultSet);
            }

            // check eof
            if (MySQLPacket.detectPacketType(data) ==
                MySQLGenericPacketType.eof) {
              state = 4;

              _state = _MySQLConnectionState.connectionEstablished;
              await sink!.close();
              return;
            }

            packet = MySQLPacket.decodeResultSetRowPacket(data, colsCount);
            final values = (packet.payload as MySQLResultSetRowPacket).values;
            sink!.add(ResultSetRow._(colDefs: colDefs, values: values));
            packet = null;
            break;
          } else {
            // check eof
            if (MySQLPacket.detectPacketType(data) ==
                MySQLGenericPacketType.eof) {
              final resultSetPacket = MySQLPacketResultSet(
                columnCount: BigInt.from(colsCount),
                columns: colDefs,
                rows: resultSetRows,
              );

              final resultSet = ResultSet._(resultSetPacket: resultSetPacket);

              if (currentResultSet != null) {
                currentResultSet!.next = resultSet;
              } else {
                firstResultSet = resultSet;
              }
              currentResultSet = resultSet;

              final eofPacket = MySQLPacket.decodeGenericPacket(data);
              final eofPayload = eofPacket.payload as MySQLPacketEOF;

              if (eofPayload.statusFlags & mysqlServerFlagMoreResultsExists !=
                  0) {
                state = 0;
                colsCount = 0;
                colDefs = [];
                resultSetRows = [];
                return;
              } else {
                // there is no more results, just return
                state = 4;
                _state = _MySQLConnectionState.connectionEstablished;
                completer.complete(firstResultSet);
                return;
              }
            }

            packet = MySQLPacket.decodeResultSetRowPacket(data, colsCount);
            break;
          }
      }

      if (packet != null) {
        final payload = packet.payload;

        if (payload is MySQLPacketError) {
          completer.completeError(
            MySQLServerException(payload.errorMessage, payload.errorCode),
          );
          _state = _MySQLConnectionState.connectionEstablished;
          return;
        } else if (payload is MySQLPacketOK || payload is MySQLPacketEOF) {
          // do nothing
        } else if (payload is MySQLPacketColumnCount) {
          state = 1;
          colsCount = payload.columnCount.toInt();
          return;
        } else if (payload is MySQLColumnDefinitionPacket) {
          colDefs.add(payload);
          if (colDefs.length == colsCount) {
            state = 2;
          }
        } else if (payload is MySQLResultSetRowPacket) {
          assert(iterable == false);
          resultSetRows.add(payload);
        } else {
          completer.completeError(
            MySQLClientException(
              "Unexpected payload received in response to COMM_QUERY request",
            ),
            StackTrace.current,
          );
          _forceClose();
          return;
        }
      }
    } catch (e) {
      completer.completeError(e, StackTrace.current);
      _forceClose();
    }
  };

  _socket.add(packet.encode());

  return completer.future;
}