From 3c7f5c8462be22a080b34385c756183b74930924 Mon Sep 17 00:00:00 2001 From: Andrew Pendleton Date: Sat, 5 Mar 2016 17:12:17 -0500 Subject: [PATCH 1/9] And infrastructure for iterating over geocoder data --- lib/mbtiles.js | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/lib/mbtiles.js b/lib/mbtiles.js index 2ca3dc2..68d3bbd 100644 --- a/lib/mbtiles.js +++ b/lib/mbtiles.js @@ -448,7 +448,6 @@ MBTiles.prototype._commit = function(callback) { break; } }); - mbtiles._db.run('COMMIT', function(err) { mbtiles._committing = false; mbtiles.emit('commit'); @@ -616,6 +615,16 @@ MBTiles.prototype.putGeocoderData = function(type, shard, data, callback) { }); }; +// Implements carmen#getGeocoderData method. +MBTiles.prototype.geocoderDataForEach = function(type, callback, completeCallback) { + return this._db.each('SELECT shard, data FROM geocoder_data WHERE type = ? ORDER BY shard', type, function(err, row) { + if (err && err.code === 'SQLITE_ERROR' && err.errno === 1) return callback(); + if (err) return callback(err); + if (!row) return callback(); + callback(row.shard, zlib.inflateSync(row.data)); + }, completeCallback); +}; + // Implements carmen#getIndexableDocs method. MBTiles.prototype.getIndexableDocs = function(pointer, callback) { pointer = pointer || {}; From aac15a8f81742204f3bd8cd5500a9739425d0bef Mon Sep 17 00:00:00 2001 From: Andrew Pendleton Date: Thu, 17 Mar 2016 17:55:34 -0400 Subject: [PATCH 2/9] Ditch geocoderDataForEach (it's a memory hog) and instead go with an async iterator method with an internal buffer, implemented around sqlite limits, to give caller more control over reads --- lib/mbtiles.js | 67 +++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 58 insertions(+), 9 deletions(-) diff --git a/lib/mbtiles.js b/lib/mbtiles.js index 68d3bbd..6647b71 100644 --- a/lib/mbtiles.js +++ b/lib/mbtiles.js @@ -615,15 +615,64 @@ MBTiles.prototype.putGeocoderData = function(type, shard, data, callback) { }); }; -// Implements carmen#getGeocoderData method. -MBTiles.prototype.geocoderDataForEach = function(type, callback, completeCallback) { - return this._db.each('SELECT shard, data FROM geocoder_data WHERE type = ? ORDER BY shard', type, function(err, row) { - if (err && err.code === 'SQLITE_ERROR' && err.errno === 1) return callback(); - if (err) return callback(err); - if (!row) return callback(); - callback(row.shard, zlib.inflateSync(row.data)); - }, completeCallback); -}; +// Implements carmen#geocoderDataIterator method. +MBTiles.prototype.geocoderDataIterator = function(type) { + var chunkSize = 100; + var position = 0; + var getNextIfBelow = 0.2 * chunkSize; + var nextQueue = []; + var dataQueue = []; + var doneSentinel = {}; + var _this = this; + + var sendIfAvailable = function() { + while (nextQueue.length && dataQueue.length) { + var nextCb = nextQueue.shift(), data; + if (dataQueue[0] == doneSentinel) { + nextCb({value: undefined, done: true}); + } else { + data = dataQueue.shift(); + maybeRefillBuffer(); + + nextCb({value: {shard: data.shard, data: zlib.inflateSync(data.data)}, done: false}); + } + } + } + + var refilling = false; + var refillBuffer = function() { + refilling = true; + var segmentCount = 0; + _this._db.each('SELECT shard, data FROM geocoder_data WHERE type = ? ORDER BY shard limit ?,?', type, position, chunkSize, function(err, row) { + dataQueue.push(row); + segmentCount += 1; + sendIfAvailable(); + }, function() { + refilling = false; + if (segmentCount) { + maybeRefillBuffer(); + } else { + // we didn't get anything this time, so we're done + dataQueue.push(doneSentinel); + sendIfAvailable(); + } + }); + position += chunkSize; + } + + var maybeRefillBuffer = function() { + if (dataQueue.length <= getNextIfBelow && !refilling && dataQueue[dataQueue.length - 1] != doneSentinel) { + refillBuffer(); + } + } + + refillBuffer(); + + return {asyncNext: function(callback) { + nextQueue.push(callback); + sendIfAvailable(); + }} +} // Implements carmen#getIndexableDocs method. MBTiles.prototype.getIndexableDocs = function(pointer, callback) { From 3967adcb4a2eb0d26f070e2574e4fb781b071399 Mon Sep 17 00:00:00 2001 From: Andrew Pendleton Date: Wed, 30 Mar 2016 01:10:20 -0400 Subject: [PATCH 3/9] Avoid callstack loop by making sure only one geocoder data iterator buffer-flush operation is happening at once --- lib/mbtiles.js | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/lib/mbtiles.js b/lib/mbtiles.js index 6647b71..b178a07 100644 --- a/lib/mbtiles.js +++ b/lib/mbtiles.js @@ -625,7 +625,11 @@ MBTiles.prototype.geocoderDataIterator = function(type) { var doneSentinel = {}; var _this = this; + var sending = false; var sendIfAvailable = function() { + if (sending) return; + sending = true; + while (nextQueue.length && dataQueue.length) { var nextCb = nextQueue.shift(), data; if (dataQueue[0] == doneSentinel) { @@ -637,6 +641,8 @@ MBTiles.prototype.geocoderDataIterator = function(type) { nextCb({value: {shard: data.shard, data: zlib.inflateSync(data.data)}, done: false}); } } + + sending = false; } var refilling = false; From 11b6381c4f8cf7e2aab788f97afbd0e334399859 Mon Sep 17 00:00:00 2001 From: Andrew Pendleton Date: Fri, 1 Apr 2016 01:16:01 -0400 Subject: [PATCH 4/9] Another attempt at avoiding overflowing the callstack on geocoderDataIterator: every tenth time we call a callback, call it via setImmediate to reset the stack --- lib/mbtiles.js | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/lib/mbtiles.js b/lib/mbtiles.js index b178a07..2e39329 100644 --- a/lib/mbtiles.js +++ b/lib/mbtiles.js @@ -615,6 +615,9 @@ MBTiles.prototype.putGeocoderData = function(type, shard, data, callback) { }); }; +var stackCounter = 0; +var stackMax = 10; + // Implements carmen#geocoderDataIterator method. MBTiles.prototype.geocoderDataIterator = function(type) { var chunkSize = 100; @@ -625,6 +628,20 @@ MBTiles.prototype.geocoderDataIterator = function(type) { var doneSentinel = {}; var _this = this; + // every tenth callback call (globally) do it via setImmediate + // to avoid callback loops that blow the callstack + var checkStack = function(cb, arg) { + if (stackCounter >= stackMax) { + stackCounter = 0; + setImmediate(function() { + cb(arg); + }) + } else { + stackCounter += 1; + cb(arg); + } + } + var sending = false; var sendIfAvailable = function() { if (sending) return; @@ -633,12 +650,12 @@ MBTiles.prototype.geocoderDataIterator = function(type) { while (nextQueue.length && dataQueue.length) { var nextCb = nextQueue.shift(), data; if (dataQueue[0] == doneSentinel) { - nextCb({value: undefined, done: true}); + checkStack(nextCb, {value: undefined, done: true}); } else { data = dataQueue.shift(); maybeRefillBuffer(); - nextCb({value: {shard: data.shard, data: zlib.inflateSync(data.data)}, done: false}); + checkStack(nextCb, {value: {shard: data.shard, data: zlib.inflateSync(data.data)}, done: false}); } } From b37655577f2a672ebe0522178df4b4eab414132c Mon Sep 17 00:00:00 2001 From: Andrew Pendleton Date: Mon, 4 Apr 2016 10:49:34 -0400 Subject: [PATCH 5/9] Increase the frequency of calling setImmediate in the geocoderDataIterator function --- lib/mbtiles.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/mbtiles.js b/lib/mbtiles.js index 2e39329..d60f1d6 100644 --- a/lib/mbtiles.js +++ b/lib/mbtiles.js @@ -616,7 +616,7 @@ MBTiles.prototype.putGeocoderData = function(type, shard, data, callback) { }; var stackCounter = 0; -var stackMax = 10; +var stackMax = 3; // Implements carmen#geocoderDataIterator method. MBTiles.prototype.geocoderDataIterator = function(type) { @@ -628,7 +628,7 @@ MBTiles.prototype.geocoderDataIterator = function(type) { var doneSentinel = {}; var _this = this; - // every tenth callback call (globally) do it via setImmediate + // every nth callback call (globally) do it via setImmediate // to avoid callback loops that blow the callstack var checkStack = function(cb, arg) { if (stackCounter >= stackMax) { From b6ef79d7dcee0e6bee46778ead8c9f7775a0161d Mon Sep 17 00:00:00 2001 From: Andrew Pendleton Date: Mon, 4 Apr 2016 13:16:52 -0400 Subject: [PATCH 6/9] I give up: setImmediate for every callback --- lib/mbtiles.js | 30 ++++++++++-------------------- 1 file changed, 10 insertions(+), 20 deletions(-) diff --git a/lib/mbtiles.js b/lib/mbtiles.js index d60f1d6..54d4b9e 100644 --- a/lib/mbtiles.js +++ b/lib/mbtiles.js @@ -615,9 +615,6 @@ MBTiles.prototype.putGeocoderData = function(type, shard, data, callback) { }); }; -var stackCounter = 0; -var stackMax = 3; - // Implements carmen#geocoderDataIterator method. MBTiles.prototype.geocoderDataIterator = function(type) { var chunkSize = 100; @@ -628,35 +625,28 @@ MBTiles.prototype.geocoderDataIterator = function(type) { var doneSentinel = {}; var _this = this; - // every nth callback call (globally) do it via setImmediate - // to avoid callback loops that blow the callstack - var checkStack = function(cb, arg) { - if (stackCounter >= stackMax) { - stackCounter = 0; - setImmediate(function() { - cb(arg); - }) - } else { - stackCounter += 1; - cb(arg); - } - } - var sending = false; var sendIfAvailable = function() { if (sending) return; sending = true; while (nextQueue.length && dataQueue.length) { - var nextCb = nextQueue.shift(), data; + var nextCb = nextQueue.shift(), data, cbValue; if (dataQueue[0] == doneSentinel) { - checkStack(nextCb, {value: undefined, done: true}); + cbValue = {value: undefined, done: true}; } else { data = dataQueue.shift(); maybeRefillBuffer(); - checkStack(nextCb, {value: {shard: data.shard, data: zlib.inflateSync(data.data)}, done: false}); + cbValue = {value: {shard: data.shard, data: zlib.inflateSync(data.data)}, done: false}; } + // bind the callback and data now so that they don't change before the setImmediate + // callback is executed + (function(nextCb, cbValue) { + setImmediate(function() { + nextCb(cbValue); + }); + })(nextCb, cbValue); } sending = false; From d3845ea9f92aee90d38db6d82da649c195dfc012 Mon Sep 17 00:00:00 2001 From: Andrew Pendleton Date: Thu, 14 Apr 2016 13:24:27 -0400 Subject: [PATCH 7/9] Add test of geocoderDataIterator --- test/geocoder.test.js | 33 +++++++++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/test/geocoder.test.js b/test/geocoder.test.js index 1b2dc89..d5336a4 100644 --- a/test/geocoder.test.js +++ b/test/geocoder.test.js @@ -62,18 +62,43 @@ tape('putGeocoderData', function(assert) { assert.ifError(err); to.putGeocoderData('term', 0, new Buffer('asdf'), function(err) { assert.ifError(err); - to.stopWriting(function(err) { + to.putGeocoderData('term', 1, new Buffer('ZZZZZ'), function(err) { assert.ifError(err); - to.getGeocoderData('term', 0, function(err, buffer) { + to.stopWriting(function(err) { assert.ifError(err); - assert.deepEqual('asdf', buffer.toString()); - assert.end(); + to.getGeocoderData('term', 0, function(err, buffer) { + assert.ifError(err); + assert.deepEqual('asdf', buffer.toString()); + assert.end(); + }); }); }); }); }); }); +tape('geocoderDataIterator', function(assert) { + var it = to.geocoderDataIterator("term"); + var data = []; + var n = function(item) { + if (item.done) { + assert.equal(data.length, 2, "iterator produces two shards"); + + assert.equal(data[0].shard, 0); + assert.equal(data[0].data.toString(), "asdf"); + + assert.equal(data[1].shard, 1); + assert.equal(data[1].data.toString(), "ZZZZZ"); + + assert.end(); + } else { + data.push(item.value); + it.asyncNext(n); + } + } + it.asyncNext(n); +}) + tape('getIndexableDocs', function(assert) { from.getIndexableDocs({ limit: 10 }, function(err, docs, pointer) { assert.ifError(err); From 44746344526e2d8c6af3c8159c2e970683b6d692 Mon Sep 17 00:00:00 2001 From: Andrew Pendleton Date: Thu, 14 Apr 2016 13:32:38 -0400 Subject: [PATCH 8/9] Improve error handling for geocoderDataIterator --- lib/mbtiles.js | 14 ++++++++++---- test/geocoder.test.js | 3 ++- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/lib/mbtiles.js b/lib/mbtiles.js index 54d4b9e..a7388b8 100644 --- a/lib/mbtiles.js +++ b/lib/mbtiles.js @@ -633,18 +633,24 @@ MBTiles.prototype.geocoderDataIterator = function(type) { while (nextQueue.length && dataQueue.length) { var nextCb = nextQueue.shift(), data, cbValue; if (dataQueue[0] == doneSentinel) { - cbValue = {value: undefined, done: true}; + cbValue = { + err: null, + row: {value: undefined, done: true} + } } else { data = dataQueue.shift(); maybeRefillBuffer(); - cbValue = {value: {shard: data.shard, data: zlib.inflateSync(data.data)}, done: false}; + cbValue = { + err: data.err, + row: {value: {shard: data.row.shard, data: zlib.inflateSync(data.row.data)}, done: false} + }; } // bind the callback and data now so that they don't change before the setImmediate // callback is executed (function(nextCb, cbValue) { setImmediate(function() { - nextCb(cbValue); + nextCb(cbValue.err, cbValue.row); }); })(nextCb, cbValue); } @@ -657,7 +663,7 @@ MBTiles.prototype.geocoderDataIterator = function(type) { refilling = true; var segmentCount = 0; _this._db.each('SELECT shard, data FROM geocoder_data WHERE type = ? ORDER BY shard limit ?,?', type, position, chunkSize, function(err, row) { - dataQueue.push(row); + dataQueue.push({row: row, err: err}); segmentCount += 1; sendIfAvailable(); }, function() { diff --git a/test/geocoder.test.js b/test/geocoder.test.js index d5336a4..49cecca 100644 --- a/test/geocoder.test.js +++ b/test/geocoder.test.js @@ -80,7 +80,8 @@ tape('putGeocoderData', function(assert) { tape('geocoderDataIterator', function(assert) { var it = to.geocoderDataIterator("term"); var data = []; - var n = function(item) { + var n = function(err, item) { + assert.ifError(err); if (item.done) { assert.equal(data.length, 2, "iterator produces two shards"); From 405d9877169b25d886f216783639ed39f83fe946 Mon Sep 17 00:00:00 2001 From: Andrew Pendleton Date: Thu, 14 Apr 2016 18:49:34 -0400 Subject: [PATCH 9/9] Ditch inflateSync in favor of plain inflate; test iteration order; add d3-queue dep --- lib/mbtiles.js | 41 +++++++++++++++---------- package.json | 1 + test/geocoder.test.js | 69 ++++++++++++++++++++++++++++--------------- 3 files changed, 72 insertions(+), 39 deletions(-) diff --git a/lib/mbtiles.js b/lib/mbtiles.js index a7388b8..8491ede 100644 --- a/lib/mbtiles.js +++ b/lib/mbtiles.js @@ -9,6 +9,7 @@ var sm = new (require('sphericalmercator')); var sqlite3 = require('sqlite3'); var tiletype = require('tiletype'); var ZXYStream = require('./zxystream'); +var queue = require('d3-queue').queue; function noop(err) { if (err) throw err; @@ -625,34 +626,42 @@ MBTiles.prototype.geocoderDataIterator = function(type) { var doneSentinel = {}; var _this = this; + var zlibQueue = queue(1); + var inflate = function(data, callback) { + zlibQueue.defer(function(cb) { + zlib.inflate(data, function(err, buf) { + callback(err, buf); + cb(); + }); + }); + } + var sending = false; var sendIfAvailable = function() { if (sending) return; sending = true; while (nextQueue.length && dataQueue.length) { - var nextCb = nextQueue.shift(), data, cbValue; + var nextCb = nextQueue.shift(), data; if (dataQueue[0] == doneSentinel) { - cbValue = { - err: null, - row: {value: undefined, done: true} - } + (function(nextCb) { + setImmediate(function() { + nextCb(null, {value: undefined, done: true}); + }); + })(nextCb); } else { data = dataQueue.shift(); maybeRefillBuffer(); - cbValue = { - err: data.err, - row: {value: {shard: data.row.shard, data: zlib.inflateSync(data.row.data)}, done: false} - }; + (function(nextCb, cbValue) { + inflate(data.row.data, function(err, buf) { + nextCb( + data.err, + {value: {shard: data.row.shard, data: buf}, done: false} + ); + }) + })(nextCb, data); } - // bind the callback and data now so that they don't change before the setImmediate - // callback is executed - (function(nextCb, cbValue) { - setImmediate(function() { - nextCb(cbValue.err, cbValue.row); - }); - })(nextCb, cbValue); } sending = false; diff --git a/package.json b/package.json index 56af353..6d66b74 100644 --- a/package.json +++ b/package.json @@ -26,6 +26,7 @@ "Konstantin Käfer " ], "dependencies": { + "d3-queue": "~2.0.3", "tiletype": "0.1.x", "sqlite3": "3.x", "sphericalmercator": "~1.0.1" diff --git a/test/geocoder.test.js b/test/geocoder.test.js index 49cecca..8ee064f 100644 --- a/test/geocoder.test.js +++ b/test/geocoder.test.js @@ -2,6 +2,8 @@ var fs = require('fs'); var util = require('util'); var MBTiles = require('..'); var tape = require('tape'); +var queue = require('d3-queue').queue; +var crypto = require('crypto'); var expected = { bounds: '-141.005548666451,41.6690855919108,-52.615930948992,83.1161164353916', @@ -62,15 +64,12 @@ tape('putGeocoderData', function(assert) { assert.ifError(err); to.putGeocoderData('term', 0, new Buffer('asdf'), function(err) { assert.ifError(err); - to.putGeocoderData('term', 1, new Buffer('ZZZZZ'), function(err) { + to.stopWriting(function(err) { assert.ifError(err); - to.stopWriting(function(err) { + to.getGeocoderData('term', 0, function(err, buffer) { assert.ifError(err); - to.getGeocoderData('term', 0, function(err, buffer) { - assert.ifError(err); - assert.deepEqual('asdf', buffer.toString()); - assert.end(); - }); + assert.deepEqual('asdf', buffer.toString()); + assert.end(); }); }); }); @@ -78,26 +77,50 @@ tape('putGeocoderData', function(assert) { }); tape('geocoderDataIterator', function(assert) { - var it = to.geocoderDataIterator("term"); - var data = []; - var n = function(err, item) { + to.startWriting(function(err) { assert.ifError(err); - if (item.done) { - assert.equal(data.length, 2, "iterator produces two shards"); + // get a bunch of shards of different sizes and put them in in an arbitrary order + var shardIds = {}; + var q = queue() + while (true) { + var id = Math.floor(Math.random() * Math.pow(2, 16)); + if (shardIds[id]) continue; - assert.equal(data[0].shard, 0); - assert.equal(data[0].data.toString(), "asdf"); + shardIds[id] = 1; + q.defer(function(id, cb) { + to.putGeocoderData("term", id, crypto.randomBytes(Math.floor(Math.random()* 1024 * 1024)), cb); + }, id); - assert.equal(data[1].shard, 1); - assert.equal(data[1].data.toString(), "ZZZZZ"); - - assert.end(); - } else { - data.push(item.value); - it.asyncNext(n); + if (Object.keys(shardIds).length >= 50) break; } - } - it.asyncNext(n); + q.awaitAll(function() { + to.stopWriting(function(err) { + var it = to.geocoderDataIterator("term"); + var data = []; + var n = function(err, item) { + assert.ifError(err); + if (item.done) { + assert.equal(data.length, 51, "iterator produces 51 shards"); + + assert.equal(data[0].shard, 0); + assert.equal(data[0].data.toString(), "asdf", "first shard data is preserved"); + + var sorted = true; + for (var i = 1; i < data.length; i++) { + if (data[i - 1].shard >= data[i].shard) sorted = false; + } + assert.equal(sorted, true, "shards come back in order"); + + assert.end(); + } else { + data.push(item.value); + it.asyncNext(n); + } + } + it.asyncNext(n); + }); + }) + }); }) tape('getIndexableDocs', function(assert) {