Include pixelworks and terminate workers before creating new ones

This commit is contained in:
Tim Schaub
2020-05-16 21:33:39 -06:00
parent bfc035415e
commit b16c2e0062
6 changed files with 660 additions and 19 deletions

5
package-lock.json generated
View File

@@ -9424,11 +9424,6 @@
}
}
},
"pixelworks": {
"version": "1.1.0",
"resolved": "https://registry.npmjs.org/pixelworks/-/pixelworks-1.1.0.tgz",
"integrity": "sha1-Hwla1I3Ki/ihyCWOAJIDGkTyLKU="
},
"pkg-dir": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/pkg-dir/-/pkg-dir-3.0.0.tgz",

View File

@@ -44,7 +44,6 @@
"elm-pep": "^1.0.4",
"ol-mapbox-style": "^6.1.1",
"pbf": "3.2.1",
"pixelworks": "1.1.0",
"rbush": "^3.0.1"
},
"devDependencies": {

View File

@@ -11,17 +11,17 @@ class Disposable {
/**
* The object has already been disposed.
* @type {boolean}
* @private
* @protected
*/
this.disposed_ = false;
this.disposed = false;
}
/**
* Clean up.
*/
dispose() {
if (!this.disposed_) {
this.disposed_ = true;
if (!this.disposed) {
this.disposed = true;
this.disposeInternal();
}
}

View File

@@ -1,6 +1,7 @@
/**
* @module ol/source/Raster
*/
import Disposable from '../Disposable.js';
import Event from '../events/Event.js';
import EventType from '../events/EventType.js';
import ImageCanvas from '../ImageCanvas.js';
@@ -11,12 +12,354 @@ import SourceState from './State.js';
import TileLayer from '../layer/Tile.js';
import TileQueue from '../TileQueue.js';
import TileSource from './Tile.js';
import {Processor} from 'pixelworks/lib/index.js';
import {assign} from '../obj.js';
import {createCanvasContext2D} from '../dom.js';
import {create as createTransform} from '../transform.js';
import {equals, getCenter, getHeight, getWidth} from '../extent.js';
let hasImageData = true;
try {
new ImageData(10, 10);
} catch (_) {
hasImageData = false;
}
const context = document.createElement('canvas').getContext('2d');
/**
* @param {Uint8ClampedArray} data Image data.
* @param {number} width Number of columns.
* @param {number} height Number of rows.
* @return {ImageData} Image data.
*/
export function newImageData(data, width, height) {
if (hasImageData) {
return new ImageData(data, width, height);
} else {
const imageData = context.createImageData(width, height);
imageData.data.set(data);
return imageData;
}
}
/* istanbul ignore next */
/**
* Create a function for running operations. This function is serialized for
* use in a worker.
* @param {function(Array, Object):*} operation The operation.
* @return {function(Object):ArrayBuffer} A function that takes an object with
* buffers, meta, imageOps, width, and height properties and returns an array
* buffer.
*/
function createMinion(operation) {
let workerHasImageData = true;
try {
new ImageData(10, 10);
} catch (_) {
workerHasImageData = false;
}
function newWorkerImageData(data, width, height) {
if (workerHasImageData) {
return new ImageData(data, width, height);
} else {
return {data: data, width: width, height: height};
}
}
return function (data) {
// bracket notation for minification support
const buffers = data['buffers'];
const meta = data['meta'];
const imageOps = data['imageOps'];
const width = data['width'];
const height = data['height'];
const numBuffers = buffers.length;
const numBytes = buffers[0].byteLength;
let output, b;
if (imageOps) {
const images = new Array(numBuffers);
for (b = 0; b < numBuffers; ++b) {
images[b] = newWorkerImageData(
new Uint8ClampedArray(buffers[b]),
width,
height
);
}
output = operation(images, meta).data;
} else {
output = new Uint8ClampedArray(numBytes);
const arrays = new Array(numBuffers);
const pixels = new Array(numBuffers);
for (b = 0; b < numBuffers; ++b) {
arrays[b] = new Uint8ClampedArray(buffers[b]);
pixels[b] = [0, 0, 0, 0];
}
for (let i = 0; i < numBytes; i += 4) {
for (let j = 0; j < numBuffers; ++j) {
const array = arrays[j];
pixels[j][0] = array[i];
pixels[j][1] = array[i + 1];
pixels[j][2] = array[i + 2];
pixels[j][3] = array[i + 3];
}
const pixel = operation(pixels, meta);
output[i] = pixel[0];
output[i + 1] = pixel[1];
output[i + 2] = pixel[2];
output[i + 3] = pixel[3];
}
}
return output.buffer;
};
}
/**
* Create a worker for running operations.
* @param {Object} config Configuration.
* @param {function(MessageEvent): void} onMessage Called with a message event.
* @return {Worker} The worker.
*/
function createWorker(config, onMessage) {
const lib = Object.keys(config.lib || {}).map(function (name) {
return 'var ' + name + ' = ' + config.lib[name].toString() + ';';
});
const lines = lib.concat([
'var __minion__ = (' + createMinion.toString() + ')(',
config.operation.toString(),
');',
'self.addEventListener("message", function(event) {',
' var buffer = __minion__(event.data);',
' self.postMessage({buffer: buffer, meta: event.data.meta}, [buffer]);',
'});',
]);
const blob = new Blob(lines, {type: 'text/javascript'});
const source = URL.createObjectURL(blob);
const worker = new Worker(source);
worker.addEventListener('message', onMessage);
return worker;
}
/**
* @typedef {Object} FauxMessageEvent
* @property {Object} data Message data.
*/
/**
* Create a faux worker for running operations.
* @param {ProcessorOptions} config Configuration.
* @param {function(FauxMessageEvent): void} onMessage Called with a message event.
* @return {Object} The faux worker.
*/
function createFauxWorker(config, onMessage) {
const minion = createMinion(config.operation);
let terminated = false;
return {
postMessage: function (data) {
setTimeout(function () {
if (terminated) {
return;
}
onMessage({data: {buffer: minion(data), meta: data['meta']}});
}, 0);
},
terminate: function () {
terminated = true;
},
};
}
/**
* @typedef {Object} ProcessorOptions
* @property {number} threads Number of workers to spawn.
* @property {function(Array, Object):*} operation The operation.
* @property {Object} [lib] Functions that will be made available to operations run in a worker.
* @property {number} queue The number of queued jobs to allow.
* @property {boolean} [imageOps=false] Pass all the image data to the operation instead of a single pixel.
*/
/**
* @classdesc
* A processor runs pixel or image operations in workers.
*/
export class Processor extends Disposable {
/**
* @param {ProcessorOptions} config Configuration.
*/
constructor(config) {
super();
this._imageOps = !!config.imageOps;
let threads;
if (config.threads === 0) {
threads = 0;
} else if (this._imageOps) {
threads = 1;
} else {
threads = config.threads || 1;
}
const workers = [];
if (threads) {
for (let i = 0; i < threads; ++i) {
workers[i] = createWorker(config, this._onWorkerMessage.bind(this, i));
}
} else {
workers[0] = createFauxWorker(
config,
this._onWorkerMessage.bind(this, 0)
);
}
this._workers = workers;
this._queue = [];
this._maxQueueLength = config.queue || Infinity;
this._running = 0;
this._dataLookup = {};
this._job = null;
}
/**
* Run operation on input data.
* @param {Array.<Array|ImageData>} inputs Array of pixels or image data
* (depending on the operation type).
* @param {Object} meta A user data object. This is passed to all operations
* and must be serializable.
* @param {function(Error, ImageData, Object): void} callback Called when work
* completes. The first argument is any error. The second is the ImageData
* generated by operations. The third is the user data object.
*/
process(inputs, meta, callback) {
this._enqueue({
inputs: inputs,
meta: meta,
callback: callback,
});
this._dispatch();
}
/**
* Add a job to the queue.
* @param {Object} job The job.
*/
_enqueue(job) {
this._queue.push(job);
while (this._queue.length > this._maxQueueLength) {
this._queue.shift().callback(null, null);
}
}
/**
* Dispatch a job.
*/
_dispatch() {
if (this._running === 0 && this._queue.length > 0) {
const job = this._queue.shift();
this._job = job;
const width = job.inputs[0].width;
const height = job.inputs[0].height;
const buffers = job.inputs.map(function (input) {
return input.data.buffer;
});
const threads = this._workers.length;
this._running = threads;
if (threads === 1) {
this._workers[0].postMessage(
{
buffers: buffers,
meta: job.meta,
imageOps: this._imageOps,
width: width,
height: height,
},
buffers
);
} else {
const length = job.inputs[0].data.length;
const segmentLength = 4 * Math.ceil(length / 4 / threads);
for (let i = 0; i < threads; ++i) {
const offset = i * segmentLength;
const slices = [];
for (let j = 0, jj = buffers.length; j < jj; ++j) {
slices.push(buffers[i].slice(offset, offset + segmentLength));
}
this._workers[i].postMessage(
{
buffers: slices,
meta: job.meta,
imageOps: this._imageOps,
width: width,
height: height,
},
slices
);
}
}
}
}
/**
* Handle messages from the worker.
* @param {number} index The worker index.
* @param {MessageEvent} event The message event.
*/
_onWorkerMessage(index, event) {
if (this.disposed) {
return;
}
this._dataLookup[index] = event.data;
--this._running;
if (this._running === 0) {
this._resolveJob();
}
}
/**
* Resolve a job. If there are no more worker threads, the processor callback
* will be called.
*/
_resolveJob() {
const job = this._job;
const threads = this._workers.length;
let data, meta;
if (threads === 1) {
data = new Uint8ClampedArray(this._dataLookup[0]['buffer']);
meta = this._dataLookup[0]['meta'];
} else {
const length = job.inputs[0].data.length;
data = new Uint8ClampedArray(length);
meta = new Array(length);
const segmentLength = 4 * Math.ceil(length / 4 / threads);
for (let i = 0; i < threads; ++i) {
const buffer = this._dataLookup[i]['buffer'];
const offset = i * segmentLength;
data.set(new Uint8ClampedArray(buffer), offset);
meta[i] = this._dataLookup[i]['meta'];
}
}
this._job = null;
this._dataLookup = {};
job.callback(
null,
newImageData(data, job.inputs[0].width, job.inputs[0].height),
meta
);
this._dispatch();
}
/**
* Terminate all workers associated with the processor.
*/
disposeInternal() {
for (let i = 0; i < this._workers.length; ++i) {
this._workers[i].terminate();
}
this._workers.length = 0;
}
}
/**
* A function that takes an array of input data, performs some operation, and
* returns an array of output data.
@@ -141,9 +484,9 @@ class RasterSource extends ImageSource {
/**
* @private
* @type {*}
* @type {Processor}
*/
this.worker_ = null;
this.processor_ = null;
/**
* @private
@@ -259,7 +602,11 @@ class RasterSource extends ImageSource {
* @api
*/
setOperation(operation, opt_lib) {
this.worker_ = new Processor({
if (this.processor_) {
this.processor_.dispose();
}
this.processor_ = new Processor({
operation: operation,
imageOps: this.operationType_ === RasterOperationType.IMAGE,
queue: 1,
@@ -385,7 +732,7 @@ class RasterSource extends ImageSource {
this.dispatchEvent(
new RasterSourceEvent(RasterEventType.BEFOREOPERATIONS, frameState, data)
);
this.worker_.process(
this.processor_.process(
imageDatas,
data,
this.onWorkerComplete_.bind(this, frameState)
@@ -445,6 +792,12 @@ class RasterSource extends ImageSource {
getImageInternal() {
return null; // not implemented
}
disposeInternal() {
if (this.processor_) {
this.processor_.dispose();
}
}
}
/**

View File

@@ -8,16 +8,16 @@ describe('ol.Disposable', function () {
});
});
describe('#disposed_', function () {
describe('#disposed', function () {
it('is initially false', function () {
const disposable = new Disposable();
expect(disposable.disposed_).to.be(false);
expect(disposable.disposed).to.be(false);
});
it('is true after a call to dispose', function () {
const disposable = new Disposable();
disposable.dispose();
expect(disposable.disposed_).to.be(true);
expect(disposable.disposed).to.be(true);
});
});

View File

@@ -3,7 +3,10 @@ import ImageLayer from '../../../../src/ol/layer/Image.js';
import Map from '../../../../src/ol/Map.js';
import Point from '../../../../src/ol/geom/Point.js';
import Projection from '../../../../src/ol/proj/Projection.js';
import RasterSource from '../../../../src/ol/source/Raster.js';
import RasterSource, {
Processor,
newImageData,
} from '../../../../src/ol/source/Raster.js';
import Source from '../../../../src/ol/source/Source.js';
import Static from '../../../../src/ol/source/ImageStatic.js';
import TileSource from '../../../../src/ol/source/Tile.js';
@@ -135,6 +138,20 @@ where('Uint8ClampedArray').describe('ol.source.Raster', function () {
view.setZoom(0);
});
it('disposes the processor when disposed', function () {
const source = new RasterSource({
threads: 0,
sources: [redSource, greenSource, blueSource],
operation: function (inputs) {
return inputs[0];
},
});
source.dispose();
expect(source.processor_.disposed).to.be(true);
});
it('allows operation type to be set to "image"', function (done) {
const log = [];
@@ -256,6 +273,17 @@ where('Uint8ClampedArray').describe('ol.source.Raster', function () {
}
});
});
it('disposes the previous processor', function () {
const previousProcessor = raster.processor_;
raster.setOperation(function (pixels) {
return pixels[0];
});
expect(previousProcessor.disposed).to.be(true);
expect(raster.processor_.disposed).to.be(false);
});
});
describe('beforeoperations', function () {
@@ -388,3 +416,269 @@ where('Uint8ClampedArray').describe('ol.source.Raster', function () {
});
});
});
where('Uint8ClampedArray').describe('Processor', function () {
const identity = function (inputs) {
return inputs[0];
};
describe('constructor', function () {
it('creates a new processor', function () {
const processor = new Processor({
operation: identity,
});
expect(processor).to.be.a(Processor);
});
});
describe('#process()', function () {
it('calls operation with input pixels', function (done) {
const processor = new Processor({
operation: function (inputs, meta) {
++meta.count;
const pixel = inputs[0];
for (let i = 0, ii = pixel.length; i < ii; ++i) {
meta.sum += pixel[i];
}
return pixel;
},
});
const array = new Uint8ClampedArray([1, 2, 3, 4, 5, 6, 7, 8]);
const input = newImageData(array, 1, 2);
processor.process([input], {count: 0, sum: 0}, function (err, output, m) {
if (err) {
done(err);
return;
}
expect(m.count).to.equal(2);
expect(m.sum).to.equal(36);
done();
});
});
it('calls callback with processed image data', function (done) {
const processor = new Processor({
operation: function (inputs) {
const pixel = inputs[0];
pixel[0] *= 2;
pixel[1] *= 2;
pixel[2] *= 2;
pixel[3] *= 2;
return pixel;
},
});
const array = new Uint8ClampedArray([1, 2, 3, 4, 5, 6, 7, 8]);
const input = newImageData(array, 1, 2);
processor.process([input], {}, function (err, output, m) {
if (err) {
done(err);
return;
}
expect(output).to.be.a(ImageData);
expect(output.data).to.eql(
new Uint8ClampedArray([2, 4, 6, 8, 10, 12, 14, 16])
);
done();
});
});
it('allows library functions to be called', function (done) {
const lib = {
sum: function (a, b) {
return a + b;
},
diff: function (a, b) {
return a - b;
},
};
const normalizedDiff = function (pixels) {
const pixel = pixels[0];
const r = pixel[0];
const g = pixel[1];
/* eslint-disable */
var nd = diff(r, g) / sum(r, g);
/* eslint-enable */
const index = Math.round((255 * (nd + 1)) / 2);
return [index, index, index, pixel[3]];
};
const processor = new Processor({
operation: normalizedDiff,
lib: lib,
});
const array = new Uint8ClampedArray([10, 2, 0, 0, 5, 8, 0, 1]);
const input = newImageData(array, 1, 2);
processor.process([input], {}, function (err, output, m) {
if (err) {
done(err);
return;
}
expect(output).to.be.a(ImageData);
const v0 = Math.round((255 * (1 + 8 / 12)) / 2);
const v1 = Math.round((255 * (1 + -3 / 13)) / 2);
expect(output.data).to.eql(
new Uint8ClampedArray([v0, v0, v0, 0, v1, v1, v1, 1])
);
done();
});
});
it('calls callbacks for each call', function (done) {
const processor = new Processor({
operation: identity,
});
let calls = 0;
function createCallback(index) {
return function (err, output, meta) {
if (err) {
done(err);
return;
}
expect(output).to.be.a(ImageData);
++calls;
};
}
for (let i = 0; i < 5; ++i) {
const input = newImageData(new Uint8ClampedArray([1, 2, 3, 4]), 1, 1);
processor.process([input], {}, createCallback(i));
}
setTimeout(function () {
expect(calls).to.be(5);
done();
}, 1000);
});
it('respects max queue length', function (done) {
const processor = new Processor({
queue: 1,
operation: identity,
});
const log = [];
function createCallback(index) {
return function (err, output, meta) {
if (err) {
done(err);
return;
}
log.push(output);
};
}
for (let i = 0; i < 5; ++i) {
const input = newImageData(new Uint8ClampedArray([1, 2, 3, 4]), 1, 1);
processor.process([input], {}, createCallback(i));
}
setTimeout(function () {
expect(log).to.have.length(5);
expect(log[0]).to.be(null);
expect(log[1]).to.be(null);
expect(log[2]).to.be(null);
expect(log[3]).to.be.a(ImageData);
expect(log[4]).to.be.a(ImageData);
done();
}, 1000);
});
});
describe('#process() - faux worker', function () {
let identitySpy;
beforeEach(function () {
identitySpy = sinon.spy(identity);
});
it('calls operation with input pixels', function (done) {
const processor = new Processor({
threads: 0,
operation: identitySpy,
});
const array = new Uint8ClampedArray([1, 2, 3, 4, 5, 6, 7, 8]);
const input = newImageData(array, 1, 2);
processor.process([input], {}, function (err, output, m) {
if (err) {
done(err);
return;
}
expect(identitySpy.callCount).to.be(2);
const first = identitySpy.getCall(0);
expect(first.args).to.have.length(2);
done();
});
});
it('passes meta object to operations', function (done) {
const processor = new Processor({
threads: 0,
operation: identitySpy,
});
const array = new Uint8ClampedArray([1, 2, 3, 4]);
const input = newImageData(array, 1, 1);
const meta = {foo: 'bar'};
processor.process([input], meta, function (err, output, m) {
if (err) {
done(err);
return;
}
expect(m).to.eql(meta);
expect(identitySpy.callCount).to.be(1);
done();
});
});
});
describe('#dispose()', function () {
it('stops callbacks from being called', function (done) {
const processor = new Processor({
operation: identity,
});
const array = new Uint8ClampedArray([1, 2, 3, 4, 5, 6, 7, 8]);
const input = newImageData(array, 1, 2);
processor.process([input], {}, function () {
done(new Error('Expected abort to stop callback from being called'));
});
processor.dispose();
setTimeout(done, 500);
});
});
describe('#dispose() - faux worker', function () {
it('stops callbacks from being called', function (done) {
const processor = new Processor({
threads: 0,
operation: identity,
});
const array = new Uint8ClampedArray([1, 2, 3, 4, 5, 6, 7, 8]);
const input = newImageData(array, 1, 2);
processor.process([input], {}, function () {
done(new Error('Expected abort to stop callback from being called'));
});
processor.dispose();
setTimeout(done, 20);
});
});
});