Server constructor

Server(
  1. RemoteClient client, {
  2. Duration? keepAliveInterval,
})

Implementation

Server(this.client, {this.keepAliveInterval}) {
  _sub = client.stream.listen(
    (msg) async {
      if ((msg.type == OperationMessage.gqlConnectionInit) && !_init) {
        try {
          Map? connectionParams;
          if (msg.payload is Map) {
            connectionParams = msg.payload as Map?;
          } else if (msg.payload != null) {
            throw FormatException(
              '${msg.type} payload must be a map (object).',
            );
          }

          var connect = await onConnect(client, connectionParams);
          if (!connect) throw false;
          _init = true;
          client.sink.add(
            OperationMessage(OperationMessage.gqlConnectionAck),
          );

          if (keepAliveInterval != null) {
            client.sink.add(
              OperationMessage(OperationMessage.gqlConnectionKeepAlive),
            );
            _timer ??= Timer.periodic(keepAliveInterval!, (timer) {
              client.sink.add(
                OperationMessage(OperationMessage.gqlConnectionKeepAlive),
              );
            });
          }
        } catch (e) {
          if (e == false) {
            _reportError('The connection was rejected.');
          } else {
            _reportError(e.toString());
          }
        }
      } else if (_init) {
        if (msg.type == OperationMessage.gqlStart) {
          if (msg.id == null) {
            throw FormatException('${msg.type} id is required.');
          }
          if (msg.payload == null) {
            throw FormatException('${msg.type} payload is required.');
          } else if (msg.payload is! Map) {
            throw FormatException(
              '${msg.type} payload must be a map (object).',
            );
          }
          var payload = msg.payload as Map;
          var query = payload['query'];
          var variables = payload['variables'];
          var operationName = payload['operationName'];
          if (query == null || query is! String) {
            throw FormatException(
              '${msg.type} payload must contain a string named "query".',
            );
          }
          if (variables != null && variables is! Map) {
            throw FormatException(
              '${msg.type} payload\'s "variables" field must be a map (object).',
            );
          }
          if (operationName != null && operationName is! String) {
            throw FormatException(
              '${msg.type} payload\'s "operationName" field must be a string.',
            );
          }
          var result = await onOperation(
            msg.id,
            query,
            (variables as Map?)?.cast<String, dynamic>(),
            operationName as String?,
          );
          var data = result.data;

          if (result.errors.isNotEmpty) {
            client.sink.add(
              OperationMessage(
                OperationMessage.gqlData,
                id: msg.id,
                payload: {'errors': result.errors.toList()},
              ),
            );
          } else {
            if (data is Map &&
                data.keys.length == 1 &&
                data.containsKey('data')) {
              data = data['data'];
            }

            if (data is Stream) {
              await for (var event in data) {
                if (event is Map &&
                    event.keys.length == 1 &&
                    event.containsKey('data')) {
                  event = event['data'];
                }
                client.sink.add(
                  OperationMessage(
                    OperationMessage.gqlData,
                    id: msg.id,
                    payload: {'data': event},
                  ),
                );
              }
            } else {
              client.sink.add(
                OperationMessage(
                  OperationMessage.gqlData,
                  id: msg.id,
                  payload: {'data': data},
                ),
              );
            }
          }

          // c.complete();
          client.sink.add(
            OperationMessage(OperationMessage.gqlComplete, id: msg.id),
          );
        } else if (msg.type == OperationMessage.gqlConnectionTerminate) {
          await _sub?.cancel();
        }
      }
    },
    onError: _done.completeError,
    onDone: () {
      _done.complete();
      _timer?.cancel();
    },
  );
}