isolated_stream 2.0.0-dev.1 copy "isolated_stream: ^2.0.0-dev.1" to clipboard
isolated_stream: ^2.0.0-dev.1 copied to clipboard

A Dart package for running CPU-intensive stream transformations in separate isolates to prevent blocking the main thread.

example/isolated_stream_example.dart

import 'dart:async';
import 'package:isolated_stream/isolated_stream.dart';

/// Example handler for CPU-intensive prime number checking
class PrimeCheckHandler extends IsolatedHandler<int, bool> {
  PrimeCheckHandler() : super(debugName: 'PrimeChecker');

  @override
  bool compute(int number) {
    if (number < 2) return false;
    if (number == 2) return true;
    if (number % 2 == 0) return false;

    for (int i = 3; i * i <= number; i += 2) {
      if (number % i == 0) return false;
    }
    return true;
  }
}

/// Example handler for asynchronous operations
class AsyncMultiplyHandler extends IsolatedHandler<int, int> {
  final int factor;

  AsyncMultiplyHandler(this.factor)
      : super(debugName: 'AsyncMultiply by $factor');

  @override
  Future<int> compute(int value) async {
    // Simulate some async work
    await Future.delayed(const Duration(milliseconds: 10));
    return value * factor;
  }
}

/// Example handler for complex data processing
class DataProcessorHandler
    extends IsolatedHandler<Map<String, dynamic>, Map<String, dynamic>> {
  DataProcessorHandler() : super(debugName: 'DataProcessor');

  @override
  Map<String, dynamic> compute(Map<String, dynamic> data) {
    // Process the data
    final processed = Map<String, dynamic>.from(data);
    processed['processed_at'] = DateTime.now().millisecondsSinceEpoch;
    processed['hash'] = data.toString().hashCode;

    return processed;
  }
}

void main() async {
  print('=== Isolated Stream Examples ===\n');

  // Example 1: Basic synchronous transformation
  print('1. Basic synchronous transformation:');
  await basicExample();

  // Example 2: CPU-intensive operations
  print('\n2. CPU-intensive prime checking:');
  await primeCheckingExample();

  // Example 3: Asynchronous operations
  print('\n3. Asynchronous operations:');
  await asyncExample();

  // Example 4: Concurrent processing
  print('\n4. Concurrent processing:');
  await concurrentExample();

  // Example 5: Isolate pooling
  print('\n5. Isolate pooling:');
  await isolatePoolExample();

  // Example 6: Complex data processing
  print('\n6. Complex data processing:');
  await complexDataExample();

  print('\n=== All examples completed ===');
}

Future<void> basicExample() async {
  final numbers = Stream.fromIterable([1, 2, 3, 4, 5]);
  final doubled = numbers.isolatedMap(AsyncMultiplyHandler(2));

  print('Input: [1, 2, 3, 4, 5]');
  print('Processing: multiply by 2 in isolate');

  final results = await doubled.toList();
  print('Output: $results');
}

Future<void> primeCheckingExample() async {
  final candidates = Stream.fromIterable([17, 18, 19, 20, 21, 22, 23]);
  final primeResults = candidates.isolatedMap(PrimeCheckHandler());

  print('Checking if these numbers are prime: [17, 18, 19, 20, 21, 22, 23]');

  final results = await primeResults.toList();
  for (int i = 0; i < results.length; i++) {
    final number = [17, 18, 19, 20, 21, 22, 23][i];
    final isPrime = results[i];
    print('  $number is ${isPrime ? 'prime' : 'not prime'}');
  }
}

Future<void> asyncExample() async {
  final numbers = Stream.fromIterable([1, 2, 3, 4]);
  final processed = numbers.isolatedMap(AsyncMultiplyHandler(3));

  print('Processing [1, 2, 3, 4] with async multiply by 3:');

  await for (final result in processed) {
    print('  Processed: $result');
  }
}

Future<void> concurrentExample() async {
  print('Comparing sequential vs concurrent processing:');

  // Sequential processing
  final stopwatch1 = Stopwatch()..start();
  final sequential = Stream.fromIterable([1, 2, 3, 4, 5, 6])
      .isolatedMap(AsyncMultiplyHandler(2));
  await sequential.toList();
  stopwatch1.stop();
  print('  Sequential processing took: ${stopwatch1.elapsedMilliseconds}ms');

  // Concurrent processing
  final stopwatch2 = Stopwatch()..start();
  final concurrent = Stream.fromIterable([1, 2, 3, 4, 5, 6]).isolatedMap(
    AsyncMultiplyHandler(2),
    strategy: IsolatedProcessingStrategy.concurrent(concurrency: 3),
  );
  await concurrent.toList();
  stopwatch2.stop();
  print('  Concurrent processing took: ${stopwatch2.elapsedMilliseconds}ms');

  final improvement =
      stopwatch1.elapsedMilliseconds / stopwatch2.elapsedMilliseconds;
  print('  Performance improvement: ${improvement.toStringAsFixed(1)}x faster');
}

Future<void> isolatePoolExample() async {
  print('Comparing single isolate vs isolate pool:');

  final workload = List.generate(12, (i) => i + 1);

  // Single isolate with high concurrency
  final stopwatch1 = Stopwatch()..start();
  final singleIsolate = Stream.fromIterable(workload).isolatedMap(
    AsyncMultiplyHandler(2),
    strategy: IsolatedProcessingStrategy.concurrent(
      concurrency: 6,
      isolates: 1, // Single isolate
    ),
  );
  await singleIsolate.toList();
  stopwatch1.stop();
  print('  Single isolate (6 concurrent): ${stopwatch1.elapsedMilliseconds}ms');

  // Multiple isolates with high concurrency
  final stopwatch2 = Stopwatch()..start();
  final multipleIsolates = Stream.fromIterable(workload).isolatedMap(
    AsyncMultiplyHandler(2),
    strategy: IsolatedProcessingStrategy.concurrent(
      concurrency: 6,
      isolates: 3, // 3 isolates in pool
    ),
  );
  await multipleIsolates.toList();
  stopwatch2.stop();
  print(
      '  Isolate pool (6 concurrent, 3 isolates): ${stopwatch2.elapsedMilliseconds}ms');

  print(
      '  Isolate pooling helps distribute work for better resource utilization');
}

Future<void> complexDataExample() async {
  final data = <Map<String, dynamic>>[
    {'name': 'Alice', 'age': 30, 'city': 'New York'},
    {'name': 'Bob', 'age': 25, 'city': 'London'},
    {'name': 'Charlie', 'age': 35, 'city': 'Tokyo'},
  ];

  final stream = Stream<Map<String, dynamic>>.fromIterable(data);
  final processed = stream.isolatedMap(DataProcessorHandler());

  print('Processing complex data structures:');

  final results = await processed.toList();
  for (final result in results) {
    print(
        '  ${result['name']}: processed at ${result['processed_at']}, hash: ${result['hash']}');
  }
}
1
likes
160
points
0
downloads

Publisher

verified publisherklyta.it

Weekly Downloads

A Dart package for running CPU-intensive stream transformations in separate isolates to prevent blocking the main thread.

Repository (GitHub)
View/report issues

Topics

#isolate #stream #concurrency #performance #async

Documentation

API reference

Funding

Consider supporting this project:

www.paypal.com
www.buymeacoffee.com

License

MIT (license)

Dependencies

meta

More

Packages that depend on isolated_stream