扫码阅读
手机扫码阅读
固定QPS异步任务功能再探
56 2024-04-13
前几天分享过固定QPS异步任务功能初探使用了缓存线程池,利用java.util.concurrent.Semaphore
实现了固定QPS的异步任务。
昨天录制视频的时候,谈到第二种放弃的实现思路,就是通过将任务丢到一个队列中,然后通过一个线程从线程池取任务,丢到线程池里面执行。今天早上仔细想了想还是很值得实现了,能够很好地降低线程数,刚好我之前在Java自定义异步功能实践 2021-10-19
中用到了守护线程,正好充当这个任务管理线程。
下面就是具体的实现:
入口方法
这里依然写到了com.funtester.frame.SourceCode
工具类中,关键字我用看funer
,区别去之前的funner
。
代码如下:
/**
* 以固定QPS,异步执行,默认16,调整方法{@link SourceCode#setMaxQps(int)}
* 改种方式在main线程结束后会以QPS /5 +5执行等待任务
*
* @param f
*/ public static void funer(Closure f) {
fun(JToG.toClosure(() -> {
ThreadPoolUtil.addQPSTask(f); return null;
}));
}
实现细节
这里队列我选用了java.util.concurrent.LinkedBlockingQueue
,用来存储需要执行的异步任务。添加任务的方法:
/**
* 添加异步固定QPS的任务
* {@link java.util.concurrent.LinkedBlockingQueue#offer(java.lang.Object)}是非阻塞,返回Boolean值
* @param closure
* @return */ static def addQPSTask(Closure closure) {
asyncQueue.offer(closure)
}
守护线程添加新执行逻辑:
static boolean daemon() {
def thread = new Thread(new Runnable() { @Override void run() {
SourceCode.noError { while (checkMain()) {
SourceCode.sleep(1.0)
ASYNC_QPS.times {executeCacheSync()}
}
waitAsyncIdle()
}
ThreadPoolUtil.shutPool()
}
})
thread.setDaemon(true)
thread.setName("Deamon")
thread.start()
}
其中执行异步任务功能:
/**
* 使用缓存线程池执行任务,适配第二种方式{@link com.funtester.frame.SourceCode#funer(groovy.lang.Closure)}
* @return */ static def executeCacheSync() {
def poll = asyncQueue.poll() if (poll != null) executeCacheSync({poll()})
}
针对本地版本,还有一个关闭线程池的功能,这个主要是防止程序停不下来,跟固定线程的异步任务功能类似,所以写到了一起。
/**
* 等待异步线程池空闲
*/ static void waitAsyncIdle() { if (asyncPool == null) return SourceCode.time({
SourceCode.waitFor {
((int) (ASYNC_QPS / 5) + 1).times {executeCacheSync()}
asyncPool.getQueue().size() == 0 && asyncPool.getActiveCount() == 0 && asyncQueue.size() == 0 }
}, "异步线程池等待")
} /**
* 关闭异步线程池,不然会停不下来*/ static void shutPool() { if (!getFunPool().isShutdown()) {
log.info(Output.rgb("异步线程池关闭!"))
getFunPool().shutdown()
} if (cachePool != null && !cachePool.isShutdown()) {
cachePool.shutdown()
}
}
自测
用例:
static void main(String[] args) {
setMaxQps(1) 10.times {
funer {
output(Time.getDate())
}
}
sleep(5.0)
output("FunTester")
}
控制台输出:
22:02:36.770 main
###### # # # # ####### ###### ##### ####### ###### #####
# # # ## # # # # # # # #
#### # # # # # # #### ##### # #### #####
# # # # # # # # # # # # #
# ##### # # # ###### ##### # ###### # # 22:02:38.300 C-1 2022-10-26 22:02:38 22:02:39.294 C-1 2022-10-26 22:02:39 22:02:40.298 C-1 2022-10-26 22:02:40 22:02:41.299 C-1 2022-10-26 22:02:41 22:02:42.261 main FunTester 22:02:42.301 C-1 2022-10-26 22:02:42 22:02:42.320 C-1 2022-10-26 22:02:42 22:02:42.525 C-1 2022-10-26 22:02:42 22:02:42.730 C-1 2022-10-26 22:02:42 22:02:42.931 C-1 2022-10-26 22:02:42 22:02:43.133 C-1 2022-10-26 22:02:43 22:02:43.133 Deamon 异步线程池等待执行耗时:825 ms 22:02:43.153 Deamon 异步线程池关闭!
进程已结束,退出代码0
原文链接:
http://mp.weixin.qq.com/s?__biz=MzU4MTE2NDEyMQ==&mid=2247498404&idx=1&sn=b3937dd6db6156edb9ccbf612ef7e685&chksm=fd497792ca3efe84bf7dd0e4b05b3df75e369509a6b3af335b4fc684163da96b79cc7233d796#rd
FunTester的其他文章
手动测试依然很重要
市场上看到的数百种新的应用程序和产品,在向用户?
JsonPath验证类既Groovy重载操作符实践
在使用JsonPath工具类封装进行接口响应的验证过程中,由于使用原生的JsonPath的API获取到的值默认是object,如果需要转成其他类型需要多些一些代码。
如何列举测试点
测试人员需要能够在软件开发过程中,基于软件的需?
Selenium自动化的JUnit参数化实践
作为自动化测试人员,经常会遇到这样Selenium测试自动化场景:需要一次又一次地执行相同的测试用例,只是使用不同的输入和环境配置,从而使工作变得冗长且多余。
通用池化框架commons-pool2功能拓展
最近真真实实用到了通用池化框架commons-pool2,又学到
加入社区微信群
与行业大咖零距离交流学习
软件研发质量管理体系建设
白皮书上线