newStream method

  1. @override
Future<P2PStream> newStream(
  1. PeerId p,
  2. List<ProtocolID> pids,
  3. Context context
)
override

NewStream opens a new stream to given peer p, and writes a p2p/protocol header with given ProtocolID. If there is no connection to p, attempts to create one. If ProtocolID is "", writes no header. (Thread-safe)

If context is not provided, a new Context will be created.

Implementation

@override
Future<P2PStream> newStream(PeerId p, List<ProtocolID> pids, Context context) async {
  final startTime = DateTime.now();


  // Set up a timeout context if needed
  final hasTimeout = _negtimeout > Duration.zero;
  final deadline = hasTimeout ? DateTime.now().add(_negtimeout) : null;


  // Phase 1: Connection

  final connectStartTime = DateTime.now();

  await connect(AddrInfo(p, []), context: context);

  final connectTime = DateTime.now().difference(connectStartTime);


  // Phase 2: Stream Creation

  final streamCreateStartTime = DateTime.now();

  final stream = await _network.newStream(context, p);

  final streamCreateTime = DateTime.now().difference(streamCreateStartTime);


  // DEBUG: Add protocol assignment tracking


  // Phase 3: Identify Wait

  final identifyStartTime = DateTime.now();

  await _idService.identifyWait(stream.conn);

  final identifyTime = DateTime.now().difference(identifyStartTime);


  // Phase 4: Protocol Negotiation

  final negotiationStartTime = DateTime.now();

  try {
    if (hasTimeout && deadline != null) {

      stream.setDeadline(deadline);
    }


    // DEBUG: Add detailed protocol negotiation tracking

    final selectStartTime = DateTime.now();

    final selectedProtocol = await _mux.selectOneOf(stream, pids);

    final selectTime = DateTime.now().difference(selectStartTime);


    // DEBUG: Add protocol selection result tracking


    if (hasTimeout) {

      stream.setDeadline(null); // Clear deadline after successful negotiation
    }

    if (selectedProtocol == null) {
      _log.severe('🤝 [NEWSTREAM-PHASE-4] No protocol selected from: $pids');
      stream.reset();
      throw Exception('Failed to negotiate any of the requested protocols: $pids with peer $p');
    }

    // Phase 5: Protocol Setup

    final setupStartTime = DateTime.now();

    // DEBUG: Add protocol assignment tracking

    await stream.setProtocol(selectedProtocol);

    // Ensure the stream's scope is also updated with the protocol.
    // This is crucial for services like Identify that attach to the scope.

    await stream.scope().setProtocol(selectedProtocol);

    // Add the successfully negotiated protocol to the peerstore for the remote peer.
    // Note: The go-libp2p implementation adds this *after* the stream handler returns,
    // but it seems more robust to add it as soon as negotiation succeeds.
    // This ensures that even if the handler has issues, we've recorded the protocol.
    peerStore.protoBook.addProtocols(p, [selectedProtocol]);

    final setupTime = DateTime.now().difference(setupStartTime);
    final negotiationTime = DateTime.now().difference(negotiationStartTime);
    final totalTime = DateTime.now().difference(startTime);





    // DEBUG: Add final protocol assignment confirmation


    return stream;

  } catch (e, stackTrace) {
    final negotiationTime = DateTime.now().difference(negotiationStartTime);
    final totalTime = DateTime.now().difference(startTime);
    _log.severe('❌ [NEWSTREAM-ERROR] Stream creation failed after ${totalTime.inMilliseconds}ms (negotiation: ${negotiationTime.inMilliseconds}ms): $e\n$stackTrace');

    try {
      stream.reset();

    } catch (resetError) {
      _log.warning('⚠️ [NEWSTREAM-ERROR] Error during stream reset: $resetError');
    }

    // No need to check for UnimplementedError specifically anymore
    throw Exception('Failed to negotiate protocol with $p for $pids: $e');
  }
}