send/app/streams.js

104 lines
2.5 KiB
JavaScript
Raw Permalink Normal View History

2019-08-02 19:03:53 +00:00
/* global TransformStream */
2018-07-17 18:40:01 +00:00
export function transformStream(readable, transformer, oncancel) {
2018-07-26 05:26:11 +00:00
try {
2018-07-19 21:46:12 +00:00
return readable.pipeThrough(new TransformStream(transformer));
2018-07-26 05:26:11 +00:00
} catch (e) {
const reader = readable.getReader();
return new ReadableStream({
start(controller) {
if (transformer.start) {
return transformer.start(controller);
2018-07-18 23:39:14 +00:00
}
2018-07-26 05:26:11 +00:00
},
async pull(controller) {
let enqueued = false;
const wrappedController = {
enqueue(d) {
enqueued = true;
controller.enqueue(d);
2018-07-18 23:39:14 +00:00
}
2018-07-26 05:26:11 +00:00
};
while (!enqueued) {
const data = await reader.read();
if (data.done) {
if (transformer.flush) {
await transformer.flush(controller);
}
return controller.close();
}
await transformer.transform(data.value, wrappedController);
}
},
cancel(reason) {
readable.cancel(reason);
if (oncancel) {
oncancel(reason);
2018-07-18 23:39:14 +00:00
}
}
2018-07-26 05:26:11 +00:00
});
}
}
class BlobStreamController {
constructor(blob, size) {
this.blob = blob;
this.index = 0;
this.chunkSize = size || 1024 * 64;
}
pull(controller) {
return new Promise((resolve, reject) => {
const bytesLeft = this.blob.size - this.index;
if (bytesLeft <= 0) {
controller.close();
return resolve();
}
2018-07-26 05:26:11 +00:00
const size = Math.min(this.chunkSize, bytesLeft);
const slice = this.blob.slice(this.index, this.index + size);
const reader = new FileReader();
reader.onload = () => {
controller.enqueue(new Uint8Array(reader.result));
resolve();
};
reader.onerror = reject;
reader.readAsArrayBuffer(slice);
this.index += size;
});
}
}
export function blobStream(blob, size) {
return new ReadableStream(new BlobStreamController(blob, size));
}
class ConcatStreamController {
constructor(streams) {
this.streams = streams;
this.index = 0;
this.reader = null;
this.nextReader();
}
nextReader() {
const next = this.streams[this.index++];
this.reader = next && next.getReader();
}
async pull(controller) {
if (!this.reader) {
return controller.close();
}
const data = await this.reader.read();
if (data.done) {
this.nextReader();
return this.pull(controller);
2018-07-18 23:39:14 +00:00
}
2018-07-26 05:26:11 +00:00
controller.enqueue(data.value);
}
}
export function concatStream(streams) {
return new ReadableStream(new ConcatStreamController(streams));
2018-07-17 18:40:01 +00:00
}