diff --git a/lib/mbtiles.js b/lib/mbtiles.js index 2ca3dc2..8491ede 100644 --- a/lib/mbtiles.js +++ b/lib/mbtiles.js @@ -9,6 +9,7 @@ var sm = new (require('sphericalmercator')); var sqlite3 = require('sqlite3'); var tiletype = require('tiletype'); var ZXYStream = require('./zxystream'); +var queue = require('d3-queue').queue; function noop(err) { if (err) throw err; @@ -448,7 +449,6 @@ MBTiles.prototype._commit = function(callback) { break; } }); - mbtiles._db.run('COMMIT', function(err) { mbtiles._committing = false; mbtiles.emit('commit'); @@ -616,6 +616,92 @@ MBTiles.prototype.putGeocoderData = function(type, shard, data, callback) { }); }; +// Implements carmen#geocoderDataIterator method. +MBTiles.prototype.geocoderDataIterator = function(type) { + var chunkSize = 100; + var position = 0; + var getNextIfBelow = 0.2 * chunkSize; + var nextQueue = []; + var dataQueue = []; + var doneSentinel = {}; + var _this = this; + + var zlibQueue = queue(1); + var inflate = function(data, callback) { + zlibQueue.defer(function(cb) { + zlib.inflate(data, function(err, buf) { + callback(err, buf); + cb(); + }); + }); + } + + var sending = false; + var sendIfAvailable = function() { + if (sending) return; + sending = true; + + while (nextQueue.length && dataQueue.length) { + var nextCb = nextQueue.shift(), data; + if (dataQueue[0] == doneSentinel) { + (function(nextCb) { + setImmediate(function() { + nextCb(null, {value: undefined, done: true}); + }); + })(nextCb); + } else { + data = dataQueue.shift(); + maybeRefillBuffer(); + + (function(nextCb, cbValue) { + inflate(data.row.data, function(err, buf) { + nextCb( + data.err, + {value: {shard: data.row.shard, data: buf}, done: false} + ); + }) + })(nextCb, data); + } + } + + sending = false; + } + + var refilling = false; + var refillBuffer = function() { + refilling = true; + var segmentCount = 0; + _this._db.each('SELECT shard, data FROM geocoder_data WHERE type = ? ORDER BY shard limit ?,?', type, position, chunkSize, function(err, row) { + dataQueue.push({row: row, err: err}); + segmentCount += 1; + sendIfAvailable(); + }, function() { + refilling = false; + if (segmentCount) { + maybeRefillBuffer(); + } else { + // we didn't get anything this time, so we're done + dataQueue.push(doneSentinel); + sendIfAvailable(); + } + }); + position += chunkSize; + } + + var maybeRefillBuffer = function() { + if (dataQueue.length <= getNextIfBelow && !refilling && dataQueue[dataQueue.length - 1] != doneSentinel) { + refillBuffer(); + } + } + + refillBuffer(); + + return {asyncNext: function(callback) { + nextQueue.push(callback); + sendIfAvailable(); + }} +} + // Implements carmen#getIndexableDocs method. MBTiles.prototype.getIndexableDocs = function(pointer, callback) { pointer = pointer || {}; diff --git a/package.json b/package.json index 56af353..6d66b74 100644 --- a/package.json +++ b/package.json @@ -26,6 +26,7 @@ "Konstantin Käfer " ], "dependencies": { + "d3-queue": "~2.0.3", "tiletype": "0.1.x", "sqlite3": "3.x", "sphericalmercator": "~1.0.1" diff --git a/test/geocoder.test.js b/test/geocoder.test.js index 1b2dc89..8ee064f 100644 --- a/test/geocoder.test.js +++ b/test/geocoder.test.js @@ -2,6 +2,8 @@ var fs = require('fs'); var util = require('util'); var MBTiles = require('..'); var tape = require('tape'); +var queue = require('d3-queue').queue; +var crypto = require('crypto'); var expected = { bounds: '-141.005548666451,41.6690855919108,-52.615930948992,83.1161164353916', @@ -74,6 +76,53 @@ tape('putGeocoderData', function(assert) { }); }); +tape('geocoderDataIterator', function(assert) { + to.startWriting(function(err) { + assert.ifError(err); + // get a bunch of shards of different sizes and put them in in an arbitrary order + var shardIds = {}; + var q = queue() + while (true) { + var id = Math.floor(Math.random() * Math.pow(2, 16)); + if (shardIds[id]) continue; + + shardIds[id] = 1; + q.defer(function(id, cb) { + to.putGeocoderData("term", id, crypto.randomBytes(Math.floor(Math.random()* 1024 * 1024)), cb); + }, id); + + if (Object.keys(shardIds).length >= 50) break; + } + q.awaitAll(function() { + to.stopWriting(function(err) { + var it = to.geocoderDataIterator("term"); + var data = []; + var n = function(err, item) { + assert.ifError(err); + if (item.done) { + assert.equal(data.length, 51, "iterator produces 51 shards"); + + assert.equal(data[0].shard, 0); + assert.equal(data[0].data.toString(), "asdf", "first shard data is preserved"); + + var sorted = true; + for (var i = 1; i < data.length; i++) { + if (data[i - 1].shard >= data[i].shard) sorted = false; + } + assert.equal(sorted, true, "shards come back in order"); + + assert.end(); + } else { + data.push(item.value); + it.asyncNext(n); + } + } + it.asyncNext(n); + }); + }) + }); +}) + tape('getIndexableDocs', function(assert) { from.getIndexableDocs({ limit: 10 }, function(err, docs, pointer) { assert.ifError(err);