process<T, R> method

Future<List<R>> process<T, R>({
  1. required List<T> items,
  2. required Future<R> processFunction(
    1. T
    ),
  3. void onProgress(
    1. int completed,
    2. int total
    )?,
})

Processes a list of items in parallel

items is the list of items to process processFunction is the function to apply to each item onProgress is an optional callback for progress updates

Implementation

Future<List<R>> process<T, R>({
  required List<T> items,
  required Future<R> Function(T) processFunction,
  void Function(int completed, int total)? onProgress,
}) async {
  final results = List<R?>.filled(items.length, null);
  final completers = List<Completer<R>>.generate(
    items.length,
    (_) => Completer<R>(),
  );

  int completedCount = 0;

  // Process items in parallel using the isolate pool
  for (int i = 0; i < items.length; i++) {
    final item = items[i];
    final completer = completers[i];

    // Get the next isolate in round-robin fashion
    final isolateIndex = _currentIndex;
    _currentIndex = (_currentIndex + 1) % _poolSize;

    // Send the task to the isolate
    _sendPorts[isolateIndex].send(
      _IsolateRequest(
        id: i,
        item: item,
        functionId: processFunction.hashCode,
        responsePort: _receivePorts[isolateIndex].sendPort,
      ),
    );

    // Set up the completion handler
    completer.future.then((result) {
      results[i] = result;
      completedCount++;

      if (onProgress != null) {
        onProgress(completedCount, items.length);
      }
    });
  }

  // Wait for all tasks to complete
  await Future.wait(completers.map((c) => c.future));

  return results.cast<R>();
}