fetchStream method
Stream<String>
fetchStream({
- List<
Message> ? previousMessages, - int retries = 2,
- FutureOr<
void> onError(- CompletionException error,
- int retriesRemaning
- void onJsonComplete()?,
override
Implementation
@override
Stream<String> fetchStream({
List<Message>? previousMessages,
int retries = 2,
FutureOr<void> Function(
CompletionException error,
int retriesRemaning,
)? onError,
void Function(Map<String, dynamic> chunk)? onJsonComplete,
}) async* {
// Creating run
Response response = await dio.post(
url,
data: {
'assistant_id': assistantId,
'stream': true,
if (previousMessages != null)
'additional_messages': previousMessages.map((x) {
return x.toGptMap;
}).toList(),
},
options: Options(
responseType: ResponseType.stream,
),
);
var stream = getJsonStreamFromResponse(response, ignorePrefixes: {
'event',
});
// Reading stream
await for (var part in stream) {
if (onJsonComplete != null) onJsonComplete(part);
String object = part['object'];
if (object == 'chat.completion.chunk') {
var content = readMessageChunk(part);
if (content != null) {
yield content;
}
} else if (object == 'thread.message.delta') {
// Handling new messages
Map<String, dynamic> delta = part['delta'];
List content = delta['content'];
Map<String, dynamic> item = content.first;
String type = item['type'];
if (type == 'text') {
Map<String, dynamic> text = item['text'];
String value = text['value'];
yield value;
} else {
VitGptDartConfiguration.logger.w('Unable to process type: $type');
}
} else if (object == 'thread.run.step') {
// Handling errors
Map<String, dynamic>? lastError = part['last_error'];
if (lastError == null) {
continue;
}
String? errorCode = part['code'];
String? errorMessage = part['message'];
var exception = CompletionException(errorCode, errorMessage);
if (onError != null) await onError(exception, retries);
if (retries <= 0) throw exception;
await Future.delayed(Duration(seconds: 1));
var retryStream = fetchStream(
retries: retries - 1,
);
yield* retryStream;
}
}
}