serverToSocket static method
Future<SocketConnector>
serverToSocket({
- InternetAddress? addressA,
- int portA = 0,
- required InternetAddress addressB,
- required int portB,
- DataTransformer? transformAtoB,
- DataTransformer? transformBtoA,
- bool verbose = false,
- bool logTraffic = false,
- Duration timeout = SocketConnector.defaultTimeout,
- IOSink? logger,
- bool multi = false,
- @Deprecated("use beforeJoining instead") dynamic onConnect()?,
- dynamic beforeJoining()?,
- int backlog = 0,
- Creates socket to
portBonaddressB - Binds to
portAonaddressA - Listens for a socket connection on
portAport and joins it to the 'B' side - If
portAis not provided then a port is chosen by the OS. addressAdefaults to InternetAddress.anyIPv4multiflag controls whether or not to allow multiple connections to the bound server portportAonConnectis called whenportAhas got a new connection and a corresponding outbound socket has been created toaddressB:portBand the two have been joined togetherbeforeJoiningis called whenportAhas got a new connection and a corresponding outbound socket has been created toaddressB:portBbut before they are joined together. This allows the code which called serverToSocket to take additional steps (such as setting new transformers rather than the ones which were provided initially)
Implementation
static Future<SocketConnector> serverToSocket(
{
/// Defaults to [InternetAddress.anyIPv4]
InternetAddress? addressA,
int portA = 0,
required InternetAddress addressB,
required int portB,
DataTransformer? transformAtoB,
DataTransformer? transformBtoA,
bool verbose = false,
bool logTraffic = false,
Duration timeout = SocketConnector.defaultTimeout,
IOSink? logger,
bool multi = false,
@Deprecated("use beforeJoining instead")
Function(Socket socketA, Socket socketB)? onConnect,
Function(Side sideA, Side sideB)? beforeJoining,
int backlog = 0}) async {
IOSink logSink = logger ?? stderr;
addressA ??= InternetAddress.anyIPv4;
SocketConnector connector = SocketConnector(
verbose: verbose,
logTraffic: logTraffic,
timeout: timeout,
logger: logSink,
);
int connections = 0;
// bind to a local port for side 'A'
connector._serverSocketA = await ServerSocket.bind(
addressA,
portA,
backlog: backlog,
);
StreamController<Socket> ssc = StreamController();
Mutex m = Mutex();
ssc.stream.listen((sideASocket) async {
try {
// It's important we handle these in sequence with no chance for race
// So we're going to use a mutex
await m.acquire();
Side sideA = Side(sideASocket, true, transformer: transformAtoB);
unawaited(connector.handleSingleConnection(sideA).catchError((err) {
logSink.writeln(
'ERROR $err from handleSingleConnection on sideA $sideA');
}));
if (verbose) {
logSink.writeln('Creating socket #${++connections} to the "B" side');
}
// connect to the side 'B' address and port
Socket sideBSocket = await Socket.connect(addressB, portB);
if (verbose) {
logSink.writeln('"B" side socket #$connections created');
}
Side sideB = Side(sideBSocket, false, transformer: transformBtoA);
if (verbose) {
logSink.writeln('Calling the beforeJoining callback');
}
await beforeJoining?.call(sideA, sideB);
unawaited(connector.handleSingleConnection(sideB).catchError((err) {
logSink.writeln(
'ERROR $err from handleSingleConnection on sideB $sideB');
}));
onConnect?.call(sideASocket, sideBSocket);
} finally {
m.release();
}
});
// listen on the local port and connect the inbound socket
connector._serverSocketA?.listen((sideASocket) {
if (!multi) {
try {
connector._serverSocketA?.close();
} catch (e) {
logSink.writeln('Error while closing serverSocketA: $e');
}
}
ssc.add(sideASocket);
});
return (connector);
}