diff --git a/lib/zxystream.js b/lib/zxystream.js index 4da0cda..5816503 100644 --- a/lib/zxystream.js +++ b/lib/zxystream.js @@ -12,7 +12,7 @@ function ZXYStream(source, options) { options = options || {}; this.source = source; - this._afterGet = this._afterGet.bind(this); + this.batch = options.batch || 1000; stream.Readable.call(this); } @@ -38,14 +38,29 @@ ZXYStream.prototype._read = function() { return; } - stream.statement.get(stream._afterGet); -}; + var lines = ''; + var error; + var remaining = stream.batch; + for (var i = 0; i < stream.batch; i++) stream.statement.get(afterGet); -ZXYStream.prototype._afterGet = function(err, row) { - if (err && err.code === 'SQLITE_ERROR' && /no such table/.test(err.message)) return this.push(null); - if (err) return this.emit('error', err); - if (!row) return this.push(null); - this.push(toLine(row)); + function afterGet(err, row) { + if (err && err.code === 'SQLITE_ERROR' && /no such table/.test(err.message)) { + // no-op + } else if (err) { + error = err; + } else if (!row) { + // no-op + } else { + lines += toLine(row); + } + if (!--remaining) { + if (error) { + stream.emit('error', error); + } else { + stream.push(lines || null); + } + } + } }; function toLine(row) { diff --git a/test/zxystream.js b/test/zxystream.js index eafdd7a..58e1e3d 100644 --- a/test/zxystream.js +++ b/test/zxystream.js @@ -10,12 +10,13 @@ tape('zxystream setup', function(assert) { }); }); -tape('zxystream default', function(assert) { +tape('zxystream default batch', function(assert) { var stream = source.createZXYStream(); var output = ''; var called = 0; assert.deepEqual(stream.source, source, 'sets stream.source'); + assert.deepEqual(stream.batch, 1000, 'sets stream.batch = 1000'); stream.on('data', function(lines) { assert.equal(stream.table, 'map'); @@ -25,7 +26,7 @@ tape('zxystream default', function(assert) { stream.on('end', function() { var queue = output.toString().split('\n'); assert.equal(queue.length, 270); - assert.equal(called, 269, 'emitted data' + called + ' times'); + assert.equal(called, 1, 'emitted data x1 times'); checkTile(queue); function checkTile(queue) { if (!queue.length) return assert.end(); @@ -41,6 +42,37 @@ tape('zxystream default', function(assert) { }); +tape('zxystream batch = 10', function(assert) { + var stream = source.createZXYStream({batch:10}); + var output = ''; + var called = 0; + + assert.deepEqual(stream.source, source, 'sets stream.source'); + assert.deepEqual(stream.batch, 10, 'sets stream.batch = 10'); + + stream.on('data', function(lines) { + assert.equal(stream.table, 'map'); + output += lines; + called++; + }); + stream.on('end', function() { + var queue = output.toString().split('\n'); + assert.equal(queue.length, 270); + assert.equal(called, 27, 'emitted data x27 times'); + checkTile(queue); + function checkTile(queue) { + if (!queue.length) return assert.end(); + var zxy = queue.shift(); + if (!zxy) return checkTile(queue); + zxy = zxy.split('/'); + source.getTile(zxy[0], zxy[1], zxy[2], function(err, buffer, headers) { + assert.equal(!err && (buffer instanceof Buffer), true, zxy.join('/') + ' exists'); + checkTile(queue); + }); + } + }); +}); + tape('zxystream unindexed', function(assert) { new MBTiles(__dirname + '/fixtures/unindexed.mbtiles', function(err, s) { assert.ifError(err); @@ -55,6 +87,7 @@ tape('zxystream unindexed zxystream', function(assert) { var called = 0; assert.deepEqual(stream.source, source, 'sets stream.source'); + assert.deepEqual(stream.batch, 1000, 'sets stream.batch = 1000'); stream.on('data', function(lines) { assert.equal(stream.table, 'tiles'); @@ -64,7 +97,7 @@ tape('zxystream unindexed zxystream', function(assert) { stream.on('end', function() { var queue = output.toString().split('\n'); assert.equal(queue.length, 286); - assert.equal(called, 285, 'emitted data x285 times'); + assert.equal(called, 1, 'emitted data x27 times'); checkTile(queue); function checkTile(queue) { if (!queue.length) return assert.end();