stream method
Stream
stream(
- int command, {
- List args = const [],
- CancelationToken? token,
- bool inspectRequest = false,
- bool inspectResponse = false,
inherited
Sends a streaming workload to the worker.
Implementation
@override
Stream<dynamic> stream(
int command, {
List args = const [],
CancelationToken? token,
bool inspectRequest = false,
bool inspectResponse = false,
}) {
final squadronToken = token?.wrap();
late final ForwardStreamController controller;
squadronToken?.onCanceled.then((ex) {
if (!controller.isClosed) {
controller.subscription?.cancel();
controller.addError(SquadronException.from(ex, null, command));
controller.close();
}
_channel?.cancelToken(squadronToken);
});
controller = ForwardStreamController(onListen: () async {
try {
if (controller.isClosed) return;
squadronToken?.throwIfCanceled();
final channel = _channel ?? await start();
if (controller.isClosed) return;
_stats.beginWork();
controller.done.whenComplete(_stats.endWork).ignore();
controller.attachSubscription(channel
.sendStreamingRequest(
command,
args,
token: squadronToken,
inspectRequest: inspectRequest,
inspectResponse: inspectResponse,
)
.listen(
controller.add,
onError: (ex, st) =>
controller.addError(SquadronException.from(ex, st, command)),
onDone: controller.close,
cancelOnError: false,
));
} catch (ex, st) {
_stats.failed();
controller.subscription?.cancel();
controller.addError(SquadronException.from(ex, st, command));
controller.close();
}
});
return controller.stream;
}