generateCompletionStream method

Future<CactusStreamedCompletionResult> generateCompletionStream({
  1. required List<ChatMessage> messages,
  2. CactusCompletionParams? params,
})

Implementation

Future<CactusStreamedCompletionResult> generateCompletionStream({
  required List<ChatMessage> messages,
  CactusCompletionParams? params,
}) async {
  CactusCompletionParams completionParams = params ?? defaultCompletionParams;
  final model = params?.model ?? _lastInitializedModel ?? defaultInitParams.model;
  int? currentHandle = await _getValidatedHandle(model: model, reInit: completionParams.tools?.isNotEmpty == true);
  int quantization = (await Supabase.getModel(model))?.quantization ?? 8;

  if (currentHandle != null) {
    if(completionParams.tools != null) {
      List<CactusTool>? toolsToUse = completionParams.tools;
      if (enableToolFiltering && toolsToUse != null && toolsToUse.isNotEmpty) {
        toolsToUse = await _filterTools(messages, toolsToUse);
      }

      // Create params with filtered tools
      completionParams = CactusCompletionParams(
        temperature: completionParams.temperature,
        topK: completionParams.topK,
        topP: completionParams.topP,
        maxTokens: completionParams.maxTokens,
        stopSequences: completionParams.stopSequences,
        tools: toolsToUse,
        completionMode: completionParams.completionMode
      );
    }
    try {
      final streamedResult = CactusContext.completionStream(currentHandle, messages, completionParams, quantization);
      streamedResult.result.then((result) {
        _logCompletionTelemetry(result, model, success: result.success, message: result.success ? null : result.response);
      }).catchError((error) {
        _logCompletionTelemetry(null, model, success: false, message: error.toString());
      });

      return streamedResult;
    } catch (e) {
      debugPrint('Local streaming completion failed: $e');
      if (completionParams.completionMode == CompletionMode.local || (completionParams.completionMode == CompletionMode.hybrid && completionParams.cactusToken == null)) {
        _logCompletionTelemetry(null, model, success: false, message: e.toString());
        rethrow;
      }
      debugPrint('Falling back to cloud streaming completion');
    }
  }

  if (completionParams.completionMode == CompletionMode.hybrid && completionParams.cactusToken != null) {
    try {
      final openRouterService = OpenRouterService(apiKey: completionParams.cactusToken!);
      final streamedResult = await openRouterService.generateCompletionStream(
        messages: messages,
        params: completionParams,
      );
      streamedResult.result.whenComplete(() => openRouterService.dispose());
      streamedResult.result.then((result) {
        _logCompletionTelemetry(result, model, success: result.success, message: result.success ? null : result.response);
      }).catchError((error) {
        _logCompletionTelemetry(null, model, success: false, message: 'Cloud streaming completion failed: $error');
      });
      return streamedResult;
    } catch (e) {
      _logCompletionTelemetry(null, model, success: false, message: 'Cloud streaming completion failed: $e');
      throw Exception('Cloud streaming completion failed: $e');
    }
  }

  throw Exception('Model $_lastInitializedModel is not downloaded. Please download it before generating completions.');
}