process<T, R> method
Processes a list of items in parallel
items
is the list of items to process
processFunction
is the function to apply to each item
onProgress
is an optional callback for progress updates
Implementation
Future<List<R>> process<T, R>({
required List<T> items,
required Future<R> Function(T) processFunction,
void Function(int completed, int total)? onProgress,
}) async {
final results = List<R?>.filled(items.length, null);
final completers = List<Completer<R>>.generate(
items.length,
(_) => Completer<R>(),
);
int completedCount = 0;
// Process items in parallel using the isolate pool
for (int i = 0; i < items.length; i++) {
final item = items[i];
final completer = completers[i];
// Get the next isolate in round-robin fashion
final isolateIndex = _currentIndex;
_currentIndex = (_currentIndex + 1) % _poolSize;
// Send the task to the isolate
_sendPorts[isolateIndex].send(
_IsolateRequest(
id: i,
item: item,
functionId: processFunction.hashCode,
responsePort: _receivePorts[isolateIndex].sendPort,
),
);
// Set up the completion handler
completer.future.then((result) {
results[i] = result;
completedCount++;
if (onProgress != null) {
onProgress(completedCount, items.length);
}
});
}
// Wait for all tasks to complete
await Future.wait(completers.map((c) => c.future));
return results.cast<R>();
}