低配终端环境下如何模拟大规模负载

xreztento · · 1455 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

什么束缚了你的手脚

很多时候我会听到这样的抱怨声“我的终端设备配置太差了,跑测试工具只能模拟很少量的虚拟用户”。是的,我只想说有时候就算你使用高配置的服务器在某些场景下也只能模拟出十位数字的用户。

这不是危言耸听,比如LoadRunner一些特殊的协议,像Microsoft .Net协议这样可以录制一个.Net的Client程序的通信交互逻辑,通过引用Client程序的dll动态链接库,利用代理到的通信交互过程,可以分析并反向工程出一套客户端程序代码的C#.Net脚本。利用这样的脚本运行并发测试,就好像你的一台设备上同时启动了几十个真实的Client(因为受限于反向工程时的一些技术限制,甚至比真实的Client还要笨重),可想其消耗资源之大,会跑到操作系统不能自理。

因此,并不完全是硬件配置原因限制了你的目标,还有其他很多原因,这里面可能最直接的几点原因都是来源于测试工具本身:

(1)有些优秀的工具在安装时会帮助你完成对操作系统内核参数的优化,以及工具本身可以根据所搭载设备的硬件配置进行自适应式的优化调整,而你所使用的大部分工具不具备这样的能力;

(2)要知道目前你所获得的绝大数性能测试工具历史都相当悠久,工具的核心框架仍然保持着它最初的模样,比如这些工具对线程的控制与应用方面,虽然随着升级进行了一定的优化,但仍沿袭了“同步阻塞IO模型”;

(3)许多版本的迭代并没有有效地提升其在模拟执行测试时的性能,反而使得工具变得“厚重”。比如JMeter这样的工具,10几年来不断增加额外的新特性,这些特性并没有显著地优化其自身性能,有时反而适得其反,比如BeanShell甚至在使用不当的情况下在测试时会搅乱工具的整体性能表现。

很多时候工具会限制你的思维方式、束缚住你的手脚,你无法怪罪于工具,比如,“同步阻塞IO模型”的应用是最易驾驭且在线程级别上最能体现“并发”概念的,对于一个通用型多协议支持的测试工具这样的模型最为稳妥而且具备普适性,这当然值得去牺牲一些性能。

当享受使用这些工具带来便利的同时,你可能根本不会进一步去思考如何来优化,从而改良对于一些特定场景下的测试手段,最终来达到一些惊人的效果。

线程

需要达到惊人的效果,我们需要了解是什么限制住了这些工具的发挥,前面提到了“同步阻塞IO模型”的概念,是的,很大程度上你可以将性能问题的产生归咎于这类线程应用模型的使用上。

消耗内存

我们知道比如在JVM这样的运行环境下,每创建一个线程是需要为其分配一定大小额度的线程栈内存的,在一般条件下默认值为1M字节。这些线程一旦数量太多就会占用大量内存,给GC带来回收压力,当把内存耗尽时,你将得到以下异常:

java.lang.OutOfMemoryError: unable to create new native thread

我们可以通过配置的方式尽力挽回:

当你创建一个线程的时候,虚拟机会在JVM内存创建一个Thread对象同时创建一个操作系统线程,而这个系统级线程的内存用的并不是JVM分配内存,而是系统中剩下的内存,一个普遍认可的最大创建线程数的计算公式如下:

Max number of threads(最大创建线程数) = (进程最大可用内存 - JVM分配内存 - OS Reserved Memory(JNI,本地方法栈)) / thread stack size(线程栈大小)

对于如JMeter这样由Java语言所编写的性能测试工具来说,简单的通过减小JVM分配内存数,来增加最大创建线程数是不可靠的,每一个创建线程都是要完成特定的任务,而在任务生命周期中将会创建大量的Java对象,这些对象将会很快耗尽JVM分配内存。因此,你需要倍加小心地配合着调节以下参数来扩大理论上的最大创建线程数:

-Xmx:设置JVM最大堆大小
-Xss或-XX:ThreadStackSize=<value>:设置线程的栈大小(字节数)(0表示默认)

当然,操作系统不会任由单个进程创建无数的线程,大部分情况下会有内核级参数的限制,比如Linux操作系统可以通过调节以下内核参数从理论上来消除这些限制:

/proc/sys/kernel/pid_max 
/proc/sys/kernel/thread-max
max_user_process(ulimit -u)
/proc/sys/vm/max_map_count

