Merge pull request #63 from mapbox/geocoder-better-iterator
Geocoder data iterator
This commit is contained in:
+87
-1
@@ -9,6 +9,7 @@ var sm = new (require('sphericalmercator'));
|
|||||||
var sqlite3 = require('sqlite3');
|
var sqlite3 = require('sqlite3');
|
||||||
var tiletype = require('tiletype');
|
var tiletype = require('tiletype');
|
||||||
var ZXYStream = require('./zxystream');
|
var ZXYStream = require('./zxystream');
|
||||||
|
var queue = require('d3-queue').queue;
|
||||||
|
|
||||||
function noop(err) {
|
function noop(err) {
|
||||||
if (err) throw err;
|
if (err) throw err;
|
||||||
@@ -448,7 +449,6 @@ MBTiles.prototype._commit = function(callback) {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
mbtiles._db.run('COMMIT', function(err) {
|
mbtiles._db.run('COMMIT', function(err) {
|
||||||
mbtiles._committing = false;
|
mbtiles._committing = false;
|
||||||
mbtiles.emit('commit');
|
mbtiles.emit('commit');
|
||||||
@@ -616,6 +616,92 @@ MBTiles.prototype.putGeocoderData = function(type, shard, data, callback) {
|
|||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Implements carmen#geocoderDataIterator method.
|
||||||
|
MBTiles.prototype.geocoderDataIterator = function(type) {
|
||||||
|
var chunkSize = 100;
|
||||||
|
var position = 0;
|
||||||
|
var getNextIfBelow = 0.2 * chunkSize;
|
||||||
|
var nextQueue = [];
|
||||||
|
var dataQueue = [];
|
||||||
|
var doneSentinel = {};
|
||||||
|
var _this = this;
|
||||||
|
|
||||||
|
var zlibQueue = queue(1);
|
||||||
|
var inflate = function(data, callback) {
|
||||||
|
zlibQueue.defer(function(cb) {
|
||||||
|
zlib.inflate(data, function(err, buf) {
|
||||||
|
callback(err, buf);
|
||||||
|
cb();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
var sending = false;
|
||||||
|
var sendIfAvailable = function() {
|
||||||
|
if (sending) return;
|
||||||
|
sending = true;
|
||||||
|
|
||||||
|
while (nextQueue.length && dataQueue.length) {
|
||||||
|
var nextCb = nextQueue.shift(), data;
|
||||||
|
if (dataQueue[0] == doneSentinel) {
|
||||||
|
(function(nextCb) {
|
||||||
|
setImmediate(function() {
|
||||||
|
nextCb(null, {value: undefined, done: true});
|
||||||
|
});
|
||||||
|
})(nextCb);
|
||||||
|
} else {
|
||||||
|
data = dataQueue.shift();
|
||||||
|
maybeRefillBuffer();
|
||||||
|
|
||||||
|
(function(nextCb, cbValue) {
|
||||||
|
inflate(data.row.data, function(err, buf) {
|
||||||
|
nextCb(
|
||||||
|
data.err,
|
||||||
|
{value: {shard: data.row.shard, data: buf}, done: false}
|
||||||
|
);
|
||||||
|
})
|
||||||
|
})(nextCb, data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sending = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
var refilling = false;
|
||||||
|
var refillBuffer = function() {
|
||||||
|
refilling = true;
|
||||||
|
var segmentCount = 0;
|
||||||
|
_this._db.each('SELECT shard, data FROM geocoder_data WHERE type = ? ORDER BY shard limit ?,?', type, position, chunkSize, function(err, row) {
|
||||||
|
dataQueue.push({row: row, err: err});
|
||||||
|
segmentCount += 1;
|
||||||
|
sendIfAvailable();
|
||||||
|
}, function() {
|
||||||
|
refilling = false;
|
||||||
|
if (segmentCount) {
|
||||||
|
maybeRefillBuffer();
|
||||||
|
} else {
|
||||||
|
// we didn't get anything this time, so we're done
|
||||||
|
dataQueue.push(doneSentinel);
|
||||||
|
sendIfAvailable();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
position += chunkSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
var maybeRefillBuffer = function() {
|
||||||
|
if (dataQueue.length <= getNextIfBelow && !refilling && dataQueue[dataQueue.length - 1] != doneSentinel) {
|
||||||
|
refillBuffer();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
refillBuffer();
|
||||||
|
|
||||||
|
return {asyncNext: function(callback) {
|
||||||
|
nextQueue.push(callback);
|
||||||
|
sendIfAvailable();
|
||||||
|
}}
|
||||||
|
}
|
||||||
|
|
||||||
// Implements carmen#getIndexableDocs method.
|
// Implements carmen#getIndexableDocs method.
|
||||||
MBTiles.prototype.getIndexableDocs = function(pointer, callback) {
|
MBTiles.prototype.getIndexableDocs = function(pointer, callback) {
|
||||||
pointer = pointer || {};
|
pointer = pointer || {};
|
||||||
|
|||||||
@@ -26,6 +26,7 @@
|
|||||||
"Konstantin Käfer <kkaefer>"
|
"Konstantin Käfer <kkaefer>"
|
||||||
],
|
],
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
|
"d3-queue": "~2.0.3",
|
||||||
"tiletype": "0.1.x",
|
"tiletype": "0.1.x",
|
||||||
"sqlite3": "3.x",
|
"sqlite3": "3.x",
|
||||||
"sphericalmercator": "~1.0.1"
|
"sphericalmercator": "~1.0.1"
|
||||||
|
|||||||
@@ -2,6 +2,8 @@ var fs = require('fs');
|
|||||||
var util = require('util');
|
var util = require('util');
|
||||||
var MBTiles = require('..');
|
var MBTiles = require('..');
|
||||||
var tape = require('tape');
|
var tape = require('tape');
|
||||||
|
var queue = require('d3-queue').queue;
|
||||||
|
var crypto = require('crypto');
|
||||||
|
|
||||||
var expected = {
|
var expected = {
|
||||||
bounds: '-141.005548666451,41.6690855919108,-52.615930948992,83.1161164353916',
|
bounds: '-141.005548666451,41.6690855919108,-52.615930948992,83.1161164353916',
|
||||||
@@ -74,6 +76,53 @@ tape('putGeocoderData', function(assert) {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
tape('geocoderDataIterator', function(assert) {
|
||||||
|
to.startWriting(function(err) {
|
||||||
|
assert.ifError(err);
|
||||||
|
// get a bunch of shards of different sizes and put them in in an arbitrary order
|
||||||
|
var shardIds = {};
|
||||||
|
var q = queue()
|
||||||
|
while (true) {
|
||||||
|
var id = Math.floor(Math.random() * Math.pow(2, 16));
|
||||||
|
if (shardIds[id]) continue;
|
||||||
|
|
||||||
|
shardIds[id] = 1;
|
||||||
|
q.defer(function(id, cb) {
|
||||||
|
to.putGeocoderData("term", id, crypto.randomBytes(Math.floor(Math.random()* 1024 * 1024)), cb);
|
||||||
|
}, id);
|
||||||
|
|
||||||
|
if (Object.keys(shardIds).length >= 50) break;
|
||||||
|
}
|
||||||
|
q.awaitAll(function() {
|
||||||
|
to.stopWriting(function(err) {
|
||||||
|
var it = to.geocoderDataIterator("term");
|
||||||
|
var data = [];
|
||||||
|
var n = function(err, item) {
|
||||||
|
assert.ifError(err);
|
||||||
|
if (item.done) {
|
||||||
|
assert.equal(data.length, 51, "iterator produces 51 shards");
|
||||||
|
|
||||||
|
assert.equal(data[0].shard, 0);
|
||||||
|
assert.equal(data[0].data.toString(), "asdf", "first shard data is preserved");
|
||||||
|
|
||||||
|
var sorted = true;
|
||||||
|
for (var i = 1; i < data.length; i++) {
|
||||||
|
if (data[i - 1].shard >= data[i].shard) sorted = false;
|
||||||
|
}
|
||||||
|
assert.equal(sorted, true, "shards come back in order");
|
||||||
|
|
||||||
|
assert.end();
|
||||||
|
} else {
|
||||||
|
data.push(item.value);
|
||||||
|
it.asyncNext(n);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
it.asyncNext(n);
|
||||||
|
});
|
||||||
|
})
|
||||||
|
});
|
||||||
|
})
|
||||||
|
|
||||||
tape('getIndexableDocs', function(assert) {
|
tape('getIndexableDocs', function(assert) {
|
||||||
from.getIndexableDocs({ limit: 10 }, function(err, docs, pointer) {
|
from.getIndexableDocs({ limit: 10 }, function(err, docs, pointer) {
|
||||||
assert.ifError(err);
|
assert.ifError(err);
|
||||||
|
|||||||
Reference in New Issue
Block a user