diff --git a/lib/mbtiles.js b/lib/mbtiles.js index a7388b8..8491ede 100644 --- a/lib/mbtiles.js +++ b/lib/mbtiles.js @@ -9,6 +9,7 @@ var sm = new (require('sphericalmercator')); var sqlite3 = require('sqlite3'); var tiletype = require('tiletype'); var ZXYStream = require('./zxystream'); +var queue = require('d3-queue').queue; function noop(err) { if (err) throw err; @@ -625,34 +626,42 @@ MBTiles.prototype.geocoderDataIterator = function(type) { var doneSentinel = {}; var _this = this; + var zlibQueue = queue(1); + var inflate = function(data, callback) { + zlibQueue.defer(function(cb) { + zlib.inflate(data, function(err, buf) { + callback(err, buf); + cb(); + }); + }); + } + var sending = false; var sendIfAvailable = function() { if (sending) return; sending = true; while (nextQueue.length && dataQueue.length) { - var nextCb = nextQueue.shift(), data, cbValue; + var nextCb = nextQueue.shift(), data; if (dataQueue[0] == doneSentinel) { - cbValue = { - err: null, - row: {value: undefined, done: true} - } + (function(nextCb) { + setImmediate(function() { + nextCb(null, {value: undefined, done: true}); + }); + })(nextCb); } else { data = dataQueue.shift(); maybeRefillBuffer(); - cbValue = { - err: data.err, - row: {value: {shard: data.row.shard, data: zlib.inflateSync(data.row.data)}, done: false} - }; + (function(nextCb, cbValue) { + inflate(data.row.data, function(err, buf) { + nextCb( + data.err, + {value: {shard: data.row.shard, data: buf}, done: false} + ); + }) + })(nextCb, data); } - // bind the callback and data now so that they don't change before the setImmediate - // callback is executed - (function(nextCb, cbValue) { - setImmediate(function() { - nextCb(cbValue.err, cbValue.row); - }); - })(nextCb, cbValue); } sending = false; diff --git a/package.json b/package.json index 56af353..6d66b74 100644 --- a/package.json +++ b/package.json @@ -26,6 +26,7 @@ "Konstantin Käfer " ], "dependencies": { + "d3-queue": "~2.0.3", "tiletype": "0.1.x", "sqlite3": "3.x", "sphericalmercator": "~1.0.1" diff --git a/test/geocoder.test.js b/test/geocoder.test.js index 49cecca..8ee064f 100644 --- a/test/geocoder.test.js +++ b/test/geocoder.test.js @@ -2,6 +2,8 @@ var fs = require('fs'); var util = require('util'); var MBTiles = require('..'); var tape = require('tape'); +var queue = require('d3-queue').queue; +var crypto = require('crypto'); var expected = { bounds: '-141.005548666451,41.6690855919108,-52.615930948992,83.1161164353916', @@ -62,15 +64,12 @@ tape('putGeocoderData', function(assert) { assert.ifError(err); to.putGeocoderData('term', 0, new Buffer('asdf'), function(err) { assert.ifError(err); - to.putGeocoderData('term', 1, new Buffer('ZZZZZ'), function(err) { + to.stopWriting(function(err) { assert.ifError(err); - to.stopWriting(function(err) { + to.getGeocoderData('term', 0, function(err, buffer) { assert.ifError(err); - to.getGeocoderData('term', 0, function(err, buffer) { - assert.ifError(err); - assert.deepEqual('asdf', buffer.toString()); - assert.end(); - }); + assert.deepEqual('asdf', buffer.toString()); + assert.end(); }); }); }); @@ -78,26 +77,50 @@ tape('putGeocoderData', function(assert) { }); tape('geocoderDataIterator', function(assert) { - var it = to.geocoderDataIterator("term"); - var data = []; - var n = function(err, item) { + to.startWriting(function(err) { assert.ifError(err); - if (item.done) { - assert.equal(data.length, 2, "iterator produces two shards"); + // get a bunch of shards of different sizes and put them in in an arbitrary order + var shardIds = {}; + var q = queue() + while (true) { + var id = Math.floor(Math.random() * Math.pow(2, 16)); + if (shardIds[id]) continue; - assert.equal(data[0].shard, 0); - assert.equal(data[0].data.toString(), "asdf"); + shardIds[id] = 1; + q.defer(function(id, cb) { + to.putGeocoderData("term", id, crypto.randomBytes(Math.floor(Math.random()* 1024 * 1024)), cb); + }, id); - assert.equal(data[1].shard, 1); - assert.equal(data[1].data.toString(), "ZZZZZ"); - - assert.end(); - } else { - data.push(item.value); - it.asyncNext(n); + if (Object.keys(shardIds).length >= 50) break; } - } - it.asyncNext(n); + q.awaitAll(function() { + to.stopWriting(function(err) { + var it = to.geocoderDataIterator("term"); + var data = []; + var n = function(err, item) { + assert.ifError(err); + if (item.done) { + assert.equal(data.length, 51, "iterator produces 51 shards"); + + assert.equal(data[0].shard, 0); + assert.equal(data[0].data.toString(), "asdf", "first shard data is preserved"); + + var sorted = true; + for (var i = 1; i < data.length; i++) { + if (data[i - 1].shard >= data[i].shard) sorted = false; + } + assert.equal(sorted, true, "shards come back in order"); + + assert.end(); + } else { + data.push(item.value); + it.asyncNext(n); + } + } + it.asyncNext(n); + }); + }) + }); }) tape('getIndexableDocs', function(assert) {