异步批处理方案设计:JDK21 + 虚拟线程
前言
在业务系统开发过程中,经常碰到需要大批量处理业务数据,处理不当非常容易出现如下问题:
CPU 飙高,导致整个系统卡顿,甚至 整个系统 Crash Down
触发 OOM,系统无限重启,业务没有办法继续执行
请求量过大,打崩数据库或者外部系统
本文将采用 JDK 21 中正式成为标准功能的虚拟线程(Virtual Thread)来完成异步批处理工作,创建虚拟线程有多种方式,都比较简单,可以自行查阅网上资料。
使用 JDK 21 虚拟线程的最佳实践:
采用同步的方式直接创建虚拟线程和调用,因为虚拟线程不像Java 传统线程不是稀缺资源,可以创建很多
每个并发任务创建一个虚拟线程,没有必要进行池化处理
使用信号量进行并发控制
尽量少使用 Thread Local 对象,可能消耗大量的内存,导致 OOM,并且不是线程安全的
避免出现载体线程挂起的情况,比如:虚拟线程所执行的代码块使用了
synchronized
或者Object.wait()
异步批处理设计
设计要点:
用户或者三方系统发起一系列的业务操作,识别其中可以异步执行的逻辑,比如:短信通知、邮件通知、短信验证码、站内信、财务系统大批量开票结果回传等。
任务消息主要包含任务 ID、任务类型、消息体,其中任务类型设计成枚举值,绑定业务执行的 handler 和 业务类型所对应的信号量 Semaphore;消息体存储业务对象的 JSON String。
封装好任务消息,持久化到消息队列当中,可以根据项目的需要按需实现 JDK BlockingQueue、 Redission BlockingQueue、RocketMQ等;根据中间件的特性以及项目的需要,按需调整成延迟队列,将时间靠前的排在队首,以满足各种延迟执行的效果,比如:明天早上 8 点发送短信,15 分钟后取消订单,会议室开始前 15 分钟通知,会议结束通知等。
启动一个载体线程开启 while 无限循环,从队列的队首弹出任务数据,如果队列中没有数据,block 住等待业务数据。
通过任务类型获取对应的信号量对象,在开启虚拟线程前,申请 Semaphore 许可,如果申请失败,block 住载体线程。
一种任务类型对应一个消息队列的 Topic,并开启一个载体线程。这样被信号量 block 时,不会影响其他任务的并发执行。
启动虚拟线程执行具体的业务任务,由任务类型枚举中进行定义,参考附录图片。执行完成后在 finally 中 release 信号量。
识别任务类型,分别为:CPU 密集型、 IO 密集型、资源限制型等,结合实际情况,调整信号量额度配置,进行并发控制,防止出现 CPU 100%、OOM、外部资源系统崩溃等情况
执行任务出现异常后,尽量不要跳出循环,记录错误问题后,设计自动重试方案。
最后可关注公众号Eric技术圈,私信回复暗号“异步批处理源码“,即可自动领取源码。
附录
代码结构:
任务类型代码设计:
信号量配置:
测试验证:
注意:本次测试采用的是JDK 内存队列容器 LinkedBlockingQueue
,没有实现多 Topic 模式,会导致不同任务类型的信号量相互竞争。
多任务并发获取结果: