Retain batch concept and return lines in groups of 1000
This commit is contained in:
@@ -12,7 +12,7 @@ function ZXYStream(source, options) {
|
||||
options = options || {};
|
||||
|
||||
this.source = source;
|
||||
this._afterGet = this._afterGet.bind(this);
|
||||
this.batch = options.batch || 1000;
|
||||
|
||||
stream.Readable.call(this);
|
||||
}
|
||||
@@ -38,14 +38,29 @@ ZXYStream.prototype._read = function() {
|
||||
return;
|
||||
}
|
||||
|
||||
stream.statement.get(stream._afterGet);
|
||||
};
|
||||
var lines = '';
|
||||
var error;
|
||||
var remaining = stream.batch;
|
||||
for (var i = 0; i < stream.batch; i++) stream.statement.get(afterGet);
|
||||
|
||||
ZXYStream.prototype._afterGet = function(err, row) {
|
||||
if (err && err.code === 'SQLITE_ERROR' && /no such table/.test(err.message)) return this.push(null);
|
||||
if (err) return this.emit('error', err);
|
||||
if (!row) return this.push(null);
|
||||
this.push(toLine(row));
|
||||
function afterGet(err, row) {
|
||||
if (err && err.code === 'SQLITE_ERROR' && /no such table/.test(err.message)) {
|
||||
// no-op
|
||||
} else if (err) {
|
||||
error = err;
|
||||
} else if (!row) {
|
||||
// no-op
|
||||
} else {
|
||||
lines += toLine(row);
|
||||
}
|
||||
if (!--remaining) {
|
||||
if (error) {
|
||||
stream.emit('error', error);
|
||||
} else {
|
||||
stream.push(lines || null);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
function toLine(row) {
|
||||
|
||||
@@ -10,12 +10,13 @@ tape('zxystream setup', function(assert) {
|
||||
});
|
||||
});
|
||||
|
||||
tape('zxystream default', function(assert) {
|
||||
tape('zxystream default batch', function(assert) {
|
||||
var stream = source.createZXYStream();
|
||||
var output = '';
|
||||
var called = 0;
|
||||
|
||||
assert.deepEqual(stream.source, source, 'sets stream.source');
|
||||
assert.deepEqual(stream.batch, 1000, 'sets stream.batch = 1000');
|
||||
|
||||
stream.on('data', function(lines) {
|
||||
assert.equal(stream.table, 'map');
|
||||
@@ -25,7 +26,7 @@ tape('zxystream default', function(assert) {
|
||||
stream.on('end', function() {
|
||||
var queue = output.toString().split('\n');
|
||||
assert.equal(queue.length, 270);
|
||||
assert.equal(called, 269, 'emitted data' + called + ' times');
|
||||
assert.equal(called, 1, 'emitted data x1 times');
|
||||
checkTile(queue);
|
||||
function checkTile(queue) {
|
||||
if (!queue.length) return assert.end();
|
||||
@@ -41,6 +42,37 @@ tape('zxystream default', function(assert) {
|
||||
});
|
||||
|
||||
|
||||
tape('zxystream batch = 10', function(assert) {
|
||||
var stream = source.createZXYStream({batch:10});
|
||||
var output = '';
|
||||
var called = 0;
|
||||
|
||||
assert.deepEqual(stream.source, source, 'sets stream.source');
|
||||
assert.deepEqual(stream.batch, 10, 'sets stream.batch = 10');
|
||||
|
||||
stream.on('data', function(lines) {
|
||||
assert.equal(stream.table, 'map');
|
||||
output += lines;
|
||||
called++;
|
||||
});
|
||||
stream.on('end', function() {
|
||||
var queue = output.toString().split('\n');
|
||||
assert.equal(queue.length, 270);
|
||||
assert.equal(called, 27, 'emitted data x27 times');
|
||||
checkTile(queue);
|
||||
function checkTile(queue) {
|
||||
if (!queue.length) return assert.end();
|
||||
var zxy = queue.shift();
|
||||
if (!zxy) return checkTile(queue);
|
||||
zxy = zxy.split('/');
|
||||
source.getTile(zxy[0], zxy[1], zxy[2], function(err, buffer, headers) {
|
||||
assert.equal(!err && (buffer instanceof Buffer), true, zxy.join('/') + ' exists');
|
||||
checkTile(queue);
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
tape('zxystream unindexed', function(assert) {
|
||||
new MBTiles(__dirname + '/fixtures/unindexed.mbtiles', function(err, s) {
|
||||
assert.ifError(err);
|
||||
@@ -55,6 +87,7 @@ tape('zxystream unindexed zxystream', function(assert) {
|
||||
var called = 0;
|
||||
|
||||
assert.deepEqual(stream.source, source, 'sets stream.source');
|
||||
assert.deepEqual(stream.batch, 1000, 'sets stream.batch = 1000');
|
||||
|
||||
stream.on('data', function(lines) {
|
||||
assert.equal(stream.table, 'tiles');
|
||||
@@ -64,7 +97,7 @@ tape('zxystream unindexed zxystream', function(assert) {
|
||||
stream.on('end', function() {
|
||||
var queue = output.toString().split('\n');
|
||||
assert.equal(queue.length, 286);
|
||||
assert.equal(called, 285, 'emitted data x285 times');
|
||||
assert.equal(called, 1, 'emitted data x27 times');
|
||||
checkTile(queue);
|
||||
function checkTile(queue) {
|
||||
if (!queue.length) return assert.end();
|
||||
|
||||
Reference in New Issue
Block a user