send/server/storage.js

286 lines
5.9 KiB
JavaScript
Raw Normal View History

const AWS = require('aws-sdk');
const s3 = new AWS.S3();
const mkdirp = require('mkdirp');
const config = require('./config');
const fs = require('fs');
const path = require('path');
const mozlog = require('./log');
2017-06-08 20:45:28 +00:00
2017-07-11 19:34:49 +00:00
const log = mozlog('send.storage');
2017-06-08 20:45:28 +00:00
const redis_lib =
config.env === 'development' && config.redis_host === 'localhost'
? 'redis-mock'
: 'redis';
const redis = require(redis_lib);
2017-06-07 23:46:48 +00:00
const redis_client = redis.createClient({
host: config.redis_host,
2017-06-23 18:14:33 +00:00
connect_timeout: 10000
2017-06-07 23:46:48 +00:00
});
redis_client.on('error', err => {
2017-08-04 04:47:03 +00:00
log.error('Redis:', err);
});
const fileDir = config.file_dir;
if (config.s3_bucket) {
module.exports = {
exists: exists,
2017-07-20 22:16:00 +00:00
ttl: ttl,
length: awsLength,
get: awsGet,
set: awsSet,
2017-06-20 21:33:28 +00:00
setField: setField,
delete: awsDelete,
2017-06-23 18:14:33 +00:00
forceDelete: awsForceDelete,
2017-06-29 17:27:36 +00:00
ping: awsPing,
2017-07-11 19:47:40 +00:00
flushall: flushall,
quit: quit,
2017-06-29 17:27:36 +00:00
metadata
};
} else {
mkdirp.sync(config.file_dir);
log.info('fileDir', fileDir);
module.exports = {
exists: exists,
2017-07-20 22:16:00 +00:00
ttl: ttl,
length: localLength,
get: localGet,
set: localSet,
2017-06-20 21:33:28 +00:00
setField: setField,
delete: localDelete,
2017-06-23 18:14:33 +00:00
forceDelete: localForceDelete,
2017-06-29 17:27:36 +00:00
ping: localPing,
2017-07-11 19:47:40 +00:00
flushall: flushall,
quit: quit,
2017-06-29 17:27:36 +00:00
metadata
};
}
if (config.redis_event_expire) {
const forceDelete = config.s3_bucket ? awsForceDelete : localForceDelete;
const redis_sub = redis_client.duplicate();
const subKey = '__keyevent@0__:expired';
redis_sub.psubscribe(subKey, function() {
log.info('Redis:', 'subscribed to expired key events');
});
redis_sub.on('pmessage', function(channel, message, id) {
log.info('RedisExpired:', id);
forceDelete(id);
});
}
2017-07-11 19:47:40 +00:00
function flushall() {
redis_client.flushdb();
}
function quit() {
redis_client.quit();
}
2017-06-29 17:27:36 +00:00
function metadata(id) {
return new Promise((resolve, reject) => {
redis_client.hgetall(id, (err, reply) => {
if (err || !reply) {
return reject(err);
2017-06-29 17:27:36 +00:00
}
resolve(reply);
2017-06-29 17:30:08 +00:00
});
});
2017-06-29 17:27:36 +00:00
}
2017-07-20 22:16:00 +00:00
function ttl(id) {
return new Promise((resolve, reject) => {
redis_client.ttl(id, (err, reply) => {
if (err || !reply) {
return reject(err);
2017-07-20 22:16:00 +00:00
}
resolve(reply * 1000);
2017-07-22 00:01:26 +00:00
});
});
2017-07-20 22:16:00 +00:00
}
function exists(id) {
2017-06-08 20:45:28 +00:00
return new Promise((resolve, reject) => {
redis_client.exists(id, (rediserr, reply) => {
2017-06-23 18:42:51 +00:00
if (reply === 1 && !rediserr) {
resolve();
} else {
reject(rediserr);
}
2017-06-08 20:45:28 +00:00
});
});
}
2017-06-20 21:33:28 +00:00
function setField(id, key, value) {
redis_client.hset(id, key, value);
}
function localLength(id) {
return new Promise((resolve, reject) => {
try {
resolve(fs.statSync(path.join(fileDir, id)).size);
} catch (err) {
reject();
}
});
}
function localGet(id) {
return fs.createReadStream(path.join(fileDir, id));
}
function localSet(newId, file, meta) {
return new Promise((resolve, reject) => {
const filepath = path.join(fileDir, newId);
2017-07-20 19:50:20 +00:00
const fstream = fs.createWriteStream(filepath);
file.pipe(fstream);
2017-07-20 19:50:20 +00:00
file.on('limit', () => {
file.unpipe(fstream);
fstream.destroy(new Error('limit'));
});
fstream.on('finish', () => {
redis_client.hmset(newId, meta);
redis_client.expire(newId, config.expire_seconds);
log.info('localSet:', 'Upload Finished of ' + newId);
2017-11-30 21:41:09 +00:00
resolve(meta.owner);
});
2017-07-20 19:50:20 +00:00
fstream.on('error', err => {
log.error('localSet:', 'Failed upload of ' + newId);
2017-07-20 19:50:20 +00:00
fs.unlinkSync(filepath);
reject(err);
});
});
}
2017-11-30 21:41:09 +00:00
function localDelete(id, ownerToken) {
return new Promise((resolve, reject) => {
redis_client.hget(id, 'delete', (err, reply) => {
2017-11-30 21:41:09 +00:00
if (!reply || ownerToken !== reply) {
reject();
} else {
redis_client.del(id);
2017-08-04 04:47:03 +00:00
log.info('Deleted:', id);
resolve(fs.unlinkSync(path.join(fileDir, id)));
}
});
});
}
function localForceDelete(id) {
return new Promise((resolve, reject) => {
redis_client.del(id);
resolve(fs.unlinkSync(path.join(fileDir, id)));
});
}
2017-06-23 18:14:33 +00:00
function localPing() {
return new Promise((resolve, reject) => {
redis_client.ping(err => {
return err ? reject() : resolve();
});
});
}
function awsLength(id) {
2017-06-09 17:44:12 +00:00
const params = {
Bucket: config.s3_bucket,
Key: id
};
return new Promise((resolve, reject) => {
s3.headObject(params, function(err, data) {
if (!err) {
resolve(data.ContentLength);
} else {
reject();
}
});
});
}
function awsGet(id) {
2017-06-09 17:44:12 +00:00
const params = {
Bucket: config.s3_bucket,
Key: id
};
2017-06-20 17:21:11 +00:00
try {
return s3.getObject(params).createReadStream();
2017-06-20 19:52:01 +00:00
} catch (err) {
2017-06-20 17:21:11 +00:00
return null;
}
}
function awsSet(newId, file, meta) {
2017-06-09 17:44:12 +00:00
const params = {
Bucket: config.s3_bucket,
Key: newId,
Body: file
};
2017-07-20 19:50:20 +00:00
let hitLimit = false;
const upload = s3.upload(params);
file.on('limit', () => {
hitLimit = true;
upload.abort();
});
2017-07-22 00:01:26 +00:00
return upload.promise().then(
() => {
2017-07-20 19:50:20 +00:00
redis_client.hmset(newId, meta);
redis_client.expire(newId, config.expire_seconds);
2017-07-20 19:50:20 +00:00
},
err => {
if (hitLimit) {
throw new Error('limit');
} else {
2017-07-20 19:50:20 +00:00
throw err;
}
2017-07-22 00:01:26 +00:00
}
);
}
2017-11-30 21:41:09 +00:00
function awsDelete(id, ownerToken) {
return new Promise((resolve, reject) => {
redis_client.hget(id, 'delete', (err, reply) => {
2017-11-30 21:41:09 +00:00
if (!reply || ownerToken !== reply) {
reject();
} else {
2017-06-09 17:44:12 +00:00
const params = {
Bucket: config.s3_bucket,
Key: id
};
2017-06-09 17:44:12 +00:00
s3.deleteObject(params, function(err, _data) {
2017-07-18 17:52:32 +00:00
redis_client.del(id);
2017-06-19 20:37:56 +00:00
err ? reject(err) : resolve(err);
});
}
});
});
}
function awsForceDelete(id) {
return new Promise((resolve, reject) => {
2017-06-09 17:44:12 +00:00
const params = {
Bucket: config.s3_bucket,
Key: id
};
2017-06-09 17:44:12 +00:00
s3.deleteObject(params, function(err, _data) {
2017-07-18 17:52:32 +00:00
redis_client.del(id);
2017-08-04 04:47:03 +00:00
err ? reject(err) : resolve();
});
});
}
2017-06-23 18:14:33 +00:00
function awsPing() {
return localPing().then(() =>
s3.headBucket({ Bucket: config.s3_bucket }).promise()
2017-06-23 18:14:33 +00:00
);
}