文章

异步批处理方案设计:JDK21 + 虚拟线程

image-20240817111945355

前言

在业务系统开发过程中,经常碰到需要大批量处理业务数据,处理不当非常容易出现如下问题:

  • CPU 飙高,导致整个系统卡顿,甚至 整个系统 Crash Down

  • 触发 OOM,系统无限重启,业务没有办法继续执行

  • 请求量过大,打崩数据库或者外部系统

本文将采用 JDK 21 中正式成为标准功能的虚拟线程(Virtual Thread)来完成异步批处理工作,创建虚拟线程有多种方式,都比较简单,可以自行查阅网上资料。

使用 JDK 21 虚拟线程的最佳实践:

  • 采用同步的方式直接创建虚拟线程和调用,因为虚拟线程不像Java 传统线程不是稀缺资源,可以创建很多

  • 每个并发任务创建一个虚拟线程,没有必要进行池化处理

  • 使用信号量进行并发控制

  • 尽量少使用 Thread Local 对象,可能消耗大量的内存,导致 OOM,并且不是线程安全的

  • 避免出现载体线程挂起的情况,比如:虚拟线程所执行的代码块使用了synchronized 或者 Object.wait()

异步批处理设计

image-20240819113435696

设计要点:

  1. 用户或者三方系统发起一系列的业务操作,识别其中可以异步执行的逻辑,比如:短信通知、邮件通知、短信验证码、站内信、财务系统大批量开票结果回传等。

  2. 任务消息主要包含任务 ID、任务类型、消息体,其中任务类型设计成枚举值,绑定业务执行的 handler 和 业务类型所对应的信号量 Semaphore;消息体存储业务对象的 JSON String。

  3. 封装好任务消息,持久化到消息队列当中,可以根据项目的需要按需实现 JDK BlockingQueue、 Redission BlockingQueue、RocketMQ等;根据中间件的特性以及项目的需要,按需调整成延迟队列,将时间靠前的排在队首,以满足各种延迟执行的效果,比如:明天早上 8 点发送短信,15 分钟后取消订单,会议室开始前 15 分钟通知,会议结束通知等。

  4. 启动一个载体线程开启 while 无限循环,从队列的队首弹出任务数据,如果队列中没有数据,block 住等待业务数据。

  5. 通过任务类型获取对应的信号量对象,在开启虚拟线程前,申请 Semaphore 许可,如果申请失败,block 住载体线程。

  6. 一种任务类型对应一个消息队列的 Topic,并开启一个载体线程。这样被信号量 block 时,不会影响其他任务的并发执行。

  7. 启动虚拟线程执行具体的业务任务,由任务类型枚举中进行定义,参考附录图片。执行完成后在 finally 中 release 信号量

  8. 识别任务类型,分别为:CPU 密集型、 IO 密集型、资源限制型等,结合实际情况,调整信号量额度配置,进行并发控制,防止出现 CPU 100%、OOM、外部资源系统崩溃等情况

  9. 执行任务出现异常后,尽量不要跳出循环,记录错误问题后,设计自动重试方案。

最后可关注公众号Eric技术圈,私信回复暗号“异步批处理源码“,即可自动领取源码。

附录

代码结构:

image-20240818235038247

任务类型代码设计:

image-20240818235432002

信号量配置:

image-20240818235848606

测试验证:

注意:本次测试采用的是JDK 内存队列容器 LinkedBlockingQueue,没有实现多 Topic 模式,会导致不同任务类型的信号量相互竞争。

image-20240819113148149

多任务并发获取结果:

image-20240819113027937

License:  CC BY 4.0