mall_SpringBoot项目开发总结

之前使用SSM框架搭建过一个商城项目,但是该项目的全部技术栈都局限在了增删改查操作,虽然涉及到了数据库表的各种设计以及各种交互,但是仍然不能满足当下流行技术栈练手的能力,因此尝试将SSM项目转为SpringBoot项目,并且在其原有的基础上增加RedisRabbitMQ中间件,增加秒杀商品的功能,现记录如下。

SSM项目转为SpringBoot项目

原来的SSM项目可以见SSM整合开发 | HeavyTiger’s Blogs项目总结。该项目以增强自己对Spring、SpringMVC、MyBatis的掌握为目的进行开发,项目地址如下:HeavyTiger/mall: 使用ssm+vue框架实现的商城系统 (github.com),前端项目为室友使用Vue.js框架开发,项目地址如下:tzhiy/mall_FE: 商城系统的前端部分 (github.com)

现在,为了更便捷地进行迭代,添加更多功能模块,将项目转为SpringBoot进行开发(称以下所有原mall项目为mall_SSM,称迭代的SpringBoot项目为mall_SpringBoot)。

mall_SSM项目中,项目的各种类如图所示:

image-20220211211657587

其中controllerinterceptormapperpojoserviceutil为项目的相关业务逻辑,不过多介绍,resources/com/mall/mapper/*.xmlmybatis的相关SQL语句,也不再赘述,这些内容都是可以直接添加到mall_SpringBoot项目中的,之后在修改完包路径后,即可全部添加到新的项目中。

故只用对resources/conf的相关内容进行迁移修改,在SpringBoot中使用application.yml / application.properties以及config配置类进行替代即可

配置log4j2实现日志模块

首先我们使用log4j2作为日志的实现,由于之前出现了log4j2漏洞,因此要求将该依赖spring-boot-starter-log4j2的版本修改为:2.6.2,该版本使用了2.17.1版本的log4j,修复了存在的漏洞。

由于SpringBoot使用logback作为日志实现,因此需要排除该依赖,由于该项目使用Spring Initializr进行构建,其将spring-boot-starter-parent作为parent maven项目,在其中无法使用<exclusion>...</exclusion>来排除依赖,因此我们在下方的依赖导入spring-boot-starter-logging后全局排除所有内容即可。

最后的pom.xml如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<!--全局排除spring-boot-starter-logging内的所有依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--引入 log4j2-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
<version>2.6.2</version>
</dependency>

导入后新建application.yml以后使用该文件做自动配置,在文件中填入以下内容

1
2
3
4
5
6
7
8
9
10
# 日志配置
logging:
config: classpath:config/log4j2.xml
# 服务配置
server:
# 端口号配置
port: 8080
# 访问项目路径配置
servlet:
context-path: /mall

首先服务配置和以往一样,我们使用8080端口,项目访问路径设置为/mall,日志配置则放在classpath:config/log4j2.xml

新建日志配置resources/config/log4j2.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
<?xml version="1.0" encoding="UTF-8"?>
<!-- Configuration后面的status,这个用于设置log4j2自身内部的信息输出,可以不设置,当设置成trace时,
你会看到log4j2内部各种详细输出。可以设置成OFF(关闭)或Error(只输出错误信息)
-->
<Configuration status="WARN" monitorInterval="30">
<Properties>
<Property name="App">mall</Property>
<Property name="logDir">logs</Property>
<Property name="splitSize">30 MB</Property>
</Properties>

<Appenders>
<!-- 输出控制台日志的配置 -->
<Console name="console" target="SYSTEM_OUT">
<!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch)-->
<ThresholdFilter level="info" onMatch="ACCEPT" onMismatch="DENY"/>
<!-- 输出日志的格式 -->
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>

<!-- 打印出所有的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档 -->
<RollingRandomAccessFile name="infoLog" fileName="${logDir}/${App}-info.log" immediateFlush="false"
filePattern="${logDir}/$${date:yyyy-MM}/${App}-info-%d{MM-dd-yyyy}-%i.log.gz"
append="true">
<PatternLayout pattern="%d{yyyy-MM-dd 'at' HH:mm:ss z} [%t] %-5level %logger{36} %L %M - %msg%xEx%n"/>
<Policies>
<TimeBasedTriggeringPolicy interval="6" modulate="true"/>
<SizeBasedTriggeringPolicy size="${splitSize}"/>
</Policies>
<Filters>
<!-- 只记录info和warn级别信息 -->
<ThresholdFilter level="error" onMatch="DENY" onMismatch="NEUTRAL"/>
<ThresholdFilter level="info" onMatch="ACCEPT" onMismatch="DENY"/>
</Filters>
<!-- 指定每天的最大压缩包个数,默认7个,超过了会覆盖之前的 -->
<DefaultRolloverStrategy max="50"/>
</RollingRandomAccessFile>

<!-- 存储所有error信息 -->
<RollingRandomAccessFile name="errorLog" fileName="${logDir}/${App}-error.log" immediateFlush="false"
filePattern="${logDir}/$${date:yyyy-MM}/${App}-error-%d{MM-dd-yyyy}-%i.log.gz"
append="false">
<PatternLayout pattern="%d{yyyy-MM-dd 'at' HH:mm:ss z} [%t] %-5level %logger{36} %L %M - %msg%xEx%n"/>
<Policies>
<TimeBasedTriggeringPolicy interval="6" modulate="true"/>
<SizeBasedTriggeringPolicy size="${splitSize}"/>
</Policies>
<Filters>
<!-- 只记录error级别信息 -->
<ThresholdFilter level="error" onMatch="ACCEPT" onMismatch="DENY"/>
</Filters>
<!-- 指定每天的最大压缩包个数,默认7个,超过了会覆盖之前的 -->
<DefaultRolloverStrategy max="50"/>
</RollingRandomAccessFile>
</Appenders>

<Loggers>
<!-- root logger 配置,全局配置,默认所有的Logger都继承此配置 -->
<!-- AsyncRoot - 异步记录日志 - 需要LMAX Disruptor的支持 -->
<Root includeLocation="true" additivity="true">
<AppenderRef ref="infoLog"/>
<AppenderRef ref="errorLog"/>
<AppenderRef ref="console"/>
</Root>

<!--第三方的软件日志级别 -->
<logger name="org.springframework" level="info" additivity="false">
<AppenderRef ref="infoLog"/>
<AppenderRef ref="errorLog"/>
<AppenderRef ref="console"/>
</logger>
<logger name="com.heavytiger.mall.controller" level="info" additivity="false">
<AppenderRef ref="infoLog"/>
<AppenderRef ref="errorLog"/>
<AppenderRef ref="console"/>
</logger>
<logger name="org.springframework.amqp" level="info" additivity="false">
<AppenderRef ref="infoLog"/>
<AppenderRef ref="errorLog"/>
</logger>
</Loggers>
</Configuration>

若不进行配置,则会使用默认配置,仍能直接使用。

配置完成后,log4j2的日志模块测试无误:

image-20220211214308908

配置数据源、连接池、MyBatis

1. 导入相关依赖

由于实现分页功能时我使用了pagehelper,因此需要导入依赖,但是若SpringBoot的版本高于2.6.X,使用低于1.4.1的版本会报错出现循环依赖,由于选择Spring Boot 2.6.3版本,因此导入1.4.1版本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<!--MyBatis依赖-->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<!--MySQL驱动依赖-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
<scope>runtime</scope>
</dependency>
<!--Druid数据源-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.17</version>
</dependency>
<!-- pagehelper分页依赖,使用低版本会出现循环依赖问题,至少为1.4.1-->
<dependency>
<groupId>com.github.pagehelper</groupId>
<artifactId>pagehelper-spring-boot-starter</artifactId>
<version>1.4.1</version>
</dependency>

我们使用Druid来作为数据库连接池,数据库的相关配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
spring:
# 数据源配置
datasource:
url: jdbc:mysql://localhost:3306/mall?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false
driver-class-name: com.mysql.cj.jdbc.Driver
username: mall
password: 123456
# Druid连接池配置
type: com.alibaba.druid.pool.DruidDataSource
initialSize: 5
minIdle: 5
maxActive: 20
maxWait: 60000
timeBetweenEvictionRunsMillis: 60000
minEvictableIdleTimeMillis: 300000
# MyBatis配置
mybatis:
type-aliases-package: com.heavytiger.mall.pojo
mapper-locations: classpath:mapper/*.xml

MyBatis配置中配置了实体类使用别名以及mapper的包所在位置,但是要记得在mapper接口上标记@Mapper注解,否则会报错找不到bean

com.heavytiger.mall.config包下编写druid连接池配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* @author heavytiger
* @version 1.0
* @description 配置数据源
* @date 2022/2/11 11:37
*/
@Configuration
public class DruidConfig {

@ConfigurationProperties(prefix = "spring.datasource")
@Bean
public DataSource druidDataSource() {
return new DruidDataSource();
}
}

注意前缀最好同一写成spring.datasource而不是部分在spring.datasource.druid中,会导致没有注入进实体类的构造中。

配置鉴权拦截器

还是和之前的SSM项目一样,使用JWT技术进行鉴权(原因还是买不起贵的阿里云服务器,之后尝试高并发秒杀的时候若将session全部存在服务器中,会导致只有2G内存的服务器崩溃,因此最好使用JWT技术由客户端进行存储(但是好像1Mbps的带宽限制又很大了…))

导入JWT依赖

1
2
3
4
5
6
<!--JWT依赖-->
<dependency>
<groupId>com.auth0</groupId>
<artifactId>java-jwt</artifactId>
<version>3.3.0</version>
</dependency>

拦截器编写如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* @author heavytiger
* @version 1.0
* @description 实现验证用户的headers中是否存在Authorization
* 中的token,若不存在,返回状态码要求登录,存在放行
* @date 2021/12/23 19:42
*/
public class CheckInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
String token = request.getHeader("Authorization");
if(JwtUtil.verify(token)) {
// 说明验证成功,允许访问资源
return true;
} else {
response.setCharacterEncoding("utf-8");
response.setContentType("application/json; charset=utf-8");
response.getWriter().write(JsonUtil.objToJson(new ResultBean<>(EnumResult.TOKEN_ERROR)));
return false;
}
}
}

