扫码阅读
手机扫码阅读

固定QPS压测模式探索

49 2024-04-17

在早前跟测试同行在QQ群聊天的时候,聊过一个固定QPS压测的问题,最近突然有需求,想实现一下,丰富一下自己的性能测试框架,最新的代码请移步我的GitHub,地址: https://github.com/JunManYuanLong/FunTester,gitee地址: https://gitee.com/fanapi/tester

思路

  • 有一个多线程的基类,其他压测任务类继承于基类。
  • 并发执行类由线程池任务发生器补偿器组成。
  • 单线程执行任务发生器将生成的任务对象丢到线程池里面执行。
  • 另起补偿器线程完成缺失的补偿。(由于多种原因,真实发生量小于设定值)

总体的思路与如何mock固定QPS的接口moco固定QPS接口升级补偿机制这两票文章一致,但是没有采取Semaphore的模式,原因是moco是多线程对单线程,压测是单线程对多线程。

  • 语言继续采用Java语言。

基类

写得有点仓促,还未进行大量实践,所以注释少一些。这里依然设计两种子模式: 定量压测定时压测,这里由于两种压测模式,通过一个属性isTimesMode记录,在执行类FixedQpsConcurrent中用到,单次压测任务对象统一isTimesModelimit两个属性。

package com.fun.base.constaint; import com.fun.base.interfaces.MarkThread; import com.fun.config.HttpClientConstant; import com.fun.frame.execute.FixedQpsConcurrent; import com.fun.frame.httpclient.GCThread; import com.fun.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public abstract class FixedQpsThread<T> extends ThreadBase { private static Logger logger = LoggerFactory.getLogger(FixedQpsThread.class); public int qps; public int limit; public boolean isTimesMode; public FixedQpsThread(T t, int limit, int qps, MarkThread markThread) { this.limit = limit; this.qps = qps; this.mark = markThread; this.t = t;
        isTimesMode = limit > 1000 ? true : false;
    } protected FixedQpsThread() { super();
    } @Override public void run() { try {
            before();
            threadmark = mark == null ? EMPTY : this.mark.mark(this); long s = Time.getTimeStamp();
            doing(); long e = Time.getTimeStamp(); long diff = e - s;
            FixedQpsConcurrent.allTimes.add(diff);
            FixedQpsConcurrent.executeTimes.getAndIncrement(); if (diff > HttpClientConstant.MAX_ACCEPT_TIME)
                FixedQpsConcurrent.marks.add(diff + CONNECTOR + threadmark);
        } catch (Exception e) {
            logger.warn("执行任务失败!", e);
            logger.warn("执行失败对象的标记:{}", threadmark);
            FixedQpsConcurrent.errorTimes.getAndIncrement();
        } finally {
            after();
        }
    } @Override public void before() {
        GCThread.starts();
    } /**
     * 子类必需实现改方法,不然调用deepclone方法会报错
     *
     * @return */ public abstract FixedQpsThread clone();
} 

执行类

此处补偿线程设计还待优化,中间有两处休眠:一处是循环检测是否需要补偿,一处是单词补偿间隔。尚未提取配置变量,有待后面实践之后进行优化调整。测试结果对象依然采用了原来的,数值和计算方式保持一致,后期也会根据实践结果进行调整,可以关注我的GitHub及时获取更新。