高额的系统开销

创建和使用一个线程的代价是十分昂贵的,如果针对每个任务都启动一条线程进行处理,当可运行的线程(Runnable状态)数量大于可用处理器数量(核心数),那么有些线程会得不到调度,CPU将会耗费大把的精力来协调管理这群线程,在频繁的线程上下文切换中度过余生。

“同步阻塞IO模型”将这种场景发挥得淋漓尽致,它特别不适合处理IO密集型的业务场景,在大规模并行环境下,经常性的等待和切换耗费着大量的系统资源,却换来的是低效的性能,上面说了由于历史原因和易于控制,大部分测试工具都采用了这种模型完成并发的模拟。

是时候改变些什么了

以下我们将以Java语言及其运行环境为例,讨论如何利用优化线程模型的方法来改变在低配终端环境下以往使用测试工具(如JMeter)时所面临的无法模拟大规模负载的窘境。

首先需要明确的是,在模拟大规模负载的场景前,你需要逆向思考,将压力生成终端如同服务器一样对待,因为它即将成为一台反向生成大规模TCP/IP请求的设备(安装Linux操作系统)。

操作系统TCP优化

(1)我们知道“Linux操作系统一切皆文件”,一切系统IO操作都会最终被抽象为对应的文件,并可以通过“文件描述符”建立关联。作为IO操作的Socket,每创建一个连接都会创建一个对应的文件并通过文件操作完成具体工作,因此,首先需要调整用户进程可打开文件数限制的系统参数,来增加创建Socket的数量,主要方法如下:

使用ulimit设置系统允许当前用户进程打开的文件数限制:

ulimit -n 65535

(2)当模拟大量TCP/IP请求时,需要修改网络内核对TCP连接的有关限制,并优化相关参数,主要方法如下:

修改/etc/sysctl.conf文件,在文件中添加如下行(一些经验参数):

net.ipv4.ip_local_port_range = 1024 65535
fs.file-max = 65535
kernel.pid_max = 65536   
net.ipv4.tcp_syncookies = 1  
net.ipv4.tcp_synack_retries = 2  
net.ipv4.tcp_syn_retries = 2  
net.ipv4.tcp_timestsmps = 0  
net.ipv4.tcp_tw_reuse = 1  
net.ipv4.tcp_tw_recycle = 1  
net.ipv4.tcp_fin_timeout = 30  
net.ipv4.tcp_keepalive_time = 1200  
net.ipv4.ip_local_port_range = 10000 65535  
net.ipv4.tcp_max_syn_backlog = 8192  
net.ipv4.tcp_max_tw_buckets = 5000  
net.ipv4.tcp_wmem = 8192 436600 873200  
net.ipv4.tcp_rmem  = 32768 436600 873200  
net.ipv4.tcp_mem = 94500000 91500000 92700000  
net.ipv4.tcp_max_orphans = 3276800  
net.core.netdev_max_backlog = 32768  
net.core.somaxconn = 32768  
net.core.wmem_default = 8388608  
net.core.rmem_default = 8388608  
net.core.rmem_max = 16777216  
net.core.wmem_max = 16777216

之后,执行sysctl -p使其生效。

探索优化方法

我们按照往常的惯用策略,在讨论优化方法前,首先构建一个Mock环境来测试所研究方法的效果,之后通过分析得到我们的结论。

一个Mock环境

我们这里使用resteasy构建一个简单的RESTful API 服务,程序清单参考以下内容:
(1)maven的pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.xreztento.mock</groupId>
    <artifactId>restful</artifactId>
    <packaging>war</packaging>
    <version>0.0.1-SNAPSHOT</version>
    <name>restful Maven Webapp</name>
    <url>http://maven.apache.org</url>
    <repositories>
        <repository>
            <id>JBoss repository</id>
            <url>https://repository.jboss.org/nexus/content/groups/public-jboss/</url>
        </repository>
    </repositories>
    <dependencies>
        <dependency>
            <groupId>org.jboss.resteasy</groupId>
            <artifactId>resteasy-jaxrs</artifactId>
            <version>2.2.1.GA</version>
        </dependency>
        <dependency>
            <groupId>org.jboss.resteasy</groupId>
            <artifactId>resteasy-jackson-provider</artifactId>
            <version>3.0.19.Final</version>
        </dependency>
        <dependency>
            <groupId>org.jboss.resteasy</groupId>
            <artifactId>resteasy-jaxb-provider</artifactId>
            <version>3.0.19.Final</version>
        </dependency>
    </dependencies>
    <build>
        <finalName>restful</finalName>
    </build>
