rpc_dart 1.5.0 copy "rpc_dart: ^1.5.0" to clipboard
rpc_dart: ^1.5.0 copied to clipboard

gRPC-inspired library built on pure Dart

example/example.dart

// SPDX-FileCopyrightText: 2025 Karim "nogipx" Mamatkazin <nogipx@gmail.com>
//
// SPDX-License-Identifier: LGPL-3.0-or-later

import 'dart:async';

import 'package:rpc_dart/rpc_dart.dart';

void main() async {
  RpcLogger.setDefaultMinLogLevel(RpcLoggerLevel.internal);

  // 🔧 Настройка уровня логирования библиотеки
  // По умолчанию INFO - показывает только важные события
  // Для отладки можно установить INTERNAL - покажет все внутренние операции
  print('🔧 Уровень логирования: INFO (скрыты внутренние детали библиотеки)');

  print(
      '\n🚀 === RPC DART - Комплексная демонстрация автоматической трассировки ===\n');

  // Создаем InMemory транспорт
  final (clientTransport, serverTransport) = RpcInMemoryTransport.pair();

  // Настраиваем сервер
  final serverEndpoint = RpcResponderEndpoint(
    transport: serverTransport,
    debugLabel: 'Calculator Server',
  );
  serverEndpoint.registerServiceContract(CalculatorResponder());
  serverEndpoint.start();

  // Настраиваем клиент
  final clientEndpoint = RpcCallerEndpoint(
    transport: clientTransport,
    debugLabel: 'Calculator Client',
  );
  final calculator = CalculatorCaller(clientEndpoint);

  try {
    await Future.delayed(Duration(milliseconds: 100));

    print('📋 === 1. UNARY CALLS (один запрос → один ответ) ===\n');

    // 1. UNARY без контекста - автоматическая генерация trace ID
    print('🔹 Без контекста (автогенерация trace ID):');
    final result1 = await calculator.add(10, 20);
    print('   10 + 20 = $result1\n');

    // 2. UNARY с пользовательским контекстом
    print('🔹 С пользовательским trace ID:');
    final customContext =
        RpcContextUtils.withTracing(traceId: 'user_trace_123');
    final result2 = await calculator.multiply(5, 7, context: customContext);
    print('   5 × 7 = $result2\n');

    print('📊 === 2. SERVER STREAMING (один запрос → поток ответов) ===\n');

    print('🔹 Сложное вычисление с прогрессом:');
    await for (final progress in calculator.calculateWithProgress(
        CalculationRequest(a: 100, b: 25, operation: 'divide'))) {
      print('   📈 $progress');
    }
    print('');

    print('📥 === 3. CLIENT STREAMING (поток запросов → один ответ) ===\n');

    print('🔹 Батчевая обработка нескольких вычислений:');
    final batchRequests = Stream.fromIterable([
      CalculationRequest(a: 10, b: 5, operation: 'add'),
      CalculationRequest(a: 20, b: 4, operation: 'multiply'),
      CalculationRequest(a: 100, b: 10, operation: 'divide'),
      CalculationRequest(a: 50, b: 15, operation: 'subtract'),
      CalculationRequest(a: 7, b: 3, operation: 'add'),
    ]);

    final batchResult = await calculator.processBatch(batchRequests);
    print('   📊 $batchResult\n');

    print('🔄 === 4. BIDIRECTIONAL STREAMING (поток ↔ поток) ===\n');

    print('🔹 Интерактивные вычисления в реальном времени:');

    // Создаем поток запросов для bidirectional streaming
    final controller = StreamController<CalculationRequest>();
    final liveResults = calculator.liveCalculate(controller.stream);

    // Подписываемся на результаты
    final subscription = liveResults.listen((response) {
      if (response.success) {
        print('   ✅ Результат: ${response.result}');
      } else {
        print('   ❌ Ошибка: ${response.errorMessage}');
      }
    });

    // Отправляем запросы в реальном времени
    await Future.delayed(Duration(milliseconds: 100));
    controller.add(CalculationRequest(a: 15, b: 3, operation: 'multiply'));
    await Future.delayed(Duration(milliseconds: 200));

    controller.add(CalculationRequest(a: 100, b: 7, operation: 'divide'));
    await Future.delayed(Duration(milliseconds: 200));

    controller.add(CalculationRequest(a: 25, b: 15, operation: 'add'));
    await Future.delayed(Duration(milliseconds: 200));

    // Попробуем ошибочную операцию
    controller.add(CalculationRequest(a: 10, b: 0, operation: 'divide'));
    await Future.delayed(Duration(milliseconds: 200));

    // Закрываем поток запросов
    await controller.close();
    await subscription.asFuture();

    print('\n🎯 === ДЕМОНСТРАЦИЯ АВТОМАТИЧЕСКОЙ ТРАССИРОВКИ ===\n');

    print('✨ Trace ID автоматически:');
    print('   • Генерируется в CallerEndpoint если не указан');
    print('   • Генерируется в ResponderEndpoint если клиент не передал');
    print('   • Передается во все дочерние компоненты (Responders, Callers)');
    print('   • Появляется в логах всех операций автоматически');
    print('   • Работает через RpcLoggerFactory с контекстом');

    print('\n🔥 === ВСЕ ТИПЫ RPC ПРОДЕМОНСТРИРОВАНЫ ===');
    print('   ✅ Unary Calls - простые запрос/ответ операции');
    print('   ✅ Server Streaming - прогресс и live обновления');
    print('   ✅ Client Streaming - батчевая обработка данных');
    print('   ✅ Bidirectional Streaming - интерактивное взаимодействие');
    print('   ✅ Автоматическая трассировка во всех сценариях\n');
  } catch (e, stackTrace) {
    print('❌ Ошибка: $e');
    print('Stack trace: $stackTrace');
  } finally {
    // Закрываем ресурсы
    await serverEndpoint.close();
    await clientEndpoint.close();
    print('🔚 Демонстрация завершена!');
  }
}

