execute method
Future
execute(
- dynamic _params
)
override
Implementation
@override
Future execute(dynamic _params) async {
print("Upload large file execute");
this.params = _params as UploadFileParams;
total = params.file.size.toDouble();
workPhase = WorkPhase.processing;
sendStatus();
this.sendUpdate(
message:
"file: ${params.file.name} size ${params.file.size.formatBytes()}");
var pfile = params.file;
this.sendUpdate(message: "found file: ${pfile}");
uploads = UploadsApi(
ApiClient(
transport: DioLibTransport(basePath: params.apiBasePath!),
authentication: BearerAuthentication(params.apiToken!),
serializer: AggregateApiReader(
PrimitiveApiReader(),
UploadsApiReader(),
ApiReader.mmodel(),
),
),
);
// final _progress = progress ?? ProgressTracker<Uri>.ratio();
String keyName;
this.sendUpdate(
message:
"Starting upload... Total of ${params.file.size.formatBytes()}");
keyName = params.keyName!;
StartUpload upload;
try {
upload = await uploads.startUpload(
body: UploadRequest.of(path: keyName),
mediaParams: params.mediaType == null || params.mediaUrl == null
? null
: {
"mediaType": params.mediaType,
"fileName": params.file.name,
"fileSize": params.file.size,
"mediaUrl": params.mediaUrl,
});
} catch (e, stack) {
log.severe(e, stack);
message = "Failed to start upload: $e";
rethrow;
}
try {
if (upload.mediaId != null) {
sendUpdate(message: "Found a duplicate!", progress: 100);
return;
}
uploadId = upload.uploadId!;
log.info("Started upload $uploadId");
sendUpdate(message: "Starting to buffer: $uploadId", progress: 10);
log.info("Starting upload of ${pfile.name}");
/// 5mb min size
var uploadSize = 1024 * 1024 * 5;
_currentPart = 1;
var _buf = BytesBuffer();
final _parts = <Future<ETagResponse>>[];
final locks = LocalSemaphore(3);
await for (var chunk in pfile.openStream()) {
if (this.isShuttingDown) {
return;
}
_buf.add(chunk);
_seen += chunk.length;
if (_buf.length >= uploadSize) {
/// upload chunk
_currentPart = _currentPart + 1;
final me = _currentPart;
var _b = _buf.toBytes();
_buf = BytesBuffer();
final _comp = Completer<ETagResponse>();
_parts.add(_comp.future);
await locks.acquire();
try {
log.info("Starting upload ${_currentPart + 1}");
sendUpdate(
message: "Buffered enough: Uploading ${_currentPart - 1}",
state: {
"uploadId": uploadId,
"currentPart": _currentPart,
},
);
final p = await uploadPart(me, _b);
_comp.complete(p);
} finally {
locks.release();
}
}
}
/// Wait for any pending items before continuing
final parts = [...(await Future.wait(_parts))];
/// Take care of the last item
if (_buf.length > 0) {
_currentPart = _currentPart + 1;
final me = _currentPart;
parts.add(await uploadPart(me, _buf.toBytes()));
}
sendUpdate(
message: "Parts are done! Now we need to finalize!", progress: 95);
log.info("Completed uploading ${_parts.length} parts for $uploadId");
var resp = await uploads.completeUpload(uploadId,
body: FinishUpload.of(
keyName: keyName,
parts: parts,
));
log.info("Completed $uploadId status of ${resp}");
sendUpdate(message: "Done!");
} catch (e, s) {
log.severe(e, s);
rethrow;
}
}