多线程并发
多线程并发是指在单个程序中同时运行多个线程,通过并行或交替执行任务来提升性能和资源利用率的编程模型。
并发模型用于实现不同应用场景中的并发任务。常见的并发模型有基于内存共享的模型和基于消息通信的模型。
Actor并发模型是基于消息通信的典型并发模型。开发者无需处理锁带来的复杂问题,且具备高并发度,因此应用广泛。
当前ArkTS提供了TaskPool和Worker两种并发能力,两者均基于Actor并发模型实现。
| 内存共享模型 | Actor模型 |
|---|---|
![]() | ![]() |
| 为了避免不同生产者或消费者同时访问同一块共享内存容器时产生脏读、脏写现象,同一时间只能有一个生产者或消费者访问该容器。即不同生产者和消费者需争夺使用容器的锁。当一个角色获取锁后 ,其他角色需等待该角色释放锁,才能重新尝试获取锁以访问该容器。 | Actor模型中,不同角色之间并不共享内存,生产者线程和UI线程都有自己的虚拟机实例,两个虚拟机实例之间拥有独占的内存,相互隔离。生产者生产出结果后,通过序列化通信将结果发送给UI线程。UI线程消费结果后,再发送新的生产任务给生产者线程。 |
TaskPool使用
定义并发函数:使用TaskPool时,执行的并发函数必须用@Concurrent装饰器修饰,否则无法通过校验。
// 核心1:定义并发任务函数(API20要求顶层函数)
@Concurrent
function calculateSum(start: number, end: number): number {
//模拟耗时计算( start ~ end 的累加,模拟密集型任务)
let sum = 0;
for (let i = start; i < end; i++) {
sum += i;
}
console.log(`dxin => ${start} ~ ${end} 的和为:${sum}`)
return sum;
}
创建任务并提交
// 核心2:执行单个并发任务
private async runSingleTask() {
try {
//1.创建Task对象(绑定并发函数和参数)
const task = new taskpool.Task(calculateSum, 1, 100000);
//2.提交到TaskPool执行(异步等待结果)
const sum = await taskpool.execute(task);
console.log(`dxin => sum : ${sum}`)
//3.处理结果
this.result = `${sum}`;
} catch (err) {
this.result = `任务失败:${(err as Error).message}`
console.log(`dxin => 单个并发任务执行失败`, err)
}
}
批量处理任务:
// 核心3:执行多个并发任务(并行)
private async runBatchTasks() {
try {
this.result = '批量任务执行中...'
// 1。创建TaskGroup(API 20 批量任务的标准管理方式)
const taskGroup = new taskpool.TaskGroup()
//2.向TaskGroup中添加多个Task
taskGroup.addTask(new taskpool.Task(calculateSum, 1, 100000))
taskGroup.addTask(new taskpool.Task(calculateSum, 300000, 600000))
taskGroup.addTask(new taskpool.Task(calculateSum, 600000, 1000000))
// 3.提交TaskGroup 执行(API 20 支持TaskGroup 作为execute 参数)
const results = await taskpool.execute(taskGroup) as number[];
// 4。求和 汇总结果(results 是按 addTask 顺序返回的结果数组)
const totalSum = results.reduce((a, b) => a + b, 0);
this.result = `批量任务完成:\n3个任务并行执行\n各任务结果:${results}\n总和=${totalSum}`;
} catch (err) {
this.result = `批量任务失败:${(err as Error).message}`;
}
}
TaskPool案例:
// TaskPool.ets
import { taskpool } from '@kit.ArkTS';
// 核心1:定义并发任务函数(API20要求顶层函数)
@Concurrent
function calculateSum(start: number, end: number): number {
//模拟耗时计算( start ~ end 的累加,模拟密集型任务)
let sum = 0;
for (let i = start; i < end; i++) {
sum += i;
}
console.log(`dxin => ${start} ~ ${end} 的和为:${sum}`)
return sum;
}
@Entry
@ComponentV2
struct TaskPool {
@Local result: string = ''
// 核心2:执行单个并发任务
private async runSingleTask() {
try {
//1.创建Task对象(绑定并发函数和参数)
const task = new taskpool.Task(calculateSum, 1, 100000);
//2.提交到TaskPool执行(异步等待结果)
const sum = await taskpool.execute(task);
console.log(`dxin => sum : ${sum}`)
//3.处理结果
this.result = `${sum}`;
} catch (err) {
this.result = `任务失败:${(err as Error).message}`
console.log(`dxin => 单个并发任务执行失败`, err)
}
}
// 核心3:执行多个并发任务(并行)
private async runBatchTasks() {
try {
this.result = '批量任务执行中...'
// 1。创建TaskGroup(API 20 批量任务的标准管理方式)
const taskGroup = new taskpool.TaskGroup()
//2.向TaskGroup中添加多个Task
taskGroup.addTask(new taskpool.Task(calculateSum, 1, 100000))
taskGroup.addTask(new taskpool.Task(calculateSum, 300000, 600000))
taskGroup.addTask(new taskpool.Task(calculateSum, 600000, 1000000))
// 3.提交TaskGroup 执行(API 20 支持TaskGroup 作为execute 参数)
const results = await taskpool.execute(taskGroup) as number[];
// 4。求和 汇总结果(results 是按 addTask 顺序返回的结果数组)
const totalSum = results.reduce((a, b) => a + b, 0);
this.result = `批量任务完成:\n3个任务并行执行\n各任务结果:${results}\n总和=${totalSum}`;
} catch (err) {
this.result = `批量任务失败:${(err as Error).message}`;
}
}
build() {
Column({ space: 30 }) {
Text('TaskPool核心用法演示')
.fontSize(22)
.fontWeight(FontWeight.Bold)
//执行单个并发任务
Button('执行单个并发任务')
.onClick(() => this.runSingleTask())
.backgroundColor('#O07DFF')
.fontColor('#FFFFFE')
.padding(12)
.borderRadius(8)
//执行多个并发任务(体现并行)
Button('执行多个并发任务')
.onClick(() => this.runBatchTasks())
.backgroundColor('#00B42A')
.fontColor('#FFFFFE')
.padding(12)
.borderRadius(8)
Text(this.result)
}
.width('100%')
.height('100%')
.backgroundColor($r('app.color.theme_color'))
.justifyContent(FlexAlign.Center)
}
}
Worker使用
TaskPool线程任务执行时长上限为3分钟。如果遇到更长的耗时操作。就无法满足了。
Worker线程文件需要放在"{moduleName}/src/main/ets/"目录层级之下,否则不会被打包到应用中。有手动和自动两种创建Worker线程目录及文件的方式,推荐使用自动创建方式。创建完成后在build-profile.json5文件sourceOption字段中自动配置Worker线程文件的路径。
Worker特性:
- 独立线程不受时间限制
- 可保持任务中间状态
- 与线程完全隔离
Worker通信机制:
postMessage:发送消息onmessage:接收消息- 数据需要序列化传递(不能传函数)
Worker使用步骤
步骤1:创建Worker
//创建Worker对象
let workerInstance: worker.ThreadWorker = new worker.ThreadWorker('entry/ets/workers/MyWorker.ets');
步骤2:发送消息
// 发送消息给Worker线程
try {
workerInstance.postMessage('dxin MainWorker');
} catch (error) {
// TODO: Implement error handling.
}
步骤3:worker接受消息,并回传宿主线程
// 注册onmessage回调,当Worker线程收到来自其宿主线程通过postMessage接口发送的消息时被调用,在Worker线程执行
workerPort.onmessage = (event: MessageEvents) => {
let data:string = event.data;
console.log(`dxin => workerPort.onmessage is: ${data}`)
// 向宿主线程发送消息
try {
workerPort.postMessage('dxin worker');
}catch (e){
console.log(`dxin => worker线程向主线程发消息失败!`)
}
};
步骤4:宿主进程接受结果
workerInstance.onmessage = (e: MessageEvents) => {
let data: string = e.data;
// 展示获取到子线程的消息
this.message = data
console.log(`dxin => workerInstance.onmessage is: ${data}`)
}
Worker案例
创建的线程文件:
// 子线程 src/main/ets/workers/MyWorker.ets
import { ErrorEvent, MessageEvents, ThreadWorkerGlobalScope, worker } from '@kit.ArkTS';
const workerPort: ThreadWorkerGlobalScope = worker.workerPort;
// 注册onmessage回调,当Worker线程收到来自其宿主线程通过postMessage接口发送的消息时被调用,在Worker线程执行
workerPort.onmessage = (event: MessageEvents) => {
let data:string = event.data;
console.log(`dxin => workerPort.onmessage is: ${data}`)
// 向宿主线程发送消息
try {
workerPort.postMessage('dxin worker');
}catch (e){
console.log(`dxin => worker线程向主线程发消息失败!`)
}
};
// 注册onmessageerror回调,当Worker对象接收到一条无法被序列化的消息时被调用,在Worker线程执行
workerPort.onmessageerror = (event: MessageEvents) => {
console.log(`dxin => Worker对象接收到一条无法被序列化的消息`)
};
// 注册onerror回调,捕获注册onerror回调,捕获Worker在执行过程中发生的异常,在Worker线程执行,在Worker线程执行
workerPort.onerror = (event: ErrorEvent) => {
console.log(`dxin =>注册onerror回调,捕获Worker在执行过程中发生的异常`)
};
UI页面主线程:
// src/main/ets/pages/MainWorker.ets
import worker, { ErrorEvent, MessageEvents } from '@ohos.worker';
// Worker主线程
@Entry
@Component
struct MainWorker {
@State message: string = '';
build() {
Column() {
Button('向子线程发消息')
.onClick(() => {
//创建Worker对象
let workerInstance: worker.ThreadWorker = new worker.ThreadWorker('entry/ets/workers/MyWorker.ets');
// 注册onmessage回调,捕获宿主线程接收到来自其创建的Worker通过workerPort.postMessage接口发送的消息。该回调在宿主线程执行
workerInstance.onmessage = (e: MessageEvents) => {
let data: string = e.data;
// 展示获取到子线程的消息
this.message = data
console.log(`dxin => workerInstance.onmessage is: ${data}`)
}
// 注册onALLErrors回调,捕获Worker线程的onmessage回调、timer回调以及文件执行等流程产生的全局异常。该回调在宿主线程执行
workerInstance.onAllErrors = (err: ErrorEvent) => {
console.error('workerInstance onAllErrors message is:' + err.message);
}
//注册onmessageerror回调,当Worker对象接收到无法序列化的消息时被调用,在宿主线程执行
workerInstance.onmessageerror = () => {
console.error('workerInstance onmessageerror');
}
// 注册onexit回调,当Worker销毁时被调用,在宿主线程执行
workerInstance.onexit = (e: number) => {
// Worker正常退出时,code为0;异常退出时,code为1
console.info('workerInstance onexit code is: ', e);
}
// 发送消息给Worker线程
try {
workerInstance.postMessage('dxin MainWorker');
} catch (error) {
// TODO: Implement error handling.
}
})
Text(this.message).fontSize(22)
}
.height('100%')
.width('100%')
.justifyContent(FlexAlign.Center)
}
}

