stream<I> method
Invokes the remote flow and streams its response.
Implementation
FlowStreamResponse<O?, S> stream<I>({
required I input,
Map<String, String>? headers,
}) {
final fromStreamChunk = _fromStreamChunk;
if (fromStreamChunk == null) {
final error = GenkitException(
'fromStreamChunk must be provided for streaming operations.',
);
return (
response: Future.error(error, StackTrace.current),
stream: Stream.error(error, StackTrace.current),
);
}
StreamSubscription? subscription;
final streamController = StreamController<S>();
final responseFuture = streamFlow(
url: _url,
fromResponse: _fromResponse,
fromStreamChunk: _fromStreamChunk,
headers: {
if (_defaultHeaders != null) ..._defaultHeaders,
if (headers != null) ...headers,
},
onChunk: (chunk) {
if (streamController.isClosed) return;
streamController.add(chunk);
},
onSubscription: (sub) => subscription = sub,
setCancelCallback: (cancelCallback) {
streamController.onCancel = () {
cancelCallback();
subscription?.cancel();
};
},
input: input,
httpClient: _httpClient,
);
return (
stream: streamController.stream,
response: responseFuture.then(
(d) {
if (!streamController.isClosed) {
streamController.close();
}
return d;
},
onError: (error, st) {
if (!streamController.isClosed) {
streamController.addError(error, st);
streamController.close();
}
},
),
);
}