readDelimited<T extends GeneratedMessage> function
Reads a varint length-prefixed message from the P2PStream.
stream
: The P2PStream to read from.
builder
: A function that constructs the specific GeneratedMessage type from bytes.
Returns a Future of the parsed message.
Implementation
Future<T> readDelimited<T extends GeneratedMessage>(
P2PStream stream, T Function(List<int> bytes) builder) async {
final carryOverBuffer = <int>[];
final varintBytesBuilder = BytesBuilder(copy: false);
int messageLength = -1;
int varintByteCount = 0;
// 1. Read and decode the varint length prefix
while (true) {
int byte;
if (carryOverBuffer.isNotEmpty) {
byte = carryOverBuffer.removeAt(0);
} else {
// Read a new chunk from the P2PStream. Read one byte at a time for varint, or a small chunk.
// Reading one byte at a time can be inefficient. Let's read a small chunk.
final chunk = await stream.read(12); // Read up to 12 bytes (max varint is 10, plus a bit)
if (chunk.isEmpty) {
if (stream.isClosed) {
throw StateError('P2PStream closed prematurely while reading varint length.');
} else {
throw StateError('P2PStream read returned empty chunk while reading varint length.');
}
}
carryOverBuffer.addAll(chunk);
if (carryOverBuffer.isEmpty) {
throw StateError('P2PStream logic error: carryOverBuffer empty after adding non-empty chunk.');
}
byte = carryOverBuffer.removeAt(0);
}
varintBytesBuilder.addByte(byte);
varintByteCount++;
if ((byte & 0x80) == 0) { // Last byte of varint
try {
messageLength = decodeVarint(varintBytesBuilder.toBytes());
} catch (e) {
throw FormatException('Invalid varint decoding: $e. Bytes: ${varintBytesBuilder.toBytes()}');
}
break;
}
if (varintByteCount > 10) {
throw FormatException('Varint prefix is too long (max 10 bytes). Read: ${varintBytesBuilder.toBytes()}');
}
}
if (messageLength < 0) {
throw FormatException('Invalid message length: $messageLength');
}
if (messageLength == 0) { // Handle zero-length messages (e.g. empty protobuf message)
return builder(Uint8List(0));
}
// 2. Read the message bytes
final messageData = await _readNBytesFromP2PStream(stream, messageLength, carryOverBuffer);
// 3. Parse the message
return builder(messageData);
}