recursiveDistillChunks method

Stream<IChunk> recursiveDistillChunks({
  1. required Stream<IChunk> chunks,
  2. required IDistiller distiller,
  3. int parallelism = 8,
  4. int factor = 3,
  5. bool isBase = true,
})

Implementation

Stream<IChunk> recursiveDistillChunks({
  required Stream<IChunk> chunks,
  required IDistiller distiller,
  int parallelism = 8,
  int factor = 3,
  bool isBase = true,
}) async* {
  Queue<IChunk> buffer = Queue<IChunk>();

  Stream<IChunk> distilled =
      distillChunks(
        chunks: chunks.map((i) {
          if (isBase) buffer.add(i);
          return i;
        }),
        distiller: distiller,
        factor: factor,
        parallelism: parallelism,
      ).asBroadcastStream();

  StreamController<IChunk> nextLevelController = StreamController<IChunk>();

  StreamSubscription sub = distilled.listen(nextLevelController.add);

  int c = 0;
  await for (IChunk i in distilled) {
    c++;
    if (isBase) {
      while (buffer.isNotEmpty) {
        yield buffer.removeFirst();
      }
    }
    yield i;
  }

  sub.cancel();
  nextLevelController.close();

  if (c <= factor) {
    return;
  }

  yield* recursiveDistillChunks(
    chunks: nextLevelController.stream,
    distiller: distiller,
    factor: factor,
    parallelism: parallelism,
    isBase: false,
  );
}