diff --git a/packages/tempstore/tempStore.js b/packages/tempstore/tempStore.js index f7d4d5f0..b372dd05 100644 --- a/packages/tempstore/tempStore.js +++ b/packages/tempstore/tempStore.js @@ -293,8 +293,16 @@ FS.TempStore.createWriteStream = function(fileObj, options) { setObj['keys.' + chunkNum] = result.fileKey; tracker.update(selector, {$set: setObj}); + + var temp = tracker.findOne(selector); + + if(!temp){ + FS.debug && console.log('NOT FOUND FROM TEMPSTORE => EXIT (REMOVED)'); + return; + } + // Get updated chunkCount - var chunkCount = FS.Utility.size(tracker.findOne(selector).keys); + var chunkCount = FS.Utility.size(temp.keys); // Progress self.emit('progress', fileObj, chunkNum, chunkCount, chunkSum, result); @@ -304,6 +312,9 @@ FS.TempStore.createWriteStream = function(fileObj, options) { // We no longer need the chunk info var modifier = { $set: {}, $unset: {chunkCount: 1, chunkSum: 1, chunkSize: 1} }; + if(!fileObj.instance_id) + modifier.$set.instance_id = process.env.COLLECTIONFS_ENV_NAME_UNIQUE_ID ? process.env[process.env.COLLECTIONFS_ENV_NAME_UNIQUE_ID] : process.env.METEOR_PARENT_PID; + // Check if the file has been uploaded before if (typeof fileObj.uploadedAt === 'undefined') { // We set the uploadedAt date @@ -324,8 +335,14 @@ FS.TempStore.createWriteStream = function(fileObj, options) { // XXX is emitting "ready" necessary? self.emit('ready', fileObj, chunkCount, result); } else { - // Update the chunkCount on the fileObject - fileObj.update({ $set: {chunkCount: chunkCount} }); + + var modifier = { $set: {}}; + if(!fileObj.instance_id) + modifier.$set.instance_id = process.env.COLLECTIONFS_ENV_NAME_UNIQUE_ID ? process.env[process.env.COLLECTIONFS_ENV_NAME_UNIQUE_ID] : process.env.METEOR_PARENT_PID; + + modifier.$set.chunkCount = chunkCount; + + fileObj.update(modifier); } }); diff --git a/packages/worker/fileWorker.js b/packages/worker/fileWorker.js index b21ff870..d401b84e 100644 --- a/packages/worker/fileWorker.js +++ b/packages/worker/fileWorker.js @@ -43,12 +43,14 @@ FS.FileWorker.observe = function(fsCollection) { // Initiate observe for finding files that have been stored so we can delete // any temp files + /* fsCollection.files.find(getDoneQuery(fsCollection.options.stores)).observe({ added: function(fsFile) { FS.debug && console.log("FileWorker ADDED - calling deleteChunks for", fsFile._id); FS.TempStore.removeFile(fsFile); } }); + */ // Initiate observe for catching files that have been removed and // removing the data from all stores as well @@ -84,6 +86,7 @@ function getReadyQuery(storeName) { var selector = {uploadedAt: {$exists: true}}; selector['copies.' + storeName] = null; selector['failures.copies.' + storeName + '.doneTrying'] = {$ne: true}; + selector['instance_id'] = process.env.COLLECTIONFS_ENV_NAME_UNIQUE_ID ? process.env[process.env.COLLECTIONFS_ENV_NAME_UNIQUE_ID] : process.env.METEOR_PARENT_PID; return selector; } @@ -141,7 +144,7 @@ function getDoneQuery(stores) { tempCond['failures.copies.' + storeName + '.doneTrying'] = true; copyCond.$or.push(tempCond); selector.$and.push(copyCond); - }) + }); return selector; } @@ -172,6 +175,11 @@ function saveCopy(fsFile, storeName, options) { var writeStream = storage.adapter.createWriteStream(fsFile); var readStream = FS.TempStore.createReadStream(fsFile); + writeStream.on('finish',Meteor.bindEnvironment(function(){ + FS.debug && console.log('finish', fsFile._id); + FS.TempStore.removeFile(fsFile); + })); + // Pipe the temp data into the storage adapter readStream.pipe(writeStream); }