stream<I> method
Invokes the remote flow and streams its response.
Implementation
ActionStream<S, O> stream<I>({
required I input,
Map<String, String>? headers,
}) {
final fromStreamChunk = _fromStreamChunk;
if (fromStreamChunk == null) {
final error = GenkitException(
'fromStreamChunk must be provided for streaming operations.',
);
final stream = Stream<S>.error(error);
final actionStream = ActionStream<S, O>(stream);
actionStream._setError(error, StackTrace.current);
return actionStream;
}
StreamSubscription? subscription;
final streamController = StreamController<S>();
final actionStream = ActionStream<S, O>(streamController.stream);
streamFlow<O, S>(
url: _url,
fromResponse: _fromResponse,
fromStreamChunk: _fromStreamChunk,
headers: {
if (_defaultHeaders != null) ..._defaultHeaders,
if (headers != null) ...headers,
},
onChunk: (chunk) {
if (streamController.isClosed) return;
streamController.add(chunk);
},
onSubscription: (sub) => subscription = sub,
setCancelCallback: (cancelCallback) {
streamController.onCancel = () {
cancelCallback();
subscription?.cancel();
};
},
input: input,
httpClient: _httpClient,
).then(
(d) {
actionStream._setResult(d as O);
if (!streamController.isClosed) {
streamController.close();
}
},
onError: (error, st) {
actionStream._setError(error, st);
if (!streamController.isClosed) {
streamController.addError(error, st);
streamController.close();
}
},
);
return actionStream;
}