/// Простая модель сообщения для демонстрации
class CalculationRequest implements IRpcSerializable {
  final double a, b;
  final String operation; // 'add', 'subtract', 'multiply', 'divide'

  CalculationRequest(
      {required this.a, required this.b, required this.operation});

  @override
  Map<String, dynamic> toJson() => {'a': a, 'b': b, 'operation': operation};

  static CalculationRequest fromJson(Map<String, dynamic> json) =>
      CalculationRequest(
        a: json['a'] is int ? (json['a'] as int).toDouble() : json['a'],
        b: json['b'] is int ? (json['b'] as int).toDouble() : json['b'],
        operation: json['operation'],
      );

  static RpcCodec<CalculationRequest> get codec =>
      RpcCodec(CalculationRequest.fromJson);

  @override
  String toString() =>
      'CalculationRequest(a: $a, b: $b, operation: $operation)';
}

class CalculationResponse implements IRpcSerializable {
  final double? result;
  final bool success;
  final String? errorMessage;

  CalculationResponse({this.result, this.success = true, this.errorMessage});

  @override
  Map<String, dynamic> toJson() => {
        'result': result,
        'success': success,
        'errorMessage': errorMessage,
      };

  static CalculationResponse fromJson(Map<String, dynamic> json) =>
      CalculationResponse(
        result: json['result'],
        success: json['success'] ?? true,
        errorMessage: json['errorMessage'],
      );

  static RpcCodec<CalculationResponse> get codec =>
      RpcCodec(CalculationResponse.fromJson);

  @override
  String toString() => success
      ? 'CalculationResponse(result: $result)'
      : 'CalculationResponse(error: $errorMessage)';
}

/// Модель для прогресса вычислений (server streaming)
class CalculationProgress implements IRpcSerializable {
  final String step;
  final double progress; // 0.0 to 1.0
  final String? currentOperation;

  CalculationProgress(
      {required this.step, required this.progress, this.currentOperation});

  @override
  Map<String, dynamic> toJson() => {
        'step': step,
        'progress': progress,
        'currentOperation': currentOperation,
      };

  static CalculationProgress fromJson(Map<String, dynamic> json) =>
      CalculationProgress(
        step: json['step'],
        progress: json['progress'],
        currentOperation: json['currentOperation'],
      );

  static RpcCodec<CalculationProgress> get codec =>
      RpcCodec(CalculationProgress.fromJson);

  @override
  String toString() =>
      'CalculationProgress(step: $step, progress: ${(progress * 100).toInt()}%)';
}