拦截器配置类编写如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* @author heavytiger
* @version 1.0
* @description Web应用配置类
* @date 2022/2/11 14:10
*/
@Configuration
public class WebConfig implements WebMvcConfigurer {
@Override
public void addInterceptors(InterceptorRegistry registry) {
// 添加鉴权拦截器,拦截所有请求,排除登录和注册相关的API调用
registry.addInterceptor(new CheckInterceptor())
.addPathPatterns("/**")
.excludePathPatterns("/login", "/register", "/userExist");
WebMvcConfigurer.super.addInterceptors(registry);
}
}

配置Redis集群

配置为Redis集群的原因是之前学习Redis的时候使用过单机和主从复制模式,但是没有用java和集群交互过,因此想尝试使用Redis集群,至少使用了主从读写分离或多或少能提高秒杀的高并发下的读效率。

导入依赖

1
2
3
4
5
6
7
8
9
10
<!--Spring Data Redis依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--commons-pool2依赖-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>

配置连接及连接池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
spring:
# Redis相关配置
redis:
database: 0
password: 6o.......................orO
timeout: 6000ms
cluster:
# 失败重连次数
max-redirects: 2
# 集群配置,三组一主一从,--cluster-replicas 1模式
nodes:
- 121.xxx.xxx.31:6379
- 121.xxx.xxx.31:6380
- 121.xxx.xxx.31:6381
- 121.xxx.xxx.31:6389
- 121.xxx.xxx.31:6390
- 121.xxx.xxx.31:6391
# Redis连接池设置
lettuce:
pool:
enabled: true
# 连接池最大连接数(使用负值表示没有限制)
max-active: 1000
# 连接池中的最大空闲连接
max-idle: 10
# 连接池中的最小空闲连接
min-idle: 5
# 连接池最大阻塞等待时间(使用负值表示没有限制)
max-wait: -1

编写获取连接的配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/**
* @author heavytiger
* @version 1.0
* @description 配置Redis连接池
* @date 2022/2/11 15:06
*/
@Configuration
@EnableCaching // 开启缓存支持
public class RedisConfig {
@Resource
private LettuceConnectionFactory lettuceConnectionFactory;

@Bean
public KeyGenerator keyGenerator() {
return new KeyGenerator() {
@Override
public Object generate(Object target, Method method, Object... params) {
StringBuffer sb = new StringBuffer();
sb.append(target.getClass().getName());
sb.append(method.getName());
for (Object obj : params) {
sb.append(obj.toString());
}
return sb.toString();
}
};
}

// 缓存管理器
@Bean
public CacheManager cacheManager() {
RedisCacheManager.RedisCacheManagerBuilder builder = RedisCacheManager.RedisCacheManagerBuilder
.fromConnectionFactory(lettuceConnectionFactory);
@SuppressWarnings("serial")
Set<String> cacheNames = new HashSet<String>() {
{
add("codeNameCache");
}
};
builder.initialCacheNames(cacheNames);
return builder.build();
}

/**
* RedisTemplate配置
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
// 设置序列化
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(
Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
// 配置redisTemplate
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
redisTemplate.setConnectionFactory(lettuceConnectionFactory);
RedisSerializer<?> stringSerializer = new StringRedisSerializer();
// key序列化
redisTemplate.setKeySerializer(stringSerializer);
// value序列化
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
// Hash key序列化
redisTemplate.setHashKeySerializer(stringSerializer);
// Hash value序列化
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}

编写测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.heavytiger.mall;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

/**
* @author heavytiger
* @version 1.0
* @description 测试redis的功能是否正常
* @date 2022/2/11 15:17
*/
@SpringBootTest
@Component
public class RedisTemplateTest {
@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Test
public void redisTemplateTest() {
Map<String, Object> temp, map = new HashMap<>();
map.put("hash1", "value1");
map.put("hash2", "123");
map.put("hash3", Arrays.asList("1", 2, 3, "abc"));
redisTemplate.opsForValue().set("key_hash", map);
temp = (Map<String, Object>) redisTemplate.opsForValue().get("key_hash");
System.out.println("if value set == get" + map.equals(temp));
for (Map.Entry<String, Object> entry : temp.entrySet()) {
System.out.println(entry.getKey() + " : " + entry.getValue());
}
}
}

测试结果如下所示:

1
2
3
4
if value set == get true
hash3 : [1, 2, 3, abc]
hash2 : 123
hash1 : value1

redis中数据如下:

image-20220211221041301

可以看到,集群中的数据能被正常获取到。

注意事项:

在使用springboot连接集群时会出现超时的情况,且多次连接仍然无法解决,经查询,是因为在创建集群时使用127.0.0.1,没有走TCP连接,若要允许外网访问,应该设置为真实IP地址,完成后外网即可访问。

redis-cli --cluster create --cluster-replicas 1 -a 6o5FLS9bSJmdTGlyqJWxiEGf56vE1orO 121.xxx.xxx.31:6379 121.xxx.xxx.31:6380 121.xxx.xxx.31:6381 121.xxx.xxx.31:6389 121.xxx.xxx.31:6390 121.xxx.xxx.31:6391

需要重新停止服务后,删除所有rdb以及node文件,此外,由于开放了外网连接,需要在防火墙中进行相关的配置。若出现Waiting for the cluster to join提示,且之后一直无响应,原因是每个 Redis 集群节点都需要打开两个 TCP 连接。服务客户端的普通Redis TCP端口,比如6379,加上数据端口加上10000得到的端口,所以也要开放是16379

在开放完所有端口后,即可正常使用。

详见stackoverflow如下题问:

How to solve redis cluster “Waiting for the cluster to join” issue? - Stack Overflow

[修改]将集群部署到虚拟机的docker中

在1.4节部署到服务器上时没有考虑到服务器的带宽,”青春版”阿里云服务器带宽只有1Mbps,外网访问会担心因为带宽过小产生瓶颈使得QPS不高,(其实也可以直接在服务器内网跑压测,但是服务器上还挂了别的服务,所以还是用虚拟机加docker部署更得我心)

1. 首先准备集群的conf文件

和之间一样,组建3主3从Redis集群,一样使用之前的文件进行创建,这样的缺点是,不太方便指定主从结点,我们在虚拟机中同样的路径下创建redis相关的配置文件夹。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[root@localhost redis]# pwd
/usr/local/dbBackup/redis
[root@localhost redis]# ll
总用量 116
-rw-r--r--. 1 root root 345 2月 9 17:11 redis6379.conf
-rw-r--r--. 1 root root 345 2月 9 17:11 redis6380.conf
-rw-r--r--. 1 root root 345 2月 9 17:11 redis6381.conf
-rw-r--r--. 1 root root 345 2月 9 17:11 redis6389.conf
-rw-r--r--. 1 root root 345 2月 9 17:11 redis6390.conf
-rw-r--r--. 1 root root 345 2月 9 17:11 redis6391.conf
-rw-r--r--. 1 root root 93748 2月 9 17:11 redis.conf
[root@localhost dbBackup]# pwd
/usr/local/dbBackup
[root@localhost dbBackup]# ll
总用量 0
drwxr-xr-x. 2 root root 156 2月 9 17:14 redis
drwxr-xr-x. 2 root root 6 2月 9 17:08 redis_cluster

redis6379.conf为例说明:

1
2
3
4
5
6
7
8
include /usr/local/dbBackup/redis/redis.conf
port 6379
pidfile "/var/run/redis_6379.pid"
dbfilename "dump6379.rdb"
cluster-enabled yes
cluster-config-file nodes-6379.conf
cluster-node-timeout 15000
masterauth 6o5..........................orO

2. 安装docker-compose以支持对容器集群的快速编排

使用如下命令下载docker-compose

1
2
3
4
# curl -L https://get.daocloud.io/docker/compose/releases/download/1.29.2/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose

# 使用如下命令赋予可执行权限
# sudo chmod +x /usr/local/bin/docker-compose

3. 创建docker-compose.yml文件,其中完成镜像,网络,容器中内容的相关声明配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
version: '3.8'

x-images:
&default-image
redis

services:
redis6379:
image: *default-image
container_name: redis6379
command:
["redis-server", "/usr/local/dbBackup/redis/redis6379.conf"]
restart: always
volumes:
- ./redis6379.conf:/usr/local/dbBackup/redis/redis6379.conf
- ./redis.conf:/usr/local/dbBackup/redis/redis.conf
- /usr/local/dbBackup/redis_cluster:/usr/local/dbBackup/redis_cluster
- ./data/6379:/data
ports:
- 6379:6379
- 16379:16379

redis6380:
image: *default-image
container_name: redis6380
command:
["redis-server", "/usr/local/dbBackup/redis/redis6380.conf"]
restart: always
volumes:
- ./redis6380.conf:/usr/local/dbBackup/redis/redis6380.conf
- ./redis.conf:/usr/local/dbBackup/redis/redis.conf
- /usr/local/dbBackup/redis_cluster:/usr/local/dbBackup/redis_cluster
- ./data/6380:/data
ports:
- 6380:6380
- 16380:16380

redis6381:
image: *default-image
container_name: redis6381
command:
["redis-server", "/usr/local/dbBackup/redis/redis6381.conf"]
restart: always
volumes:
- ./redis6381.conf:/usr/local/dbBackup/redis/redis6381.conf
- ./redis.conf:/usr/local/dbBackup/redis/redis.conf
- /usr/local/dbBackup/redis_cluster:/usr/local/dbBackup/redis_cluster
- ./data/6381:/data
ports:
- 6381:6381
- 16381:16381

redis6389:
image: *default-image
container_name: redis6389
command:
["redis-server", "/usr/local/dbBackup/redis/redis6389.conf"]
restart: always
volumes:
- ./redis6389.conf:/usr/local/dbBackup/redis/redis6389.conf
- ./redis.conf:/usr/local/dbBackup/redis/redis.conf
- /usr/local/dbBackup/redis_cluster:/usr/local/dbBackup/redis_cluster
- ./data/6389:/data
ports:
- 6389:6389
- 16389:16389

redis6390:
image: *default-image
container_name: redis6390
command:
["redis-server", "/usr/local/dbBackup/redis/redis6390.conf"]
restart: always
volumes:
- ./redis6390.conf:/usr/local/dbBackup/redis/redis6390.conf
- ./redis.conf:/usr/local/dbBackup/redis/redis.conf
- /usr/local/dbBackup/redis_cluster:/usr/local/dbBackup/redis_cluster
- ./data/6390:/data
ports:
- 6390:6390
- 16390:16390

redis6391:
image: *default-image
container_name: redis6391
command:
["redis-server", "/usr/local/dbBackup/redis/redis6391.conf"]
restart: always
volumes:
- ./redis6391.conf:/usr/local/dbBackup/redis/redis6391.conf
- ./redis.conf:/usr/local/dbBackup/redis/redis.conf
- /usr/local/dbBackup/redis_cluster:/usr/local/dbBackup/redis_cluster
- ./data/6391:/data
ports:
- 6391:6391
- 16391:16391

使用docker-compose ps可以显示启动的容器,使用docker-compose down将停止并删除所有的容器

4. 使用docker-compose up -d命令启动容器集群

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
[root@localhost redis]# docker-compose up -d
Creating network "redis_default" with the default driver
Creating redis6389 ... done
Creating redis6379 ... done
Creating redis6381 ... done
Creating redis6390 ... done
Creating redis6380 ... done
Creating redis6391 ... done
[root@localhost redis]# docker-compose ps
Name Command State Ports
------------------------------------------------------------------------------------------------
redis6379 docker-entrypoint.sh redis ... Up 0.0.0.0:16379->16379/tcp,:::16379->16379/tc
p, 0.0.0.0:6379->6379/tcp,:::6379->6379/tcp
redis6380 docker-entrypoint.sh redis ... Up 0.0.0.0:16380->16380/tcp,:::16380->16380/tc
p, 6379/tcp,
0.0.0.0:6380->6380/tcp,:::6380->6380/tcp
redis6381 docker-entrypoint.sh redis ... Up 0.0.0.0:16381->16381/tcp,:::16381->16381/tc
p, 6379/tcp,
0.0.0.0:6381->6381/tcp,:::6381->6381/tcp
redis6389 docker-entrypoint.sh redis ... Up 0.0.0.0:16389->16389/tcp,:::16389->16389/tc
p, 6379/tcp,
0.0.0.0:6389->6389/tcp,:::6389->6389/tcp
redis6390 docker-entrypoint.sh redis ... Up 0.0.0.0:16390->16390/tcp,:::16390->16390/tc
p, 6379/tcp,
0.0.0.0:6390->6390/tcp,:::6390->6390/tcp
redis6391 docker-entrypoint.sh redis ... Up 0.0.0.0:16391->16391/tcp,:::16391->16391/tc
p, 6379/tcp,
0.0.0.0:6391->6391/tcp,:::6391->6391/tcp

5. 随便进入任意主节点,执行命令分配slots以及主从关系

1
2
3
4
[root@localhost redis]# docker exec -it redis6379 /bin/bash

# 输入以下命令
redis-cli --cluster create --cluster-replicas 1 -a 6o5.......................E1orO 192.168.205.200:6379 192.168.205.200:6380 192.168.205.200:6381 192.168.205.200:6389 192.168.205.200:6390 192.168.205.200:6391

连接redis结点查看cluster信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
>>> Performing Cluster Check (using node 192.168.205.200:6379)
M: c6412d958f8f4900940d6adb3cc693437bd0193c 192.168.205.200:6379
slots:[0-5460] (5461 slots) master
1 additional replica(s)
M: 2566111fdd7f42592720294ce9a6feb56de307d8 172.21.0.1:6380
slots:[5461-10922] (5462 slots) master
1 additional replica(s)
M: 5db0a9aabe5627a3803d9debda7661fdafab6bfa 172.21.0.1:6381
slots:[10923-16383] (5461 slots) master
1 additional replica(s)
S: e9469f171253da669f773fe220c822872084460e 172.21.0.1:6389
slots: (0 slots) slave
replicates 2566111fdd7f42592720294ce9a6feb56de307d8
S: b7658d5ea3c357892471658a45a2ce6deaf3a7a4 172.21.0.1:6390
slots: (0 slots) slave
replicates 5db0a9aabe5627a3803d9debda7661fdafab6bfa
S: 012ab8349ee18675dfcba192ce179571d6ce8f31 172.21.0.1:6391
slots: (0 slots) slave
replicates c6412d958f8f4900940d6adb3cc693437bd0193c
[OK] All nodes agree about slots configuration.
>>> Check for open slots...
>>> Check slots coverage...
[OK] All 16384 slots covered.

root@77a5bddfd6ab:/data# redis-cli -a 6o5F.........................1orO -c -p 6379
Warning: Using a password with '-a' or '-u' option on the command line interface may not be safe.
127.0.0.1:6379> ping
PONG
127.0.0.1:6379> CLUSTER INFO
cluster_state:ok
cluster_slots_assigned:16384
cluster_slots_ok:16384
cluster_slots_pfail:0
cluster_slots_fail:0
cluster_known_nodes:6
cluster_size:3
cluster_current_epoch:6
cluster_my_epoch:1
cluster_stats_messages_ping_sent:54
cluster_stats_messages_pong_sent:60
cluster_stats_messages_sent:114
cluster_stats_messages_ping_received:55
cluster_stats_messages_pong_received:54
cluster_stats_messages_meet_received:5
cluster_stats_messages_received:114

可以看到,集群创建成功

测试可以内网访问,外网无法ping

6379创建一个键keyTest,转到了6381服务,在其对应的从机6390可以读到该键

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
127.0.0.1:6379> set keyTest helloworld
-> Redirected to slot [15774] located at 172.21.0.1:6381
OK
172.21.0.1:6381> KEYS *
1) "keyTest"
# 在其他的服务中无法找到,因为slot据哈希机制分配槽位
127.0.0.1:6380> KEYS *
(empty array)

