newStream method
NewStream opens a new stream to given peer p, and writes a p2p/protocol header with given ProtocolID. If there is no connection to p, attempts to create one. If ProtocolID is "", writes no header. (Thread-safe)
If context
is not provided, a new Context will be created.
Implementation
@override
Future<P2PStream> newStream(PeerId p, List<ProtocolID> pids, Context context) async {
final startTime = DateTime.now();
// Set up a timeout context if needed
final hasTimeout = _negtimeout > Duration.zero;
final deadline = hasTimeout ? DateTime.now().add(_negtimeout) : null;
// Phase 1: Connection
final connectStartTime = DateTime.now();
await connect(AddrInfo(p, []), context: context);
final connectTime = DateTime.now().difference(connectStartTime);
// Phase 2: Stream Creation
final streamCreateStartTime = DateTime.now();
final stream = await _network.newStream(context, p);
final streamCreateTime = DateTime.now().difference(streamCreateStartTime);
// DEBUG: Add protocol assignment tracking
// Phase 3: Identify Wait
final identifyStartTime = DateTime.now();
await _idService.identifyWait(stream.conn);
final identifyTime = DateTime.now().difference(identifyStartTime);
// Phase 4: Protocol Negotiation
final negotiationStartTime = DateTime.now();
try {
if (hasTimeout && deadline != null) {
stream.setDeadline(deadline);
}
// DEBUG: Add detailed protocol negotiation tracking
final selectStartTime = DateTime.now();
final selectedProtocol = await _mux.selectOneOf(stream, pids);
final selectTime = DateTime.now().difference(selectStartTime);
// DEBUG: Add protocol selection result tracking
if (hasTimeout) {
stream.setDeadline(null); // Clear deadline after successful negotiation
}
if (selectedProtocol == null) {
_log.severe('🤝 [NEWSTREAM-PHASE-4] No protocol selected from: $pids');
stream.reset();
throw Exception('Failed to negotiate any of the requested protocols: $pids with peer $p');
}
// Phase 5: Protocol Setup
final setupStartTime = DateTime.now();
// DEBUG: Add protocol assignment tracking
await stream.setProtocol(selectedProtocol);
// Ensure the stream's scope is also updated with the protocol.
// This is crucial for services like Identify that attach to the scope.
await stream.scope().setProtocol(selectedProtocol);
// Add the successfully negotiated protocol to the peerstore for the remote peer.
// Note: The go-libp2p implementation adds this *after* the stream handler returns,
// but it seems more robust to add it as soon as negotiation succeeds.
// This ensures that even if the handler has issues, we've recorded the protocol.
peerStore.protoBook.addProtocols(p, [selectedProtocol]);
final setupTime = DateTime.now().difference(setupStartTime);
final negotiationTime = DateTime.now().difference(negotiationStartTime);
final totalTime = DateTime.now().difference(startTime);
// DEBUG: Add final protocol assignment confirmation
return stream;
} catch (e, stackTrace) {
final negotiationTime = DateTime.now().difference(negotiationStartTime);
final totalTime = DateTime.now().difference(startTime);
_log.severe('❌ [NEWSTREAM-ERROR] Stream creation failed after ${totalTime.inMilliseconds}ms (negotiation: ${negotiationTime.inMilliseconds}ms): $e\n$stackTrace');
try {
stream.reset();
} catch (resetError) {
_log.warning('⚠️ [NEWSTREAM-ERROR] Error during stream reset: $resetError');
}
// No need to check for UnimplementedError specifically anymore
throw Exception('Failed to negotiate protocol with $p for $pids: $e');
}
}