transform method

Stream<List<Conversation>> transform(
  1. Stream<List<Conversation>> data,
  2. List<StepDescriptor> steps,
  3. PackedDataCache? cache
)

Dispatch the stream to local workers and transform using the provided steps

Implementation

Stream<List<Conversation>> transform(Stream<List<Conversation>> data,
    List<StepDescriptor> steps, PackedDataCache? cache) async* {
  yield* data
      .map((data) => run(TransformTask(data, steps, cache: cache))
          .then((value) => switch (value) {
                TransformResponse result => result.batch,
              }))
      .transform(SynchronizingTransformer(workers.length));
}