127.0.0.1:6391> CLUSTER NODES
2566111fdd7f42592720294ce9a6feb56de307d8 172.21.0.1:6380@16380 master - 0 1644420185000 2 connected 5461-10922
012ab8349ee18675dfcba192ce179571d6ce8f31 172.21.0.3:6391@16391 myself,slave c6412d958f8f4900940d6adb3cc693437bd0193c 0 1644420183000 1 connected
c6412d958f8f4900940d6adb3cc693437bd0193c 172.21.0.1:6379@16379 master - 0 1644420186353 1 connected 0-5460
# 可以看到,6390从机对应6381主机,因此设置的数据可以被查到
b7658d5ea3c357892471658a45a2ce6deaf3a7a4 172.21.0.1:6390@16390 slave 5db0a9aabe5627a3803d9debda7661fdafab6bfa 0 1644420185000 3 connected
e9469f171253da669f773fe220c822872084460e 172.21.0.1:6389@16389 slave 2566111fdd7f42592720294ce9a6feb56de307d8 0 1644420185345 2 connected
5db0a9aabe5627a3803d9debda7661fdafab6bfa 172.21.0.1:6381@16381 master - 0 1644420184338 3 connected 10923-16383

# 可以看到,对应的从机可以读到主机的数据
127.0.0.1:6390> keys *
1) "keyTest"
127.0.0.1:6391> keys *
(empty array)

安装nmap测试工具,查看端口开放情况

1
2
# 安装nmap
[root@localhost redis]# yum install nmap

查看端口开放情况

1
2
3
4
5
6
7
8
9
10
11
12
13
[root@localhost redis]# nmap 127.0.0.1

