subscribe<T> method

Future<WebsocketSubscription<T>> subscribe<T>(
  1. JsonRpcSubscribeMethod method,
  2. JsonRpcNotificationDecoder<T> decoder, {
  3. WebsocketOnDataHandler<T>? onData,
  4. WebsocketOnErrorHandler? onError,
  5. WebsocketOnDoneHandler? onDone,
  6. bool cancelOnError = false,
  7. bool autoUnsubscribes = false,
  8. Duration? timeLimit,
})

Subscribes to the JSON RPC websocket notification of method.

The notification's result is mapped to type T by the decoder and forwarded to all notification listeners.

if cancelOnError the subscription will be cancelled if the websocket stream emits any error or the subscription times out.

Implementation

Future<WebsocketSubscription<T>> subscribe<T>(
  final JsonRpcSubscribeMethod method,
  final JsonRpcNotificationDecoder<T> decoder, {
  final WebsocketOnDataHandler<T>? onData,
  final WebsocketOnErrorHandler? onError,
  final WebsocketOnDoneHandler? onDone,
  final bool cancelOnError = false,
  final bool autoUnsubscribes = false,
  final Duration? timeLimit,
}) async {
  /// Subscribe to the JSON RPC notifications.
  final JsonRpcSubscribeResponse response = await websocketClient.send(
    method.request(commitment),
    method.response,
    config: websocketClientConfig,
  );

  /// Get the subscription id.
  final SubscriptionId subscriptionId = response.result!;

  /// Get or create a notification dispatcher for the subscription.
  final WebsocketNotifier<T> notifier =
      (_notifiers[subscriptionId] ??= WebsocketNotifier<T>(
    method.method,
    subscriptionId: subscriptionId,
    cancelOnError: cancelOnError,
    autoUnsubscribes: autoUnsubscribes,
    decoder: decoder,
    timeLimit: timeLimit,
    onTimeout: () => _onTimeout(subscriptionId),
  )) as WebsocketNotifier<T>;

  /// Add a listener to the dispatcher's queue.
  return notifier.addListener(
    onData: onData,
    onError: onError,
    onDone: onDone,
  );
}