createOutgoing static method

Future<UDXStream> createOutgoing(
  1. UDX udx,
  2. UDPSocket socket,
  3. int localId,
  4. int remoteId,
  5. String host,
  6. int port, {
  7. bool framed = false,
  8. int initialSeq = 0,
  9. int? initialCwnd,
  10. bool firewall(
    1. UDPSocket socket,
    2. int port,
    3. String host
    )?,
})

Implementation

static Future<UDXStream> createOutgoing(
  UDX udx,
  UDPSocket socket,
  int localId,
  int remoteId,
  String host,
  int port, {
  bool framed = false,
  int initialSeq = 0,
  int? initialCwnd,
  bool Function(UDPSocket socket, int port, String host)? firewall,
}) async {
  if (socket.closing) {
    throw StateError('UDXStream.createOutgoing: Socket is closing');
  }

  if (!socket.canCreateNewStream()) {
    throw StreamLimitExceededError('Cannot create new stream: remote peer stream limit reached.');
  }
  socket.incrementOutgoingStreams();


  final stream = UDXStream(
    udx,
    localId,
    isInitiator: true,
    initialCwnd: initialCwnd,
    framed: framed,
    initialSeq: initialSeq,
    firewall: firewall,
  );
  stream._socket = socket;
  stream.remoteId = remoteId;
  stream.remoteHost = host;
  stream.remotePort = port;
  stream.remoteFamily = UDX.getAddressFamily(host);
  socket.registerStream(stream);
  stream._remoteConnectionWindowUpdateSubscription?.cancel();
  stream._remoteConnectionWindowUpdateSubscription = stream._socket!.on('remoteConnectionWindowUpdate').listen(stream._handleRemoteConnectionWindowUpdate);
  ////print('[UDXStream ${stream.id} Subscribing to remoteConnectionWindowUpdate on socket: ${stream._socket.hashCode}]');
  stream._connected = true;

  stream.packetManager.onRetransmit = (packet) {
    if (!stream.connected || stream._socket == null || stream.remotePort == null || stream.remoteHost == null) return;
    if (stream._socket!.closing) {
      ////print('[UDXStream ${stream.id}.onRetransmit] Socket is closing, skipping packet retransmission.');
      return;
    }
    stream._socket!.send(packet.toBytes());
  };
  stream.packetManager.onSendProbe = (packet) {
    if (!stream.connected || stream._socket == null || stream.remotePort == null || stream.remoteHost == null) return;
    if (stream._socket!.closing) {
      ////print('[UDXStream ${stream.id}.onSendProbe] Socket is closing, skipping probe packet send.');
      return;
    }
    stream._socket!.send(packet.toBytes());
  };

  // //print( '[STRM $localId] Creating outgoing SYN packet. Using remoteCid: ${socket.cids.remoteCid}, localCid: ${socket.cids.localCid}');
  final initialPacket = UDXPacket(
    destinationCid: socket.cids.remoteCid,
    sourceCid: socket.cids.localCid,
    destinationStreamId: remoteId,
    sourceStreamId: localId,
    sequence: stream.packetManager.nextSequence,
    frames: [StreamFrame(data: Uint8List(0), isSyn: true)],
  );

  // Log the initial packet being sent
  ////print('[UDXStream ${stream.id} CREATE_OUTGOING_SENDING_INITIAL_PACKET] Seq=${initialPacket.sequence}, DestID=${initialPacket.destinationStreamId}, SrcID=${initialPacket.sourceStreamId}');

  try {
    stream._sentPackets[initialPacket.sequence] = (DateTime.now(), 0); // SYN packet has 0 data size
    if (stream._socket != null && stream.remotePort != null && stream.remoteHost != null) {
      if (!stream._socket!.closing) {
        stream._socket!.send(initialPacket.toBytes());
      } else {
        ////print('[UDXStream ${stream.id}.createOutgoing] Socket is closing, skipping initial packet send.');
      }
    }
    stream.packetManager.sendPacket(initialPacket);
    stream._congestionController.onPacketSent(0);

    if (stream._socket != null && stream.remoteHost != null && stream.remotePort != null) {
      if (!stream._socket!.closing) {
        await stream._socket!.sendMaxDataFrame(
          UDPSocket.defaultInitialConnectionWindow
        );
        await stream._socket!.sendMaxStreamsFrame();
        ////print('[UDXStream.createOutgoing] DEBUG: Attempted to send MaxDataFrame for stream ${stream.id} to ${stream.remoteHost}:${stream.remotePort}');
      } else {
        ////print('[UDXStream ${stream.id}.createOutgoing] Socket is closing, skipping MaxDataFrame send.');
      }
    }
  } catch (e) {
    await stream.close();
    rethrow;
  }

  stream.emit('connect');
  return stream;
}