Additional docs and type checking for raster source

This commit is contained in:
Tim Schaub
2021-08-26 11:04:07 -06:00
parent 3b6bf14cdc
commit de9ff20f65
3 changed files with 158 additions and 86 deletions

View File

@@ -46,12 +46,21 @@ export function newImageData(data, width, height) {
return imageData;
}
/**
* @typedef {Object} MinionData
* @property {Array<ArrayBuffer>} buffers Array of buffers.
* @property {Object} meta Operation metadata.
* @property {boolean} imageOps The operation is an image operation.
* @property {number} width The width of the image.
* @property {number} height The height of the image.
*/
/* istanbul ignore next */
/**
* Create a function for running operations. This function is serialized for
* use in a worker.
* @param {function(Array, Object):*} operation The operation.
* @return {function(Object):ArrayBuffer} A function that takes an object with
* @return {function(MinionData):ArrayBuffer} A function that takes an object with
* buffers, meta, imageOps, width, and height properties and returns an array
* buffer.
*/
@@ -81,40 +90,40 @@ function createMinion(operation) {
const numBuffers = buffers.length;
const numBytes = buffers[0].byteLength;
let output, b;
if (imageOps) {
const images = new Array(numBuffers);
for (b = 0; b < numBuffers; ++b) {
for (let b = 0; b < numBuffers; ++b) {
images[b] = newWorkerImageData(
new Uint8ClampedArray(buffers[b]),
width,
height
);
}
output = operation(images, meta).data;
} else {
output = new Uint8ClampedArray(numBytes);
const arrays = new Array(numBuffers);
const pixels = new Array(numBuffers);
for (b = 0; b < numBuffers; ++b) {
arrays[b] = new Uint8ClampedArray(buffers[b]);
pixels[b] = [0, 0, 0, 0];
}
for (let i = 0; i < numBytes; i += 4) {
for (let j = 0; j < numBuffers; ++j) {
const array = arrays[j];
pixels[j][0] = array[i];
pixels[j][1] = array[i + 1];
pixels[j][2] = array[i + 2];
pixels[j][3] = array[i + 3];
}
const pixel = operation(pixels, meta);
output[i] = pixel[0];
output[i + 1] = pixel[1];
output[i + 2] = pixel[2];
output[i + 3] = pixel[3];
const output = operation(images, meta).data;
return output.buffer;
}
const output = new Uint8ClampedArray(numBytes);
const arrays = new Array(numBuffers);
const pixels = new Array(numBuffers);
for (let b = 0; b < numBuffers; ++b) {
arrays[b] = new Uint8ClampedArray(buffers[b]);
pixels[b] = [0, 0, 0, 0];
}
for (let i = 0; i < numBytes; i += 4) {
for (let j = 0; j < numBuffers; ++j) {
const array = arrays[j];
pixels[j][0] = array[i];
pixels[j][1] = array[i + 1];
pixels[j][2] = array[i + 2];
pixels[j][3] = array[i + 3];
}
const pixel = operation(pixels, meta);
output[i] = pixel[0];
output[i + 1] = pixel[1];
output[i + 2] = pixel[2];
output[i + 3] = pixel[3];
}
return output.buffer;
};
@@ -122,7 +131,7 @@ function createMinion(operation) {
/**
* Create a worker for running operations.
* @param {Object} config Configuration.
* @param {ProcessorOptions} config Processor options.
* @param {function(MessageEvent): void} onMessage Called with a message event.
* @return {Worker} The worker.
*/
@@ -177,11 +186,22 @@ function createFauxWorker(config, onMessage) {
};
}
/**
* @typedef {function(Error, ImageData, (Object|Array<Object>)): void} JobCallback
*/
/**
* @typedef {Object} Job
* @property {Object} meta Job metadata.
* @property {Array<ImageData>} inputs Array of input data.
* @property {JobCallback} callback Called when the job is complete.
*/
/**
* @typedef {Object} ProcessorOptions
* @property {number} threads Number of workers to spawn.
* @property {function(Array, Object):*} operation The operation.
* @property {Object} [lib] Functions that will be made available to operations run in a worker.
* @property {Operation} operation The operation.
* @property {Object<string, Function>} [lib] Functions that will be made available to operations run in a worker.
* @property {number} queue The number of queued jobs to allow.
* @property {boolean} [imageOps=false] Pass all the image data to the operation instead of a single pixel.
*/
@@ -206,7 +226,11 @@ export class Processor extends Disposable {
} else {
threads = config.threads || 1;
}
const workers = [];
/**
* @type {Array<Worker>}
*/
const workers = new Array(threads);
if (threads) {
for (let i = 0; i < threads; ++i) {
workers[i] = createWorker(config, this._onWorkerMessage.bind(this, i));
@@ -218,17 +242,32 @@ export class Processor extends Disposable {
);
}
this._workers = workers;
/**
* @type {Array<Job>}
* @private
*/
this._queue = [];
this._maxQueueLength = config.queue || Infinity;
this._running = 0;
/**
* @type {Object<number, any>}
* @private
*/
this._dataLookup = {};
/**
* @type {Job}
* @private
*/
this._job = null;
}
/**
* Run operation on input data.
* @param {Array<Array|ImageData>} inputs Array of pixels or image data
* (depending on the operation type).
* @param {Array<ImageData>} inputs Array of image data.
* @param {Object} meta A user data object. This is passed to all operations
* and must be serializable.
* @param {function(Error, ImageData, Object): void} callback Called when work
@@ -246,7 +285,7 @@ export class Processor extends Disposable {
/**
* Add a job to the queue.
* @param {Object} job The job.
* @param {Job} job The job.
*/
_enqueue(job) {
this._queue.push(job);
@@ -259,48 +298,51 @@ export class Processor extends Disposable {
* Dispatch a job.
*/
_dispatch() {
if (this._running === 0 && this._queue.length > 0) {
const job = this._queue.shift();
this._job = job;
const width = job.inputs[0].width;
const height = job.inputs[0].height;
const buffers = job.inputs.map(function (input) {
return input.data.buffer;
});
const threads = this._workers.length;
this._running = threads;
if (threads === 1) {
this._workers[0].postMessage(
{
buffers: buffers,
meta: job.meta,
imageOps: this._imageOps,
width: width,
height: height,
},
buffers
);
} else {
const length = job.inputs[0].data.length;
const segmentLength = 4 * Math.ceil(length / 4 / threads);
for (let i = 0; i < threads; ++i) {
const offset = i * segmentLength;
const slices = [];
for (let j = 0, jj = buffers.length; j < jj; ++j) {
slices.push(buffers[j].slice(offset, offset + segmentLength));
}
this._workers[i].postMessage(
{
buffers: slices,
meta: job.meta,
imageOps: this._imageOps,
width: width,
height: height,
},
slices
);
}
if (this._running || this._queue.length === 0) {
return;
}
const job = this._queue.shift();
this._job = job;
const width = job.inputs[0].width;
const height = job.inputs[0].height;
const buffers = job.inputs.map(function (input) {
return input.data.buffer;
});
const threads = this._workers.length;
this._running = threads;
if (threads === 1) {
this._workers[0].postMessage(
{
buffers: buffers,
meta: job.meta,
imageOps: this._imageOps,
width: width,
height: height,
},
buffers
);
return;
}
const length = job.inputs[0].data.length;
const segmentLength = 4 * Math.ceil(length / 4 / threads);
for (let i = 0; i < threads; ++i) {
const offset = i * segmentLength;
const slices = [];
for (let j = 0, jj = buffers.length; j < jj; ++j) {
slices.push(buffers[j].slice(offset, offset + segmentLength));
}
this._workers[i].postMessage(
{
buffers: slices,
meta: job.meta,
imageOps: this._imageOps,
width: width,
height: height,
},
slices
);
}
}
@@ -334,7 +376,7 @@ export class Processor extends Disposable {
} else {
const length = job.inputs[0].data.length;
data = new Uint8ClampedArray(length);
meta = new Array(length);
meta = new Array(threads);
const segmentLength = 4 * Math.ceil(length / 4 / threads);
for (let i = 0; i < threads; ++i) {
const buffer = this._dataLookup[i]['buffer'];
@@ -388,14 +430,17 @@ export class Processor extends Disposable {
*/
const RasterEventType = {
/**
* Triggered before operations are run.
* Triggered before operations are run. Listeners will receive an event object with
* a `data` property that can be used to make data available to operations.
* @event module:ol/source/Raster.RasterSourceEvent#beforeoperations
* @api
*/
BEFOREOPERATIONS: 'beforeoperations',
/**
* Triggered after operations are run.
* Triggered after operations are run. Listeners will receive an event object with
* a `data` property. If more than one thread is used, `data` will be an array of
* objects. If a single thread is used, `data` will be a single object.
* @event module:ol/source/Raster.RasterSourceEvent#afteroperations
* @api
*/
@@ -424,7 +469,8 @@ export class RasterSourceEvent extends Event {
/**
* @param {string} type Type.
* @param {import("../PluggableMap.js").FrameState} frameState The frame state.
* @param {Object} data An object made available to operations.
* @param {Object|Array<Object>} data An object made available to operations. For "afteroperations" evenets
* this will be an array of objects if more than one thread is used.
*/
constructor(type, frameState, data) {
super(type);
@@ -776,7 +822,7 @@ class RasterSource extends ImageSource {
* @param {import("../PluggableMap.js").FrameState} frameState The frame state.
* @param {Error} err Any error during processing.
* @param {ImageData} output The output image data.
* @param {Object} data The user data.
* @param {Object|Array<Object>} data The user data (or an array if more than one thread).
* @private
*/
onWorkerComplete_(frameState, err, output, data) {