streamHttpRequest function
Future<Stream<HttpStreamOutput?> >
streamHttpRequest(
- String requestId,
- APIType apiType,
- HttpRequestModel httpRequestModel, {
- SupportedUriSchemes defaultUriScheme = kDefaultUriScheme,
- bool noSSL = false,
Implementation
Future<Stream<HttpStreamOutput>> streamHttpRequest(
String requestId,
APIType apiType,
HttpRequestModel httpRequestModel, {
SupportedUriSchemes defaultUriScheme = kDefaultUriScheme,
bool noSSL = false,
}) async {
final authData = httpRequestModel.authModel;
final controller = StreamController<HttpStreamOutput>();
StreamSubscription<List<int>?>? subscription;
final stopwatch = Stopwatch()..start();
Future<void> _cleanup() async {
stopwatch.stop();
await subscription?.cancel();
httpClientManager.closeClient(requestId);
await Future.microtask(() {});
controller.close();
}
Future<void> _addCancelledMessage() async {
if (!controller.isClosed) {
controller.add((null, null, null, kMsgRequestCancelled));
}
httpClientManager.removeCancelledRequest(requestId);
await _cleanup();
}
Future<void> _addErrorMessage(dynamic error) async {
await Future.microtask(() {});
if (httpClientManager.wasRequestCancelled(requestId)) {
await _addCancelledMessage();
} else {
controller.add((null, null, null, error.toString()));
await _cleanup();
}
}
controller.onCancel = () async {
await subscription?.cancel();
httpClientManager.cancelRequest(requestId);
};
if (httpClientManager.wasRequestCancelled(requestId)) {
await _addCancelledMessage();
return controller.stream;
}
final client = httpClientManager.createClient(requestId, noSSL: noSSL);
HttpRequestModel authenticatedHttpRequestModel = httpRequestModel.copyWith();
try {
if (authData != null && authData.type != APIAuthType.none) {
authenticatedHttpRequestModel = await handleAuth(
httpRequestModel,
authData,
);
}
} catch (e) {
await _addErrorMessage(e.toString());
return controller.stream;
}
final (uri, uriError) = getValidRequestUri(
authenticatedHttpRequestModel.url,
authenticatedHttpRequestModel.enabledParams,
defaultUriScheme: defaultUriScheme,
);
if (uri == null) {
await _addErrorMessage(uriError ?? 'Invalid URL');
return controller.stream;
}
try {
final streamedResponse = await makeStreamedRequest(
client: client,
uri: uri,
requestModel: authenticatedHttpRequestModel,
apiType: apiType,
);
HttpResponse _createResponseFromBytes(List<int> bytes) {
return HttpResponse.bytes(
bytes,
streamedResponse.statusCode,
request: streamedResponse.request,
headers: streamedResponse.headers,
isRedirect: streamedResponse.isRedirect,
persistentConnection: streamedResponse.persistentConnection,
reasonPhrase: streamedResponse.reasonPhrase,
);
}
final contentType =
getMediaTypeFromHeaders(streamedResponse.headers)?.mimeType ?? '';
final chunkList = <List<int>>[];
subscription = streamedResponse.stream.listen(
(bytes) async {
if (controller.isClosed) return;
final isStreaming = kStreamingResponseTypes.contains(contentType);
if (isStreaming) {
final response = _createResponseFromBytes(bytes);
controller.add((true, response, stopwatch.elapsed, null));
} else {
chunkList.add(bytes);
}
},
onDone: () async {
if (chunkList.isNotEmpty && !controller.isClosed) {
final allBytes = chunkList.expand((x) => x).toList();
final response = _createResponseFromBytes(allBytes);
final isStreaming = kStreamingResponseTypes.contains(contentType);
controller.add((isStreaming, response, stopwatch.elapsed, null));
chunkList.clear();
} else {
final response = _createResponseFromBytes([]);
controller.add((false, response, stopwatch.elapsed, null));
}
await _cleanup();
},
onError: _addErrorMessage,
);
return controller.stream;
} catch (e) {
await _addErrorMessage(e);
return controller.stream;
}
}