Ditch inflateSync in favor of plain inflate; test iteration order; add d3-queue dep
This commit is contained in:
@@ -9,6 +9,7 @@ var sm = new (require('sphericalmercator'));
|
||||
var sqlite3 = require('sqlite3');
|
||||
var tiletype = require('tiletype');
|
||||
var ZXYStream = require('./zxystream');
|
||||
var queue = require('d3-queue').queue;
|
||||
|
||||
function noop(err) {
|
||||
if (err) throw err;
|
||||
@@ -625,34 +626,42 @@ MBTiles.prototype.geocoderDataIterator = function(type) {
|
||||
var doneSentinel = {};
|
||||
var _this = this;
|
||||
|
||||
var zlibQueue = queue(1);
|
||||
var inflate = function(data, callback) {
|
||||
zlibQueue.defer(function(cb) {
|
||||
zlib.inflate(data, function(err, buf) {
|
||||
callback(err, buf);
|
||||
cb();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
var sending = false;
|
||||
var sendIfAvailable = function() {
|
||||
if (sending) return;
|
||||
sending = true;
|
||||
|
||||
while (nextQueue.length && dataQueue.length) {
|
||||
var nextCb = nextQueue.shift(), data, cbValue;
|
||||
var nextCb = nextQueue.shift(), data;
|
||||
if (dataQueue[0] == doneSentinel) {
|
||||
cbValue = {
|
||||
err: null,
|
||||
row: {value: undefined, done: true}
|
||||
}
|
||||
(function(nextCb) {
|
||||
setImmediate(function() {
|
||||
nextCb(null, {value: undefined, done: true});
|
||||
});
|
||||
})(nextCb);
|
||||
} else {
|
||||
data = dataQueue.shift();
|
||||
maybeRefillBuffer();
|
||||
|
||||
cbValue = {
|
||||
err: data.err,
|
||||
row: {value: {shard: data.row.shard, data: zlib.inflateSync(data.row.data)}, done: false}
|
||||
};
|
||||
(function(nextCb, cbValue) {
|
||||
inflate(data.row.data, function(err, buf) {
|
||||
nextCb(
|
||||
data.err,
|
||||
{value: {shard: data.row.shard, data: buf}, done: false}
|
||||
);
|
||||
})
|
||||
})(nextCb, data);
|
||||
}
|
||||
// bind the callback and data now so that they don't change before the setImmediate
|
||||
// callback is executed
|
||||
(function(nextCb, cbValue) {
|
||||
setImmediate(function() {
|
||||
nextCb(cbValue.err, cbValue.row);
|
||||
});
|
||||
})(nextCb, cbValue);
|
||||
}
|
||||
|
||||
sending = false;
|
||||
|
||||
@@ -26,6 +26,7 @@
|
||||
"Konstantin Käfer <kkaefer>"
|
||||
],
|
||||
"dependencies": {
|
||||
"d3-queue": "~2.0.3",
|
||||
"tiletype": "0.1.x",
|
||||
"sqlite3": "3.x",
|
||||
"sphericalmercator": "~1.0.1"
|
||||
|
||||
@@ -2,6 +2,8 @@ var fs = require('fs');
|
||||
var util = require('util');
|
||||
var MBTiles = require('..');
|
||||
var tape = require('tape');
|
||||
var queue = require('d3-queue').queue;
|
||||
var crypto = require('crypto');
|
||||
|
||||
var expected = {
|
||||
bounds: '-141.005548666451,41.6690855919108,-52.615930948992,83.1161164353916',
|
||||
@@ -62,15 +64,12 @@ tape('putGeocoderData', function(assert) {
|
||||
assert.ifError(err);
|
||||
to.putGeocoderData('term', 0, new Buffer('asdf'), function(err) {
|
||||
assert.ifError(err);
|
||||
to.putGeocoderData('term', 1, new Buffer('ZZZZZ'), function(err) {
|
||||
to.stopWriting(function(err) {
|
||||
assert.ifError(err);
|
||||
to.stopWriting(function(err) {
|
||||
to.getGeocoderData('term', 0, function(err, buffer) {
|
||||
assert.ifError(err);
|
||||
to.getGeocoderData('term', 0, function(err, buffer) {
|
||||
assert.ifError(err);
|
||||
assert.deepEqual('asdf', buffer.toString());
|
||||
assert.end();
|
||||
});
|
||||
assert.deepEqual('asdf', buffer.toString());
|
||||
assert.end();
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -78,26 +77,50 @@ tape('putGeocoderData', function(assert) {
|
||||
});
|
||||
|
||||
tape('geocoderDataIterator', function(assert) {
|
||||
var it = to.geocoderDataIterator("term");
|
||||
var data = [];
|
||||
var n = function(err, item) {
|
||||
to.startWriting(function(err) {
|
||||
assert.ifError(err);
|
||||
if (item.done) {
|
||||
assert.equal(data.length, 2, "iterator produces two shards");
|
||||
// get a bunch of shards of different sizes and put them in in an arbitrary order
|
||||
var shardIds = {};
|
||||
var q = queue()
|
||||
while (true) {
|
||||
var id = Math.floor(Math.random() * Math.pow(2, 16));
|
||||
if (shardIds[id]) continue;
|
||||
|
||||
assert.equal(data[0].shard, 0);
|
||||
assert.equal(data[0].data.toString(), "asdf");
|
||||
shardIds[id] = 1;
|
||||
q.defer(function(id, cb) {
|
||||
to.putGeocoderData("term", id, crypto.randomBytes(Math.floor(Math.random()* 1024 * 1024)), cb);
|
||||
}, id);
|
||||
|
||||
assert.equal(data[1].shard, 1);
|
||||
assert.equal(data[1].data.toString(), "ZZZZZ");
|
||||
|
||||
assert.end();
|
||||
} else {
|
||||
data.push(item.value);
|
||||
it.asyncNext(n);
|
||||
if (Object.keys(shardIds).length >= 50) break;
|
||||
}
|
||||
}
|
||||
it.asyncNext(n);
|
||||
q.awaitAll(function() {
|
||||
to.stopWriting(function(err) {
|
||||
var it = to.geocoderDataIterator("term");
|
||||
var data = [];
|
||||
var n = function(err, item) {
|
||||
assert.ifError(err);
|
||||
if (item.done) {
|
||||
assert.equal(data.length, 51, "iterator produces 51 shards");
|
||||
|
||||
assert.equal(data[0].shard, 0);
|
||||
assert.equal(data[0].data.toString(), "asdf", "first shard data is preserved");
|
||||
|
||||
var sorted = true;
|
||||
for (var i = 1; i < data.length; i++) {
|
||||
if (data[i - 1].shard >= data[i].shard) sorted = false;
|
||||
}
|
||||
assert.equal(sorted, true, "shards come back in order");
|
||||
|
||||
assert.end();
|
||||
} else {
|
||||
data.push(item.value);
|
||||
it.asyncNext(n);
|
||||
}
|
||||
}
|
||||
it.asyncNext(n);
|
||||
});
|
||||
})
|
||||
});
|
||||
})
|
||||
|
||||
tape('getIndexableDocs', function(assert) {
|
||||
|
||||
Reference in New Issue
Block a user