一、CPU 密集型工作开发领导
CPU 密集型工作是指须要占用系统资源解决大量计算能力的工作,须要长时间运行,这段时间会阻塞线程其它事件的解决,不合适放在主线程进行。例如图像处理、视频编码、数据分析等。
基于多线程并发机制解决 CPU 密集型工作能够进步 CPU 利用率,晋升应用程序响应速度。
当进行一系列同步工作时,举荐应用 Worker;而进行大量或调度点较为扩散的独立工作时,不方便使用 8 个 Worker 去做负载治理,举荐采纳 TaskPool。接下来将以图像直方图解决以及后盾长时间的模型预测工作别离进行举例。
应用 TaskPool 进行图像直方图解决
1. 实现图像处理的业务逻辑。
2. 数据分段,将各段数据通过不同工作的执行实现图像处理。创立Task,通过execute()执行工作,在当前任务完结后,会将直方图处理结果同时返回。
3. 后果数组汇总解决。
import taskpool from '@ohos.taskpool';@Concurrentfunction imageProcessing(dataSlice: ArrayBuffer) { // 步骤1: 具体的图像处理操作及其他耗时操作 return dataSlice;}function histogramStatistic(pixelBuffer: ArrayBuffer) { // 步骤2: 分成三段并发调度 let number = pixelBuffer.byteLength / 3; let buffer1 = pixelBuffer.slice(0, number); let buffer2 = pixelBuffer.slice(number, number * 2); let buffer3 = pixelBuffer.slice(number * 2); let task1 = new taskpool.Task(imageProcessing, buffer1); let task2 = new taskpool.Task(imageProcessing, buffer2); let task3 = new taskpool.Task(imageProcessing, buffer3); taskpool.execute(task1).then((ret: ArrayBuffer[]) => { // 步骤3: 后果解决 }); taskpool.execute(task2).then((ret: ArrayBuffer[]) => { // 步骤3: 后果解决 }); taskpool.execute(task3).then((ret: ArrayBuffer[]) => { // 步骤3: 后果解决 });}@Entry@Componentstruct Index { @State message: string = 'Hello World' build() { Row() { Column() { Text(this.message) .fontSize(50) .fontWeight(FontWeight.Bold) .onClick(() => { let data: ArrayBuffer; histogramStatistic(data); }) } .width('100%') } .height('100%') }}
应用 Worker 进行长时间数据分析
本文通过某地区提供的房价数据训练一个繁难的房价预测模型,该模型反对通过输出屋宇面积和房间数量去预测该区域的房价,模型须要长时间运行,房价预测须要应用后面的模型运行后果,因而须要应用 Worker。
1. DevEco Studio 提供了 Worker 创立的模板,新建一个 Worker 线程,例如命名为“MyWorker”。
2. 在主线程中通过调用 ThreadWorker 的constructor()办法创立 Worker 对象,以后线程为宿主线程。
import worker from '@ohos.worker';const workerInstance = new worker.ThreadWorker('entry/ets/workers/MyWorker.ts');
3. 在宿主线程中通过调用onmessage()办法接管 Worker 线程发送过去的音讯,并通过调用postMessage()办法向 Worker 线程发送音讯。
例如向 Worker 线程发送训练和预测的音讯,同时接管 Worker 线程发送回来的音讯。
// 接管Worker子线程的后果workerInstance.onmessage = function(e) { // data:主线程发送的信息 let data = e.data; console.info('MyWorker.ts onmessage'); // 在Worker线程中进行耗时操作}workerInstance.onerror = function (d) { // 接管Worker子线程的错误信息}// 向Worker子线程发送训练音讯workerInstance.postMessage({ 'type': 0 });// 向Worker子线程发送预测音讯workerInstance.postMessage({ 'type': 1, 'value': [90, 5] });
4. 在 MyWorker.ts 文件中绑定 Worker 对象,以后线程为 Worker 线程。
import worker, { ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@ohos.worker';let workerPort: ThreadWorkerGlobalScope = worker.workerPort;
5. 在 Worker 线程中通过调用onmessage()办法接管宿主线程发送的音讯内容,并通过调用postMessage()办法向宿主线程发送音讯。
如在 Worker 线程中定义预测模型及其训练过程,同时与主线程进行信息交互。
import worker, { ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@ohos.worker';let workerPort: ThreadWorkerGlobalScope = worker.workerPort;// 定义训练模型及后果 let result;// 定义预测函数function predict(x) { return result[x];}// 定义优化器训练过程function optimize() { result = {};}// Worker线程的onmessage逻辑workerPort.onmessage = function (e: MessageEvents) { let data = e.data // 依据传输的数据的type抉择进行操作 switch (data.type) { case 0: // 进行训练 optimize(); // 训练之后发送主线程训练胜利的音讯 workerPort.postMessage({ type: 'message', value: 'train success.' }); break; case 1: // 执行预测 const output = predict(data.value); // 发送主线程预测的后果 workerPort.postMessage({ type: 'predict', value: output }); break; default: workerPort.postMessage({ type: 'message', value: 'send message is invalid' }); break; }}
6. 在 Worker 线程中实现工作之后,执行 Worker 线程销毁操作。销毁线程的形式次要有两种:依据须要能够在宿主线程中对 Worker 线程进行销毁;也能够在 Worker 线程中被动销毁 Worker 线程。在宿主线程中通过调用onexit()办法定义 Worker 线程销毁后的解决逻辑。
// Worker线程销毁后,执行onexit回调办法workerInstance.onexit = function() { console.info("main thread terminate");}
形式一:在宿主线程中通过调用terminate()办法销毁 Worker 线程,并终止 Worker 接管音讯。
// 销毁Worker线程workerInstance.terminate();
形式二:在 Worker 线程中通过调用close()办法被动销毁 Worker 线程,并终止 Worker 接管音讯。
// 销毁线程workerPort.close();
二、 I/O 密集型工作开发领导
应用异步并发能够解决单次 I/O 工作阻塞的问题,然而如果遇到 I/O 密集型工作,同样会阻塞线程中其它工作的执行,这时须要应用多线程并发能力来进行解决。
I/O 密集型工作的性能重点通常不在于 CPU 的解决能力,而在于 I/O 操作的速度和效率。这种工作通常须要频繁地进行磁盘读写、网络通信等操作。此处以频繁读写系统文件来模仿 I/O 密集型并发工作的解决。
1. 定义并发函数,外部密集调用 I/O 能力。
import fs from '@ohos.file.fs';// 定义并发函数,外部密集调用I/O能力@Concurrentasync function concurrentTest(fileList: string[]) { // 写入文件的实现 async function write(data, filePath) { let file = await fs.open(filePath, fs.OpenMode.READ_WRITE); await fs.write(file.fd, data); fs.close(file); } // 循环写文件操作 for (let i = 0; i < fileList.length; i++) { write('Hello World!', fileList[i]).then(() => { console.info(`Succeeded in writing the file. FileList: ${fileList[i]}`); }).catch((err) => { console.error(`Failed to write the file. Code is ${err.code}, message is ${err.message}`) return false; }) } return true;}
2. 应用 TaskPool 执行蕴含密集 I/O 的并发函数:通过调用execute()办法执行工作,并在回调中进行调度后果解决。示例中的 filePath1 和 filePath2 的获取形式请参见获取利用文件门路。
import taskpool from '@ohos.taskpool';let filePath1 = ...; // 利用文件门路let filePath2 = ...;// 应用TaskPool执行蕴含密集I/O的并发函数// 数组较大时,I/O密集型工作工作散发也会抢占主线程,须要应用多线程能力taskpool.execute(concurrentTest, [filePath1, filePath2]).then((ret) => { // 调度后果解决 console.info(`The result: ${ret}`);})