stream<I> method

FlowStreamResponse<O?, S> stream<I>({
  1. required I input,
  2. Map<String, String>? headers,
})

Invokes the remote flow and streams its response.

Implementation

FlowStreamResponse<O?, S> stream<I>({
  required I input,
  Map<String, String>? headers,
}) {
  final fromStreamChunk = _fromStreamChunk;
  if (fromStreamChunk == null) {
    final error = GenkitException(
      'fromStreamChunk must be provided for streaming operations.',
    );
    return (
      response: Future.error(error, StackTrace.current),
      stream: Stream.error(error, StackTrace.current),
    );
  }

  StreamSubscription? subscription;
  final streamController = StreamController<S>();

  final responseFuture = streamFlow(
    url: _url,
    fromResponse: _fromResponse,
    fromStreamChunk: _fromStreamChunk,
    headers: {
      if (_defaultHeaders != null) ..._defaultHeaders,
      if (headers != null) ...headers,
    },
    onChunk: (chunk) {
      if (streamController.isClosed) return;
      streamController.add(chunk);
    },
    onSubscription: (sub) => subscription = sub,
    setCancelCallback: (cancelCallback) {
      streamController.onCancel = () {
        cancelCallback();
        subscription?.cancel();
      };
    },
    input: input,
    httpClient: _httpClient,
  );

  return (
    stream: streamController.stream,
    response: responseFuture.then(
      (d) {
        if (!streamController.isClosed) {
          streamController.close();
        }
        return d;
      },
      onError: (error, st) {
        if (!streamController.isClosed) {
          streamController.addError(error, st);
          streamController.close();
        }
      },
    ),
  );
}