package com.fun.frame.execute; import com.fun.base.bean.PerformanceResultBean; import com.fun.base.constaint.FixedQpsThread; import com.fun.config.Constant; import com.fun.frame.Save; import com.fun.frame.SourceCode; import com.fun.frame.httpclient.GCThread; import com.fun.utils.Time; import com.fun.utils.WriteRead; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Vector; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static java.util.stream.Collectors.toList; /**
 * 并发类,用于启动压力脚本
 */ public class FixedQpsConcurrent extends SourceCode { private static Logger logger = LoggerFactory.getLogger(FixedQpsConcurrent.class); public static boolean key = false; public static AtomicInteger executeTimes = new AtomicInteger(); public static AtomicInteger errorTimes = new AtomicInteger(); public static Vector marks = new Vector<>(); /**
     * 用于记录所有请求时间
     */ public static Vector allTimes = new Vector<>(); /**
     * 开始时间
     */ public long startTime; /**
     * 结束时间
     */ public long endTime; public int queueLength; /**
     * 任务描述
     */ public String desc = DEFAULT_STRING; /**
     * 任务集
     */ public List threads = new ArrayList<>(); /**
     * 线程池
     */ ExecutorService executorService; /**
     * @param thread 线程任务
     */ public FixedQpsConcurrent(FixedQpsThread thread) { this(thread, DEFAULT_STRING);
    } /**
     * @param threads 线程组
     */ public FixedQpsConcurrent(List threads) { this(threads, DEFAULT_STRING);
    } /**
     * @param thread 线程任务
     * @param desc   任务描述
     */ public FixedQpsConcurrent(FixedQpsThread thread, String desc) { this(); this.queueLength = 1;
        threads.add(thread); this.desc = desc + Time.getNow();
    } /**
     * @param threads 线程组
     * @param desc    任务描述
     */ public FixedQpsConcurrent(List threads, String desc) { this(); this.threads = threads; this.queueLength = threads.size(); this.desc = desc + Time.getNow();
    } private FixedQpsConcurrent() {
        executorService = ThreadPoolUtil.createPool(20, 200, 3);
    } /**
     * 执行多线程任务
     * 默认取list中thread对象,丢入线程池,完成多线程执行,如果没有threadname,name默认采用desc+线程数作为threadname,去除末尾的日期
     */ public PerformanceResultBean start() {
        key = false;
        FixedQpsThread fixedQpsThread = threads.get(0); boolean isTimesMode = fixedQpsThread.isTimesMode; int limit = fixedQpsThread.limit; int qps = fixedQpsThread.qps; long interval = 1_000_000_000 / qps;
        AidThread aidThread = new AidThread(); new Thread(aidThread).start();
        startTime = Time.getTimeStamp(); while (true) {
            executorService.execute(threads.get(limit-- % queueLength).clone()); if (key ? true : isTimesMode ? limit < 1 : Time.getTimeStamp() - startTime > fixedQpsThread.limit) break;
            sleep(interval);
        }
        endTime = Time.getTimeStamp();
        aidThread.stop();
        GCThread.stop(); try {
            executorService.shutdown();
            executorService.awaitTermination(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            logger.error("线程池等待任务结束失败!", e);
        }
        logger.info("总计执行 {} ,共用时:{} s,执行总数:{},错误数:{}!", fixedQpsThread.isTimesMode ? fixedQpsThread.limit + "次任务" : "秒", Time.getTimeDiffer(startTime, endTime), executeTimes, errorTimes); return over();
    } private PerformanceResultBean over() {
        key = true;
        Save.saveLongList(allTimes, "data/" + queueLength + desc);
        Save.saveStringListSync(marks, MARK_Path.replace(LONG_Path, EMPTY) + desc);
        allTimes = new Vector<>();
        marks = new Vector<>();
        executeTimes.set(0);
        errorTimes.set(0); return countQPS(queueLength, desc, Time.getTimeByTimestamp(startTime), Time.getTimeByTimestamp(endTime));
    } /**
     * 计算结果
     *

此结果仅供参考

*      * @param name 线程数      */ public PerformanceResultBean countQPS(int name, String desc, String start, String end) {         List strings = WriteRead.readTxtFileByLine(Constant.DATA_Path + name + desc); int size = strings.size();         List data = strings.stream().map(x -> changeStringToInt(x)).collect(toList()); int sum = data.stream().mapToInt(x -> x).sum();         Collections.sort(data);         String statistics = StatisticsUtil.statistics(data, desc, this.queueLength); double qps = 1000.0 * size * name / sum; return new PerformanceResultBean(desc, start, end, name, size, sum / size, qps, getPercent(executeTimes.get(), errorTimes.get()), 0, executeTimes.get(), statistics);     } /**      * 用于做后期的计算      *      * @param name      * @param desc      * @return */ public PerformanceResultBean countQPS(int name, String desc) { return countQPS(name, desc, Time.getDate(), Time.getDate());     } /**      * 后期计算用      *      * @param name      * @return */ public PerformanceResultBean countQPS(int name) { return countQPS(name, EMPTY, Time.getDate(), Time.getDate());     } /**      * 补偿线程      */ class AidThread implements Runnable { private boolean key = true; int i; public AidThread() {         } @Override public void run() {             logger.info("补偿线程开始!"); while (key) { long expect = (Time.getTimeStamp() - startTime) / 1000 * threads.get(0).qps; if (expect > executeTimes.get() + 10) {                     range((int) expect - executeTimes.get()).forEach(x -> {                         sleep(100);                         executorService.execute(threads.get(i++ % queueLength).clone());                     });                 }                 sleep(3);             }             logger.info("补偿线程结束!");         } public void stop() {             key = false;         }     } }

其他配套的标记类统计类还等待修改,比较简单,这里不放代码了。

原文链接: https://mp.weixin.qq.com/s?__biz=MzU4MTE2NDEyMQ==&mid=2247488128&idx=1&sn=292abe046d63242c98a4b2a85015ffe6