/// Модель для статистики батча (client streaming)
class BatchStatistics implements IRpcSerializable {
  final int totalRequests;
  final int successfulRequests;
  final int failedRequests;
  final double averageResult;
  final List<String> operations;

  BatchStatistics({
    required this.totalRequests,
    required this.successfulRequests,
    required this.failedRequests,
    required this.averageResult,
    required this.operations,
  });

  @override
  Map<String, dynamic> toJson() => {
        'totalRequests': totalRequests,
        'successfulRequests': successfulRequests,
        'failedRequests': failedRequests,
        'averageResult': averageResult,
        'operations': operations,
      };

  static BatchStatistics fromJson(Map<String, dynamic> json) => BatchStatistics(
        totalRequests: json['totalRequests'],
        successfulRequests: json['successfulRequests'],
        failedRequests: json['failedRequests'],
        averageResult: json['averageResult'],
        operations: List<String>.from(json['operations']),
      );

  static RpcCodec<BatchStatistics> get codec =>
      RpcCodec(BatchStatistics.fromJson);

  @override
  String toString() =>
      'BatchStatistics(total: $totalRequests, success: $successfulRequests, failed: $failedRequests, avg: $averageResult)';
}

/// Респондер с демонстрацией всех типов RPC методов
final class CalculatorResponder extends RpcResponderContract {
  CalculatorResponder() : super('CalculatorService');

  @override
  void setup() {
    // 1. UNARY CALL - один запрос → один ответ
    addUnaryMethod<CalculationRequest, CalculationResponse>(
      methodName: 'calculate',
      handler: calculate,
      requestCodec: CalculationRequest.codec,
      responseCodec: CalculationResponse.codec,
    );

    // 2. SERVER STREAMING - один запрос → поток ответов
    addServerStreamMethod<CalculationRequest, CalculationProgress>(
      methodName: 'calculateWithProgress',
      handler: calculateWithProgress,
      requestCodec: CalculationRequest.codec,
      responseCodec: CalculationProgress.codec,
    );

    // 3. CLIENT STREAMING - поток запросов → один ответ
    addClientStreamMethod<CalculationRequest, BatchStatistics>(
      methodName: 'processBatch',
      handler: processBatch,
      requestCodec: CalculationRequest.codec,
      responseCodec: BatchStatistics.codec,
    );

    // 4. BIDIRECTIONAL STREAMING - поток запросов ↔ поток ответов
    addBidirectionalMethod<CalculationRequest, CalculationResponse>(
      methodName: 'liveCalculate',
      handler: liveCalculate,
      requestCodec: CalculationRequest.codec,
      responseCodec: CalculationResponse.codec,
    );
  }

  /// UNARY: Простое вычисление
  Future<CalculationResponse> calculate(CalculationRequest request,
      {RpcContext? context}) async {
    try {
      final result = _performOperation(request.a, request.b, request.operation);
      return CalculationResponse(result: result);
    } catch (e) {
      return CalculationResponse(success: false, errorMessage: e.toString());
    }
  }

  /// SERVER STREAMING: Вычисление с отправкой прогресса
  Stream<CalculationProgress> calculateWithProgress(CalculationRequest request,
      {RpcContext? context}) async* {
    yield CalculationProgress(
        step: 'Initializing',
        progress: 0.0,
        currentOperation: request.operation);

    await Future.delayed(Duration(milliseconds: 200));
    yield CalculationProgress(
        step: 'Validating input',
        progress: 0.25,
        currentOperation: request.operation);

    await Future.delayed(Duration(milliseconds: 200));
    yield CalculationProgress(
        step: 'Performing calculation',
        progress: 0.50,
        currentOperation: request.operation);

    await Future.delayed(Duration(milliseconds: 200));
    final result = _performOperation(request.a, request.b, request.operation);
    yield CalculationProgress(
        step: 'Calculation complete',
        progress: 0.75,
        currentOperation: request.operation);

    await Future.delayed(Duration(milliseconds: 200));
    yield CalculationProgress(
        step: 'Finalizing result: $result',
        progress: 1.0,
        currentOperation: request.operation);
  }

