@override Stream<S> processor(Stream<T> input) async* { await for (final task in input) { yield* onTask(task); } }