</project>

(2)web.xml

<!DOCTYPE web-app PUBLIC
 "-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
 "http://java.sun.com/dtd/web-app_2_3.dtd" >

<web-app>
    <display-name>Mock Restful Web Application</display-name>

    <context-param>
        <param-name>resteasy.resources</param-name>
        <param-value>
            org.xreztento.mock.restful.MockService
        </param-value>
    </context-param>

    <context-param>
        <param-name>resteasy.servlet.mapping.prefix</param-name>
        <param-value>/</param-value>
    </context-param>

    <listener>
        <listener-class>
            org.jboss.resteasy.plugins.server.servlet.ResteasyBootstrap
        </listener-class>
    </listener>

    <servlet>
        <servlet-name>resteasy-servlet</servlet-name>
        <servlet-class>org.jboss.resteasy.plugins.server.servlet.HttpServletDispatcher</servlet-class>
    </servlet>

    <servlet-mapping>
        <servlet-name>resteasy-servlet</servlet-name>
        <url-pattern>/*</url-pattern>
    </servlet-mapping>

</web-app>

(3)一个用于接口方法返回的数据对象类MockResult

public class MockResult {
    public String getResult() {
        return result;
    }

    public void setResult(String result) {
        this.result = result;
    }

    private String result = null;
}

(4)一个RESTful API服务实现类

@Path("/api") 
public class MockService {
    @GET  
    @Path("/{mock}")
    @NoCache
    @Produces(MediaType.APPLICATION_JSON)
    public MockResult getByUsername(@PathParam("mock") String mock, @Context HttpServletResponse response) throws InterruptedException {
        MockResult result = new MockResult();
        result.setResult(mock);
        return result;
    }  
}

启动服务后,我们可以通过浏览器访问测试接口服务,结果如下:


之后,我们就以该接口作为被测试接口对象进行各种优化方法的比较。

基准方法(Thread)

我们按照“同步阻塞IO模型”来实现一个性能最糟糕的基准方法(甚至每一个Thread下都会新创建一个HttpClient对象),用于验证优化方法到底能够达到什么效果,对于HttpClient我们保持与JMeter一致,使用Apache-HttpClient,可以参考以下代码:
(1)一个用于记录结果的类TestResult

public class TestResult {
    private long min;
    private long max;
    private long time90;
    private long avg;
    private long last;

    public long getMin() {
        return min;
    }
    public void setMin(long min) {
        this.min = min;
    }
    public long getMax() {
        return max;
    }
    public void setMax(long max) {
        this.max = max;
    }
    public long getTime90() {
        return time90;
    }
    public void setTime90(long time90) {
        this.time90 = time90;
    }
    public long getAvg() {
        return avg;
    }
    public void setAvg(long avg) {
        this.avg = avg;
    }
    public long getLast() {
        return last;
    }
    public void setLast(long last) {
        this.last = last;
    }

    @Override
    public String toString(){
        return "avg : " + getAvg() + "\n"
                + "min : " + getMin() + "\n"
                + "max : " + getMax() + "\n"
                + "90% : " + getTime90() + "\n"
                + "last : " + getLast() + "\n";
    }
}

(2)一个用于结果统计和计算的类TestResultComputer

import java.util.Arrays;
import java.util.Vector;
import java.util.concurrent.atomic.AtomicInteger;

public class TestResultComputer {
    private Vector<Long> result = new Vector<Long>();
    private AtomicInteger error = new AtomicInteger(0);

    public TestResult compute(){
        TestResult tr = new TestResult();
        Vector<Long> times = result;
        tr.setLast(times.get(times.size() - 1));
        tr.setAvg(sum(times) / times.size());
        Object[] sort = times.toArray();
        Arrays.sort(sort);
        tr.setMin(Long.valueOf(sort[0].toString()));
        tr.setMax(Long.valueOf(sort[sort.length - 1].toString()));
        tr.setTime90(Long.valueOf(sort[(int)Math.floor(sort.length * 0.9)].toString()));

        return tr;
    }

    private long sum(Vector<Long> times){
        long value = 0;
        for(long time : times){
            value += time;
        }
        return value;
    }

    public void addResult(long time) {
        result.add(time);
    }

    public Vector<Long> getResult() {
        return result;
    }

    public int getError() {
        return error.get();
    }

    public void addError() {
        error.incrementAndGet();
    }
}

(3)测试实现类ThreadTester

import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.BasicResponseHandler;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;

import java.io.IOException;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;

public class ThreadTester {
    private static final BasicResponseHandler BASIC_RESPONSE_HANDLER = new BasicResponseHandler();

    public static void main(String[] args) throws InterruptedException {
        final int concurrencyLevel = 1000;
        final String url = "http://192.168.156.7:8080/restful/api/mock";


        final CountDownLatch cdl = new CountDownLatch(concurrencyLevel);
        final CyclicBarrier cb = new CyclicBarrier(concurrencyLevel, ()->{
            System.out.println("Start test......");

        });

        final TestResultComputer computer = new TestResultComputer();
        for (int i = 0; i < concurrencyLevel; i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    CloseableHttpClient client = null;

                    try {
                        client = HttpClients.createDefault();
                        HttpGet get = new HttpGet(url);
                        cb.await();
                        long startTime = System.currentTimeMillis();
                        String response = client.execute(get, BASIC_RESPONSE_HANDLER);

                        computer.addResult(System.currentTimeMillis() - startTime);
                        System.out.println(response);

                    } catch (IOException e) {

                        computer.addError();
                        System.out.println(e.getMessage());

                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    } finally {
                        cdl.countDown();
                        try {
                            client.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }).start();
        }

        cdl.await();
        System.out.println("success request : " + computer.getResult().size());
        System.out.println(computer.compute());
        System.out.println("error request : " + computer.getError());

    }
}

我们会通过设置concurrencyLevel来设定测试模拟的HTTP请求数,其中利用CyclicBarrier完成线程创建后的集合以达到实现最大并发模拟的效果,通过CountDownLatch完成对线程任务控制(等待所有线程执行完成后统计计算测试结果)。

对于另外一个基准方法我们选择直接使用JMeter来完成上面“最糟糕方法”的同样逻辑,虽然JMeter也在不懈的优化其自身性能,但很难摆脱其对于线程模型的应用限制,一个jmx脚本参考如下:

<?xml version="1.0" encoding="UTF-8"?>
<jmeterTestPlan version="1.2" properties="3.2" jmeter="3.2 r1790748">
  <hashTree>
    <TestPlan guiclass="TestPlanGui" testclass="TestPlan" testname="测试计划" enabled="true">
      <stringProp name="TestPlan.comments"></stringProp>
      <boolProp name="TestPlan.functional_mode">false</boolProp>
      <boolProp name="TestPlan.serialize_threadgroups">false</boolProp>
      <elementProp name="TestPlan.user_defined_variables" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="用户定义的变量" enabled="true">
        <collectionProp name="Arguments.arguments"/>
      </elementProp>
      <stringProp name="TestPlan.user_define_classpath"></stringProp>
    </TestPlan>
    <hashTree>
      <ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="线程组" enabled="true">
        <stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
        <elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="循环控制器" enabled="true">
          <boolProp name="LoopController.continue_forever">false</boolProp>
          <stringProp name="LoopController.loops">1</stringProp>
        </elementProp>
        <stringProp name="ThreadGroup.num_threads">1000</stringProp>
        <stringProp name="ThreadGroup.ramp_time">1</stringProp>
        <longProp name="ThreadGroup.start_time">1498439590000</longProp>
        <longProp name="ThreadGroup.end_time">1498439590000</longProp>
        <boolProp name="ThreadGroup.scheduler">false</boolProp>
        <stringProp name="ThreadGroup.duration"></stringProp>
        <stringProp name="ThreadGroup.delay"></stringProp>
      </ThreadGroup>
      <hashTree>
        <SyncTimer guiclass="TestBeanGUI" testclass="SyncTimer" testname="Synchronizing Timer" enabled="true">
          <intProp name="groupSize">1000</intProp>
          <longProp name="timeoutInMs">0</longProp>
        </SyncTimer>
        <hashTree/>
        <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="HTTP请求" enabled="true">
          <elementProp name="HTTPsampler.Arguments" elementType="Arguments" guiclass="HTTPArgumentsPanel" testclass="Arguments" testname="用户定义的变量" enabled="true">
            <collectionProp name="Arguments.arguments"/>
          </elementProp>
          <stringProp name="HTTPSampler.domain"></stringProp>
          <stringProp name="HTTPSampler.port"></stringProp>
          <stringProp name="HTTPSampler.protocol"></stringProp>
          <stringProp name="HTTPSampler.contentEncoding"></stringProp>
          <stringProp name="HTTPSampler.path">http://192.168.156.7:8080/restful/api/mock</stringProp>
          <stringProp name="HTTPSampler.method">GET</stringProp>
          <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
          <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
          <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
          <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
          <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
          <stringProp name="HTTPSampler.connect_timeout"></stringProp>
          <stringProp name="HTTPSampler.response_timeout"></stringProp>
        </HTTPSamplerProxy>
        <hashTree>
          <ResultCollector guiclass="StatVisualizer" testclass="ResultCollector" testname="聚合报告" enabled="true">
            <boolProp name="ResultCollector.error_logging">false</boolProp>
            <objProp>
              <name>saveConfig</name>
              <value class="SampleSaveConfiguration">
                <time>true</time>
                <latency>true</latency>
                <timestamp>true</timestamp>
                <success>true</success>
                <label>true</label>
                <code>true</code>
                <message>true</message>
                <threadName>true</threadName>
                <dataType>true</dataType>
                <encoding>false</encoding>
                <assertions>true</assertions>
                <subresults>true</subresults>
                <responseData>false</responseData>
                <samplerData>false</samplerData>
                <xml>false</xml>
                <fieldNames>true</fieldNames>
                <responseHeaders>false</responseHeaders>
                <requestHeaders>false</requestHeaders>
                <responseDataOnError>false</responseDataOnError>
                <saveAssertionResultsFailureMessage>true</saveAssertionResultsFailureMessage>
                <assertionsResultsToSave>0</assertionsResultsToSave>
                <bytes>true</bytes>
                <sentBytes>true</sentBytes>
                <threadCounts>true</threadCounts>
                <idleTime>true</idleTime>
                <connectTime>true</connectTime>
              </value>
            </objProp>
            <stringProp name="filename"></stringProp>
          </ResultCollector>
          <hashTree/>
        </hashTree>
      </hashTree>
    </hashTree>
    <WorkBench guiclass="WorkBenchGui" testclass="WorkBench" testname="工作台" enabled="true">
      <boolProp name="WorkBench.save">true</boolProp>
    </WorkBench>
    <hashTree/>
  </hashTree>
</jmeterTestPlan>

线程池(Pool)

为避免创建大量线程所造成的调度瓶颈与资源浪费,第一时间你应该想到线程池,是的,在服务端这是一种常用的策略,比如HTTP线程池、数据库线程池等。但遗憾的是,如果使用线程池优化方法,你很快会发现它一次只能模拟一个线程池大小的HTTP请求数,其他请求被安排在长长的队列中等待,要命的是它还具备一定的欺骗性,由于其模型的轻量性,在模型创建过程中资源开销极少,如果被请求的接口服务响应很快,它甚至可以得到一个非常理想的优化结果,我们将在后面的优化方法比对中给出结论。

一个线程池优化方法可以参考以下代码:

(1)测试实现类PoolTester

import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.impl.client.BasicResponseHandler;
import org.apache.http.impl.client.FutureRequestExecutionService;
import org.apache.http.impl.client.HttpClientBuilder;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class PoolTester {

    private final static class CustomizeHandler implements ResponseHandler<String> {
        private static final BasicResponseHandler BASIC_RESPONSE_HANDLER = new BasicResponseHandler();


        private long startTime = 0L;
        private TestResultComputer computer = null;
        private CountDownLatch cdl = null;

        CustomizeHandler(long startTime,
                         TestResultComputer computer,
                         CountDownLatch cdl){
            this.startTime = startTime;
            this.computer = computer;
            this.cdl = cdl;
        }
        @Override
        public String handleResponse(HttpResponse httpResponse) {
            String response = null;
            try {
                response = BASIC_RESPONSE_HANDLER.handleResponse(httpResponse);

                computer.addResult(System.currentTimeMillis() - startTime);
                System.out.println(response);

            } catch (IOException e) {
                e.printStackTrace();
                computer.addError();
            } finally {
                this.cdl.countDown();
            }
            return response;
        }
    };


    public static void main(String[] args) throws InterruptedException {

        final int concurrencyLevel = 1000;
        final String url = "http://192.168.156.7:8080/restful/api/mock";
        final CountDownLatch cdl = new CountDownLatch(concurrencyLevel);

        final TestResultComputer computer = new TestResultComputer();

        HttpClient httpClient = HttpClientBuilder.create()
                .setMaxConnPerRoute(concurrencyLevel)
                .build();
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        FutureRequestExecutionService futureRequestExecutionService =
                new FutureRequestExecutionService(httpClient, executorService);


        System.out.println("Start test......");
        for(int i = 0; i < concurrencyLevel; i++) {
            HttpGet get = new HttpGet(url);
            long startTime = System.currentTimeMillis();
            futureRequestExecutionService.execute(get,
                    HttpClientContext.create(),
                    new CustomizeHandler(startTime, computer, cdl));
        }

        cdl.await();
        System.out.println("success request : " + computer.getResult().size());
        System.out.println(computer.compute());
        System.out.println("error request : " + computer.getError());

    }
}

线程池方法很难做到同步这些线程,你在大部分实现失去对它们的控制而把控制权交给ExecutorService。

IO多路复用模型

在IO密集型业务场景下,IO多路服用模型的应用是一种最常见的选择。许多编程语言从核心层天然的实现了对这种模型的支持,比如JavaScript(NodeJS)和Golang。

其他一些编程语言如C/C++在Linux环境下提供的poll、epoll,Java语言中NIO的selector以及比较成熟的netty、grizzly框架都可以从不同程度上利用设计理论模式(如Reactor、Proactor)实现IO多路复用模型。

IO多路复用模型支持你在很少的线程下创建大量的异步任务,通过类似于EventLoop的机制来调度和监视这些非阻塞任务,一旦有异步任务得到响应便会得到处理,因此,可以实现大量的IO操作吞吐量。

Reactor模型(Async)

Reactor模型是一种最常见的IO多路复用模型,Apache-HttpClient最新版本已经提供了实现,我们利用这一特性完成优化方法的构建。

一个线程池优化方法可以参考以下代码:

(1)一个用于处理异步任务的回调方法类ResultCallback

import org.apache.http.concurrent.FutureCallback;
import org.apache.http.impl.client.BasicResponseHandler;
import org.apache.http.HttpResponse;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;


public class ResultCallback implements FutureCallback<HttpResponse> {
    private static final BasicResponseHandler BASIC_RESPONSE_HANDLER = new BasicResponseHandler();

    private long startTime = 0L;
    private TestResultComputer computer = null;
    private CountDownLatch cdl = null;

    public ResultCallback(long startTime,
                          TestResultComputer computer,
                          CountDownLatch cdl){
        this.startTime = startTime;
        this.computer = computer;
        this.cdl = cdl;
    }

    @Override
    public void completed(HttpResponse httpResponse) {
        try {
            String response = BASIC_RESPONSE_HANDLER.handleResponse(httpResponse);

            computer.addResult(System.currentTimeMillis() - startTime);
            System.out.println(response);

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            this.cdl.countDown();
        }

    }

    @Override
    public void failed(Exception e) {
        this.computer.addError();
        System.out.println(e.getMessage());
        this.cdl.countDown();

    }

    @Override
    public void cancelled() {

    }
}

(2)测试实现类AsyncTester

import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.nio.reactor.ConnectingIOReactor;

import java.io.IOException;
import java.util.concurrent.*;

public class AsyncTester {

    public static void main(String[] args){
        final int concurrencyLevel = 1000;
        final String url = "http://192.168.156.7:8080/restful/api/mock";

        final TestResultComputer computer = new TestResultComputer();
        final CountDownLatch cdl = new CountDownLatch(concurrencyLevel);

        CloseableHttpAsyncClient httpClient = null;

        try {
            IOReactorConfig ioReactorConfig = IOReactorConfig
                    .custom()
                    .setIoThreadCount(20)
                    .build();
            ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
            PoolingNHttpClientConnectionManager connManager = new PoolingNHttpClientConnectionManager(
                    ioReactor);
            connManager.setMaxTotal(concurrencyLevel);
            connManager.setDefaultMaxPerRoute(concurrencyLevel);
            httpClient = HttpAsyncClients.custom()
                    .setConnectionManager(connManager)
                    .build();
            httpClient.start();
            System.out.println("Start test......");
            for(int i = 0; i < concurrencyLevel; i++) {
                HttpGet get = new HttpGet(url);
                long startTime = System.currentTimeMillis();
                httpClient.execute(get, new ResultCallback(startTime, computer, cdl));
            }
            cdl.await();

            System.out.println("success request : " + computer.getResult().size());
            System.out.println(computer.compute());
            System.out.println("error request : " + computer.getError());


        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (httpClient != null) {
                try {
                    httpClient.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
Java纤程库Quasar(Fiber)

Quasar提供了高性能轻量级的线程,提供了类似Go的channel,Erlang风格的actor,以及其它的异步编程的工具,可以用在Java和Kotlin编程语言中(目前只支持Linux和Mac OS)。

Quasar里的Fiber其实是一个Continuation,他可以被Quasar定义的Scheduler调度,一个Continuation记录着运行实例的状态,而且会被随时中断,并且也会随后在他被中断的地方恢复。Fiber的功能和使用类似Thread, API接口也类似,所以使用起来没有违和感,但是它们不是被操作系统管理的,它们是由一个或者多个ForkJoinPool调度。一个idle fiber只占用400字节内存,切换的时候占用更少的CPU。

你可以从http://www.paralleluniverse.co/quasar和http://www.paralleluniverse.co/comsat 了解到相关知识。

Quasar的实现依赖Java字节码增强技术,最简单的方法是利用Quasar所提供的JavaAgent在ClassLoad阶段利用Instrument完成增强工作,在运行时,需要加入JVM参数如-javaagent:quasar-core-0.7.2-jdk8.jar指定JavaAgent。

在使用纤程库时,你需要特别注意,如果你的代码中出现了任何可能造成线程同步阻塞(Thread-blocking)的代码片段(如sychronized关键字、Lock、Thread.sleep等),Quasar默认都会发出警告,从而无法完成字节码增强工作。

一个纤程库优化方法可以参考以下代码:

(1)一个为纤程库实现的CyclicBarrier类

由于纤程在使用上与线程十分相似,甚至无缝支持ThreadLocal,因此,可以实现集合,但由于java所提供的CyclicBarrier类主要造成了线程的阻塞会无法完成纤程化,因此,我们需要利用纤程库的Fiber.park和Fiber.unpark完成控制工作,之后通过回调方法的方式实现集合与释放。

参考代码如下:

import co.paralleluniverse.fibers.SuspendExecution;

public interface Parkable {

    public void unstop();
    public void stop() throws SuspendExecution;
}
import co.paralleluniverse.fibers.SuspendExecution;

import java.util.concurrent.atomic.AtomicInteger;

public class CyclicBarrier {
    private AtomicInteger count;
    private Runnable runnable;
    private Parkable[] fibers;

    public CyclicBarrier(int count){
        this.count = new AtomicInteger(count);
        fibers = new Parkable[count];
    }

    public CyclicBarrier(int count, Runnable runnable){
        this.count = new AtomicInteger(count);
        this.runnable = runnable;
        fibers = new Parkable[count];
    }

    public void await(Parkable fiber) throws SuspendExecution {

        fibers[this.count.decrementAndGet()] = fiber;

        if(this.count.get() == 0){
            System.out.println("Start testing......");

            for(int i = 1; i < fibers.length; i++){
                fibers[i].unstop();
            }
            return;
        }

        fiber.stop();
    }
}

(2)一个HttpFiber类完成HTTP请求任务

import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.SuspendExecution;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.BasicResponseHandler;
import org.apache.http.impl.client.CloseableHttpClient;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;


public class HttpFiber extends Fiber<Void> implements Parkable {

    private static final BasicResponseHandler BASIC_RESPONSE_HANDLER = new BasicResponseHandler();

    private final String url = "http://192.168.156.7:8080/restful/api/mock";
    private CloseableHttpClient client = null;
    private CountDownLatch cdl = null;
    private CyclicBarrier cb = null;
    private TestResultComputer computer = null;

    public HttpFiber(final CloseableHttpClient client,
                     final CountDownLatch cdl,
                     final CyclicBarrier cb,
                     final TestResultComputer computer){
        this.client = client;
        this.cdl = cdl;
        this.cb = cb;
        this.computer = computer;
    }

    @Override
    public void unstop(){

        Fiber.unpark(this);
    }

    @Override
    public void stop() throws SuspendExecution {

        Fiber.park();
    }

    @Override
    protected Void run() throws SuspendExecution, InterruptedException{
        HttpGet get = new HttpGet(url);
        this.cb.await(this);
        long startTime = System.currentTimeMillis();
        try {

            String response = this.client.execute(get, BASIC_RESPONSE_HANDLER);
            this.computer.addResult(System.currentTimeMillis() - startTime);
            System.out.println(response);

        } catch (IOException e) {

            this.computer.addError();
            System.out.println(e.getMessage());

        } finally {
            this.cdl.countDown();
        }
        return null;
    }
}

(3)测试实现类FiberTester

import co.paralleluniverse.fibers.httpclient.FiberHttpClientBuilder;
import org.apache.http.impl.client.CloseableHttpClient;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class FiberTester {

    public static void main(String[] args) throws InterruptedException {
        final int concurrencyLevel = 1000;

        final CloseableHttpClient client = FiberHttpClientBuilder.
                create(20).
                setMaxConnPerRoute(concurrencyLevel).
                setMaxConnTotal(concurrencyLevel).build();

        final CountDownLatch cdl = new CountDownLatch(concurrencyLevel);

        final TestResultComputer computer = new TestResultComputer();

        final CyclicBarrier cb = new CyclicBarrier(concurrencyLevel);
        for (int i = 0; i < concurrencyLevel; i++){

            new HttpFiber(client, cdl, cb, computer).start();
        }

        cdl.await();
        System.out.println("success request : " + computer.getResult().size());
        System.out.println(computer.compute());
        System.out.println("error request : " + computer.getError());
        try {
            client.close();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }


}

结论

调度性能优化

比较这些方法我们需要预先规定一些前提条件:
(1)JVM内存参数: -Xms512M -Xmx512M;
(2)JMeter使用3.2版本;
(3)异步IO或池大小均设置为20;
(4)每个方法跑3次,为了简单表述我们取平均响应时间的最小值进行比较得到结论。
可大致参考以下运行脚本:

sh jmeter.sh -n -t mock.jmx -l mock.jtl
java -Xms512M -Xmx512M -javaagent:quasar-core-0.7.2-jdk8.jar -jar FiberTester.jar

结果分析

分别测试了100并发、500并发、1000并发、2000并发和5000并发,各方法之间的平均响应时间比较如下图所示:


和之间讨论的理论情况一样,Pool方法得到了一个最理想的响应时间值,但它只是一个假象,JMeter与Thread方法由于采用了类似的线程模型,因此结果基本一致。

Async方法最优秀,尽量做到了模拟并发与自身调度性能上的最优适配,但由于采用了Reactor模型,面对业务复杂任务的时候,很难驾驭协调各个任务的关系,就比如前面说道的同步各个任务。

Fiber方法虽然在整体性能上不如Async方法,但就编程模式上与Thread几乎无异,并能够完美兼容ThreadLocal的使用,这是一个折中的方法,你甚至可以用它直接改造如JMeter这样的工具,将线程纤程化。

一个试金石

想要探查是否是由于并发压力不够(如Pool方法)而造成响应时间偏低,我们可以采用一种简单的方法,在服务器端制造一些延时,在并发压力不够的情况下想要完成任务总量就会浪费大量时间。

只需要在RESTful API服务实现类中增加如下代码:

Thread.sleep(500);

此验证同样可以证明Async方法和Fiber方法压力是足够的。

一个100并发的比较图如下:


可以看到Pool方法一下子就露出了压力不足的真实面貌,其他方法与无服务器延时时表现基本一致。

资源苛刻条件

前面我们讨论过对于内存的苛刻条件是对线程使用的
在相当苛刻的内存条件下,如VM内存参数: -Xms128M -Xmx128M,我们对主要优化方法进行比较可以发现:
JMeter在模拟2000 HTTP请求的场景下,将会发生OOM:

Uncaught Exception java.lang.OutOfMemoryError: Java heap space.

Async方法在5000 HTTP请求的场景下仍可使用正常。


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:xreztento

查看原文:低配终端环境下如何模拟大规模负载

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1455 次点击  
加入收藏 微博
上一篇:关于学习
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传