Merge pull request #51 from mapbox/zxystream-statement
Prepared statement rather than batch/offset for zxystream
This commit is contained in:
@@ -6,22 +6,13 @@ util.inherits(ZXYStream, stream.Readable);
|
||||
|
||||
// Readable stream of line-delimited z/x/y coordinates
|
||||
// contained within the MBTiles `tiles` table/view.
|
||||
//
|
||||
// The `batch` option exists to allow tests to check that
|
||||
// multiple calls to `_read` are handled properly. IRL the
|
||||
// default offset of 1000 should be reasonably efficient
|
||||
// and not worth messing with.
|
||||
function ZXYStream(source, options) {
|
||||
if (!source) throw new TypeError('MBTiles source required');
|
||||
|
||||
options = options || {};
|
||||
|
||||
if (options.batch !== undefined && typeof options.batch !== 'number')
|
||||
throw new TypeError('options.batch must be a positive integer');
|
||||
|
||||
this.source = source;
|
||||
this.batch = options.batch || 1000;
|
||||
this.offset = 0;
|
||||
|
||||
stream.Readable.call(this);
|
||||
}
|
||||
@@ -38,15 +29,38 @@ ZXYStream.prototype._read = function() {
|
||||
});
|
||||
}
|
||||
|
||||
this.source._db.all('SELECT zoom_level AS z, tile_column AS x, tile_row AS y FROM ' + this.table + ' LIMIT ' + this.batch + ' OFFSET ' + this.offset, function(err, rows) {
|
||||
if (err && err.code === 'SQLITE_ERROR' && /no such table/.test(err.message)) return stream.push(null);
|
||||
if (err) return stream.emit('error', err);
|
||||
if (!rows.length) return stream.push(null);
|
||||
stream.offset += stream.batch;
|
||||
var chunk = '';
|
||||
for (var i = 0; i < rows.length; i++) chunk += toLine(rows[i]);
|
||||
stream.push(chunk);
|
||||
});
|
||||
// Prepare sql statement
|
||||
if (!stream.statement) {
|
||||
stream.statement = this.source._db.prepare('SELECT zoom_level AS z, tile_column AS x, tile_row AS y FROM ' + this.table, function(err) {
|
||||
if (err && err.code === 'SQLITE_ERROR' && /no such table/.test(err.message)) return stream.push(null);
|
||||
return stream._read();
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
var lines = '';
|
||||
var error;
|
||||
var remaining = stream.batch;
|
||||
for (var i = 0; i < stream.batch; i++) stream.statement.get(afterGet);
|
||||
|
||||
function afterGet(err, row) {
|
||||
if (err && err.code === 'SQLITE_ERROR' && /no such table/.test(err.message)) {
|
||||
// no-op
|
||||
} else if (err) {
|
||||
error = err;
|
||||
} else if (!row) {
|
||||
// no-op
|
||||
} else {
|
||||
lines += toLine(row);
|
||||
}
|
||||
if (!--remaining) {
|
||||
if (error) {
|
||||
stream.emit('error', error);
|
||||
} else {
|
||||
stream.push(lines || null);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
function toLine(row) {
|
||||
|
||||
@@ -17,7 +17,6 @@ tape('zxystream default batch', function(assert) {
|
||||
|
||||
assert.deepEqual(stream.source, source, 'sets stream.source');
|
||||
assert.deepEqual(stream.batch, 1000, 'sets stream.batch = 1000');
|
||||
assert.deepEqual(stream.offset, 0, 'sets stream.offset = 0');
|
||||
|
||||
stream.on('data', function(lines) {
|
||||
assert.equal(stream.table, 'map');
|
||||
@@ -50,7 +49,6 @@ tape('zxystream batch = 10', function(assert) {
|
||||
|
||||
assert.deepEqual(stream.source, source, 'sets stream.source');
|
||||
assert.deepEqual(stream.batch, 10, 'sets stream.batch = 10');
|
||||
assert.deepEqual(stream.offset, 0, 'sets stream.offset = 0');
|
||||
|
||||
stream.on('data', function(lines) {
|
||||
assert.equal(stream.table, 'map');
|
||||
@@ -90,7 +88,6 @@ tape('zxystream unindexed zxystream', function(assert) {
|
||||
|
||||
assert.deepEqual(stream.source, source, 'sets stream.source');
|
||||
assert.deepEqual(stream.batch, 1000, 'sets stream.batch = 1000');
|
||||
assert.deepEqual(stream.offset, 0, 'sets stream.offset = 0');
|
||||
|
||||
stream.on('data', function(lines) {
|
||||
assert.equal(stream.table, 'tiles');
|
||||
|
||||
Reference in New Issue
Block a user