Ditch geocoderDataForEach (it's a memory hog) and instead go with an async iterator method with an internal buffer, implemented around sqlite limits, to give caller more control over reads
This commit is contained in:
+58
-9
@@ -615,15 +615,64 @@ MBTiles.prototype.putGeocoderData = function(type, shard, data, callback) {
|
|||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
// Implements carmen#getGeocoderData method.
|
// Implements carmen#geocoderDataIterator method.
|
||||||
MBTiles.prototype.geocoderDataForEach = function(type, callback, completeCallback) {
|
MBTiles.prototype.geocoderDataIterator = function(type) {
|
||||||
return this._db.each('SELECT shard, data FROM geocoder_data WHERE type = ? ORDER BY shard', type, function(err, row) {
|
var chunkSize = 100;
|
||||||
if (err && err.code === 'SQLITE_ERROR' && err.errno === 1) return callback();
|
var position = 0;
|
||||||
if (err) return callback(err);
|
var getNextIfBelow = 0.2 * chunkSize;
|
||||||
if (!row) return callback();
|
var nextQueue = [];
|
||||||
callback(row.shard, zlib.inflateSync(row.data));
|
var dataQueue = [];
|
||||||
}, completeCallback);
|
var doneSentinel = {};
|
||||||
};
|
var _this = this;
|
||||||
|
|
||||||
|
var sendIfAvailable = function() {
|
||||||
|
while (nextQueue.length && dataQueue.length) {
|
||||||
|
var nextCb = nextQueue.shift(), data;
|
||||||
|
if (dataQueue[0] == doneSentinel) {
|
||||||
|
nextCb({value: undefined, done: true});
|
||||||
|
} else {
|
||||||
|
data = dataQueue.shift();
|
||||||
|
maybeRefillBuffer();
|
||||||
|
|
||||||
|
nextCb({value: {shard: data.shard, data: zlib.inflateSync(data.data)}, done: false});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var refilling = false;
|
||||||
|
var refillBuffer = function() {
|
||||||
|
refilling = true;
|
||||||
|
var segmentCount = 0;
|
||||||
|
_this._db.each('SELECT shard, data FROM geocoder_data WHERE type = ? ORDER BY shard limit ?,?', type, position, chunkSize, function(err, row) {
|
||||||
|
dataQueue.push(row);
|
||||||
|
segmentCount += 1;
|
||||||
|
sendIfAvailable();
|
||||||
|
}, function() {
|
||||||
|
refilling = false;
|
||||||
|
if (segmentCount) {
|
||||||
|
maybeRefillBuffer();
|
||||||
|
} else {
|
||||||
|
// we didn't get anything this time, so we're done
|
||||||
|
dataQueue.push(doneSentinel);
|
||||||
|
sendIfAvailable();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
position += chunkSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
var maybeRefillBuffer = function() {
|
||||||
|
if (dataQueue.length <= getNextIfBelow && !refilling && dataQueue[dataQueue.length - 1] != doneSentinel) {
|
||||||
|
refillBuffer();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
refillBuffer();
|
||||||
|
|
||||||
|
return {asyncNext: function(callback) {
|
||||||
|
nextQueue.push(callback);
|
||||||
|
sendIfAvailable();
|
||||||
|
}}
|
||||||
|
}
|
||||||
|
|
||||||
// Implements carmen#getIndexableDocs method.
|
// Implements carmen#getIndexableDocs method.
|
||||||
MBTiles.prototype.getIndexableDocs = function(pointer, callback) {
|
MBTiles.prototype.getIndexableDocs = function(pointer, callback) {
|
||||||
|
|||||||
Reference in New Issue
Block a user