dial method
Implementation
@override
Future<TransportConn> dial(MultiAddr addr, {Duration? timeout}) async {
_log.fine('Dialing $addr');
// 1. Parse the /p2p-circuit address.
final addrComponents = addr.components; // Use the components getter
String? relayIdStr;
String? destIdStr;
int p2pIdx = -1;
for (int i = 0; i < addrComponents.length; i++) {
if (addrComponents[i].$1.code == Protocols.p2p.code) {
p2pIdx = i;
relayIdStr = addrComponents[i].$2;
break;
}
}
if (relayIdStr == null) {
throw ArgumentError('Dial address must contain a /p2p/relayId component: $addr');
}
final relayId = p2p_peer.PeerId.fromString(relayIdStr); // Use concrete PeerId.fromString
bool connectToRelayAsDest = false;
PeerId destId;
int circuitIdx = -1;
for (int i = p2pIdx + 1; i < addrComponents.length; i++) {
if (addrComponents[i].$1.code == Protocols.circuit.code) {
circuitIdx = i;
break;
}
}
if (circuitIdx == -1) {
throw ArgumentError('Dial address is not a circuit address (missing /p2p-circuit): $addr');
}
if (circuitIdx == addrComponents.length - 1) {
// Ends with /p2p-circuit, so destination is the relay itself
destId = relayId;
connectToRelayAsDest = true;
_log.fine('Dialing relay $relayId as destination via circuit');
} else if (circuitIdx < addrComponents.length - 1 && addrComponents[circuitIdx + 1].$1.code == Protocols.p2p.code) {
// Has /p2p/destId after /p2p-circuit
destIdStr = addrComponents[circuitIdx + 1].$2;
destId = p2p_peer.PeerId.fromString(destIdStr); // Use concrete PeerId.fromString
_log.fine('Dialing $destId via relay $relayId');
} else {
throw ArgumentError('Invalid circuit address format after /p2p-circuit: $addr');
}
// 2. Connect to the relay peer if not already connected.
// The host should handle this when opening a new stream.
// We might need to add the relay's address to the peerstore if we know it,
// but typically the caller of dial should have done this or the host can discover it.
// 3. Open a new stream to the relay using CircuitV2Protocol.protoIDv2Hop.
// 3. Open a new stream to the relay using CircuitV2Protocol.protoIDv2Hop.
// Host.newStream requires a Context. Creating a default one for now.
// TODO: Consider if a more specific context is needed.
final ctx = Context(); // Create a new Context
_log.fine('Opening HOP stream to relay ${relayId.toString()}');
// Correct order for newStream: peerId, protocols, context
final hopStream = await host.newStream(relayId, [CircuitV2Protocol.protoIDv2Hop], ctx);
_log.fine('HOP stream to relay ${relayId.toString()} opened');
try {
// 4. Send a HopMessage with type = CONNECT and peer set to the destination peer.
final hopMsg = circuit_pb.HopMessage()
..type = circuit_pb.HopMessage_Type.CONNECT
..peer = (circuit_pb.Peer()
..id = destId.toBytes()
// Optionally add our listen addrs for the destination to know
// ..addAllAddrs(host.listenAddrs().map((ma) => ma.toBytes()).toList())
);
if (connectToRelayAsDest) {
// If connecting to the relay itself as destination, the peer field in HopMessage
// might be empty or refer to the relay itself. Go client sends its own AddrInfo.
// For simplicity, let's assume destId (which is relayId here) is correct.
}
_log.fine('Sending HopMessage.CONNECT to relay for dest ${destId.toString()}');
// writeDelimitedMessage expects Sink<List<int>>. P2PStream has `write(Uint8List)`.
// We need an adapter or to ensure P2PStream can be treated as Sink<List<int>>.
// For now, assume direct usage or a simple helper within writeDelimitedMessage if it handles P2PStream.
// If circuit_io.writeDelimitedMessage is strict, we'd do:
// final hopSink = StreamController<List<int>>();
// hopSink.stream.listen(hopStream.write); // This is simplified, needs proper sink adapter
// await circuit_io.writeDelimitedMessage(hopSink, hopMsg);
// await hopSink.close();
// For now, assume P2PStream can be used directly if its write method matches Sink's add.
// The original code used hopStream.sink, which is not available.
// Let's assume circuit_io.writeDelimitedMessage can handle a P2PStream directly
// by calling its .write(Uint8List) method. This is an optimistic assumption.
// A more robust solution would be an adapter if direct use fails.
// The function signature is `void writeDelimitedMessage(Sink<List<int>> sink, GeneratedMessage message)`
// P2PStream is not a Sink<List<int>> directly.
// Let's create a simple adapter inline for now.
final StreamController<List<int>> hopSinkController = StreamController();
hopSinkController.stream.listen((data) {
hopStream.write(Uint8List.fromList(data));
});
circuit_io.writeDelimitedMessage(hopSinkController.sink, hopMsg); // Removed await
await hopSinkController.close();
// 5. Await a HopMessage response from the relay with type = STATUS.
final adaptedHopStreamForReader = _adaptP2PStreamToDartStream(hopStream);
final hopReader = circuit_io.DelimitedReader(adaptedHopStreamForReader, maxCircuitMessageSize);
final statusMsg = await hopReader.readMsg(circuit_pb.HopMessage());
// if (statusMsg == null) { // Redundant check
// throw Exception('Did not receive status message from relay');
// }
_log.fine('Received HopMessage from relay: type=${statusMsg.type}, status=${statusMsg.status}');
if (statusMsg.type != circuit_pb.HopMessage_Type.STATUS) {
throw Exception('Expected STATUS message from relay, got ${statusMsg.type}');
}
if (statusMsg.status != circuit_pb.Status.OK) {
throw Exception('Relay returned error status: ${statusMsg.status}');
}
// 6. If status is OK, the stream `hopStream` is now connected to the destination peer.
// Wrap this stream in a RelayedConn object and return it.
final relayedConn = RelayedConn(
stream: hopStream as P2PStream<Uint8List>, // Cast needed
transport: this,
localPeer: host.id,
remotePeer: destId,
localMultiaddr: addr, // The address we dialed
remoteMultiaddr: addr.decapsulate(Protocols.circuit.name)!, // Decapsulate /p2p-circuit part
// isInitiator: true, // This is derived from stream.stat().direction in RelayedConn
);
_log.fine('Successfully dialed ${destId.toString()} via relay ${relayId.toString()}');
return relayedConn;
} catch (e, s) {
_log.severe('Error during HOP stream negotiation: $e\n$s');
await hopStream.reset(); // Ensure stream is closed on error
rethrow;
}
}