Starting Nmap 6.40 ( http://nmap.org ) at 2022-02-09 23:28 CST
Nmap scan report for localhost (127.0.0.1)
Host is up (0.0000060s latency).
Not shown: 996 closed ports
PORT STATE SERVICE
22/tcp open ssh
25/tcp open smtp
3306/tcp open mysql
6389/tcp open clariion-evr01

Nmap done: 1 IP address (1 host up) scanned in 1.59 seconds

也可以查看端口开放情况

1
2
[root@localhost redis]# firewall-cmd --query-port=6379/tcp
no

因此,需要开放其他使用到的端口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
[root@localhost redis]# firewall-cmd --zone=public --add-port=6379/tcp --permanent
success
[root@localhost redis]# firewall-cmd --zone=public --add-port=6380/tcp --permanent
success
[root@localhost redis]# firewall-cmd --zone=public --add-port=6381/tcp --permanent
success
[root@localhost redis]# firewall-cmd --zone=public --add-port=6389/tcp --permanent
success
[root@localhost redis]# firewall-cmd --zone=public --add-port=6390/tcp --permanent
success
[root@localhost redis]# firewall-cmd --zone=public --add-port=6391/tcp --permanent
success
[root@localhost redis]# firewall-cmd --zone=public --add-port=16379/tcp --permanent
success
[root@localhost redis]# firewall-cmd --zone=public --add-port=16380/tcp --permanent
success
[root@localhost redis]# firewall-cmd --zone=public --add-port=16381/tcp --permanent
success
[root@localhost redis]# firewall-cmd --zone=public --add-port=16389/tcp --permanent
success
[root@localhost redis]# firewall-cmd --zone=public --add-port=16390/tcp --permanent
success
[root@localhost redis]# firewall-cmd --zone=public --add-port=16391/tcp --permanent
success
[root@localhost redis]# firewall-cmd --zone=public --list-ports
6379/tcp 6380/tcp 6381/tcp 6389/tcp 6390/tcp 6391/tcp 16391/tcp 16379/tcp 16380/tcp 16381/tcp 16389/tcp 16390/tcp
[root@localhost redis]# firewall-cmd --reload
success

再次测试,即可在外网正常访问

image-20220219141106992

6. 测试是否可以正常连接到redis集群

多次尝试,并检查端口,发现所有涉及的端口均可以访问,但是在SpringBoot中尝试了一整天仍然无法连接到redis数据库,但是阿里云服务器上同样方式部署的服务就可以正常连接,无果。最后考虑到lua脚本无法自动切换连接的端口,因此更换方式采用连接docker中部署的单redis

流程如下:

使用如下命令启动容器:

1
2
3
4
5
docker run --name redis \
-v /usr/local/dbBackup/redis/data:/data \
-v /usr/local/dbBackup/redis/redis_single.conf:/etc/redis/redis.conf \
-p 6379:6379 \
-d redis redis-server /etc/redis/redis.conf

启动后,测试是否可以正常访问:

修改yml配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#  单机redis配置
redis:
host: 192.168.205.200
port: 6379
database: 0
password: 6o5FLS9bSJmdTGlyqJWxiEGf56vE1orO
# Redis连接池设置
lettuce:
pool:
enabled: true
# 连接池最大连接数(使用负值表示没有限制)
max-active: 1000
# 连接池中的最大空闲连接
max-idle: 10
# 连接池中的最小空闲连接
min-idle: 5
# 连接池最大阻塞等待时间(使用负值表示没有限制)
max-wait: -1

测试如下:

image-20220220095508459

这次测试终于没有问题了,之后就采用该模式连接。

Docker中部署RabbitMQ

1. 首先在线拉取RabbitMQ镜像

1
docker pull rabbitmq:3-management

2. 安装RabbitMQ,即执行容器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
docker run \
-e RABBITMQ_DEFAULT_USER=heavytiger \
-e RABBITMQ_DEFAULT_PASS=a.........m \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management

[root@localhost /]# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
8a7012d4a213 rabbitmq:3-management "docker-entrypoint.s…" 10 seconds ago Up 10 seconds 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp mq
4ba1eea6fba3 redis "docker-entrypoint.s…" 42 hours ago Up 42 hours 0.0.0.0:6379->6379/tcp, :::6379->6379/tcp redis
757d2d305710 mysql:8.0.28 "docker-entrypoint.s…" 43 hours ago Up 43 hours 0.0.0.0:3306->3306/tcp, :::3306->3306/tcp, 33060/tcp mysql

3. 访问RabbitMQ Management

在浏览器中访问地址:your virtual machine ip:15672

在输入启动容器时配置的帐号密码,即可正常访问: -e RABBITMQ_DEFAULT_USER=heavytiger-e RABBITMQ_DEFAULT_PASS=a.........m

image-20220218195919400

4. 在SpringBoot中配置RabbitMQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
spring:
# RabbitMQ配置,消息队列
rabbitmq:
host: 192.168.205.200
username: heavytiger
password: 37628981mm
virtual-host: /
port: 5672
listener:
simple:
# 消费者最小数量
concurrency: 10
# 消费者最大数量
max-concurrency: 10
# 限制一次处理一条消息,不要预取,默认预取250条
prefetch: 1
# 启动时是否默认启动容器
auto-startup: true
# 被拒绝时选择重新入队
default-requeue-rejected: true
template:
# 如果被拒绝了,进行重试
retry:
# 开启重试
enabled: true
# 重试时间
initial-interval: 1000ms
# 默认重试次数
max-attempts: 3
# 重试最大间隔时间
max-interval: 10000ms
# 重试的间隔乘数,例如2.0,第一次等10s,第二次20s,第三次40s
multiplier: 1

秒杀模块的实现

数据库表的设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
DROP TABLE IF EXISTS seckill_order;

DROP TABLE IF EXISTS seckill_products;

CREATE TABLE `seckill_order` (
`id` INT UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '秒杀订单ID',
`customer_id` INT UNSIGNED NOT NULL COMMENT '用户ID',
`order_id` INT UNSIGNED NOT NULL COMMENT '订单ID',
`product_id` INT UNSIGNED NOT NULL COMMENT '商品ID',
PRIMARY KEY ( `id` )
) ENGINE = INNODB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4;

CREATE TABLE `seckill_products` (
`id` INT UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '秒杀商品ID',
`product_id` INT UNSIGNED NOT NULL COMMENT '商品ID',
`seckill_price` DECIMAL ( 10, 2 ) DEFAULT ( 0.01 ) NOT NULL COMMENT '秒杀价格',
`stock` INT UNSIGNED NOT NULL COMMENT '库存数量',
`start_date` datetime NOT NULL COMMENT '秒杀开始时间',
`end_date` datetime NOT NULL COMMENT '秒杀结束时间',
PRIMARY KEY ( `id` )
) ENGINE = INNODB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4;

INSERT INTO `mall`.`seckill_products` (`id`, `product_id`, `seckill_price`, `stock`, `start_date`, `end_date`) VALUES (1, 1, 99.00, 10, '2022-02-11 14:00:00', '2022-02-11 14:10:00');

编写pojo和mapper层

秒杀商品表以及秒杀订单表要实现增删改查等功能

比较简单,和之前SSM框架搭建的操作步骤一样即可,不再赘述

只展示SeckillOrderMapper.xmlSeckillProductMapper.xml文件的代码

SeckillOrderMapper.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.heavytiger.mall.mapper.SeckillOrderMapper">

<insert id="addSeckillOrder" useGeneratedKeys="true" keyProperty="id" keyColumn="id"
parameterType="SeckillOrder">
INSERT INTO mall.seckill_order (id, customer_id, order_id, product_id)
VALUES (#{id}, #{customerId}, #{orderId}, #{productId})
</insert>

<delete id="deleteSeckillOrder" parameterType="int">
DELETE
FROM mall.seckill_order
WHERE id = #{id}
</delete>

<update id="updateSeckillOrder" parameterType="SeckillOrder">
UPDATE mall.seckill_order
<set>
<if test="customerId != null">customer_id=#{customerId},</if>
<if test="OrderId != null">order_id=#{orderId},</if>
<if test="ProductId != null">product_id=#{productId}</if>
</set>
WHERE id = #{id}
</update>

<resultMap id="SeckillOrderMap" type="SeckillOrder">
<id column="id" property="id"/>
<result column="customer_id" property="customerId"/>
<result column="order_id" property="orderId"/>
<result column="product_id" property="productId"/>
</resultMap>

<select id="querySeckillOrderById" parameterType="int" resultMap="SeckillOrderMap">
SELECT *
FROM mall.seckill_order
WHERE id = #{id}
</select>

<select id="querySeckillOrders" resultMap="SeckillOrderMap">
SELECT *
FROM mall.seckill_order
</select>

<select id="querySeckillOrdersByCustomerId" parameterType="int" resultMap="SeckillOrderMap">
SELECT *
FROM mall.seckill_order
WHERE customer_id = #{customerId}
</select>
</mapper>

SeckillProductMapper.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.heavytiger.mall.mapper.SeckillProductMapper">

<insert id="addSeckillProduct" useGeneratedKeys="true" keyProperty="id" keyColumn="id"
parameterType="SeckillProduct">
INSERT INTO mall.seckill_products(id, product_id, seckill_price, stock, start_date, end_date)
VALUES (#{id}, #{productId}, #{seckillPrice}, #{stock}, #{startDate}, #{endDate})
</insert>

<delete id="deleteSeckillProduct" parameterType="int">
DELETE
FROM mall.seckill_products
WHERE id = #{id}
</delete>

<update id="updateSeckillProduct" parameterType="SeckillProduct">
UPDATE mall.seckill_products
<set>
<if test="productId != null">product_id=#{productId},</if>
<if test="seckillPrice != null">seckill_price=#{seckillPrice},</if>
<if test="stock != null">stock=#{stock},</if>
<if test="startDate != null">start_date=#{startDate},</if>
<if test="endDate != null">end_date=#{endDate}</if>
</set>
WHERE id = #{id}
</update>

<resultMap id="SeckillProductMap" type="SeckillProduct">
<id column="id" property="id"/>
<result column="product_id" property="productId"/>
<result column="seckill_price" property="seckillPrice"/>
<result column="stock" property="stock"/>
<result column="start_date" property="startDate"/>
<result column="end_date" property="endDate"/>
</resultMap>

<select id="querySeckillProductById" parameterType="int" resultMap="SeckillProductMap">
SELECT *
FROM mall.seckill_products
WHERE id = #{id}
</select>

<select id="querySeckillProducts" resultMap="SeckillProductMap">
SELECT *
FROM mall.seckill_products
</select>

<select id="querySeckillProductByProductId" resultMap="SeckillProductMap">
SELECT *
FROM mall.seckill_products
WHERE product_id = #{productId}
</select>

</mapper>

编写Service层

之后会编写代码实现以下接口,有些接口可能没有使用过,有些为了不在service中返回controller层需要返回给前台的格式或者抛出异常,直接在controller层实现了,避免代码量比较大,mapper中的一些方法也没有使用过,仅作为参考

SeckillProductService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package com.heavytiger.mall.service;

import com.heavytiger.mall.dto.SeckillExposer;
import com.heavytiger.mall.pojo.SeckillProduct;
import org.springframework.stereotype.Service;

import java.util.List;

/**
* @author heavytiger
* @version 1.0
* @description 秒杀商品的Service层接口
* @date 2022/2/13 23:17
*/
@Service
public interface SeckillProductService {
/**
* 通过商品的ID获取秒杀的物品信息
* @param id 秒杀的商品信息
* @return 查询到的可以秒杀的商品
*/
public SeckillProduct querySeckillProductById(Integer id);

/**
* 查询秒杀商品表中的商品编号,在秒杀结束后删除秒杀商品表中的记录,因此商品编号有唯一性
* @param pid 需要查询的秒杀商品
* @return 获得当前商品的秒杀信息
*/
public SeckillProduct querySeckillProductByProductId(Integer pid);

/**
* 进行秒杀某个物品.
* @param id 秒杀的商品ID
* @param userId 顾客的ID
* @param pathMD5 校验秒杀接口地址,防止接口暴露
* @return 是否秒杀成功
*/
public Integer doSeckill(Integer id, Integer userId, String pathMD5);


/**
* 查询全部秒杀商品
* @return 返回查询到的所有秒杀商品
*/
public List<SeckillProduct> querySeckillProducts();

/**
* 获取商品详情信息
* @param id
* @return
*/
public SeckillExposer getDetail(Integer id);

/**
* 删除秒杀商品
* @param id
* @return
*/
public Integer deleteSeckillProduct(Integer id);

/**
* 创建唯一的秒杀地址,防止地址提前暴露
* @param userId
* @param seckillId
* @return
*/
public String createPath(Integer userId, Integer seckillId);

/**
* 修改秒杀商品表
* @param seckillProduct
* @return
*/
public Integer updateSeckillProduct(SeckillProduct seckillProduct);
}

SeckillOrderService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.heavytiger.mall.service;

import com.heavytiger.mall.pojo.SeckillOrder;

/**
* @author heavytiger
* @version 1.0
* @description 秒杀商品订单的Service层接口
* @date 2022/2/13 23:18
*/
public interface SeckillOrderService {

/**
* 添加秒杀订单
*
* @param userId 用户的id
* @param seckillId 需要添加的秒杀订单
* @return 返回影响的行数
*/
public Integer addSeckillOrder(Integer seckillId, Integer userId);

/**
* 根据订单号查询订单
* @param id 需要查询的订单号
* @return 返回根据订单号查询到的订单
*/
public SeckillOrder querySeckillOrderById(Integer id);
}

秒杀模块实现的思路

秒杀商品的预热

首先可以秒杀的商品相关信息应该存储在数据库中,可以使得数据持久化,后台管理也可以方便地添加秒杀商品的预热记录,但是如何将预热的数据导入到秒杀模块中呢?

我使用了Spring提供的定时任务,将数据库中存储的秒杀商品数据导入到本地,使用Redis进行缓存,库存缓存一份,商品数据缓存一份,先在Redis中预减库存,并将成功秒杀到的商品生成用户订单 ,这将极大地提升QPS,并且降低数据源的压力

使用Spring的注解,每隔5分钟扫描一次数据库秒杀商品表,如果Redis中已经存在库存信息则不重复添加,否则将数据导入后再次更新数据

之后为了再次提高QPS,设置了一个HashMap类型的库存售罄的标志,在库存清空的时候,将该标记设置为True,之后用户访问API将访问标志后直接返回库存不足的信息,可以将QPS提高近一倍

瞬时高并发的处理

一般在秒杀开始的瞬间,用户对接口请求的并发量急速增加,可能在一瞬间达到顶峰,例如本次测试时使用2000个用户同时进行秒杀,此时这种大部分用户会秒杀失败的场景,可能会在一瞬间将后台的数据库干掉,因此不能让这些流量在一瞬间全部进入后台进行磁盘IO,我们需要改进系统使其能应付该瞬时高并发场景

可以尝试以下的方式:

  1. 页面静态化
  2. CDN加速
  3. 缓存预减库存
  4. 异步消息处理订单
  5. 限流
  6. 分布式锁等

(由于本人不熟悉前端,因此没有做用户端的内容展示。使用前后端分离的方案,只用Jmeter压测了API请求)

缓存预减库存

在秒杀过程中,系统会检查库存是否充足,若充足则允许下单,写数据库,若不够,则直接返回该商品已经被抢购完,但是只有很少的人能够成功秒杀商品,因此这是很典型的读多写少型场景。假如数十万的请求查数据库库存是否充足,数据库会挂掉,因为连接资源有限无法支撑如此大的流量。

因此我们使用Redis进行库存的缓存,我的思路是首先在预热阶段将商品的库存和秒杀商品的信息(id,物品id,秒杀总数,开始时间,结束时间)加入Redis,在访问秒杀API接口时,通过Redis预减库存,之后通过RabbitMQ异步下订单,提高QPS。

缓存击穿,缓存穿透问题

使用Redis时会存在缓存击穿以及缓存穿透等问题,由于我使用了定时任务,并且保证所有的秒杀活动设置都需要提前1h,因此能保证在秒杀开始时,Redis中一定存在相关商品的库存缓存,若不存在一定是因为没有该商品,这样考虑理论上可以避免缓存击穿穿透问题。

解决缓存击穿的方案

但是,仍然会存一些bug,比如扫描表挂掉了,阻塞地情况,因此我们可以考虑使用分布式锁来解决缓存击穿的问题,每次访问时,如果没有查到缓存库存,让该线程尝试持有使用Redis实现的自旋锁,如果没有抢到锁则阻塞,抢到锁的将秒杀商品数据导入Redis缓存并释放锁,其他的线程自旋再次尝试获取缓存(由于已经设计了定时任务预热商品的步骤,因此没有实现分布式锁)也可以使用redission实现分布式锁

解决缓存穿透的方案

即使解决了缓存击穿,仍然可能因为秒杀商品表中没有商品而导致线程不停地重复获得锁,但是从数据库中又没有取到值,最后仍导致效率极低,可以使用布隆过滤器解决,我使用哈希表作为标志解决这个问题。

库存预减流程

我使用Redis中的lua脚本实现伪事务的效果,保证操作的原子性,这样就不会导致提交覆盖的问题产生。

RabbitMQ异步生成订单

秒杀场景中,并发量巨大的是秒杀功能,下单和支付功能实际的并发量很小,所以在设计秒杀模块时,应该将下单支付模块解耦出来,作为异步任务处理

在这里我们使用了RabbitMQ消息队列来实现异步生成订单的功能(主要是熟悉下RabbitMQ的使用)。

消息队列信息丢失

若消息队列发送订单信息时出现了信息丢失,有三种可能:

  1. 生产者往MQ写数据丢失了
    可能因为网络原因数据传输丢失。
  2. MQ本身数据丢失了
    数据放在内存中,MQ机器宕机了,那么内存中的数据丢失了。
  3. 消费者消费的时候丢失了
    消费者接受到消息,马上给MQ返回已经接受到消息的回复,消费者接着处理逻辑,在处理过程中,消费者宕机了;下次重启的时候消费者会获取到下一条数据消费,这样数据就丢失了。

解决方案:

  1. 解决生产者丢失数据

生产者开启confirm模式,每个消息分配唯一id,生产者发送完消息之后就不用管了,MQ如果成功接收到消息,那么会调用生产者的ack(String messageId)方法;MQ如果接收失败,那么会调用生产者nack(String messageId)方法,生产者根据情况判断是否要重试;

  1. 解决MQ丢失数据

开启持久化,把内存中的数据写入磁盘,这样可以使得服务器宕机后重启时仍然能恢复消息数据

  1. 解决消费者丢失数据

    消费者关闭autoAck,等自己全部处理完成后再发送ackRabbitMQ,这样可以使得数据再消费者端不丢失

消息队列重复消费

再消费者消费消息时,若ack应答时出现了网络丢失等情况,可能会导致重复消费的情况,因为MQ有重试机制,会重新发送消息给消费者端,导致生成多条订单。

我的方案是在Redis中建立一个set保存秒杀成功的用户id,如果消费者端要生成该用户的订单,则先尝试删除set中的id值,若删除成功正常执行,若不成功则提示重复生成订单,若之后的操作存在问题,Spring事务会回滚。

秒杀模块的相关实现

秒杀商品的预热实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
* @author heavytiger
* @version 1.0
* @description 扫描数据库,将秒杀信息储存到Redis中
* @date 2022/2/16 11:59
*/

@Component
public class DatabaseScanTask implements InitializingBean {

@Autowired
private SeckillProductService seckillProductService;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private HashMap<String, Boolean> seckillOverMap;

@Scheduled(cron = "30 */5 * * * ?")
@Transactional
public void databaseScanTask() {
// 商品预热,每隔5分钟从将未添加的秒杀商品数据添加到Redis中,例如12:00:30, 12:05:30, 12:10:30
ValueOperations<String, Object> valueOperations = redisTemplate.opsForValue();
List<SeckillProduct> products = seckillProductService.querySeckillProducts();
for (SeckillProduct product : products) {
Integer curStock = (Integer) valueOperations.get("seckillStock:" + product.getId());
if (curStock == null) {
// 表示redis中的该秒杀商品为空,应该将秒杀商品添加进去
long curTimestamp = System.currentTimeMillis(); // 当前时间
long endTimestamp = product.getEndDate().getTime(); // 结束时间

// 如果当前时间大于秒杀时间,说明秒杀已经结束了,将秒杀商品表中的商品删除,商品售罄标记删除,直接返回
if (endTimestamp < curTimestamp) {
seckillProductService.deleteSeckillProduct(product.getId());
seckillOverMap.remove("seckillProduct:" + product.getId());
return;
}
// 设置秒杀商品的库存
valueOperations.setIfAbsent("seckillStock:" + product.getId(), product.getStock(),
endTimestamp - curTimestamp, TimeUnit.MILLISECONDS);
// 将秒杀商品信息缓存
valueOperations.setIfAbsent("seckillProduct:" + product.getId(), product,
endTimestamp - curTimestamp, TimeUnit.MILLISECONDS);
}
// 设置秒杀商品的售罄状态为False,表示未售罄
seckillOverMap.putIfAbsent("seckillProduct:" + product.getId(), Boolean.FALSE);
}
}

@Override
public void afterPropertiesSet() throws Exception {
// 在项目启动初始化所有的Bean时进行初始化系统,加载所有的秒杀商品
databaseScanTask();
}
}

使用语句:@Scheduled(cron = "30 */5 * * * ?")创建定时任务,每5分钟的第30s执行定时任务,因为秒杀一般会选在整点开启,因此,特意错开到30s的时候执行,且每隔5min执行一次,在设置秒杀活动时,对开始时间设置要求,这样就不会出现秒杀时无法找到商品的情况。

注意此时在Redis中设置的库存键的名称以及商品键的名称

注意,库存售罄标志使用@Bean单例模式注入,方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.heavytiger.mall.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

/**
* @author heavytiger
* @version 1.0
* @description 标记秒杀结束
* @date 2022/2/21 16:31
*/
@Configuration
public class SeckillOverConfig {
/**
* @return 获得全局标记hashmap,标记商品是否卖完
*/
@Bean
public HashMap<String, Boolean> seckillOverMap() {
return new HashMap<>();
}
}

秒杀的相关设计实现

获取秒杀的信息

前端会需要与后端同步信息,获取服务器的时间,秒杀开始的时间,结束的时间等信息,因此编写如下dto类和接口:

dto类SeckillExposer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* @author heavytiger
* @version 1.0
* @description 秒杀商品的详情页面获取服务器的时间,秒杀的相关状态及秒杀的请求地址
* @date 2022/2/16 17:11
*/
public class SeckillExposer {
// 当前是否允许秒杀 true允许,false不允许
private Integer status;
// 开始秒杀时间
private Long start;
// 秒杀结束时间
private Long end;
// 当前服务器时间
private Long now;
...
}

获取秒杀信息接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@GetMapping("/getSeckillDetail/{seckillId}")
public ResultBean<Object> getSeckillDetail(@PathVariable Integer seckillId) {
// 获取当前的产品秒杀状态
// 修改,因为可能被很多人请求,不应该请求数据库,防止业务崩溃,故将该数据缓存到redis中
SeckillProduct seckillProduct =
(SeckillProduct) redisTemplate.opsForValue().get("seckillProduct:" + seckillId);
SeckillExposer seckillExposer = new SeckillExposer();
if (seckillProduct == null) {
// 说明该商品无法被秒杀
seckillExposer.setStatus(2);
} else {
// 获取当前服务器时间
seckillExposer.setNow(System.currentTimeMillis());
// 获取秒杀商品开始时间
seckillExposer.setStart(seckillProduct.getStartDate().getTime());
// 获取秒杀商品借书时间
seckillExposer.setEnd(seckillProduct.getEndDate().getTime());
if (seckillExposer.getNow() < seckillExposer.getStart()) {
// 表示秒杀未开始
seckillExposer.setStatus(0);
} else {
// 表示秒杀开始
seckillExposer.setStatus(1);
}
}
return new ResultBean<>(EnumResult.SUCCESS, seckillExposer);
}

其实为了展示秒杀商品的剩余数量,可以给在秒杀商品表中设置预设库存和实际库存两个参数,我设计系统时没有考虑到,只在秒杀成功后减了商品表中的库存,加入此时服务器宕机,理论上没有办法恢复之前的状态,即订单已经生成,但是重启后又会读出预设库存作为可以秒杀的库存数,这样会造成数据不一致。

获取秒杀地址

为了避免提前暴露秒杀的接口,要求对地址API进行访问,在访问后如果一切正常(秒杀商品存在,用户的权限正常,商品仍有库存可以秒杀等),即可随机生成一段编码,将这段编码存储在Redis中,设置过期时间,在编码过期之前,可以将编码作为秒杀接口的请求体发送请求,若检测正常,即可以进行秒杀操作。

获取秒杀地址的接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@GetMapping("/getSeckillPath/{seckillId}")
public ResultBean<Object> getSeckillPath(HttpServletRequest request, @PathVariable Integer seckillId) {
Boolean seckillOver = seckillOverMap.get("seckillProduct:" + seckillId);
if(seckillOver == null) {
// 迭代为先进行内存访问,用于提升QPS,避免不必要的Redis访问
// 如果没有获取到值,直接返回不存在商品
return new ResultBean<>(EnumResult.SECKILL_NO_PRODUCT);
} else if (seckillOver) {
// 若售罄标记为真,表示已经售罄
return new ResultBean<>(EnumResult.SECKILL_STOCK_EMPTY);
}
String token = request.getHeader("Authorization");
Integer userId = JwtUtil.getUserId(token);
// 判断是否存在该商品的秒杀信息
SeckillProduct seckillProduct =
(SeckillProduct) redisTemplate.opsForValue().get("seckillProduct:" + seckillId);
if (seckillProduct == null) {
// 说明该商品不存在
return new ResultBean<>(EnumResult.SECKILL_NO_PRODUCT);
}
if (System.currentTimeMillis() < seckillProduct.getStartDate().getTime()) {
// 说明还没有到抢购时间,不允许进行抢购,不暴露秒杀地址
return new ResultBean<>(EnumResult.SECKILL_NO_START);
}
String path = seckillProductService.createPath(userId, seckillId);
return new ResultBean<>(EnumResult.SUCCESS, path);
}

后来迭代时加入了秒杀商品售罄的标志,如果商品售罄,将直接返回售罄的信息,不会再进行相关业务的判断,这样可以提高该接口的QPS,同时前端也可以通过返回的售罄消息禁用用户的秒杀接口,降低秒杀接口的负载,实际测试后,QPS可以提高一倍。

获取秒杀地址的接口的Service层代码:

1
2
3
4
5
6
7
8
@Override
public String createPath(Integer userId, Integer seckillId) {
String pathMD5 = UUID.randomUUID().toString().replace("-", "");
// 设置30s内允许访问路径进行秒杀
redisTemplate.opsForValue().set("seckillPath:" + seckillId + ":" + userId, pathMD5,
30, TimeUnit.SECONDS);
return pathMD5;
}

设置30s后键过期,保证Redis不至于内存开销过大。

获取当前用户是否秒杀成功

1
2
3
4
5
6
7
8
9
10
11
12
@GetMapping("/getSeckillResult/{seckillId}")
public ResultBean<Object> getSeckillResult(HttpServletRequest request, @PathVariable Integer seckillId) {
String token = request.getHeader("Authorization");
Integer userId = JwtUtil.getUserId(token);
// 从Redis中查找当前的顾客是否存在记录
Boolean hasKey = redisTemplate.opsForSet().isMember("seckillOrder:" + seckillId, userId);
if (hasKey != null && hasKey) {
// 表示存在当前用户的秒杀记录
return new ResultBean<>(EnumResult.SUCCESS);
}
return new ResultBean<>(EnumResult.SECKILL_NO_ORDER);
}

其实这个接口后来就没有使用过了,因为Redis中储存的秒杀用户记录都在消息队列接受到消息后被清空了,以避免重复创建订单的情况发生。

其实可以去掉这个API接口,使用websocket技术实现秒杀后的信息回传通知,这样可也提高用户体验,缺点是需要存放用户的session,会占用服务器资源,与我使用jwt的理念不相符。

秒杀功能的实现

这个接口为重中之重,本来不应该在controller层实现Service层应该实现的内容,但是由于我偷懒没有去制定controller层和Service层之间的返回值规范,因此就直接在controller层中返回ResultBean封装对象了。

Controller层的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/**
* 秒杀业务.
*
* @param request 获取userid
* @param pathMD5 秒杀地址,防止接口提前暴露
* @param seckillId 秒杀id
* @return 返回秒杀状态
*/
@PostMapping("/doSeckill/{seckillId}")
public ResultBean<Object> doSeckill(HttpServletRequest request,
@RequestBody(required = false) String pathMD5,
@PathVariable Integer seckillId) {
// 用于判断库存是否已经卖完
Boolean seckillOver = seckillOverMap.get("seckillProduct:" + seckillId);
if (seckillOver == null) {
// 迭代为先进行内存访问,用于提升QPS,避免不必要的Redis访问
// 如果没有获取到值,直接返回不存在商品
// 迭代后,获取秒杀地址的QPS提升到1394,执行秒杀的QPS提升到1292
return new ResultBean<>(EnumResult.SECKILL_NO_PRODUCT);
} else if (seckillOver) {
// 若售罄标记为真,表示已经售罄
return new ResultBean<>(EnumResult.SECKILL_STOCK_EMPTY);
}
// 获取userId
String token = request.getHeader("Authorization");
Integer userId = JwtUtil.getUserId(token);
// 判断抢购地址是否一致
String pathInRedis = (String) redisTemplate.opsForValue().get("seckillPath:" + seckillId + ":" + userId);
if (!pathMD5.equals(pathInRedis)) {
// 说明抢购的地址不一致,不允许继续抢购
return new ResultBean<>(EnumResult.SECKILL_PATH_ERROR);
}
// 判断是否重复抢购
Boolean hasKey = redisTemplate.opsForSet().isMember("seckillOrder:" + seckillId, userId);
if (hasKey != null && hasKey) {
// 说明该用户存在重复秒杀
return new ResultBean<>(EnumResult.SECKILL_ORDER_REPEAT);
}
// 使用lua脚本进行减库存以及存入订单的操作,必须使用Long,Integer会产生报错
Long execute = redisTemplate.execute(seckillScript, Collections.emptyList(), userId, seckillId);
if (execute == null) {
// 避免提示空指针
return new ResultBean<>(EnumResult.INTERNAL_SERVER_ERROR);
} else if (execute == 0) {
// 说明此时库存为空
// 此时测试秒杀服务的QPS为740,获取MD5秒杀路径的QPS为686
// 之后可以再次改进,在内存中设置库存清零标记,当标记后,访问秒杀及路径接口直接返回库存为空,可以提高QPS
seckillOverMap.replace("seckillProduct:" + seckillId, Boolean.TRUE);
return new ResultBean<>(EnumResult.SECKILL_STOCK_EMPTY);
} else if (execute == 1) {
// 说明此时秒杀成功,使用消息队列异步下订单
mqSender.sendSeckillMessage(userId, seckillId);
// System.out.println(userId + ":创建订单成功!");
return new ResultBean<>(EnumResult.SUCCESS);
} else {
// 说明已经秒杀成功,提示禁止重复秒杀
// System.out.println("禁止重复秒杀!");
return new ResultBean<>(EnumResult.SECKILL_ORDER_REPEAT);
}
}

已经在其中写了比较详尽的注释,就不再解释了,Redis预减库存需要使操作具有原子性,因此我使用了Redis支持的lua脚本的方式,lua脚本在使用时会阻塞别的线程访问Redis,因此不会出现并发的各种问题。

lua脚本如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
local userId = ARGV[1];
local seckillId = ARGV[2];
local stockKey = "seckillStock:" .. seckillId;
local usersKey = "seckillOrder:" .. seckillId;
local userExists = redis.call("sIsMember", usersKey, userId);
if tonumber(userExists) == 1 then
return 2;
end
local num = redis.call("get", stockKey);
if tonumber(num) <= 0 then
return 0;
else
redis.call("decr", stockKey);
redis.call("sAdd", usersKey, userId);
end
return 1;

消息队列处理订单业务

我使用了RabbitMQtopic模式实现消息队列,原因是因为该模式用处比较广,如果之后还要添加其他的模块,也可以方便地进行添加,使用通配符匹配即可。

SeckillControllerdoSeckill业务执行预减库存操作时,如果lua脚本的返回值表示秒杀成功,则会向生产者RabbitMQ发送使用哈希表封装的消息,Exchange接受到消息后,则会根据topic模式定义的规则发送给相应的队列,这些队列再将消息发送给消费者进行处理,生成订单。

首先定义Bean,进行QUEUEEXCHANGE的绑定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* @author heavytiger
* @version 1.0
* @description RabbitMQ配置类
* @date 2022/2/21 20:10
*/
@Configuration
public class RabbitMQConfig {
// 配置队列名,使用Topic模式
public static final String QUEUE = "seckillQueue";
// 配置交换器名
public static final String EXCHANGE = "seckillExchange";

@Bean
public Queue queue() {
return new Queue(QUEUE);
}

@Bean
public TopicExchange topicExchange() {
return new TopicExchange(EXCHANGE);
}

@Bean
public Binding binding() {
// 匹配路径为seckill.#的队列
return BindingBuilder.bind(queue()).to(topicExchange()).with("seckill.#");
}
}

生产者代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* @author heavytiger
* @version 1.0
* @description 发送消息
* @date 2022/2/21 20:30
*/
@Service
public class MQSender {

@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 发送消息
*/
public void sendSeckillMessage(Integer userId, Integer seckillId) {
Map<String, Integer> message = new HashMap<>();
message.put("userId", userId);
message.put("seckillId", seckillId);
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "seckill.message", JsonUtil.objToJson(message));
}
}

消费者的代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* @author heavytiger
* @version 1.0
* @description 接收消息
* @date 2022/2/21 20:30
*/
@Service
public class MQReceiver {

private static final Logger logger = LoggerFactory.getLogger(MQReceiver.class);

@Autowired
private SeckillOrderService seckillOrderService;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@RabbitListener(queues = RabbitMQConfig.QUEUE)
@Transactional(isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRED)
public void receiveSeckillMessage(String message) {
HashMap<String, Integer> map = JsonUtil.jsonToObj(message, HashMap.class);
Integer userId = map.get("userId");
Integer seckillId = map.get("seckillId");
// 将秒杀成功的订单与Redis中的记录进行比较,如果存在说明该记录正确,并将Redis中的订单数据删除
Long result = redisTemplate.opsForSet().remove("seckillOrder:" + seckillId, userId);
if (result == null) {
// 说明此时集合不存在,其中的元素已经全被删除
logger.info("Redis中秒杀商品{}的订单集合已被全部删除!", seckillId);
throw new RuntimeException("Redis中" + seckillId + "商品的订单集合已被全部删除!");
} else if (result == 1) {
// 说明只删除了一个,进行记录
logger.info("用户{}秒杀商品{}成功!", userId, seckillId);
} else {
// 说明没有删除,没有删除则证明订单已经被创建了,直接返回以免出现重复消费的情况
logger.warn("用户{}已经秒杀商品{}!请勿重复创建订单!", userId, seckillId);
}
// 添加秒杀订单,此操作会先将商品加入购物车,再将购物车中的商品结算,结算完成后生成订单,再将订单数据回填到秒杀订单中
if(seckillOrderService.addSeckillOrder(seckillId, userId) == 1) {
// 说明订单创建成功
logger.info("用户{}秒杀商品{}号的订单被成功创建!", userId, seckillId);
}
}
}

消费者和之前描述的设计思路一致,使用Redis中键名为"seckillOrder:" + seckillId集合的value标记秒杀成功的用户,如果能正常地删除数据,则说明是第一次生成订单,可以正常生成。

之后调用SeckillOrderServiceImpl中的addSeckillOrder方法,添加秒杀订单,注意此处出现了一个比较严重的bug,严重影响程序运行效率,此处我使用了注解式事务,以确保ACID,但是出现了死锁的问题,之后我会在第六章进行解释。

addSeckillOrder代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 添加秒杀订单
*
* @param userId 用户的id
* @param seckillId 需要添加的秒杀订单
* @return 返回影响的行数
*/
@Override
@Transactional(isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRED)
public Integer addSeckillOrder(Integer seckillId, Integer userId) {
// 1. 首先需要将商品添加到购物车中
// 获取Redis中存储的商品的秒杀价格
SeckillProduct seckillProduct =
(SeckillProduct) redisTemplate.opsForValue().get("seckillProduct:" + seckillId);
if(seckillProduct == null) {
throw new RuntimeException("Redis中不存在相关商品!");
}
OrderCart tempOrderCart = new OrderCart(null, userId,
seckillProduct.getProductId(), 1, seckillProduct.getSeckillPrice(), 1,
null, null, null);
orderCartService.addOrderCart(tempOrderCart);
// 2. 将购物车中的商品进行结算
Order tempOrder = new Order(null, null, userId, null,
null, null, null, null);
orderService.addOrder(tempOrder);
// 3. 将数据回写到秒杀订单表
SeckillOrder seckillOrder = new SeckillOrder(null, userId, tempOrder.getOrderId(),
seckillProduct.getProductId());
seckillOrder.setOrderId(tempOrder.getOrderId());
return seckillOrderMapper.addSeckillOrder(seckillOrder);
}

由于这是我之前学习SSM框架时自己写的系统,可能数据库的设计不是那么好,我处理订单的逻辑是:

  1. 将用户选择的商品添加到购物车(即,将商品的id、价格、数量、用户的信息等作为一条记录加入到数据库order_cart表中)
  2. 对该用户的购物车中的所有物品进行结算,结算之后生成订单,之后将订单的id回填至购物车表中,之后即可查看该用户的某一次订单的具体内容。(实际上也存在设计不合理的地方,譬如用户在进行秒杀结算时只想结算秒杀的商品,购物车中的其他商品用户不一定考虑结算,不过毕竟系统设计时就没有考虑做成很大型的平台,只作为学习练手使用)
  3. 之后将订单表的数据(id)获取后,填入秒杀订单表,生成秒杀订单

之中的所有操作,都用了read_committed的隔离级别以及required的传播级别,这也导致了之后的bug,详见之后的说明

添加购物车addOrderCart的业务代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 添加购物车
* @param orderCart 封装添加购物车
* @return 返回影响的行数
*/
@Override
@Transactional(isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW)
public Integer addOrderCart(OrderCart orderCart) {
if (customerMapper.queryCustomerById(orderCart.getCustomerId()) != null &&
productMapper.queryProductById(orderCart.getProductId()) != null) {
// 默认加入购物车时标识为1,即下单时购买该商品
orderCart.setStatus(1);
return orderCartMapper.addOrderCart(orderCart);
}
return null;
}

注意思考这里的传播级别为什么是Propagation.REQUIRES_NEW,之前不是才说了全设置的REQUIRED吗?

添加订单addOrder的业务代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* 添加订单,若需要回滚会抛出运行时异常,需要在Controller层捕获运行时异常
*
* @param order 需要增加的订单
*/
@Transactional(isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRED)
@Override
public void addOrder(Order order) {
// 开启事务支持,隔离级别可重复读,若被调用方法中存在事务则加入事务,否则创建新事务
// 生成订单SN码
order.setOrderSn(UUID.randomUUID().toString().replace("-", ""));
// 查询当前用户购物车内的所有未购买商品
List<OrderCart> orderCartList = orderCartMapper.queryNewOrderCartsByCustomerId(order.getCustomerId());
if (orderCartList == null || orderCartList.isEmpty()) {
// 抛出异常,@Transactional捕获异常后自动回滚,Controller层需要捕获异常
throw new RuntimeException("购物车中没有商品!");
}
// 计算总金额
double orderTotalMoney = 0;
for (OrderCart item : orderCartList) {
Double curPrice = item.getProductPrice();
Integer curAmount = item.getProductAmount();
orderTotalMoney += (curPrice == null || curAmount == null) ? 0 : curPrice * curAmount;
}
order.setOrderMoney(orderTotalMoney);
// 新建一条订单,该方法将回填主键(即orderId)
orderMapper.addOrder(order);
Integer curOrderId = order.getOrderId();
// 更新order_carts购物车表
for (OrderCart item : orderCartList) {
// 将购物车表中的status状态标记为0,即已经生成订单,状态为删除
item.setStatus(0);
// 将库存中的数量进行更改,若数量过多,则直接抛出运行时异常
Product curProduct = productMapper.queryProductDetailForUpdate(item.getProductId());
// System.out.println(curProduct);
Integer curStock = curProduct.getStock();
Integer curAmount = item.getProductAmount() == null ? 0 : item.getProductAmount();
if (curStock < curAmount) {
// 抛出异常,@Transactional捕获异常后自动回滚,Controller层需要捕获异常
throw new RuntimeException("库存数量不够,请检查购物车中添加的商品数量!");
} else {
item.setOrderId(curOrderId);
// 回写购物车内的OrderId数据
orderCartMapper.updateOrderCart(item);
// 更新产品中的库存stock
curProduct.setStock(curStock - curAmount);
productMapper.updateProduct(curProduct);
}
}
}

业务比较复杂,不再过多阐述实现的思路了,代码中的注释很齐全

这里有个并发减商品库存的操作,我使用悲观锁实现SELECT FOR UPDATE语句加X lock,保证了并发减库存不至于库存与订单数量对不上

最后添加秒杀订单的代码省略,因为是Mapper层的方法,很简单。

秒杀相关接口压力测试

简易接口测试

使用ApiFox对接口进行简易的测试,主要用于测试接口功能是否正常,大致截图如下,可以参考以下文章将接口A返回值用于接口B参数:

接口之间如何传递数据 | Apifox 使用文档

登录态(Auth)如何处理 | Apifox 使用文档

测试秒杀的流程是否正常:

秒杀的流程如下:

  1. 登录账户,获得自己的Token,之后每次访问接口都需要在Authorization中附上Token值鉴权
  2. 获得秒杀物品的秒杀信息
  3. 获得秒杀物品的秒杀地址,之后通过秒杀地址参与秒杀
  4. 发送秒杀的请求,调用秒杀API

因此,使用ApiFox对这些流程进行简易的测试,验证接口是否正常

获取秒杀信息的接口:

image-20220227164150457

获取秒杀地址的接口:

image-20220227164215253

执行秒杀操作的接口:

image-20220227164240828

创建完成后,创建自动化测试:

image-20220227164342405

将其中下一操作所需的信息设置为环境变量,方便获取值进行接下来的测试,如下所示:

image-20220227164453186

之后将其集成为秒杀套件,进行联合测试:

image-20220227164624889

举例说明,接口测试后,可以看到执行的结果:

image-20220227164742233

image-20220227164829449

image-20220227164841464

image-20220227164918514

JMeter进行压力测试

由于本项目使用JWT鉴权,因此直接使用测试类注册10000个帐号,供测试使用,将这些帐号密码调用接口生成Token存储到user_tokens.txt文件中,之后进行测试时,使用该文件验证身份

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 添加10000个秒杀用户,用于测试秒杀效果
*/
@Test
public void addSeckillCustomer() {
for (int i = 0; i < 10000; i++) {
Customer temp = new Customer(null, "seckill" + i, "123456", null, null,
null, "秒杀模块测试用户" + i);
customerService.addCustomer(temp);
System.out.println("插入的customer主键id为:" + temp.getUserId());
}
}

/**
* 登录秒杀测试用户并获得用户的token
* 将测试的10000个用户的token值放入txt文件中
*/
@Test
public void createTokens(){
File file = new File("C:\\Users\\DCM\\Desktop\\user_tokens.txt");
if(file.exists()) {
// 如果文件存在则直接删除
file.delete();
}
try(FileWriter fileWriter = new FileWriter(file)) {
for(int i = 0; i < 10000; i++) {
Customer customer = customerService.customerLogin("seckill" + i, "123456");
String token = JwtUtil.sign(customer.getUserId(), customer.getUsername());
fileWriter.write(token + "\n");
fileWriter.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
}

创建的包含Token信息的文件:

image-20220227165453713

秒杀测试套件的创建:

由于直接生成了token,可以直接使用,就不需要再调用登录接口了

设置用户登录信息的csv数据文件,选择文件所在的位置,以及提取为什么变量名,此时我将变量名设置为token,之后可以使用${token}进行变量的调用

image-20220227223105990

秒杀的商品设置为全局变量存储,方便进行修改

image-20220227223405920

创建信息头管理器,在Authorization中使用${token}获取token值并附在header中一并发送,用于鉴权

image-20220227223321782

之后使用JSON提取器获取返回值json中的MD5秒杀地址,获取地址后,将其添加到秒杀商品操作的请求体中,即可完成请求

image-20220302090027111

image-20220302090051415

压力测试一:

对秒杀编号3的50个商品进行测试,测试使用2000个线程,在1s内发起请求,一共循环5次,测试出系统的QPS

image-20220302090551454

测试结果如下:

可以看到此时秒杀操作的QPS1514,比较合理

image-20220302093227630

检查数据库的结果:

此时Redis中的数据正常,50个商品全部被秒杀完成

image-20220302093437091

数据库中的库存原来是500,在秒杀操作完成后,减库存至450

image-20220302093727229

image-20220302093816365

查看购物车表、订单表、秒杀订单表中的详情:

image-20220302093927977

可以看到,有50行被选中,证明有50个顾客秒杀了商品

image-20220302094053172

有50行被选中,说明共生成了50条订单,正确

image-20220302094241264

此时秒杀商品表中有50条记录,且order_id,customer_id,均与订单表购物车表中的信息一致,正确

且此时的order_id生成是连续的,说明没有产生回滚(由于之前提到的产生死锁的操作,会导致出现很多回滚,虽然id是自增的,id每次却跨越5-6增加,说明基本5-6次回滚才成功插入一行数据,效率极低,QPS也特别低)

压力测试二

秒杀商品5,一共100件,测试使用3000线程在1s发起请求,共请求4次,实际测得的秒杀接口的QPS在1484左右

image-20220302110659354

压力测试三

秒杀商品6,一共10件,测试使用4000线程在1s发起请求,共请求3次,实际测得的秒杀接口的QPS在1364左右

image-20220302112214600

测试二和测试三就不演示数据库中的内容了,可以和旁边的异常率对照,加了过滤器,只把秒杀成功的视作正常

记录出现的问题

使用SELECT FOR UPDATE时出现死锁

为了解决mysql中,商品表的库存在并发减的情况下会出现覆盖提交的问题,如果使用乐观锁会增加重试的成本,并且需要手写重试的代码,因此考虑使用悲观锁解决问题,使用SELECT FOR UPDATE实现悲观锁,但是在并发的情况下出现了大量的死锁情况,甚至每一个事务都要回滚五六次才能成功的修改数据,因此我尝试解决该问题

使用sql语句,打开死锁日志,可以记录最后一次发生的死锁:

1
2
3
set GLOBAL innodb_print_all_deadlocks=ON;
# 查询innodb日志
show engine innodb status;

日志相关情况(仅截取了死锁部分的日志):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
------------------------
LATEST DETECTED DEADLOCK
------------------------
2022-02-22 21:12:03 139829317871360
*** (1) TRANSACTION:
TRANSACTION 29602, ACTIVE 0 sec starting index read
mysql tables in use 3, locked 3
LOCK WAIT 8 lock struct(s), heap size 1128, 3 row lock(s), undo log entries 2
MySQL thread id 26, OS thread handle 139828862592768, query id 14275 192.168.205.1 mall statistics
SELECT *
FROM mall.product_detail
WHERE product_id = 2
FOR UPDATE

*** (1) HOLDS THE LOCK(S):
RECORD LOCKS space id 90 page no 4 n bits 88 index PRIMARY of table `mall`.`products` trx id 29602 lock mode S locks rec but not gap

*** (1) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 90 page no 4 n bits 88 index PRIMARY of table `mall`.`products` trx id 29602 lock_mode X locks rec but not gap waiting


*** (2) TRANSACTION:
TRANSACTION 29603, ACTIVE 0 sec starting index read
mysql tables in use 3, locked 3
LOCK WAIT 8 lock struct(s), heap size 1128, 3 row lock(s), undo log entries 2
MySQL thread id 23, OS thread handle 139828865763072, query id 14278 192.168.205.1 mall statistics
SELECT *
FROM mall.product_detail
WHERE product_id = 2
FOR UPDATE

*** (2) HOLDS THE LOCK(S):
RECORD LOCKS space id 90 page no 4 n bits 88 index PRIMARY of table `mall`.`products` trx id 29603 lock mode S locks rec but not gap

*** (2) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 90 page no 4 n bits 88 index PRIMARY of table `mall`.`products` trx id 29603 lock_mode X locks rec but not gap waiting

*** WE ROLL BACK TRANSACTION (2)

查看死锁日志:

TRANSACTION表示两个事务,分别为(1)(2),简单的英文翻译,可以知道出现死锁的原因是,有两个事务都已经持有mall.products数据库的s锁,s锁是共享锁,允许当前持有该锁的事务读取行,如果事务(1)持有s锁其他的事务也可以对改行上s锁,但是使用SELECT FOR UPDATE语句时,当准确命中索引的时候(即此时使用主键进行索引查找,加行级排他锁),会锁行,对该行加x锁,即:

  • 如果事务(1)持有了行上的s锁,则其他任何事务也可以持有该行的s锁,但是想加x锁必须等其他事务的S锁释放。

  • 如果事务(1)持有了行上的X锁,则其他任何事务不能持有该行的X锁,必须等待(1)在该行上的X锁释放。

因此,此时两个事务都持有了S锁,但是同时又都在尝试X锁,互相不释放S锁,导致了死锁的产生,MySQL检测到后就自动回滚了。

解决方案:

查看之前在什么地方使用了SELECT语句导致上了S锁,经过检查,是在添加购物车时,使用了SELECT语句查询商品保证商品不为空,此时如果要修改该行的Stock库存,则需要再加上X锁,并发下多事务会导致死锁。

我将事务的传播级别由REQUIRED修改为了REQUIRED_NEW,自己新建立一个事务即可,这样可以保证两个操作不使用同一个事务,这样就不会出现死锁的情况,解决了出现的问题。

docker中时间与服务器时间不一致的问题

部署在docker中的服务器时间与服务器外的时间不一致,解决方案如下:

首先同步服务器的时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 安装ntp服务
yum install -y ntp

# 修改配置
vi /etc/ntp.conf

# 修改如下,截选部分
# Permit all access over the loopback interface. This could
# be tightened as well, but to do so would effect some of
# the administrative functions.
restrict 127.0.0.1
restrict ::1
# Hosts on local network are less restricted.
restrict 192.168.205.0 mask 255.255.255.0 nomodify notrap
restrict 192.168.1.0 mask 255.255.255.0 nomodify notrap
restrict 192.168.0.0 mask 255.255.0.0 nomodify notrap
restrict cn.pool.ntp.org
# Use public servers from the pool.ntp.org project.
# Please consider joining the pool (http://www.pool.ntp.org/join.html).
server cn.pool.ntp.org prefer # 以这台主机为优先
server ntp.sjtu.edu.cn
server 0.centos.pool.ntp.org iburst
server 1.centos.pool.ntp.org iburst
server 2.centos.pool.ntp.org iburst
server 3.centos.pool.ntp.org iburst

# 启动服务
[root@localhost ~]# ntpdate cn.pool.ntp.org
2 Mar 13:53:11 ntpdate[103109]: step time server 84.16.67.12 offset 468657.069197 sec
[root@localhost ~]# date
2022年 03月 02日 星期三 13:53:14 CST

之后同步虚拟机中的时间

1
2
3
4
5
6
# 方法1:启动时添加时间参数
docker run -p 3306:3306 --name mysql -v /etc/localtime:/etc/localtime

# 方法2:直接在宿主机操作
docker cp /etc/localtime 【容器ID或者NAME】:/etc/localtime
docker cp -L /usr/share/zoneinfo/Asia/Shanghai 【容器ID或者NAME】:/etc/localtime

修改之后,即可正常同步时间

1
2
3
[root@localhost ~]# docker exec -it mysql /bin/bash
root@757d2d305710:/# date
Wed Mar 2 13:57:52 +08 2022

参考资料

[1] SSM整合开发 | HeavyTiger’s Blogs


-------------本文到此结束 感谢您的阅读-------------
谢谢你请我喝肥宅快乐水(๑>ڡ<) ☆☆☆