stream method

  1. @override
Stream stream(
  1. int command, {
  2. List args = const [],
  3. CancelationToken? token,
  4. bool inspectRequest = false,
  5. bool inspectResponse = false,
})
inherited

Sends a streaming workload to the worker.

Implementation

@override
Stream<dynamic> stream(
  int command, {
  List args = const [],
  CancelationToken? token,
  bool inspectRequest = false,
  bool inspectResponse = false,
}) {
  final squadronToken = token?.wrap();

  late final ForwardStreamController controller;

  squadronToken?.onCanceled.then((ex) {
    if (!controller.isClosed) {
      controller.subscription?.cancel();
      controller.addError(SquadronException.from(ex, null, command));
      controller.close();
    }
    _channel?.cancelToken(squadronToken);
  });

  controller = ForwardStreamController(onListen: () async {
    try {
      if (controller.isClosed) return;
      squadronToken?.throwIfCanceled();
      final channel = _channel ?? await start();
      if (controller.isClosed) return;

      _stats.beginWork();
      controller.done.whenComplete(_stats.endWork).ignore();

      controller.attachSubscription(channel
          .sendStreamingRequest(
            command,
            args,
            token: squadronToken,
            inspectRequest: inspectRequest,
            inspectResponse: inspectResponse,
          )
          .listen(
            controller.add,
            onError: (ex, st) =>
                controller.addError(SquadronException.from(ex, st, command)),
            onDone: controller.close,
            cancelOnError: false,
          ));
    } catch (ex, st) {
      _stats.failed();
      controller.subscription?.cancel();
      controller.addError(SquadronException.from(ex, st, command));
      controller.close();
    }
  });

  return controller.stream;
}