createOutgoing static method
Future<UDXStream>
createOutgoing(
- UDX udx,
- UDPSocket socket,
- int localId,
- int remoteId,
- String host,
- int port, {
- bool framed = false,
- int initialSeq = 0,
- int? initialCwnd,
- bool firewall(
- UDPSocket socket,
- int port,
- String host
)?,
})
Implementation
static Future<UDXStream> createOutgoing(
UDX udx,
UDPSocket socket,
int localId,
int remoteId,
String host,
int port, {
bool framed = false,
int initialSeq = 0,
int? initialCwnd,
bool Function(UDPSocket socket, int port, String host)? firewall,
}) async {
if (socket.closing) {
throw StateError('UDXStream.createOutgoing: Socket is closing');
}
if (!socket.canCreateNewStream()) {
throw StreamLimitExceededError('Cannot create new stream: remote peer stream limit reached.');
}
socket.incrementOutgoingStreams();
final stream = UDXStream(
udx,
localId,
isInitiator: true,
initialCwnd: initialCwnd,
framed: framed,
initialSeq: initialSeq,
firewall: firewall,
);
stream._socket = socket;
stream.remoteId = remoteId;
stream.remoteHost = host;
stream.remotePort = port;
stream.remoteFamily = UDX.getAddressFamily(host);
socket.registerStream(stream);
stream._remoteConnectionWindowUpdateSubscription?.cancel();
stream._remoteConnectionWindowUpdateSubscription = stream._socket!.on('remoteConnectionWindowUpdate').listen(stream._handleRemoteConnectionWindowUpdate);
////print('[UDXStream ${stream.id} Subscribing to remoteConnectionWindowUpdate on socket: ${stream._socket.hashCode}]');
stream._connected = true;
stream.packetManager.onRetransmit = (packet) {
if (!stream.connected || stream._socket == null || stream.remotePort == null || stream.remoteHost == null) return;
if (stream._socket!.closing) {
////print('[UDXStream ${stream.id}.onRetransmit] Socket is closing, skipping packet retransmission.');
return;
}
stream._socket!.send(packet.toBytes());
};
stream.packetManager.onSendProbe = (packet) {
if (!stream.connected || stream._socket == null || stream.remotePort == null || stream.remoteHost == null) return;
if (stream._socket!.closing) {
////print('[UDXStream ${stream.id}.onSendProbe] Socket is closing, skipping probe packet send.');
return;
}
stream._socket!.send(packet.toBytes());
};
// //print( '[STRM $localId] Creating outgoing SYN packet. Using remoteCid: ${socket.cids.remoteCid}, localCid: ${socket.cids.localCid}');
final initialPacket = UDXPacket(
destinationCid: socket.cids.remoteCid,
sourceCid: socket.cids.localCid,
destinationStreamId: remoteId,
sourceStreamId: localId,
sequence: stream.packetManager.nextSequence,
frames: [StreamFrame(data: Uint8List(0), isSyn: true)],
);
// Log the initial packet being sent
////print('[UDXStream ${stream.id} CREATE_OUTGOING_SENDING_INITIAL_PACKET] Seq=${initialPacket.sequence}, DestID=${initialPacket.destinationStreamId}, SrcID=${initialPacket.sourceStreamId}');
try {
stream._sentPackets[initialPacket.sequence] = (DateTime.now(), 0); // SYN packet has 0 data size
if (stream._socket != null && stream.remotePort != null && stream.remoteHost != null) {
if (!stream._socket!.closing) {
stream._socket!.send(initialPacket.toBytes());
} else {
////print('[UDXStream ${stream.id}.createOutgoing] Socket is closing, skipping initial packet send.');
}
}
stream.packetManager.sendPacket(initialPacket);
stream._congestionController.onPacketSent(0);
if (stream._socket != null && stream.remoteHost != null && stream.remotePort != null) {
if (!stream._socket!.closing) {
await stream._socket!.sendMaxDataFrame(
UDPSocket.defaultInitialConnectionWindow
);
await stream._socket!.sendMaxStreamsFrame();
////print('[UDXStream.createOutgoing] DEBUG: Attempted to send MaxDataFrame for stream ${stream.id} to ${stream.remoteHost}:${stream.remotePort}');
} else {
////print('[UDXStream ${stream.id}.createOutgoing] Socket is closing, skipping MaxDataFrame send.');
}
}
} catch (e) {
await stream.close();
rethrow;
}
stream.emit('connect');
return stream;
}