stream<I> method

ActionStream<S, O> stream<I>({
  1. required I input,
  2. Map<String, String>? headers,
})

Invokes the remote flow and streams its response.

Implementation

ActionStream<S, O> stream<I>({
  required I input,
  Map<String, String>? headers,
}) {
  final fromStreamChunk = _fromStreamChunk;
  if (fromStreamChunk == null) {
    final error = GenkitException(
      'fromStreamChunk must be provided for streaming operations.',
    );
    final stream = Stream<S>.error(error);
    final actionStream = ActionStream<S, O>(stream);
    actionStream._setError(error, StackTrace.current);
    return actionStream;
  }

  StreamSubscription? subscription;
  final streamController = StreamController<S>();

  final actionStream = ActionStream<S, O>(streamController.stream);

  streamFlow<O, S>(
    url: _url,
    fromResponse: _fromResponse,
    fromStreamChunk: _fromStreamChunk,
    headers: {
      if (_defaultHeaders != null) ..._defaultHeaders,
      if (headers != null) ...headers,
    },
    onChunk: (chunk) {
      if (streamController.isClosed) return;
      streamController.add(chunk);
    },
    onSubscription: (sub) => subscription = sub,
    setCancelCallback: (cancelCallback) {
      streamController.onCancel = () {
        cancelCallback();
        subscription?.cancel();
      };
    },
    input: input,
    httpClient: _httpClient,
  ).then(
    (d) {
      actionStream._setResult(d as O);
      if (!streamController.isClosed) {
        streamController.close();
      }
    },
    onError: (error, st) {
      actionStream._setError(error, st);
      if (!streamController.isClosed) {
        streamController.addError(error, st);
        streamController.close();
      }
    },
  );

  return actionStream;
}