recursiveDistillChunks method
Implementation
Stream<IChunk> recursiveDistillChunks({
required Stream<IChunk> chunks,
required IDistiller distiller,
int parallelism = 8,
int factor = 3,
bool isBase = true,
}) async* {
Queue<IChunk> buffer = Queue<IChunk>();
Stream<IChunk> distilled =
distillChunks(
chunks: chunks.map((i) {
if (isBase) buffer.add(i);
return i;
}),
distiller: distiller,
factor: factor,
parallelism: parallelism,
).asBroadcastStream();
StreamController<IChunk> nextLevelController = StreamController<IChunk>();
StreamSubscription sub = distilled.listen(nextLevelController.add);
int c = 0;
await for (IChunk i in distilled) {
c++;
if (isBase) {
while (buffer.isNotEmpty) {
yield buffer.removeFirst();
}
}
yield i;
}
sub.cancel();
nextLevelController.close();
if (c <= factor) {
return;
}
yield* recursiveDistillChunks(
chunks: nextLevelController.stream,
distiller: distiller,
factor: factor,
parallelism: parallelism,
isBase: false,
);
}