diff --git a/package-lock.json b/package-lock.json index c1cfa3e4d9..cef67a6568 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9424,11 +9424,6 @@ } } }, - "pixelworks": { - "version": "1.1.0", - "resolved": "https://registry.npmjs.org/pixelworks/-/pixelworks-1.1.0.tgz", - "integrity": "sha1-Hwla1I3Ki/ihyCWOAJIDGkTyLKU=" - }, "pkg-dir": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/pkg-dir/-/pkg-dir-3.0.0.tgz", diff --git a/package.json b/package.json index 21feee06c2..d4eea6383e 100644 --- a/package.json +++ b/package.json @@ -44,7 +44,6 @@ "elm-pep": "^1.0.4", "ol-mapbox-style": "^6.1.1", "pbf": "3.2.1", - "pixelworks": "1.1.0", "rbush": "^3.0.1" }, "devDependencies": { diff --git a/src/ol/Disposable.js b/src/ol/Disposable.js index 4af7978487..d8b16ecae5 100644 --- a/src/ol/Disposable.js +++ b/src/ol/Disposable.js @@ -11,17 +11,17 @@ class Disposable { /** * The object has already been disposed. * @type {boolean} - * @private + * @protected */ - this.disposed_ = false; + this.disposed = false; } /** * Clean up. */ dispose() { - if (!this.disposed_) { - this.disposed_ = true; + if (!this.disposed) { + this.disposed = true; this.disposeInternal(); } } diff --git a/src/ol/source/Raster.js b/src/ol/source/Raster.js index 9992395d54..e02dd0e853 100644 --- a/src/ol/source/Raster.js +++ b/src/ol/source/Raster.js @@ -1,6 +1,7 @@ /** * @module ol/source/Raster */ +import Disposable from '../Disposable.js'; import Event from '../events/Event.js'; import EventType from '../events/EventType.js'; import ImageCanvas from '../ImageCanvas.js'; @@ -11,12 +12,354 @@ import SourceState from './State.js'; import TileLayer from '../layer/Tile.js'; import TileQueue from '../TileQueue.js'; import TileSource from './Tile.js'; -import {Processor} from 'pixelworks/lib/index.js'; import {assign} from '../obj.js'; import {createCanvasContext2D} from '../dom.js'; import {create as createTransform} from '../transform.js'; import {equals, getCenter, getHeight, getWidth} from '../extent.js'; +let hasImageData = true; +try { + new ImageData(10, 10); +} catch (_) { + hasImageData = false; +} + +const context = document.createElement('canvas').getContext('2d'); + +/** + * @param {Uint8ClampedArray} data Image data. + * @param {number} width Number of columns. + * @param {number} height Number of rows. + * @return {ImageData} Image data. + */ +export function newImageData(data, width, height) { + if (hasImageData) { + return new ImageData(data, width, height); + } else { + const imageData = context.createImageData(width, height); + imageData.data.set(data); + return imageData; + } +} + +/* istanbul ignore next */ +/** + * Create a function for running operations. This function is serialized for + * use in a worker. + * @param {function(Array, Object):*} operation The operation. + * @return {function(Object):ArrayBuffer} A function that takes an object with + * buffers, meta, imageOps, width, and height properties and returns an array + * buffer. + */ +function createMinion(operation) { + let workerHasImageData = true; + try { + new ImageData(10, 10); + } catch (_) { + workerHasImageData = false; + } + + function newWorkerImageData(data, width, height) { + if (workerHasImageData) { + return new ImageData(data, width, height); + } else { + return {data: data, width: width, height: height}; + } + } + + return function (data) { + // bracket notation for minification support + const buffers = data['buffers']; + const meta = data['meta']; + const imageOps = data['imageOps']; + const width = data['width']; + const height = data['height']; + + const numBuffers = buffers.length; + const numBytes = buffers[0].byteLength; + let output, b; + + if (imageOps) { + const images = new Array(numBuffers); + for (b = 0; b < numBuffers; ++b) { + images[b] = newWorkerImageData( + new Uint8ClampedArray(buffers[b]), + width, + height + ); + } + output = operation(images, meta).data; + } else { + output = new Uint8ClampedArray(numBytes); + const arrays = new Array(numBuffers); + const pixels = new Array(numBuffers); + for (b = 0; b < numBuffers; ++b) { + arrays[b] = new Uint8ClampedArray(buffers[b]); + pixels[b] = [0, 0, 0, 0]; + } + for (let i = 0; i < numBytes; i += 4) { + for (let j = 0; j < numBuffers; ++j) { + const array = arrays[j]; + pixels[j][0] = array[i]; + pixels[j][1] = array[i + 1]; + pixels[j][2] = array[i + 2]; + pixels[j][3] = array[i + 3]; + } + const pixel = operation(pixels, meta); + output[i] = pixel[0]; + output[i + 1] = pixel[1]; + output[i + 2] = pixel[2]; + output[i + 3] = pixel[3]; + } + } + return output.buffer; + }; +} + +/** + * Create a worker for running operations. + * @param {Object} config Configuration. + * @param {function(MessageEvent): void} onMessage Called with a message event. + * @return {Worker} The worker. + */ +function createWorker(config, onMessage) { + const lib = Object.keys(config.lib || {}).map(function (name) { + return 'var ' + name + ' = ' + config.lib[name].toString() + ';'; + }); + + const lines = lib.concat([ + 'var __minion__ = (' + createMinion.toString() + ')(', + config.operation.toString(), + ');', + 'self.addEventListener("message", function(event) {', + ' var buffer = __minion__(event.data);', + ' self.postMessage({buffer: buffer, meta: event.data.meta}, [buffer]);', + '});', + ]); + + const blob = new Blob(lines, {type: 'text/javascript'}); + const source = URL.createObjectURL(blob); + const worker = new Worker(source); + worker.addEventListener('message', onMessage); + return worker; +} + +/** + * @typedef {Object} FauxMessageEvent + * @property {Object} data Message data. + */ + +/** + * Create a faux worker for running operations. + * @param {ProcessorOptions} config Configuration. + * @param {function(FauxMessageEvent): void} onMessage Called with a message event. + * @return {Object} The faux worker. + */ +function createFauxWorker(config, onMessage) { + const minion = createMinion(config.operation); + let terminated = false; + return { + postMessage: function (data) { + setTimeout(function () { + if (terminated) { + return; + } + onMessage({data: {buffer: minion(data), meta: data['meta']}}); + }, 0); + }, + terminate: function () { + terminated = true; + }, + }; +} + +/** + * @typedef {Object} ProcessorOptions + * @property {number} threads Number of workers to spawn. + * @property {function(Array, Object):*} operation The operation. + * @property {Object} [lib] Functions that will be made available to operations run in a worker. + * @property {number} queue The number of queued jobs to allow. + * @property {boolean} [imageOps=false] Pass all the image data to the operation instead of a single pixel. + */ + +/** + * @classdesc + * A processor runs pixel or image operations in workers. + */ +export class Processor extends Disposable { + /** + * @param {ProcessorOptions} config Configuration. + */ + constructor(config) { + super(); + + this._imageOps = !!config.imageOps; + let threads; + if (config.threads === 0) { + threads = 0; + } else if (this._imageOps) { + threads = 1; + } else { + threads = config.threads || 1; + } + const workers = []; + if (threads) { + for (let i = 0; i < threads; ++i) { + workers[i] = createWorker(config, this._onWorkerMessage.bind(this, i)); + } + } else { + workers[0] = createFauxWorker( + config, + this._onWorkerMessage.bind(this, 0) + ); + } + this._workers = workers; + this._queue = []; + this._maxQueueLength = config.queue || Infinity; + this._running = 0; + this._dataLookup = {}; + this._job = null; + } + + /** + * Run operation on input data. + * @param {Array.} inputs Array of pixels or image data + * (depending on the operation type). + * @param {Object} meta A user data object. This is passed to all operations + * and must be serializable. + * @param {function(Error, ImageData, Object): void} callback Called when work + * completes. The first argument is any error. The second is the ImageData + * generated by operations. The third is the user data object. + */ + process(inputs, meta, callback) { + this._enqueue({ + inputs: inputs, + meta: meta, + callback: callback, + }); + this._dispatch(); + } + + /** + * Add a job to the queue. + * @param {Object} job The job. + */ + _enqueue(job) { + this._queue.push(job); + while (this._queue.length > this._maxQueueLength) { + this._queue.shift().callback(null, null); + } + } + + /** + * Dispatch a job. + */ + _dispatch() { + if (this._running === 0 && this._queue.length > 0) { + const job = this._queue.shift(); + this._job = job; + const width = job.inputs[0].width; + const height = job.inputs[0].height; + const buffers = job.inputs.map(function (input) { + return input.data.buffer; + }); + const threads = this._workers.length; + this._running = threads; + if (threads === 1) { + this._workers[0].postMessage( + { + buffers: buffers, + meta: job.meta, + imageOps: this._imageOps, + width: width, + height: height, + }, + buffers + ); + } else { + const length = job.inputs[0].data.length; + const segmentLength = 4 * Math.ceil(length / 4 / threads); + for (let i = 0; i < threads; ++i) { + const offset = i * segmentLength; + const slices = []; + for (let j = 0, jj = buffers.length; j < jj; ++j) { + slices.push(buffers[i].slice(offset, offset + segmentLength)); + } + this._workers[i].postMessage( + { + buffers: slices, + meta: job.meta, + imageOps: this._imageOps, + width: width, + height: height, + }, + slices + ); + } + } + } + } + + /** + * Handle messages from the worker. + * @param {number} index The worker index. + * @param {MessageEvent} event The message event. + */ + _onWorkerMessage(index, event) { + if (this.disposed) { + return; + } + this._dataLookup[index] = event.data; + --this._running; + if (this._running === 0) { + this._resolveJob(); + } + } + + /** + * Resolve a job. If there are no more worker threads, the processor callback + * will be called. + */ + _resolveJob() { + const job = this._job; + const threads = this._workers.length; + let data, meta; + if (threads === 1) { + data = new Uint8ClampedArray(this._dataLookup[0]['buffer']); + meta = this._dataLookup[0]['meta']; + } else { + const length = job.inputs[0].data.length; + data = new Uint8ClampedArray(length); + meta = new Array(length); + const segmentLength = 4 * Math.ceil(length / 4 / threads); + for (let i = 0; i < threads; ++i) { + const buffer = this._dataLookup[i]['buffer']; + const offset = i * segmentLength; + data.set(new Uint8ClampedArray(buffer), offset); + meta[i] = this._dataLookup[i]['meta']; + } + } + this._job = null; + this._dataLookup = {}; + job.callback( + null, + newImageData(data, job.inputs[0].width, job.inputs[0].height), + meta + ); + this._dispatch(); + } + + /** + * Terminate all workers associated with the processor. + */ + disposeInternal() { + for (let i = 0; i < this._workers.length; ++i) { + this._workers[i].terminate(); + } + this._workers.length = 0; + } +} + /** * A function that takes an array of input data, performs some operation, and * returns an array of output data. @@ -141,9 +484,9 @@ class RasterSource extends ImageSource { /** * @private - * @type {*} + * @type {Processor} */ - this.worker_ = null; + this.processor_ = null; /** * @private @@ -259,7 +602,11 @@ class RasterSource extends ImageSource { * @api */ setOperation(operation, opt_lib) { - this.worker_ = new Processor({ + if (this.processor_) { + this.processor_.dispose(); + } + + this.processor_ = new Processor({ operation: operation, imageOps: this.operationType_ === RasterOperationType.IMAGE, queue: 1, @@ -385,7 +732,7 @@ class RasterSource extends ImageSource { this.dispatchEvent( new RasterSourceEvent(RasterEventType.BEFOREOPERATIONS, frameState, data) ); - this.worker_.process( + this.processor_.process( imageDatas, data, this.onWorkerComplete_.bind(this, frameState) @@ -445,6 +792,12 @@ class RasterSource extends ImageSource { getImageInternal() { return null; // not implemented } + + disposeInternal() { + if (this.processor_) { + this.processor_.dispose(); + } + } } /** diff --git a/test/spec/ol/disposable.test.js b/test/spec/ol/disposable.test.js index a118a0a0b7..14d3669b07 100644 --- a/test/spec/ol/disposable.test.js +++ b/test/spec/ol/disposable.test.js @@ -8,16 +8,16 @@ describe('ol.Disposable', function () { }); }); - describe('#disposed_', function () { + describe('#disposed', function () { it('is initially false', function () { const disposable = new Disposable(); - expect(disposable.disposed_).to.be(false); + expect(disposable.disposed).to.be(false); }); it('is true after a call to dispose', function () { const disposable = new Disposable(); disposable.dispose(); - expect(disposable.disposed_).to.be(true); + expect(disposable.disposed).to.be(true); }); }); diff --git a/test/spec/ol/source/raster.test.js b/test/spec/ol/source/raster.test.js index 5a8facbb58..e3ca585d20 100644 --- a/test/spec/ol/source/raster.test.js +++ b/test/spec/ol/source/raster.test.js @@ -3,7 +3,10 @@ import ImageLayer from '../../../../src/ol/layer/Image.js'; import Map from '../../../../src/ol/Map.js'; import Point from '../../../../src/ol/geom/Point.js'; import Projection from '../../../../src/ol/proj/Projection.js'; -import RasterSource from '../../../../src/ol/source/Raster.js'; +import RasterSource, { + Processor, + newImageData, +} from '../../../../src/ol/source/Raster.js'; import Source from '../../../../src/ol/source/Source.js'; import Static from '../../../../src/ol/source/ImageStatic.js'; import TileSource from '../../../../src/ol/source/Tile.js'; @@ -135,6 +138,20 @@ where('Uint8ClampedArray').describe('ol.source.Raster', function () { view.setZoom(0); }); + it('disposes the processor when disposed', function () { + const source = new RasterSource({ + threads: 0, + sources: [redSource, greenSource, blueSource], + operation: function (inputs) { + return inputs[0]; + }, + }); + + source.dispose(); + + expect(source.processor_.disposed).to.be(true); + }); + it('allows operation type to be set to "image"', function (done) { const log = []; @@ -256,6 +273,17 @@ where('Uint8ClampedArray').describe('ol.source.Raster', function () { } }); }); + + it('disposes the previous processor', function () { + const previousProcessor = raster.processor_; + + raster.setOperation(function (pixels) { + return pixels[0]; + }); + + expect(previousProcessor.disposed).to.be(true); + expect(raster.processor_.disposed).to.be(false); + }); }); describe('beforeoperations', function () { @@ -388,3 +416,269 @@ where('Uint8ClampedArray').describe('ol.source.Raster', function () { }); }); }); + +where('Uint8ClampedArray').describe('Processor', function () { + const identity = function (inputs) { + return inputs[0]; + }; + + describe('constructor', function () { + it('creates a new processor', function () { + const processor = new Processor({ + operation: identity, + }); + + expect(processor).to.be.a(Processor); + }); + }); + + describe('#process()', function () { + it('calls operation with input pixels', function (done) { + const processor = new Processor({ + operation: function (inputs, meta) { + ++meta.count; + const pixel = inputs[0]; + for (let i = 0, ii = pixel.length; i < ii; ++i) { + meta.sum += pixel[i]; + } + return pixel; + }, + }); + + const array = new Uint8ClampedArray([1, 2, 3, 4, 5, 6, 7, 8]); + const input = newImageData(array, 1, 2); + + processor.process([input], {count: 0, sum: 0}, function (err, output, m) { + if (err) { + done(err); + return; + } + expect(m.count).to.equal(2); + expect(m.sum).to.equal(36); + done(); + }); + }); + + it('calls callback with processed image data', function (done) { + const processor = new Processor({ + operation: function (inputs) { + const pixel = inputs[0]; + pixel[0] *= 2; + pixel[1] *= 2; + pixel[2] *= 2; + pixel[3] *= 2; + return pixel; + }, + }); + + const array = new Uint8ClampedArray([1, 2, 3, 4, 5, 6, 7, 8]); + const input = newImageData(array, 1, 2); + + processor.process([input], {}, function (err, output, m) { + if (err) { + done(err); + return; + } + expect(output).to.be.a(ImageData); + expect(output.data).to.eql( + new Uint8ClampedArray([2, 4, 6, 8, 10, 12, 14, 16]) + ); + done(); + }); + }); + + it('allows library functions to be called', function (done) { + const lib = { + sum: function (a, b) { + return a + b; + }, + diff: function (a, b) { + return a - b; + }, + }; + + const normalizedDiff = function (pixels) { + const pixel = pixels[0]; + const r = pixel[0]; + const g = pixel[1]; + /* eslint-disable */ + var nd = diff(r, g) / sum(r, g); + /* eslint-enable */ + const index = Math.round((255 * (nd + 1)) / 2); + return [index, index, index, pixel[3]]; + }; + + const processor = new Processor({ + operation: normalizedDiff, + lib: lib, + }); + + const array = new Uint8ClampedArray([10, 2, 0, 0, 5, 8, 0, 1]); + const input = newImageData(array, 1, 2); + + processor.process([input], {}, function (err, output, m) { + if (err) { + done(err); + return; + } + expect(output).to.be.a(ImageData); + const v0 = Math.round((255 * (1 + 8 / 12)) / 2); + const v1 = Math.round((255 * (1 + -3 / 13)) / 2); + expect(output.data).to.eql( + new Uint8ClampedArray([v0, v0, v0, 0, v1, v1, v1, 1]) + ); + + done(); + }); + }); + + it('calls callbacks for each call', function (done) { + const processor = new Processor({ + operation: identity, + }); + + let calls = 0; + + function createCallback(index) { + return function (err, output, meta) { + if (err) { + done(err); + return; + } + expect(output).to.be.a(ImageData); + ++calls; + }; + } + + for (let i = 0; i < 5; ++i) { + const input = newImageData(new Uint8ClampedArray([1, 2, 3, 4]), 1, 1); + processor.process([input], {}, createCallback(i)); + } + + setTimeout(function () { + expect(calls).to.be(5); + done(); + }, 1000); + }); + + it('respects max queue length', function (done) { + const processor = new Processor({ + queue: 1, + operation: identity, + }); + + const log = []; + + function createCallback(index) { + return function (err, output, meta) { + if (err) { + done(err); + return; + } + log.push(output); + }; + } + + for (let i = 0; i < 5; ++i) { + const input = newImageData(new Uint8ClampedArray([1, 2, 3, 4]), 1, 1); + processor.process([input], {}, createCallback(i)); + } + + setTimeout(function () { + expect(log).to.have.length(5); + expect(log[0]).to.be(null); + expect(log[1]).to.be(null); + expect(log[2]).to.be(null); + expect(log[3]).to.be.a(ImageData); + expect(log[4]).to.be.a(ImageData); + done(); + }, 1000); + }); + }); + + describe('#process() - faux worker', function () { + let identitySpy; + beforeEach(function () { + identitySpy = sinon.spy(identity); + }); + + it('calls operation with input pixels', function (done) { + const processor = new Processor({ + threads: 0, + operation: identitySpy, + }); + + const array = new Uint8ClampedArray([1, 2, 3, 4, 5, 6, 7, 8]); + const input = newImageData(array, 1, 2); + + processor.process([input], {}, function (err, output, m) { + if (err) { + done(err); + return; + } + expect(identitySpy.callCount).to.be(2); + const first = identitySpy.getCall(0); + expect(first.args).to.have.length(2); + done(); + }); + }); + + it('passes meta object to operations', function (done) { + const processor = new Processor({ + threads: 0, + operation: identitySpy, + }); + + const array = new Uint8ClampedArray([1, 2, 3, 4]); + const input = newImageData(array, 1, 1); + const meta = {foo: 'bar'}; + + processor.process([input], meta, function (err, output, m) { + if (err) { + done(err); + return; + } + expect(m).to.eql(meta); + expect(identitySpy.callCount).to.be(1); + done(); + }); + }); + }); + + describe('#dispose()', function () { + it('stops callbacks from being called', function (done) { + const processor = new Processor({ + operation: identity, + }); + + const array = new Uint8ClampedArray([1, 2, 3, 4, 5, 6, 7, 8]); + const input = newImageData(array, 1, 2); + + processor.process([input], {}, function () { + done(new Error('Expected abort to stop callback from being called')); + }); + + processor.dispose(); + setTimeout(done, 500); + }); + }); + + describe('#dispose() - faux worker', function () { + it('stops callbacks from being called', function (done) { + const processor = new Processor({ + threads: 0, + operation: identity, + }); + + const array = new Uint8ClampedArray([1, 2, 3, 4, 5, 6, 7, 8]); + const input = newImageData(array, 1, 2); + + processor.process([input], {}, function () { + done(new Error('Expected abort to stop callback from being called')); + }); + + processor.dispose(); + setTimeout(done, 20); + }); + }); +});