streamGenAIRequest function

Future<Stream<String?>> streamGenAIRequest(
  1. AIRequestModel? aiRequestModel
)

Implementation

Future<Stream<String?>> streamGenAIRequest(
  AIRequestModel? aiRequestModel,
) async {
  final httpRequestModel = aiRequestModel?.httpRequestModel;
  final streamController = StreamController<String?>();
  if (httpRequestModel == null) {
    debugPrint("streamGenAIRequest -> httpRequestModel is null");
  } else {
    final httpStream = await streamHttpRequest(
      nanoid(),
      APIType.rest,
      httpRequestModel,
    );

    final subscription = httpStream.listen(
      (dat) {
        if (dat == null) {
          streamController.addError('STREAMING ERROR: NULL DATA');
          return;
        }

        final chunk = dat.$2;
        final error = dat.$4;

        if (chunk == null) {
          streamController.addError(error ?? 'NULL ERROR');
          return;
        }

        final ans = chunk.body;

        final lines = ans.split('\n');
        for (final line in lines) {
          if (!line.startsWith('data: ') || line.contains('[DONE]')) continue;
          final jsonStr = line.substring(6).trim();
          try {
            final jsonData = jsonDecode(jsonStr);
            final formattedOutput = aiRequestModel?.getFormattedStreamOutput(
              jsonData,
            );
            streamController.sink.add(formattedOutput);
          } catch (e) {
            debugPrint(
              '⚠️ JSON decode error in SSE: $e\nSending as Regular Text',
            );
            streamController.sink.add(jsonStr);
          }
        }
      },
      onError: (error) {
        streamController.addError('STREAM ERROR: $error');
        streamController.close();
      },
      onDone: () {
        streamController.close();
      },
      cancelOnError: true,
    );
    streamController.onCancel = () async {
      await subscription.cancel();
    };
  }
  return streamController.stream;
}