worker_pool 0.0.5
worker_pool: ^0.0.5 copied to clipboard
A Dart/Flutter package for managing a pool of isolates to execute tasks concurrently, with support for resource management, constant sharing, and proper cleanup.
Worker Pool #
A Dart/Flutter package for managing a pool of isolates to execute tasks concurrently, with support for resource management, constant sharing, and proper cleanup.
Features #
- Isolate Pool Management: Efficiently manages a pool of Dart isolates for concurrent task execution
- Resource Management: Initialize and clean up resources in isolates with custom finalizers
- Constant Sharing: Share constants between the main thread and isolates
- Task Queue: Automatically queues tasks when all isolates are busy
- Timeout Handling: Configurable timeouts for task execution
- Health Monitoring: Automatic worker restart based on task count or inactivity
- Function Registration: Register functions that can be executed in isolates
- Statistics: Get pool statistics for monitoring performance
Getting Started #
Prerequisites #
- Dart SDK ^3.8.1
- Flutter >=1.17.0 (for Flutter projects)
Installation #
Add worker_pool
to your pubspec.yaml
:
dependencies:
worker_pool:
path: /path/to/worker_pool
Or if published to pub.flutter-io.cn:
dependencies:
worker_pool: last_version
Usage #
Basic Setup #
import 'package:worker_pool/worker_pool.dart';
// Initialize the worker pool
// By default, `poolSize` is the number of processors on the machine.
final config = WorkerPoolConfig(
isolateTimeout: Duration(seconds: 30), // Task timeout
);
await WorkerPool.initialize(config);
// Get the instance
final pool = WorkerPool.instance;
Function Registration #
To execute functions in isolates, they need to be registered during initialization:
// Define a function that can be executed in an isolate
Future<int> computeSquare(int number) async {
// Simulate some computation
await Future.delayed(Duration(milliseconds: 100));
return number * number;
}
// Register the function during initialization
final config = WorkerPoolConfig(
poolSize: 4,
predefinedFunctions: {
'computeSquare': computeSquare,
},
);
await WorkerPool.initialize(config);
Task Execution #
// Submit a task to the pool
try {
final result = await pool.submit<int, int>('computeSquare', 5);
print('Result: $result'); // Output: Result: 25
} catch (e) {
print('Task failed: $e');
}
Using Constants #
Share constants between the main thread and isolates:
// Define a constant provider
Future<Map<String, dynamic>> provideConstants() async {
return {
'apiUrl': 'https://api.example.com',
'timeout': 5000,
};
}
// Register the constant provider
final config = WorkerPoolConfig(
poolSize: 4,
constantProviders: [provideConstants],
predefinedFunctions: {
'computeWithConstants': computeWithConstants,
},
);
await WorkerPool.initialize(config);
In isolate functions:
Future<String> computeWithConstants(String input) async {
final apiUrl = IsolateConstantManager.getConstant<String>('apiUrl');
final timeout = IsolateConstantManager.getConstant<int>('timeout');
// Use constants in computation
return '$input processed with API: $apiUrl and timeout: $timeout';
}
Resource Management #
Initialize and clean up resources in isolates:
// Define a resource initializer
Future<Map<String, dynamic>?> initializeDatabase() async {
// Initialize database connection
final db = await Database.connect('database_url');
return {
'database': db,
};
}
// Define a resource finalizer
Future<void> finalizeDatabase(dynamic resource) async {
if (resource is Database) {
await resource.close();
}
}
// Register resource management
final config = WorkerPoolConfig(
poolSize: 4,
resourceInitializers: [initializeDatabase],
resourceFinalizers: {
'database': finalizeDatabase,
},
);
await WorkerPool.initialize(config);
Using registered resources in isolate tasks:
// Define a function to execute in an isolate using registered resources
Future<String> processWithDatabase(String data) async {
// Use IsolateResourceManager to get registered resources
final database = IsolateResourceManager.getResource<Database>('database');
if (database == null) {
throw Exception('Database resource not found');
}
// Use the database resource for operations
final result = await database.query('SELECT * FROM table WHERE data = ?', [data]);
return result.toString();
}
// Register this function and initialize the worker pool
final config = WorkerPoolConfig(
poolSize: 4,
resourceInitializers: [initializeDatabase],
resourceFinalizers: {
'database': finalizeDatabase,
},
predefinedFunctions: {
'processWithDatabase': processWithDatabase,
},
);
await WorkerPool.initialize(config);
// Submit a task that uses resources
final result = await pool.submit<String, String>('processWithDatabase', 'sample_data');
print('Processing result: $result');
Getting Statistics #
Monitor pool performance:
final stats = pool.getStatistics();
print('Total workers: ${stats['totalWorkers']}');
print('Available workers: ${stats['availableWorkers']}');
print('Busy workers: ${stats['busyWorkers']}');
print('Queued tasks: ${stats['queuedTasks']}');
Cleanup #
Dispose the pool when finished:
await pool.dispose();
Configuration Options #
The WorkerPoolConfig
class provides several configuration options:
Option | Description | Default |
---|---|---|
poolSize |
Number of isolates to maintain in the pool. | Platform.numberOfProcessors |
resourceInitializers |
List of functions to initialize resources in isolates | [] |
resourceFinalizers |
Map of finalizer functions for resource cleanup | {} |
constantProviders |
List of functions to provide constants to isolates | [] |
healthCheckInterval |
Interval for health checks | 5 minutes |
isolateTimeout |
Timeout for task execution | 30 seconds |
maxTasksPerIsolate |
Maximum tasks per isolate before restart | 1000 |
predefinedFunctions |
Map of functions that can be executed in isolates | {} |
Additional Information #
Error Handling #
The worker pool handles errors gracefully:
- Task execution errors are propagated to the caller
- Isolate crashes are automatically recovered by restarting the worker
- Timeouts are handled with
TimeoutException
Performance Considerations #
- Tasks should be CPU-intensive to benefit from isolate-based concurrency
- I/O operations should use async/await and may not benefit from isolates
- Consider the overhead of data serialization when passing arguments to isolates
Testing #
The project includes comprehensive unit tests to ensure functionality and reliability:
flutter test
Contributing #
Contributions are welcome! Please feel free to submit issues and pull requests.
License #
This project is licensed under the MIT License - see the LICENSE file for details.