worker_pool 0.0.5 copy "worker_pool: ^0.0.5" to clipboard
worker_pool: ^0.0.5 copied to clipboard

A Dart/Flutter package for managing a pool of isolates to execute tasks concurrently, with support for resource management, constant sharing, and proper cleanup.

Worker Pool #

A Dart/Flutter package for managing a pool of isolates to execute tasks concurrently, with support for resource management, constant sharing, and proper cleanup.

中文版 README

Features #

  • Isolate Pool Management: Efficiently manages a pool of Dart isolates for concurrent task execution
  • Resource Management: Initialize and clean up resources in isolates with custom finalizers
  • Constant Sharing: Share constants between the main thread and isolates
  • Task Queue: Automatically queues tasks when all isolates are busy
  • Timeout Handling: Configurable timeouts for task execution
  • Health Monitoring: Automatic worker restart based on task count or inactivity
  • Function Registration: Register functions that can be executed in isolates
  • Statistics: Get pool statistics for monitoring performance

Getting Started #

Prerequisites #

  • Dart SDK ^3.8.1
  • Flutter >=1.17.0 (for Flutter projects)

Installation #

Add worker_pool to your pubspec.yaml:

dependencies:
  worker_pool:
    path: /path/to/worker_pool

Or if published to pub.flutter-io.cn:

dependencies:
  worker_pool: last_version

Usage #

Basic Setup #

import 'package:worker_pool/worker_pool.dart';

// Initialize the worker pool
// By default, `poolSize` is the number of processors on the machine.
final config = WorkerPoolConfig(
  isolateTimeout: Duration(seconds: 30), // Task timeout
);
await WorkerPool.initialize(config);

// Get the instance
final pool = WorkerPool.instance;

Function Registration #

To execute functions in isolates, they need to be registered during initialization:

// Define a function that can be executed in an isolate
Future<int> computeSquare(int number) async {
  // Simulate some computation
  await Future.delayed(Duration(milliseconds: 100));
  return number * number;
}

// Register the function during initialization
final config = WorkerPoolConfig(
  poolSize: 4,
  predefinedFunctions: {
    'computeSquare': computeSquare,
  },
);
await WorkerPool.initialize(config);

Task Execution #

// Submit a task to the pool
try {
  final result = await pool.submit<int, int>('computeSquare', 5);
  print('Result: $result'); // Output: Result: 25
} catch (e) {
  print('Task failed: $e');
}

Using Constants #

Share constants between the main thread and isolates:

// Define a constant provider
Future<Map<String, dynamic>> provideConstants() async {
  return {
    'apiUrl': 'https://api.example.com',
    'timeout': 5000,
  };
}

// Register the constant provider
final config = WorkerPoolConfig(
  poolSize: 4,
  constantProviders: [provideConstants],
  predefinedFunctions: {
    'computeWithConstants': computeWithConstants,
  },
);
await WorkerPool.initialize(config);

In isolate functions:

Future<String> computeWithConstants(String input) async {
  final apiUrl = IsolateConstantManager.getConstant<String>('apiUrl');
  final timeout = IsolateConstantManager.getConstant<int>('timeout');
  
  // Use constants in computation
  return '$input processed with API: $apiUrl and timeout: $timeout';
}

Resource Management #

Initialize and clean up resources in isolates:

// Define a resource initializer
Future<Map<String, dynamic>?> initializeDatabase() async {
  // Initialize database connection
  final db = await Database.connect('database_url');
  return {
    'database': db,
  };
}

// Define a resource finalizer
Future<void> finalizeDatabase(dynamic resource) async {
  if (resource is Database) {
    await resource.close();
  }
}

// Register resource management
final config = WorkerPoolConfig(
  poolSize: 4,
  resourceInitializers: [initializeDatabase],
  resourceFinalizers: {
    'database': finalizeDatabase,
  },
);
await WorkerPool.initialize(config);

Using registered resources in isolate tasks:

// Define a function to execute in an isolate using registered resources
Future<String> processWithDatabase(String data) async {
  // Use IsolateResourceManager to get registered resources
  final database = IsolateResourceManager.getResource<Database>('database');
  
  if (database == null) {
    throw Exception('Database resource not found');
  }
  
  // Use the database resource for operations
  final result = await database.query('SELECT * FROM table WHERE data = ?', [data]);
  return result.toString();
}

// Register this function and initialize the worker pool
final config = WorkerPoolConfig(
  poolSize: 4,
  resourceInitializers: [initializeDatabase],
  resourceFinalizers: {
    'database': finalizeDatabase,
  },
  predefinedFunctions: {
    'processWithDatabase': processWithDatabase,
  },
);
await WorkerPool.initialize(config);

// Submit a task that uses resources
final result = await pool.submit<String, String>('processWithDatabase', 'sample_data');
print('Processing result: $result');

Getting Statistics #

Monitor pool performance:

final stats = pool.getStatistics();
print('Total workers: ${stats['totalWorkers']}');
print('Available workers: ${stats['availableWorkers']}');
print('Busy workers: ${stats['busyWorkers']}');
print('Queued tasks: ${stats['queuedTasks']}');

Cleanup #

Dispose the pool when finished:

await pool.dispose();

Configuration Options #

The WorkerPoolConfig class provides several configuration options:

Option Description Default
poolSize Number of isolates to maintain in the pool. Platform.numberOfProcessors
resourceInitializers List of functions to initialize resources in isolates []
resourceFinalizers Map of finalizer functions for resource cleanup {}
constantProviders List of functions to provide constants to isolates []
healthCheckInterval Interval for health checks 5 minutes
isolateTimeout Timeout for task execution 30 seconds
maxTasksPerIsolate Maximum tasks per isolate before restart 1000
predefinedFunctions Map of functions that can be executed in isolates {}

Additional Information #

Error Handling #

The worker pool handles errors gracefully:

  • Task execution errors are propagated to the caller
  • Isolate crashes are automatically recovered by restarting the worker
  • Timeouts are handled with TimeoutException

Performance Considerations #

  • Tasks should be CPU-intensive to benefit from isolate-based concurrency
  • I/O operations should use async/await and may not benefit from isolates
  • Consider the overhead of data serialization when passing arguments to isolates

Testing #

The project includes comprehensive unit tests to ensure functionality and reliability:

flutter test

Contributing #

Contributions are welcome! Please feel free to submit issues and pull requests.

License #

This project is licensed under the MIT License - see the LICENSE file for details.

1
likes
160
points
35
downloads

Publisher

unverified uploader

Weekly Downloads

A Dart/Flutter package for managing a pool of isolates to execute tasks concurrently, with support for resource management, constant sharing, and proper cleanup.

Homepage
Repository (GitLab)
View/report issues

Topics

#concurrency #isolates #worker-pool #parallel-processing #flutter

Documentation

API reference

License

MIT (license)

Dependencies

flutter

More

Packages that depend on worker_pool