rpc_dart 1.3.2
rpc_dart: ^1.3.2 copied to clipboard
gRPC-inspired library built on pure Dart
RPC Dart #
RPC библиотека на чистом Dart для типобезопасного взаимодействия между компонентами
Основные концепции #
RPC Dart построен на следующих ключевых концепциях:
- Контракты (Contracts) — определяют API сервиса через интерфейсы и методы
- Responder — серверная часть, обрабатывает входящие запросы
- Caller — клиентская часть, отправляет запросы
- Endpoint — точка подключения, управляет транспортом
- Transport — уровень передачи данных (InMemory, Isolate, HTTP)
- Codec — сериализация/десериализация сообщений
Ключевые возможности #
- Полная поддержка RPC паттернов — unary calls, server streams, client streams, bidirectional streams
- Встроенный InMemory транспорт — для разработки и тестирования
- Типобезопасность — все запросы/ответы строго типизированы
- Без внешних зависимостей — только чистый Dart
- Встроенные примитивы — готовые обертки для String, Int, Double, Bool, List
- Простое тестирование — с InMemory транспортом и моками
CORD #
RPC Dart предлагает CORD (Contract-Oriented Remote Domains) — архитектурный подход для структурирования бизнес-логики через изолированные домены с типобезопасными RPC контрактами.
📚 Подробнее
Quick Start #
Готовые примеры использования #
1. Определите контракт и модели #
// Request/Response объекты реализуют IRpcSerializable
class CalculationRequest implements IRpcSerializable {
final double a, b;
final String operation; // 'add', 'subtract', 'multiply', 'divide'
CalculationRequest({required this.a, required this.b, required this.operation});
@override
Map<String, dynamic> toJson() => {'a': a, 'b': b, 'operation': operation};
static CalculationRequest fromJson(Map<String, dynamic> json) => CalculationRequest(
a: json['a'] is int ? (json['a'] as int).toDouble() : json['a'],
b: json['b'] is int ? (json['b'] as int).toDouble() : json['b'],
operation: json['operation'],
);
static RpcCodec<CalculationRequest> get codec => RpcCodec(CalculationRequest.fromJson);
}
class CalculationResponse implements IRpcSerializable {
final double? result;
final bool success;
final String? errorMessage;
CalculationResponse({this.result, this.success = true, this.errorMessage});
@override
Map<String, dynamic> toJson() => {
'result': result, 'success': success, 'errorMessage': errorMessage,
};
static CalculationResponse fromJson(Map<String, dynamic> json) => CalculationResponse(
result: json['result'], success: json['success'] ?? true, errorMessage: json['errorMessage'],
);
static RpcCodec<CalculationResponse> get codec => RpcCodec(CalculationResponse.fromJson);
}
2. Создайте сервер (Responder) #
class CalculatorResponder extends RpcResponderContract {
CalculatorResponder() : super('CalculatorService');
@override
void setup() {
addUnaryMethod<CalculationRequest, CalculationResponse>(
methodName: 'calculate',
handler: calculate,
requestCodec: CalculationRequest.codec,
responseCodec: CalculationResponse.codec,
);
addBidirectionalMethod<CalculationRequest, CalculationResponse>(
methodName: 'streamCalculate',
handler: streamCalculate,
requestCodec: CalculationRequest.codec,
responseCodec: CalculationResponse.codec,
);
}
Future<CalculationResponse> calculate(CalculationRequest request) async {
try {
double result;
switch (request.operation) {
case 'add': result = request.a + request.b; break;
case 'subtract': result = request.a - request.b; break;
case 'multiply': result = request.a * request.b; break;
case 'divide':
if (request.b == 0) throw Exception('Division by zero');
result = request.a / request.b; break;
default: throw Exception('Unknown operation: ${request.operation}');
}
return CalculationResponse(result: result);
} catch (e) {
return CalculationResponse(success: false, errorMessage: e.toString());
}
}
Stream<CalculationResponse> streamCalculate(Stream<CalculationRequest> requests) async* {
await for (final request in requests) {
yield await calculate(request);
}
}
}
3. Создайте клиент (Caller) #
class CalculatorCaller extends RpcCallerContract {
CalculatorCaller(RpcCallerEndpoint endpoint) : super('CalculatorService', endpoint);
Future<CalculationResponse> calculate(CalculationRequest request) {
return endpoint.unaryRequest<CalculationRequest, CalculationResponse>(
serviceName: serviceName,
methodName: 'calculate',
requestCodec: CalculationRequest.codec,
responseCodec: CalculationResponse.codec,
request: request,
);
}
Stream<CalculationResponse> streamCalculate(Stream<CalculationRequest> requests) {
return endpoint.bidirectionalStream<CalculationRequest, CalculationResponse>(
serviceName: serviceName,
methodName: 'streamCalculate',
requestCodec: CalculationRequest.codec,
responseCodec: CalculationResponse.codec,
requests: requests,
);
}
// Удобный метод для сложения
Future<double> add(double a, double b) async {
final response = await calculate(CalculationRequest(a: a, b: b, operation: 'add'));
if (!response.success) throw Exception(response.errorMessage);
return response.result!;
}
}
4. Запустите сервер и клиент #
void main() async {
// Создаем InMemory транспорт
final (clientTransport, serverTransport) = RpcInMemoryTransport.pair();
// Настраиваем сервер
final serverEndpoint = RpcResponderEndpoint(transport: serverTransport);
serverEndpoint.registerServiceContract(CalculatorResponder());
serverEndpoint.start(); // Важно: явно запускаем эндпоинт!
// Настраиваем клиент
final clientEndpoint = RpcCallerEndpoint(transport: clientTransport);
final calculator = CalculatorCaller(clientEndpoint);
// Делаем RPC вызовы
final result = await calculator.add(10, 20);
print('10 + 20 = $result'); // 10 + 20 = 30.0
// Работаем со стримом вычислений
final requests = Stream.fromIterable([
CalculationRequest(a: 5, b: 3, operation: 'add'),
CalculationRequest(a: 10, b: 2, operation: 'multiply'),
CalculationRequest(a: 15, b: 3, operation: 'divide'),
]);
await for (final response in calculator.streamCalculate(requests)) {
if (response.success) {
print('Result: ${response.result}');
} else {
print('Error: ${response.errorMessage}');
}
}
// Закрываем ресурсы
await serverEndpoint.close();
await clientEndpoint.close();
}
Транспорты #
InMemory Transport (включен в основную библиотеку) #
Идеально для разработки, тестирования и монолитных приложений:
final (clientTransport, serverTransport) = RpcInMemoryTransport.pair();
// Использование: разработка, unit-тесты, простые приложения
Дополнительные транспорты #
RPC Dart поддерживает создание кастомных транспортов через интерфейс RpcTransport
:
// Пример кастомного транспорта
class CustomHttpTransport implements RpcTransport {
@override
Future<void> send(RpcMessage message) async {
// Реализация отправки через HTTP
}
@override
Stream<RpcMessage> get messageStream => _messageController.stream;
}
final endpoint = RpcCallerEndpoint(transport: CustomHttpTransport());
Возможные варианты расширения:
- Isolate Transport — для CPU-интенсивных задач и изоляции сбоев
- HTTP Transport — для микросервисов и распределенных систем
- WebSocket Transport — для real-time приложений
Ключевое преимущество: код домена остается неизменным при смене транспорта!
Типы RPC взаимодействий #
Тип | Описание | Пример использования |
---|---|---|
Unary Call | Запрос → Ответ | CRUD операции, валидация |
Server Stream | Запрос → Поток ответов | Live обновления, прогресс |
Client Stream | Поток запросов → Ответ | Batch upload, агрегация |
Bidirectional Stream | Поток ↔ Поток | Чаты, real-time коллаборация |
Встроенные примитивы #
RPC Dart предоставляет готовые обертки для примитивных типов:
// Встроенные примитивы с кодеками
final name = RpcString('John'); // Строки
final age = RpcInt(25); // Целые числа
final height = RpcDouble(175.5); // Дробные числа
final isActive = RpcBool(true); // Булевы значения
final tags = RpcList<RpcString>([...]); // Списки
// Удобные расширения
final message = 'Hello'.rpc; // RpcString
final count = 42.rpc; // RpcInt
final price = 19.99.rpc; // RpcDouble
final enabled = true.rpc; // RpcBool
// Числовые примитивы поддерживают арифметические операторы
final sum = RpcInt(10) + RpcInt(20); // RpcInt(30)
final product = RpcDouble(3.14) * RpcDouble(2.0); // RpcDouble(6.28)
// Доступ к значению через свойство .value
final greeting = RpcString('Hello ') + RpcString('World');
print(greeting.value); // "Hello World"
StreamDistributor #
StreamDistributor — это мощный менеджер для управления серверными стримами, который превращает обычный StreamController
в брокер сообщений с расширенными возможностями:
Основные возможности #
- Широковещательная публикация — отправка сообщений всем подключенным клиентам
- Фильтрованная публикация — отправка по условию только определенным клиентам
- Управление жизненным циклом — автоматическое создание/удаление стримов
- Автоматическая очистка — удаление неактивных стримов по таймеру
- Метрики и мониторинг — отслеживание активности и производительности
Пример использования #
// Создаем дистрибьютор для уведомлений
final distributor = StreamDistributor<NotificationEvent>(
config: StreamDistributorConfig(
enableAutoCleanup: true,
inactivityThreshold: Duration(minutes: 5),
),
);
// Создаем клиентские стримы для разных пользователей
final userStream1 = distributor.createClientStreamWithId('user_123');
final userStream2 = distributor.createClientStreamWithId('user_456');
// Слушаем уведомления
userStream1.listen((notification) {
print('User 123 получил: ${notification.message}');
});
// Отправляем всем клиентам
distributor.publish(NotificationEvent(
message: 'Системное уведомление для всех',
priority: Priority.normal,
));
// Отправляем только клиентам с определенными ID
distributor.publishFiltered(
NotificationEvent(message: 'VIP уведомление'),
(client) => ['user_123', 'premium_user_789'].contains(client.clientId),
);
// Получаем метрики
final metrics = distributor.metrics;
print('Активных клиентов: ${metrics.currentStreams}');
print('Отправлено сообщений: ${metrics.totalMessages}');
Применение: Идеально для реализации real-time уведомлений, чатов, live обновлений и других pub/sub сценариев в серверных стримах.
Тестирование #
// Unit тест с моком (используйте любую мок-библиотеку)
class MockUserService extends Mock implements UserCaller {}
test('should handle user not found', () async {
final mockUserService = MockUserService();
when(() => mockUserService.getUser(any()))
.thenThrow(RpcException(code: RpcStatus.NOT_FOUND));
final bloc = UserBloc(mockUserService);
expect(
() => bloc.add(LoadUserEvent(id: '123')),
emitsError(isA<UserNotFoundException>()),
);
});
// Integration тест с InMemory транспортом
test('full integration test', () async {
final (clientTransport, serverTransport) = RpcInMemoryTransport.pair();
final server = RpcResponderEndpoint(transport: serverTransport);
server.registerServiceContract(UserResponder(MockUserRepository()));
server.start();
final client = UserCaller(RpcCallerEndpoint(transport: clientTransport));
final response = await client.getUser(GetUserRequest(id: '123'));
expect(response.user.id, equals('123'));
});
FAQ #
Подходит ли для production?
Рекомендуем тщательно протестировать библиотеку в вашей конкретной среде перед production деплоем.
Как тестировать RPC код?
// Unit тесты с моками
class MockUserService extends Mock implements UserCaller {}
test('должен обработать ошибку "пользователь не найден"', () async {
final mockService = MockUserService();
when(() => mockService.getUser(any()))
.thenThrow(RpcException(code: RpcStatus.NOT_FOUND));
final bloc = UserBloc(mockService);
expect(() => bloc.loadUser('123'), throwsA(isA<UserNotFoundException>()));
});
// Integration тесты с InMemory транспортом
test('полный интеграционный тест', () async {
final (client, server) = RpcInMemoryTransport.pair();
final endpoint = RpcResponderEndpoint(transport: server);
endpoint.registerServiceContract(TestService());
endpoint.start();
final caller = TestCaller(RpcCallerEndpoint(transport: client));
final result = await caller.getData();
expect(result.value, equals('expected'));
});
Какую производительность ожидать?
Производительность зависит от многих факторов: среды выполнения, размера данных, типа транспорта. Для получения точных цифр запустите бенчмарки в вашей среде:
dart run benchmark/main.dart
Общие наблюдения:
- InMemory транспорт имеет минимальные накладные расходы
- CBOR сериализация обычно быстрее JSON
- HTTP транспорт добавляет сетевую задержку
- Для больших данных рассмотрите streaming или chunking
Как обрабатывать ошибки?
RPC Dart использует gRPC-статусы для унифицированной обработки ошибок:
try {
final result = await userService.getUser(request);
} on RpcException catch (e) {
switch (e.code) {
case RpcStatus.NOT_FOUND:
showError('Пользователь не найден');
case RpcStatus.PERMISSION_DENIED:
redirectToLogin();
case RpcStatus.DEADLINE_EXCEEDED:
showError('Таймаут запроса');
default:
showError('Неизвестная ошибка: ${e.message}');
}
} catch (e) {
// Обработка неожиданных ошибок
logError('Unexpected error', error: e);
}
Как масштабировать RPC архитектуру?
CORD принципы масштабирования:
- Разделяйте домены — каждый домен должен иметь четкую ответственность
- Используйте контракты — для типобезопасного взаимодействия
- Минимизируйте связи — домены общаются только через RPC
- Централизуйте логику — бизнес-логика в Responder'ах
- Кэшируйте результаты — в Caller'ах для UI оптимизации
// ❌ Плохо - прямые зависимости
class OrderBloc {
final UserRepository userRepo;
final PaymentRepository paymentRepo;
final NotificationRepository notificationRepo;
}
// ✅ Хорошо - через RPC контракты
class OrderBloc {
final UserCaller userService;
final PaymentCaller paymentService;
final NotificationCaller notificationService;
}
Полезные ссылки:
- CORD Architecture
- RPC Dart на pub.flutter-io.cn
- Исходный код на GitHub
- Примеры кода
- Issues и поддержка
Создавайте масштабируемые Flutter приложения с RPC Dart!