基于Docker + Go+ Kafka + Redis + MySQL的秒杀已经Jmeter压力测试

harryluo163 · · 1501 次点击 · 开始浏览    置顶
这是一个创建于 的主题,其中的信息可能已经有所发展或是发生改变。

# 业务特点![在这里插入图片描述](https://img-blog.csdnimg.cn/20190524113552967.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3EzNTg1OTE0,size_16,color_FFFFFF,t_70) # 技术点 ![在这里插入图片描述](https://img-blog.csdnimg.cn/20190527151835977.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3EzNTg1OTE0,size_16,color_FFFFFF,t_70) JMeter:用JMeter来模拟秒杀活动中大量并发的用户请求 Seckill Service:基于 Go语言使用beego实现的秒杀service,图中的步骤2,3,4都是在这个service中处理的 Redis:一个Redis的docker container,在其中保存一个名为counter的数据来表示当前剩余的库存大小 Kafka: 一个Kafka的docker container,其实这里还有一个zookeeper的docker container,Kafka用zookeeper来存放一些元数据,在程序中并没有涉及到,所以也就不单独列出来说了。Seckill service在更新完Redis之后,会发送一条消息给Kafka表示一次成功的秒杀 Seckill Kafka Consumer: 会从Kafka中去获取秒杀成功的消息,处理并且存储到MySQL中 MySQL:一个MySQL的docker container,最终秒杀成功的请求都会对应着数据库表中的一条记录 # 前端页面 ![在这里插入图片描述](https://img-blog.csdnimg.cn/20190524131855332.gif) # 环境搭建 ## 1.安装JMeter,进行接口压力测试 下载地址[添加链接描述](http://mirrors.tuna.tsinghua.edu.cn/apache//jmeter/binaries/apache-jmeter-5.1.1.tgz) 首先更改为中文,右击左边菜单,添加->线程(用户)->线程组-> 线程数->2000->时间5秒 右击线程组 ->添加 ->取样器 ->http请求 设置 127.0.0.1 端口 3030 post请求 请求路径seckill/seckill ![在这里插入图片描述](https://img-blog.csdnimg.cn/20190524132654764.gif) ## 2.使用Docker安装Redis ```shell mkdir -p /var/www/redis /var/www/redis/data docker pull redis cd /var/www/redis 创建 redis.conf 下载http://download.redis.io/redis-stable/redis.conf 找到bind 127.0.0.1,把这行前面加个#注释掉 再查找protected-mode yes 把yes修改为no docker run -p 6379:6379 --name myredis -v /var/www/redis/redis.conf:/etc/redis/redis.conf -v /var/www/redis/data:/data -d redis redis-server /etc/redis/redis.conf --appendonly yes 进入redis docker exec -i -t e60da5191243 /bin/bash 加载配置 redis-server redis.conf 设置密码在配置中修改或者直接 requirepass 密码 redis-cli -a test123 config set requirepass 新密码 ``` ## 3.使用Docker安装mysql 第二种docker pull mysql:5.6 我们新建一个目录,自己随意 ``` mkdir -p /var/www/mysql/data /var/www/mysql/logs /var/www/mysql/conf ``` **第二步然后新建my.cnf**, 这个是mysql的配置文件,在使用docker创建mysql,当容器删除,mysql的数据就会清空,这个时候我们需要把mysql的配置、数据、日志从容器内映射到容器外,这样数据就保持下来了 ``` [mysqld] datadir=/var/lib/mysql socket=/var/lib/mysql/mysql.sock symbolic-links=0 sql_mode=NO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES character_set_server=utf8mb4 init_connect='SET NAMES utf8mb4' default-storage-engine=INNODB collation-server=utf8mb4_general_ci user=mysql port=3306 bind-address=0.0.0.0 [mysqld_safe] log-error=/var/log/mysqld.log pid-file=/var/run/mysqld/mysqld.pid [client] default-character-set=utf8mb4 ``` 第三步启动容器设置外网访问 ``` docker run -p 3306:3306 --name mymysql -v $PWD/conf/my.cnf:/var/www/mysql/my.cnf -v $PWD/logs:/var/www/mysql/logs -v $PWD/data:/var/www/mysql/data -e MYSQL_ROOT_PASSWORD=pass1234 -d mysql:5.6 docker exec -it mymysql bash grant all privileges on *.* to root@"%" identified by "password" with grant optio ``` ## 4.安装Kafka和zookeeper 这个单独安装有点坑,查询了官网,执行docker-compose.yml来安装吧,可以外网访问 还有几种yml安装方法 最新[github的docker-compose.yml](https://github.com/wurstmeister/kafka-docker/blob/master/docker-compose.yml) [docker-compose-swarm.yml](https://github.com/wurstmeister/kafka-docker/blob/master/docker-compose-swarm.yml) ``` version: '3.2' services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka: image: wurstmeister/kafka:latest ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: 47.105.36.188 KAFKA_CREATE_TOPICS: "test:1:1" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 volumes: - /var/run/docker.sock:/var/run/docker.sock ``` 然后使用 docker-compose up -d 后台运行启动,如果想测试,你可以在服务器上测试代码如下或者下载Kafka Tool 工具查看 ``` // 进入kafka (提示没找到 kafka ,使用docker ps 看id) docker exec -ti kafka /bin/bash cd /opt/kafka_2.12-2.2.0 创建主题 topic ./bin/kafka-topics.sh --create --zookeeper 47.105.36.188:2181 --replication-factor 1 --partitions 1 --topic mykafka #查看主题 bin/kafka-topics.sh --list --zookeeper 47.105.36.188:2181 发送消息 ./bin/kafka-console-producer.sh --broker-list 47.105.36.188:9092 --topic mykafka 接受 bin/kafka-console-producer.sh --broker-list 47.105.36.188:9092 --topic mykafka ``` ## 5.创建必要数据 1.MySQL容器中创建一个名为seckill的数据表 2.Redis容器中创建一个名为counter的计数器(设置值为1000,代表库存初始值为1000) 3.需要去Kafka容器中创建一个名为CAR_NUMBER的topic(可以不需要) ``` SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for seckill -- ---------------------------- DROP TABLE IF EXISTS `seckill`; CREATE TABLE `seckill` ( `Id` int(11) NOT NULL AUTO_INCREMENT, `info` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `date` timestamp(0) NULL DEFAULT NULL, `offset` int(255) NULL DEFAULT NULL, PRIMARY KEY (`Id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 59964 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; SET FOREIGN_KEY_CHECKS = 1; ``` ## 6.代码解析 # 1.前端页面接口和设置了定时器查询数量 ``` <section class="_ms_box"> <div class="uf _uf_center img_box">第一个商品</div> <div class="but_box"> <div id="num" class="uf _uf_center number">剩余数量:--</div> <div id="button" class="submit uf _uf_center button rosy" onclick="buy()">秒杀</button> </div> </section> var job = setInterval(function () { $.ajax({ type: 'get', url: 'seckill/getCount', success: function (result) { if (result.num > 0) { $("#num").html("剩余数量:" + result.num); $("#button").css("background", "#e77005") } else { $("#num").html("剩余数量:0"); $("#button").css("background", "#ccc") window.clearInterval(job) } } }) }, 1000) function buy() { if (isbuy) { alert("不能购买") } else { $.ajax({ type: 'post', url: 'seckill/seckill', success: function (result) { alert(result.messages) } }) } } ``` # 2.Kafka 消费者 ``` func KafkaConsumer() { var ( offset int64 = 0 config = sarama.NewConfig() o = orm.NewOrm() ) //获取最大offset值,因为kafka中每个message在有一个唯一值 就是offset,避免 o.Raw("select max(offset) as offset from seckill").QueryRow(&offset) //接收失败通知 config.Consumer.Return.Errors = true // 根据给定的代理地址和配置创建一个消费者 consumer, err := sarama.NewConsumer([]string{"47.105.36.188:9092"}, config) if err != nil { panic(err) } defer consumer.Close() //根据消费者获取指定的主题分区的消费者,Offset为0为获取最新的消息,默认设置数据库最大.offset partitionConsumer, err := consumer.ConsumePartition("CAR_NUMBER_GO", 0, offset) if err != nil { fmt.Println("error get partition consumer", err) } defer partitionConsumer.Close() //循环等待接受消息. for { select { //接收消息通道和错误通道的内容. case msg := <-partitionConsumer.Messages(): //先检查是否存在再插入 num := 0 value := string(msg.Value) o.Raw("select count(1) num from seckill where info =?", value).QueryRow(&num) if num == 0 { _, err := o.Raw("insert into seckill (date,info,offset) values (?,?,?)", time.Now(), value, msg.Offset).Exec() if err == nil { fmt.Println("mysql : ", value+"插入成功") } } case err := <-partitionConsumer.Errors(): fmt.Println(err.Err) } } } ``` 1. 因为kafka是批量发送消息过来,我这边先获取数据库中最大的偏移值 2. 插入时候就先查再插 ## 3.秒杀接口 ``` //声明一些全局变量 var ( pool *redis.Pool redisServer = flag.String("redisServer", "47.105.36.188:6379", "") redisPassword = flag.String("redisPassword", "test123", "") topic = "CAR_NUMBER_GO" config *sarama.Config kafkaServer = flag.String("kafkaServer", "47.105.36.188:9092", "") ) func init() { flag.Parse() pool = newPool(*redisServer, *redisPassword) } func (this *SeckillController) Post() { conn := pool.Get() defer conn.Close() conn.Send("MULTI") conn.Send("GET", "counter") conn.Send("DECR", "counter") num, err := redis.Int64s(conn.Do("EXEC")) if err != nil { fmt.Println(err) return } if num[1] >= 0 { messages := "go_购买成功,还剩下" + fmt.Sprintf("%d", num[1]) + "个" fmt.Println(messages) this.Data["json"] = map[string]interface{}{"messages": messages} //设置kafka配置 config := sarama.NewConfig() //是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用. config.Producer.Return.Successes = true config.Producer.Return.Errors = true //使用配置,新建一个异步生产者 producer, e := sarama.NewAsyncProducer([]string{*kafkaServer}, config) if e != nil { panic(e) } defer producer.AsyncClose() //发送的消息,主题,key msg := &sarama.ProducerMessage{ Topic: topic, Value: sarama.StringEncoder(messages), } producer.Input() <- msg fmt.Println("开始发送") //循环判断哪个通道发送过来数据. select { case suc := <-producer.Successes(): fmt.Println("发送成功 offset: ", suc.Offset, "timestamp: ", suc.Timestamp.String(), "partitions: ", suc.Partition) case fail := <-producer.Errors(): fmt.Println("发送失败 err: ", fail.Err) } } else { fmt.Print("抢完了") this.Data["json"] = map[string]interface{}{"messages": "抢完了"} } this.ServeJSON() } ``` 1. .redis不需要每次都创建,所以我使用了Pool, redis取数据使用MULTI 批量发送,然后原子减 2. 新建一个异步生产者 NewAsyncProducer,可以看看官方文档很详细的例子 https://godoc.org/github.com/Shopify/sarama#example-AsyncProducer--Goroutines 也可以使用同步方式发送数据 ``` producer, err := NewSyncProducer([]string{"localhost:9092"}, nil) if err != nil { log.Fatalln(err) } defer func() { if err := producer.Close(); err != nil { log.Fatalln(err) } }() msg := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")} partition, offset, err := producer.SendMessage(msg) if err != nil { log.Printf("FAILED to send message: %s\n", err) } else { log.Printf("> message sent to partition %d at offset %d\n", partition, offset) } ``` 接下来是测试了 1.redis设置库存1000 ![我](https://img-blog.csdnimg.cn/20190524135833153.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3EzNTg1OTE0,size_16,color_FFFFFF,t_70) 2.Jmeter设置2000并发 ![在这里插入图片描述](https://img-blog.csdnimg.cn/20190524135924280.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3EzNTg1OTE0,size_16,color_FFFFFF,t_70) 启动Jmeter ![在这里插入图片描述](https://img-blog.csdnimg.cn/20190527150155673.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3EzNTg1OTE0,size_16,color_FFFFFF,t_70) ![在这里插入图片描述](https://img-blog.csdnimg.cn/2019052414211946.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3EzNTg1OTE0,size_16,color_FFFFFF,t_70) 最后redis中的counter变成0,seckill数据表中会插入1000条记录 ![在这里插入图片描述](https://img-blog.csdnimg.cn/20190527150218378.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3EzNTg1OTE0,size_16,color_FFFFFF,t_70) 项目源码地址:[miaosha-go](https://github.com/harryluo163/miaosha-go) 只是抛砖迎玉,一个小测试。另外做了一个小测试, Go语言: 项目启动时间:15:15:41 2000并发请求开始:15:15:42 2000并发请求结束:15:15:53 数据库全部插入: 15:17:32 node.js语言: 项目启动时间:15:26:41 2000并发请求开始:15:26:42 2000并发请求结束:15:26:53 数据库全部插入: 15:27:00 其他相同,只是消费者接收消息然后插入到数据库有点差异。。。。。。

有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1501 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传