  /// CLIENT STREAMING: Обработка батча запросов
  Future<BatchStatistics> processBatch(Stream<CalculationRequest> requests,
      {RpcContext? context}) async {
    int total = 0;
    int successful = 0;
    int failed = 0;
    double sum = 0.0;
    final operations = <String>{};

    await for (final request in requests) {
      total++;
      operations.add(request.operation);

      try {
        final result =
            _performOperation(request.a, request.b, request.operation);
        sum += result;
        successful++;
      } catch (e) {
        failed++;
      }
    }

    final averageResult = successful > 0 ? sum / successful : 0.0;

    return BatchStatistics(
      totalRequests: total,
      successfulRequests: successful,
      failedRequests: failed,
      averageResult: averageResult,
      operations: operations.toList(),
    );
  }

  /// BIDIRECTIONAL STREAMING: Интерактивные вычисления в реальном времени
  Stream<CalculationResponse> liveCalculate(Stream<CalculationRequest> requests,
      {RpcContext? context}) async* {
    await for (final request in requests) {
      try {
        final result =
            _performOperation(request.a, request.b, request.operation);
        yield CalculationResponse(result: result);
      } catch (e) {
        yield CalculationResponse(success: false, errorMessage: e.toString());
      }
    }
  }

  /// Вспомогательный метод для выполнения операций
  double _performOperation(double a, double b, String operation) {
    return switch (operation) {
      'add' => a + b,
      'subtract' => a - b,
      'multiply' => a * b,
      'divide' => b != 0 ? a / b : throw Exception('Division by zero'),
      _ => throw Exception('Unknown operation: $operation'),
    };
  }
}

/// Клиент с демонстрацией всех типов RPC вызовов
final class CalculatorCaller extends RpcCallerContract {
  CalculatorCaller(RpcCallerEndpoint endpoint)
      : super('CalculatorService', endpoint);

  /// UNARY: Простое вычисление
  Future<CalculationResponse> calculate(CalculationRequest request,
      {RpcContext? context}) {
    return callUnary<CalculationRequest, CalculationResponse>(
      methodName: 'calculate',
      requestCodec: CalculationRequest.codec,
      responseCodec: CalculationResponse.codec,
      request: request,
      context: context,
    );
  }

  /// SERVER STREAMING: Вычисление с прогрессом
  Stream<CalculationProgress> calculateWithProgress(CalculationRequest request,
      {RpcContext? context}) {
    return callServerStream<CalculationRequest, CalculationProgress>(
      methodName: 'calculateWithProgress',
      requestCodec: CalculationRequest.codec,
      responseCodec: CalculationProgress.codec,
      request: request,
      context: context,
    );
  }

  /// CLIENT STREAMING: Батчевая обработка
  Future<BatchStatistics> processBatch(Stream<CalculationRequest> requests,
      {RpcContext? context}) {
    return callClientStream<CalculationRequest, BatchStatistics>(
      methodName: 'processBatch',
      requestCodec: CalculationRequest.codec,
      responseCodec: BatchStatistics.codec,
      requests: requests,
      context: context,
    );
  }

  /// BIDIRECTIONAL STREAMING: Интерактивные вычисления
  Stream<CalculationResponse> liveCalculate(Stream<CalculationRequest> requests,
      {RpcContext? context}) {
    return callBidirectionalStream<CalculationRequest, CalculationResponse>(
      methodName: 'liveCalculate',
      requestCodec: CalculationRequest.codec,
      responseCodec: CalculationResponse.codec,
      requests: requests,
      context: context,
    );
  }

  /// Удобные методы для конкретных операций
  Future<double> add(double a, double b, {RpcContext? context}) async {
    final response = await calculate(
        CalculationRequest(a: a, b: b, operation: 'add'),
        context: context);
    if (!response.success) throw Exception(response.errorMessage);
    return response.result!;
  }

  Future<double> multiply(double a, double b, {RpcContext? context}) async {
    final response = await calculate(
        CalculationRequest(a: a, b: b, operation: 'multiply'),
        context: context);
    if (!response.success) throw Exception(response.errorMessage);
    return response.result!;
  }
}
5
likes
0
points
410
downloads

Publisher

verified publisherdart.nogipx.dev

Weekly Downloads

gRPC-inspired library built on pure Dart

Homepage
Repository (GitHub)
View/report issues

Topics

#rpc #grpc #cord #contract-oriented-remote-domains #streams

Funding

Consider supporting this project:

liberapay.com

License

unknown (license)

More

Packages that depend on rpc_dart