stream footer

This commit is contained in:
Emily Hou 2018-06-25 11:26:48 -07:00
parent a4cf46c0eb
commit 9d04514f8e
3 changed files with 36 additions and 8 deletions

View file

@ -115,6 +115,7 @@ function listenForResponse(ws, canceller) {
});
}
} catch (e) {
ws.close();
canceller.cancelled = true;
canceller.error = e;
reject(e);
@ -134,7 +135,6 @@ async function upload(
const host = window.location.hostname;
const port = window.location.port;
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const error = { cancelled: false };
const ws = await asyncInitWebSocket(`${protocol}//${host}:${port}/api/ws`);
try {
@ -144,7 +144,7 @@ async function upload(
authorization: `send-v1 ${verifierB64}`
};
const responsePromise = listenForResponse(ws, error);
const responsePromise = listenForResponse(ws, canceller);
ws.send(JSON.stringify(fileMeta));
@ -154,17 +154,17 @@ async function upload(
while (!state.done) {
const buf = state.value;
if (canceller.cancelled) {
throw new Error(0);
}
if (error.cancelled) {
throw new Error(error.error);
throw canceller.error;
}
ws.send(buf);
onprogress([Math.min(streamInfo.fileSize, size), streamInfo.fileSize]);
size += streamInfo.recordSize;
state = await reader.read();
}
const footer = new Uint8Array([0]);
ws.send(footer);
const response = await responsePromise; //promise only fufills if response is good
ws.close();
@ -180,6 +180,7 @@ export function uploadWs(encrypted, info, metadata, verifierB64, onprogress) {
return {
cancel: function() {
canceller.error = new Error(0);
canceller.cancelled = true;
},
result: upload(

View file

@ -3,6 +3,7 @@ const storage = require('../storage');
const config = require('../config');
const mozlog = require('../log');
const Limiter = require('../limiter');
const Parser = require('../streamparser');
const wsStream = require('websocket-stream/stream');
const log = mozlog('send.upload');
@ -45,7 +46,10 @@ module.exports = async function(ws, req) {
const url = `${protocol}://${req.get('host')}/download/${newId}/`;
const limiter = new Limiter(config.max_file_size);
fileStream = wsStream(ws, { binary: true }).pipe(limiter);
const parser = new Parser();
fileStream = wsStream(ws, { binary: true })
.pipe(limiter)
.pipe(parser);
storage.set(newId, fileStream, meta);
ws.send(
@ -60,7 +64,7 @@ module.exports = async function(ws, req) {
log.error('upload', e);
ws.send(
JSON.stringify({
error: 500
error: e === 'limit' ? 413 : 500
})
);
ws.close();

23
server/streamparser.js Normal file
View file

@ -0,0 +1,23 @@
const { Transform } = require('stream');
class StreamParser extends Transform {
constructor() {
super();
let res;
this.promise = new Promise(resolve => {
res = resolve;
});
this.res = res;
}
_transform(chunk, encoding, callback) {
if (chunk.byteLength === 1 && chunk[0] === 0) {
this.res();
} else {
this.push(chunk);
}
callback();
}
}
module.exports = StreamParser;