streamGenAIRequest function
Implementation
Future<Stream<String?>> streamGenAIRequest(
AIRequestModel? aiRequestModel,
) async {
final httpRequestModel = aiRequestModel?.httpRequestModel;
final streamController = StreamController<String?>();
if (httpRequestModel == null) {
debugPrint("streamGenAIRequest -> httpRequestModel is null");
} else {
final httpStream = await streamHttpRequest(
nanoid(),
APIType.rest,
httpRequestModel,
);
final subscription = httpStream.listen(
(dat) {
if (dat == null) {
streamController.addError('STREAMING ERROR: NULL DATA');
return;
}
final chunk = dat.$2;
final error = dat.$4;
if (chunk == null) {
streamController.addError(error ?? 'NULL ERROR');
return;
}
final ans = chunk.body;
final lines = ans.split('\n');
for (final line in lines) {
if (!line.startsWith('data: ') || line.contains('[DONE]')) continue;
final jsonStr = line.substring(6).trim();
try {
final jsonData = jsonDecode(jsonStr);
final formattedOutput = aiRequestModel?.getFormattedStreamOutput(
jsonData,
);
streamController.sink.add(formattedOutput);
} catch (e) {
debugPrint(
'⚠️ JSON decode error in SSE: $e\nSending as Regular Text',
);
streamController.sink.add(jsonStr);
}
}
},
onError: (error) {
streamController.addError('STREAM ERROR: $error');
streamController.close();
},
onDone: () {
streamController.close();
},
cancelOnError: true,
);
streamController.onCancel = () async {
await subscription.cancel();
};
}
return streamController.stream;
}