readDelimited<T extends GeneratedMessage> function

Future<T> readDelimited<T extends GeneratedMessage>(
  1. P2PStream stream,
  2. T builder(
    1. List<int> bytes
    )
)

Reads a varint length-prefixed message from the P2PStream.

stream: The P2PStream to read from. builder: A function that constructs the specific GeneratedMessage type from bytes. Returns a Future of the parsed message.

Implementation

Future<T> readDelimited<T extends GeneratedMessage>(
    P2PStream stream, T Function(List<int> bytes) builder) async {
  final carryOverBuffer = <int>[];

  final varintBytesBuilder = BytesBuilder(copy: false);
  int messageLength = -1;
  int varintByteCount = 0;

  // 1. Read and decode the varint length prefix
  while (true) {
    int byte;
    if (carryOverBuffer.isNotEmpty) {
      byte = carryOverBuffer.removeAt(0);
    } else {
      // Read a new chunk from the P2PStream. Read one byte at a time for varint, or a small chunk.
      // Reading one byte at a time can be inefficient. Let's read a small chunk.
      final chunk = await stream.read(12); // Read up to 12 bytes (max varint is 10, plus a bit)
      if (chunk.isEmpty) {
        if (stream.isClosed) {
          throw StateError('P2PStream closed prematurely while reading varint length.');
        } else {
           throw StateError('P2PStream read returned empty chunk while reading varint length.');
        }
      }
      carryOverBuffer.addAll(chunk);
      if (carryOverBuffer.isEmpty) {
        throw StateError('P2PStream logic error: carryOverBuffer empty after adding non-empty chunk.');
      }
      byte = carryOverBuffer.removeAt(0);
    }

    varintBytesBuilder.addByte(byte);
    varintByteCount++;

    if ((byte & 0x80) == 0) { // Last byte of varint
      try {
        messageLength = decodeVarint(varintBytesBuilder.toBytes());
      } catch (e) {
        throw FormatException('Invalid varint decoding: $e. Bytes: ${varintBytesBuilder.toBytes()}');
      }
      break;
    }

    if (varintByteCount > 10) {
      throw FormatException('Varint prefix is too long (max 10 bytes). Read: ${varintBytesBuilder.toBytes()}');
    }
  }

  if (messageLength < 0) {
    throw FormatException('Invalid message length: $messageLength');
  }
  if (messageLength == 0) { // Handle zero-length messages (e.g. empty protobuf message)
    return builder(Uint8List(0));
  }


  // 2. Read the message bytes
  final messageData = await _readNBytesFromP2PStream(stream, messageLength, carryOverBuffer);

  // 3. Parse the message
  return builder(messageData);
}