导读:实战篇我们要学习一些什么样的内容

  • 短信登录

这一块我们会使用redis共享session来实现

  • 商户查询缓存

通过本章节,我们会理解缓存击穿,缓存穿透,缓存雪崩等问题,让小伙伴的对于这些概念的理解不仅仅是停留在概念上,更是能在代码中看到对应的内容

  • 优惠卷秒杀

通过本章节,我们可以学会Redis的计数器功能, 结合Lua完成高性能的redis操作,同时学会Redis分布式锁的原理,包括Redis的三种消息队列

  • 附近的商户

我们利用Redis的GEOHash来完成对于地理坐标的操作

  • UV统计

主要是使用Redis来完成统计功能

  • 用户签到

使用Redis的BitMap数据统计功能

  • 好友关注

基于Set集合的关注、取消关注,共同关注等等功能,这一块知识咱们之前就讲过,这次我们在项目中来使用一下

  • 打人探店

基于List来完成点赞列表的操作,同时基于SortedSet来完成点赞的排行榜功能

img

1、短信登录

1.1、导入黑马点评项目

1.1.1 、导入SQL(Mysql版本大于5.7)

img

1.1.2、有关当前模型

手机或者app端发起请求,请求我们的nginx服务器,nginx基于七层模型走的事HTTP协议,可以实现基于Lua直接绕开tomcat访问redis,也可以作为静态资源服务器,轻松扛下上万并发, 负载均衡到下游tomcat服务器,打散流量,我们都知道一台4核8G的tomcat,在优化和处理简单业务的加持下,大不了就处理1000左右的并发, 经过nginx的负载均衡分流后,利用集群支撑起整个项目,同时nginx在部署了前端项目后,更是可以做到动静分离,进一步降低tomcat服务的压力,这些功能都得靠nginx起作用,所以nginx是整个项目中重要的一环。

在tomcat支撑起并发流量后,我们如果让tomcat直接去访问Mysql,根据经验Mysql企业级服务器只要上点并发,一般是16或32 核心cpu,32 或64G内存,像企业级mysql加上固态硬盘能够支撑的并发,大概就是4000起~7000左右,上万并发, 瞬间就会让Mysql服务器的cpu,硬盘全部打满,容易崩溃,所以我们在高并发场景下,会选择使用mysql集群,同时为了进一步降低Mysql的压力,同时增加访问的性能,我们也会加入Redis,同时使用Redis集群使得Redis对外提供更好的服务。

img

1.1.3、导入后端项目

注意要修改application.yaml文件里的mysql,redis地址信息

1.1.4、导入前端工程

1.1.5 运行前端项目

1
localhost8080

1.2 、基于Session实现登录流程

发送验证码:

用户在提交手机号后,会校验手机号是否合法,如果不合法,则要求用户重新输入手机号

如果手机号合法,后台此时生成对应的验证码,同时将验证码进行保存,然后再通过短信的方式将验证码发送给用户

短信验证码登录、注册:

用户将验证码和手机号进行输入,后台从session中拿到当前验证码,然后和用户输入的验证码进行校验,如果不一致,则无法通过校验,如果一致,则后台根据手机号查询用户,如果用户不存在,则为用户创建账号信息,保存到数据库,无论是否存在,都会将用户信息保存到session中,方便后续获得当前登录信息

校验登录状态:

用户在请求时候,会从cookie中携带者JsessionId到后台,后台通过JsessionId从session中拿到用户信息,如果没有session信息,则进行拦截,如果有session信息,则将用户信息保存到threadLocal中,并且放行

img

1.3实现发送短信验证码

页面流程

img

具体代码如下

  • *发送验证码*

UserController

1
2
3
4
5
6
7
8
/**
* 发送手机验证码
*/
@PostMapping("code")
public Result sendCode(@RequestParam("phone") String phone, HttpSession session) {
// TODO 发送短信验证码并保存验证码
return userService.sendCode(phone, session);
}

IUserService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public interface IUserService extends IService<User> {
/**
* 发送验证码
* @param phone
* @param session
* @return
*/
Result sendCode(String phone, HttpSession session);
}
IUserServiceImpl

/**
* 发送验证码
* @param phone
* @param session
* @return
*/
@Override
public Result sendCode(String phone, HttpSession session) {
// 1.校验手机号
if (RegexUtils.isPhoneInvalid(phone)) {
// 2.如果不符合,返回错误信息
return Result.fail("手机号格式错误!");
}
// 3.符合,生成验证码
String code = RandomUtil.randomNumbers(6);

// 4.保存验证码到 session
session.setAttribute("code",code);
// 5.发送验证码
log.debug("发送短信验证码成功,验证码:{}", code);
// 返回ok
return Result.ok();
}
  • *登录*

UserController

1
2
3
4
5
6
7
8
9
/**
* 登录功能
* @param loginForm 登录参数,包含手机号、验证码;或者手机号、密码
*/
@PostMapping("/login")
public Result login(@RequestBody LoginFormDTO loginForm, HttpSession session){
// 实现登录功能
return userService.login(loginForm, session);
}

IUserService

1
2
3
4
5
6
7
8
   /**
* 登录功能
* @param loginForm
* @param session
* @return
*/
Result login(LoginFormDTO loginForm, HttpSession session);
}

IUserServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* 用户登录
* @param loginForm
* @param session
* @return
*/
@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {
//1.校验手机号
String phone = loginForm.getPhone();
if (RegexUtils.isPhoneInvalid(phone)) {
//如果不符合,返回错误信息
return Result.fail("手机号格式错误");
}

//2.校验验证码
Object cacheCode = session.getAttribute("code");
String code = loginForm.getCode();
if (cacheCode == null || !cacheCode.toString().equals(code)) {
//3.不一致,直接返回错误信息
return Result.fail("验证码错误");
}

//4.一致,根据手机号查询用户 select * from tb_user where phone = ?
User user = query().eq("phone", phone).one();

//5.判断用户是否存在
if (user == null) {
//6。不存在,创建新用户并保存
user = createUserWithPhone(phone);
}

//7.存在,保存用户到session
session.setAttribute("user", user);

//8.返回登录成功的结果
return Result.ok();
}

private User createUserWithPhone(String phone) {
//1.创建用户
User user = new User();
user.setPhone(phone);
user.setNickName(USER_NICK_NAME_PREFIX+RandomUtil.randomString(10));
//2.保存用户到数据库 insert into tb_user values (null, ?, null)
save(user);
return user;
}

1.4 显示登录拦截

tomcat的运行原理

img

1. 建立连接

  • 用户(客户端) 发起一个 HTTP 请求,首先会通过浏览器或其他客户端发起 TCP 连接
  • 这个连接由用户的 socket 建立,连接到服务器上指定的 端口号(Tomcat 注册的端口,如默认的 8080)。
  • Tomcat 在启动时,会开启一个 监听线程(Acceptor),专门负责监听该端口,等待接入的 socket 连接。

2. 分发请求

  • 一旦监听线程监听到客户端发来的连接请求,它就会:
    • 接收 socket 连接
    • 将这个连接交给 工作线程池(Worker Thread Pool) 中的一个线程去处理。
  • 这一步很关键:
    • 每个请求都被分发给线程池中的一个工作线程,实现了多线程并发处理
    • 这个线程随后会负责整个请求的生命周期。

3. 处理请求

  • 工作线程接手后,它会:

    • 读取客户端发送的 HTTP 请求数据。
    • 把这些数据解析成标准的 HttpServletRequest 对象,并准备好一个空的 HttpServletResponse
  • 然后请求进入你部署在 Tomcat 中的

    WebApp

    (SpringMVC、Servlet 应用等):

    • 依次经过:
      • Controller(接收请求,路由分发)
      • Service(处理业务逻辑)
      • DAO(访问数据库,获取或修改数据)
  • 数据处理完成后,返回值会封装到 response 中。

4. 返回响应

  • 一旦业务处理完毕,线程将响应内容写入 HttpServletResponse
  • 然后再通过最初建立的 socket 连接,把响应数据发送回客户端(用户)。
  • 至此,一个请求-响应生命周期完成,socket 连接可能被关闭或复用(根据是否使用 keep-alive)。

img

preHandle前置拦截:

postHandle后置拦截:

afterCompletion视图渲染之后返回给用户之前:

在utils下面编写一个LoginInterceptor类,实现preHandle和afterCompletion这两个方法(这里User和UserDto的问题,我推荐的是统一使用UserDto,采用BeanUtils里的copy方法即可):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class LoginInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
//获取session
HttpSession session = request.getSession();
//获取用户
User user = (User) session.getAttribute("user");
//判断用户是否存在
if(user==null){
response.setStatus(401);
return false;
}
UserDTO userDTO = new UserDTO();
BeanUtils.copyProperties(user,userDTO);
//存在,保存用户信息的ThreadLocal
UserHolder.saveUser(userDTO);
//放行
return true;
}

@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
//移除用户
UserHolder.removeUser();
}


在config下面创建一个MvcConfig类:

通过addInterceptors方法来添加拦截器,registry是拦截器的注册器。

用.excludePathPatterns来排除不需要拦截的路径。在这里code、login、bloghot、shop、shopType、upload和voucher等都不需要拦截。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Configuration
public class MvcConfig implements WebMvcConfigurer {

@Resource
private StringRedisTemplate stringRedisTemplate;

@Override
public void addInterceptors(InterceptorRegistry registry) {
// 登录拦截器
registry.addInterceptor(new LoginInterceptor())
.excludePathPatterns(
"/shop/**",
"/voucher/**",
"/shop-type/**",
"/upload/**",
"/blog/hot",
"/user/code",
"/user/login"
).order(1);
// token刷新的拦截器
registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).addPathPatterns("/**").order(0);
}
}

输入手机号码点击获取验证码,写入返回后端的验证码,勾选协议之后,登录会直接返回首页,此时看我的个人主页没问题:

img

1.5、隐藏用户敏感信息

我们通过浏览器观察到此时用户的全部信息都在,这样极为不靠谱,所以我们应当在返回用户信息之前,将用户的敏感信息进行隐藏,采用的核心思路就是书写一个UserDto对象,这个UserDto对象就没有敏感信息了,我们在返回前,将有用户敏感信息的User对象转化成没有敏感信息的UserDto对象,那么就能够避免这个尴尬的问题了

在1.4已将User转为UserDTO返回给前端。

1.6、session共享问题

核心思路分析:

每个tomcat中都有一份属于自己的session,假设用户第一次访问第一台tomcat,并且把自己的信息存放到第一台服务器的session中,但是第二次这个用户访问到了第二台tomcat,那么在第二台服务器上,肯定没有第一台服务器存放的session,所以此时 整个登录拦截功能就会出现问题,我们能如何解决这个问题呢?早期的方案是session拷贝,就是说虽然每个tomcat上都有不同的session,但是每当任意一台服务器的session修改时,都会同步给其他的Tomcat服务器的session,这样的话,就可以实现session的共享了

但是这种方案具有两个大问题

1、每台服务器中都有完整的一份session数据,服务器压力过大。

2、session拷贝数据时,可能会出现延迟

所以咱们后来采用的方案都是基于redis来完成,我们把session换成redis,redis数据本身就是共享的,就可以避免session共享的问题了

img

1.7 Redis代替session的业务流程

思考一下利用redis来存储数据,那么到底使用哪种结构呢

想要保存用户的登录信息有2种方法:1.用String类型。2.用Hash类型。

如下图,如果使用String,注意他的value,用多占用一点空间,如果使用哈希,则他的value中只会存储他数据本身,如果不是特别在意内存,其实使用String就可以啦。

String类型是以JSON字符串格式来保存,比较简单直观,但是占用内存比较多(因为有name和age这类的json格式):

img

Hash结构可以将对象中的每个字段独立存储,可以针对单个字段做CRUD,并且内存占用更少:

img

所以我们可以使用String结构,就是一个简单的key,value键值对的方式,但是关于key的处理,session他是每个用户都有自己的session,但是redis的key是共享的,咱们就不能使用code了

1.7.1在设计这个key的时候,需要满足两点

1、key要具有唯一性

2、key要方便携带

如果我们采用phone:手机号这个的数据来存储当然是可以的,但是如果把这样的敏感数据存储到redis中并且从页面中带过来毕竟不太合适,所以我们在后台生成一个随机串token,然后让前端带来这个token就能完成我们的整体逻辑了

1.7.2 整体访问流程

当注册完成后,用户去登录会去校验用户提交的手机号和验证码,是否一致,如果一致,则根据手机号查询用户信息,不存在则新建,最后将用户数据保存到redis,并且生成token作为redis的key,当我们校验用户是否登录时,会去携带着token进行访问,从redis中取出token对应的value,判断是否存在这个数据,如果没有则拦截,如果存在则将其保存到threadLocal中,并且放行。

img

1.8 基于Redis实现短信登录

在UserServiceImpl中写入如下代码(调用StringRedisTemplate中的set方法进行数据插入,最好在key的前面加入业务前缀以示区分,形成区分):

1
2
@Resource
private StringRedisTemplate stringRedisTemplate;

在sendCode这个方法里将保存验证码的代码替换为下面:

1
2
//保存验证码到redis
stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY+phone,code,LOGIN_CODE_TTL, TimeUnit.MINUTES);

在login这个方法里进行如下2处修改:

首先是校验验证码:

1
2
//校验验证码
String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);

然后是添加把用户信息添加到Redis的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 // 7.保存用户信息到 redis中
// 7.1.随机生成token,作为登录令牌
String token = UUID.randomUUID().toString(true);
// 7.2.将User对象转为HashMap存储
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(),
CopyOptions.create()
.setIgnoreNullValue(true)
.setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString()));
// 7.3.存储
String tokenKey = LOGIN_USER_KEY + token;
stringRedisTemplate.opsForHash().putAll(tokenKey, userMap);
// 7.4.设置token有效期
stringRedisTemplate.expire(tokenKey, LOGIN_USER_TTL, TimeUnit.MINUTES);

// 8.返回token
return Result.ok(token);
}

在MvcConfig类上有@Configuration注解,说明是由Spring来负责依赖注入。

在MvcConfig类中要编写如下的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Configuration
public class MvcConfig implements WebMvcConfigurer {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public void addInterceptors(InterceptorRegistry registry){
registry.addInterceptor(new LoginInterceptor(stringRedisTemplate))
.excludePathPatterns(
"/user/code",
"/user/login",
"/upload/**",
"/blog/hot",
"/shop/**",
"/shop-type/**",
"/voucher/**"
);
}
}

在utils下的LoginInterceptor中写入如下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class LoginInterceptor implements HandlerInterceptor {
@Resource
private StringRedisTemplate stringRedisTemplate;
public LoginInterceptor(StringRedisTemplate stringRedisTemplate){
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
//TODO;1.获取请求头中的token
String token = request.getHeader("authorization");
if(StrUtil.isBlank(token)){
//不存在,拦截,返回401状态码
response.setStatus(401);
return false;
}
//TODO:2.基于TOKEN获取redis的用户
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(LOGIN_USER_KEY + token);
//判断用户是否存在
if(userMap.isEmpty()){
//不存在,拦截,返回401状态码
response.setStatus(401);
return false;
}
//TODO:3.将查询到的Hash数据转化为UserDTO对象
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
//TODO:4.存在,保存用户信息的ThreadLocal
UserHolder.saveUser(userDTO);
//TODO:5.刷新token有效期
stringRedisTemplate.expire(LOGIN_USER_KEY + token,RedisConstants.LOGIN_USER_TTL, TimeUnit.MINUTES);
//放行
return true;
}

@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
//移除用户
UserHolder.removeUser();
}
}

测试:首先把Redis和数据库都启动。 原始的项目的Redis的服务器ID需要更改为自己的。点击发送验证码,redis中有记录,没问题:

img

但点击登录的时候会报一个无法将Long转String的错误。因为用的是stringRedisTemplate要求所有的字段都是string类型的。

需要对UserServiceImpl中如下的位置进行修改:

img

1
2
3
4
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO,new HashMap<>(),
CopyOptions.create()
.setIgnoreNullValue(true)
.setFieldValueEditor((fieldName,fieldValue)->fieldValue.toString()));

测试:

img

img

1.9 解决状态登录刷新问题

1.9.1 初始方案思路总结:

在这个方案中,他确实可以使用对应路径的拦截,同时刷新登录token令牌的存活时间,但是现在这个拦截器他只是拦截需要被拦截的路径,假设当前用户访问了一些不需要拦截的路径,那么这个拦截器就不会生效,所以此时令牌刷新的动作实际上就不会执行,所以这个方案他是存在问题的

img

1.9.2 优化方案

既然之前的拦截器无法对不需要拦截的路径生效,那么我们可以添加一个拦截器,在第一个拦截器中拦截所有的路径,把第二个拦截器做的事情放入到第一个拦截器中,同时刷新令牌,因为第一个拦截器有了threadLocal的数据,所以此时第二个拦截器只需要判断拦截器中的user对象是否存在即可,完成整体刷新功能。

img

1.9.3 代码

复制LoginInterceptor变成一份新的RefreshTokenInterceptor,把下面几处地方改为return true即可:

img

LoginInterceptor的代码变成如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class LoginInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
//1.判断是否需要拦截(ThreadLocal中是否有用户)
if(UserHolder.getUser()==null){
//没有,需要拦截,设置状态码
response.setStatus(401);
//拦截
return false;
}
//放行
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
//移除用户
UserHolder.removeUser();
}
}

现在还需要在MvcConfig里面对拦截器进行更新配置,需要(用order)调整拦截器的执行顺序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Configuration
public class MvcConfig implements WebMvcConfigurer {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public void addInterceptors(InterceptorRegistry registry){
registry.addInterceptor(new LoginInterceptor())
.excludePathPatterns(
"/user/code",
"/user/login",
"/upload/**",
"/blog/hot",
"/shop/**",
"/shop-type/**",
"/voucher/**"
).order(1);
registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate))
.addPathPatterns("/**").order(0);
}
}

2、商户查询缓存

2.1什么是缓存

缓存(Cache),就是数据交换的缓冲区,俗称的缓存就是缓冲区内的数据,一般从数据库中获取,存储于本地代码一般读写性能较高

例如:

1
2
3
4
5
1:Static final ConcurrentHashMap<K,V> map = new ConcurrentHashMap<>(); 本地用于高并发

2:static final Cache<K,V> USER_CACHE = CacheBuilder.newBuilder().build(); 用于redis等缓存

3:Static final Map<K,V> map = new HashMap(); 本地缓存

由于其被Static修饰,所以随着类的加载而被加载到内存之中,作为本地缓存,由于其又被final修饰,所以其引用(例3:map)和对象(例3:new HashMap())之间的关系是固定的,不能改变,因此不用担心赋值(=)导致缓存失效;

缓存作用:降低后端负载;提高读写的效率,降低响应时间。

缓存成本:数据一致性成本(数据库里的数据如果发生变化,容易与缓存中的数据形成不一致)。代码维护成本高(搭建集群)。运营成本高。

如何使用缓存

img

2.2 添加商户缓存

标准的操作方式就是查询数据库之前先查询缓存,如果缓存数据存在,则直接从缓存中返回,如果缓存数据不存在,再查询数据库,然后将数据存入redis。

img

在ShopController类的queryShopById方法中:

1
2
3
4
@GetMapping("/{id}")
public Result queryShopById(@PathVariable("id") Long id) {
return Result.ok(shopService.queryById(id));
}

在IShopService接口中编写如下代码:

1
2
3
public interface IShopService extends IService<Shop> {
Object queryById(Long id);
}

在ShopServiceImpl类的queryById方法中编写具体代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Object queryById(Long id) {
String key = CACHE_SHOP_KEY + id;
//1.从Redis查询缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2.判断是否存在
if(StrUtil.isNotBlank(shopJson)){
//3.存在,直接返回
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
//4.不存在,根据id查询数据库
Shop shop = getById(id);
//5.不存在,返回错误
if(shop==null){
return Result.fail("店铺不存在!");
}
//6.存在,写入Redis
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop));
return Result.ok(shop);
}
}

核心是通过调用hutool工具包中的JSONUtil类来实现对象转JSON(方法:toJsonStr(对象))和JSON转对象(方法:toBean(json,Bean的类型))

TODO:对分类进行缓存。

在ShopController类的queryShopByList方法中:

1
2
3
4
5
6
7
@GetMapping("list")
public Result queryTypeList() {
// List<ShopType> typeList = typeService
// .query().orderByAsc("sort").list();
List<ShopType> typeList = typeService.queryTypeList();
return Result.ok(typeList);
}

在IShopService接口中编写如下代码:

1
2
3
4
public interface IShopTypeService extends IService<ShopType> {

List<ShopType> queryTypeList();
}

ShopTypeServiceImpl:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Service
public class ShopTypeServiceImpl extends ServiceImpl<ShopTypeMapper, ShopType> implements IShopTypeService {

@Resource
private StringRedisTemplate stringRedisTemplate;


@Override
public List<ShopType> queryTypeList() {
String key = "login:type";
// SetOperations<String, String> setOps = redisTemplate.opsForSet();
// 1. 从redis查询商铺缓存
// String shopTypeJson = setOps.members(key).toString();
String shopTypeJson = stringRedisTemplate.opsForValue().get(key);
// 2. 判断是否存在
if (StrUtil.isNotBlank(shopTypeJson)) {
// 3. 存在,转化为List返回
// return objectMapper.readValue(shopTypeJson, new TypeReference<List<ShopType>>(){});
return JSONUtil.toList(shopTypeJson, ShopType.class);
}
// 4. 不存在,查询数据库
List<ShopType> typeList = this.query().orderByAsc("sort").list();
// 5. 不存在,返回空列表
if (typeList.isEmpty()) {
return new ArrayList<>();
}
// 6. 存在,写入redis
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(typeList));
// 7. 返回
return typeList;
}
}

2.3 缓存更新策略

缓存更新是redis为了节约内存而设计出来的一个东西,主要是因为内存数据宝贵,当我们向redis插入太多数据,此时就可能会导致缓存中的数据过多,所以redis会对部分数据进行更新,或者把他叫为淘汰更合适。

  • 内存淘汰:redis自动进行,当redis内存达到咱们设定的max-memery的时候,会自动触发淘汰机制,淘汰掉一些不重要的数据(可以自己设置策略方式)
  • **超时剔除:**当我们给redis设置了过期时间ttl之后,redis会将超时的数据进行删除,方便咱们继续使用缓存
  • 主动更新:我们可以手动调用方法把缓存删掉,通常用于解决缓存和数据库不一致问题

img

2.3.1 数据库缓存不一致解决方案:

缓存的数据源来自于数据库,而数据库的数据是会发生变化的,因此,如果当数据库中数据发生变化,而缓存却没有同步,此时就会有一致性问题存在

解决方案:

img

2.3.2 数据库和缓存不一致采用什么方案

选择方案一:但是方案一调用者如何处理呢?这里有几个问题操作缓存和数据库时有三个问题需要考虑:

1.选择删除缓存还是更新缓存?

  • 更新缓存:每次更新数据库都会更新缓存,无效的写操作比较多。
  • 删除缓存:更新数据库时让缓存失效,查询时再更新缓存。

2.如何保证缓存与数据库的操作的同时成功或失败?

  • 单体系统,将缓存与数据库操作放在一个事务
  • 分布式系统,利用TCC等分布式事务方案

3.先操作缓存还是先操作数据库?

  • 先删除缓存,再操作数据库
  • 先操作数据库,再删除缓存

答:我们应当是先操作数据库,再删除缓存,原因在于,如果你选择第一种方案,在两个线程并发来访问时,假设线程1先来,他先把缓存删了,此时线程2过来,他查询缓存数据并不存在,此时他写入缓存,当他写入缓存后,线程1再执行更新动作时,实际上写入的就是旧的数据,新的数据被旧数据覆盖了。

img

img

2.4 实现商铺和缓存与数据库双写一致

给查询商铺的缓存添加超时剔除和主动更新的策略。

修改ShopController中的业务逻辑,满足下面要求:

1.根据id查询商铺时,如果缓存未命中,则查询数据库,将数据库结果写入缓存,并设置超时时间。

2.根据id修改店铺时,先修改数据库,再删除缓存。

首先修改ShopServiceImpl的redis过期时间:

1
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);

修改ShopController中的updateShop方法:

1
2
3
4
5
@PutMapping
public Result updateShop(@RequestBody Shop shop) {
// 写入数据库
return Result.ok(shopService.update(shop));
}

向IShopService接口中添加update方法:

1
Object update(Shop shop);

向ShopServiceImpl类中添加update方法:

1
2
3
4
5
6
7
8
9
10
@Override
public Object update(Shop shop) {
Long id = shop.getId();
if(id == null){
return Result.fail("商铺id不存在");
}
updateById(shop);
stringRedisTemplate.delete(CACHE_SHOP_KEY + id);
return Result.ok();
}

测试:首先删除缓存中的数据,然后看SQL语句是否执行,是否加上了TTL过期时间。

img

访问http://localhost:8081/shop,然后修改101茶餐厅为102茶餐厅:

注意要发送的是PUT请求,请求的内容如下:

img

然后去数据库看是否名称更新为102茶餐厅,然后看缓存中的数据是否被删除,用户刷新页面看到102茶餐厅,缓存中会有最新的数据。

img

2.5 缓存穿透问题的解决思路

缓存穿透 :缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。

常见的解决方案有两种:

  • 缓存空对象
  • 优点:实现简单,维护方便
  • 缺点:
    • 额外的内存消耗
    • 可能造成短期的不一致
  • 布隆过滤
  • 优点:内存占用较少,没有多余key
  • 缺点:
    • 实现复杂
    • 存在误判可能

缓存空对象:

当我们客户端访问不存在的数据时,先请求redis,但是此时redis中没有数据,此时会访问到数据库,但是数据库中也没有数据,这个数据穿透了缓存,直击数据库,我们都知道数据库能够承载的并发不如redis这么高,如果大量的请求同时过来访问这种不存在的数据,这些请求就都会访问到数据库,简单的解决方案就是哪怕这个数据在数据库中也不存在,我们也把这个数据存入到redis中去,这样,下次用户过来访问这个不存在的数据,那么在redis中也能找到这个数据就不会进入到缓存了

img

布隆过滤:

布隆过滤器其实采用的是哈希思想来解决这个问题,通过一个庞大的二进制数组,走哈希思想去判断当前这个要查询的这个数据是否存在,如果布隆过滤器判断存在,则放行,这个请求会去访问redis,哪怕此时redis中的数据过期了,但是数据库中一定存在这个数据,在数据库中查询出来这个数据后,再将其放入到redis中,

假设布隆过滤器判断这个数据不存在,则直接返回

这种方式优点在于节约内存空间缺点是存在误判,误判原因在于:布隆过滤器走的是哈希思想,只要哈希思想,就可能存在哈希冲突

img

2.6 编码解决商品查询的缓存穿透问题:

下图是原始的:

img

在原来的逻辑中,我们如果发现这个数据在mysql中不存在,直接就返回404了,这样是会存在缓存穿透问题的

更改后的方案:

img

如果这个数据不存在,我们不会返回404 ,还是会把这个数据写入到Redis中,并且将value设置为空,当再次发起查询时,我们如果发现命中之后,判断这个value是否是null,如果是null,则是之前写入的数据,证明是缓存穿透数据,如果不是,则直接返回数据。

在ShopServiceImpl类里对queryById方法进行修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
public Object queryById(Long id) {
String key = CACHE_SHOP_KEY + id;
//1.从Redis查询缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2.判断是否存在
if(StrUtil.isNotBlank(shopJson)){
//3.存在,直接返回
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
//上面是有值的情况,下面是无值的2种情况:A:空字符串。B:null。
if(shopJson != null){
return Result.fail("店铺信息不存在!");
}
//4.不存在,根据id查询数据库
Shop shop = getById(id);
//5.不存在,返回错误
if(shop==null){
stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
return Result.fail("店铺不存在!");
}
//6.存在,写入Redis
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);
return Result.ok(shop);
}

测试:

localhost:8080/api/shop/1此时是命中数据。

img

localhost:8080/api/shop/0此时未命中数据。打开缓存可以看到缓存的是空,并且TTL是200秒

img

小总结:

1.缓存穿透产生的原因是什么?

答:用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请求,给数据库带来巨大压力

2.缓存穿透的解决方案有哪些?

答:

  • 缓存null值
  • 布隆过滤
  • 增强id的复杂度,避免被猜测id规律
  • 做好数据的基础格式校验
  • 加强用户权限校验
  • 做好热点参数的限流

2.7 缓存雪崩问题及解决思路

1.什么是缓存雪崩?

答:缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。

2.缓存雪崩的解决方案有哪些?

答:

  • 给业务添加多级缓存
  • 给不同的Key的TTL添加随机值(解决大量缓存key同时失效)
  • 利用Redis集群提高服务的可用性(解决Redis宕机)
  • 给缓存业务添加降级限流策略

img

2.8 缓存击穿问题及解决思路

1.什么是缓存击穿?

答:缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。

2.缓存击穿的解决方案有哪些?

答:常见的解决方案有两种

  • 互斥锁
  • 逻辑过期

分析:假设线程1在查询缓存之后,本来应该去查询数据库,然后把这个数据重新加载到缓存的,此时只要线程1走完这个逻辑,其他线程就都能从缓存中加载这些数据了,但是假设在线程1没有走完的时候,后续的线程2,线程3,线程4同时过来访问当前这个方法, 那么这些线程都不能从缓存中查询到数据,那么他们就会同一时刻来访问查询缓存,都没查到,接着同一时间去访问数据库,同时的去执行数据库代码,对数据库访问压力过大

img

解决方案一 使用锁互斥锁来解决:

我们可以采用tryLock方法 + double check来解决这样的问题。

分析:假设现在线程1过来访问,他查询缓存没有命中,但是此时他获得到了锁的资源,那么线程1就会一个人去执行逻辑,假设现在线程2过来,线程2在执行过程中,并没有获得到锁,那么线程2就可以进行到休眠,直到线程1把锁释放后,线程2获得到锁,然后再来执行逻辑,此时就能够从缓存中拿到数据了。

img

解决方案二 逻辑过期方案

方案分析:我们之所以会出现这个缓存击穿问题,主要原因是在于我们对key设置了过期时间,假设我们不设置过期时间,其实就不会有缓存击穿的问题,但是不设置过期时间,这样数据不就一直占用我们内存了吗,我们可以采用逻辑过期方案。

我们把过期时间设置在 redis的value中,注意:这个过期时间并不会直接作用于redis,而是我们后续通过逻辑去处理。假设线程1去查询缓存,然后从value中判断出来当前的数据已经过期了,此时线程1去获得互斥锁,那么其他线程会进行阻塞,获得了锁的线程他会开启一个 线程去进行 以前的重构数据的逻辑,直到新开的线程完成这个逻辑后,才释放锁, 而线程1直接进行返回,假设现在线程3过来访问,由于线程线程2持有着锁,所以线程3无法获得锁,线程3也直接返回数据,只有等到新开的线程2把重建数据构建完后,其他线程才能走返回正确的数据。

这种方案巧妙在于,异步的构建缓存,缺点在于在构建完缓存之前,返回的都是脏数据。

img

进行对比

互斥锁方案:由于保证了互斥性,所以数据一致,且实现简单,因为仅仅只需要加一把锁而已,也没其他的事情需要操心,所以没有额外的内存消耗,缺点在于有锁就有死锁问题的发生,且只能串行执行性能肯定受到影响

逻辑过期方案: 线程读取过程中不需要等待,性能好,有一个额外的线程持有锁去进行重构数据但是在重构数据完成前,其他的线程只能返回之前的数据,且实现起来麻烦

img

2.9 利用互斥锁解决缓存击穿问题

思路:相较于原来从缓存中查询不到数据后直接查询数据库而言,现在的方案是进行查询之后,如果从缓存没有查询到数据,则进行互斥锁的获取,获取互斥锁后,判断是否获得到了锁,如果没有获得到,则休眠,过一会再进行尝试,直到获取到锁为止,才能进行查询

如果获取到了锁的线程,再去进行查询,查询后将数据写入redis,再释放锁,返回数据,利用互斥锁就能保证只有一个线程去执行操作数据库的逻辑,防止缓存击穿

img

操作锁的代码:

核心思路就是利用redis的setnx方法来表示获取锁,该方法含义是redis中如果没有这个key,则插入成功,返回1,在stringRedisTemplate中返回true, 如果有这个key则插入失败,则返回0,在stringRedisTemplate返回false,我们可以通过true,或者是false,来表示是否有线程成功插入key,成功插入的key的线程我们认为他就是获得到锁的线程

在ShopServiceImpl类中定义一个tryLock方法(在Redis中的setnx相当于setIfAbsent方法。)

1
2
3
4
public boolean tryLock(String key){
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}

在ShopServiceImpl类中定义一个unLock方法用于解锁。

1
2
3
public void unLock(String key){
stringRedisTemplate.delete(key);
}

在ShopServiceImpl类中定义一个queryWithPassThrough方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public Shop queryWithPassThrough(Long id){
String key = CACHE_SHOP_KEY + id;
//1.从Redis查询缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2.判断是否存在
if(StrUtil.isNotBlank(shopJson)){
//3.存在,直接返回
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return shop;
}
//上面是有值的情况,下面是无值的2种情况:A:空字符串。B:null。
if(shopJson != null){
return null;
}
//4.不存在,根据id查询数据库
Shop shop = getById(id);
//5.不存在,返回错误
if(shop==null){
stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
return null;
}
//6.存在,写入Redis
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);
return shop;
}

在ShopServiceImpl类中定义一个queryWithMutex方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public Shop queryWithMutex(Long id){
String key = CACHE_SHOP_KEY + id;
//1.从Redis查询缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2.判断是否存在
if(StrUtil.isNotBlank(shopJson)){
//3.存在,直接返回
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return shop;
}
//上面是有值的情况,下面是无值的2种情况:A:空字符串。B:null。
if(shopJson != null){
return null;
}
//4.实现缓存重建
//4.1 获取互斥锁
String lockKey = LOCK_SHOP_KEY+id;
Shop shop = null;
try {
boolean isLock = tryLock(lockKey);
//4.2 判断是否获取成功
if(!isLock){
//4.3 失败,则休眠并重试
Thread.sleep(50);
return queryWithMutex(id);
}

//4.4 获取互斥锁成功,根据id查询数据库
shop = getById(id);
//模拟重建的延时
Thread.sleep(200);
//5.数据库查询失败,返回错误
if(shop==null){
stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
return null;
}
//6.存在,写入Redis
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
//7.释放互斥锁
unLock(lockKey);
}
//8.返回
return shop;
}

在ShopServiceImpl类中修改queryById,调用queryWithMutex:

1
2
3
4
5
6
7
public Object queryById(Long id) {
//缓存穿透
//Shop shop = queryWithPassThrough(id);
//互斥锁解决缓存击穿
Shop shop = queryWithMutex(id);
return Result.ok(shop);
}

2.10 利用逻辑过期解决缓存击穿问题

需求:修改根据id查询商铺的业务,基于逻辑过期方式来解决缓存击穿问题

思路分析:当用户开始查询redis时,判断是否命中,如果没有命中则直接返回空数据,不查询数据库,而一旦命中后,将value取出,判断value中的过期时间是否满足,如果没有过期,则直接返回redis中的数据,如果过期,则在开启独立线程后直接返回之前的数据,独立线程去重构数据,重构完成后释放互斥锁。

img

如果封装数据,添加逻辑过期字段:因为现在redis中存储的数据的value需要带上过期时间,此时要么你去修改原来的实体类

答:可以在utils包下定义RedisData类(可以让Shop继承RedisData类),也可以在RedisData中设置一个Shop类的data属性:

1
2
3
4
5
@Data
public class RedisData {
private LocalDateTime expireTime;
private Object data;
}

在ShopServiceImpl类中定义saveShop2Redis方法:

1
2
3
4
5
6
7
8
9
10
public void saveShop2Redis(Long id,Long expireSeconds){
//1.查询店铺数据
Shop shop = getById(id);
//2.封装逻辑过期时间
RedisData redisData = new RedisData();
redisData.setData(shop);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
//3.写入Redis
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData));
}

单元测试,在test包下的HmDianPingApplicationTests中创建testSaveShop类写入测试代码(这里要注意的是输入alt+insert之后选择Test Method要选择Junit 5来进行测试方法的编写):

1
2
3
4
5
6
7
8
9
10
@SpringBootTest
class HmDianPingApplicationTests {
@Resource
private ShopServiceImpl shopService;

@Test
void testSaveShop() {
shopService.saveShop2Redis(1L,10L);
}
}

可以看到redis中确实存入了数据:

img

在ShopServiceImpl中复制一份缓存穿透的代码,更改名称为queryWithLogicalExpire:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
public Shop queryWithLogicalExpire(Long id){
String key = CACHE_SHOP_KEY + id;
//1.从Redis查询缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2.判断是否存在
if(StrUtil.isBlank(shopJson)){
//3.不存在,返回空
return null;
}
//4.命中,需要先把json反序列化为对象
RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
JSONObject data = (JSONObject) redisData.getData();
Shop shop = JSONUtil.toBean(data, Shop.class);
//5.判断是否过期
//5.1 未过期直接返回店铺信息
LocalDateTime expireTime = redisData.getExpireTime();
if(expireTime.isAfter(LocalDateTime.now())){
return shop;
}
//5.2 已过期重建缓存
//6.缓存重建
//6.1.获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
//6.2.判断是否获取互斥锁成功
if(isLock){
//6.3.成功,开启独立线程,实现缓存重建
CACHE_REBUILD_EXECUTOR.submit(()->{
try {
saveShop2Redis(id,20L); //实际中应该设置为30分钟
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
unLock(lockKey);
}
});

}
//6.4.失败,返回过期的商铺信息
return shop;
}

测试:

先到数据库把102茶餐厅改为103茶餐厅(因为Redis之前插入了一条缓存为102茶餐厅,并且已经过期,此时数据库与缓存不一致),新的HTTP请求会将逻辑过期的数据删除,然后更新缓存。

线程数设置为100,Ramp-up时间设置为1

img

在查看结果树里面到中间某个HTTP请求会完成重建,响应数据会改变。

img

1.安全性问题:在高并发情况下是否会有很多线程来做重建。

2.一致性问题:在重建完成之前得到的是否是旧的数据。

2.11 封装Redis 工具类

背景:在高并发系统中,我们通常使用 Redis 缓存数据库查询结果,减轻数据库压力。但如果直接使用 Redis,可能会遇到以下问题:

  1. 缓存穿透:请求的数据既不在缓存里,也不在数据库里,会频繁打数据库。
  2. 缓存击穿:热点key突然失效,导致大量请求同时打到数据库。
  3. 缓存雪崩:大量key同时过期,引起瞬时流量打满数据库。

因此我们基于StringRedisTemplate封装一个缓存工具类,满足下列需求:

  • 方法1:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
  • 方法2:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿问题
  • 方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
  • 方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题

具体实现

在utils包下创建CacheClient类,先写入如下基础的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Slf4j
@Component
public class CacheClient {
private final StringRedisTemplate stringRedisTemplate;

public CacheClient(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}

public void set(String key, Object value, Long time, TimeUnit unit){
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(value),time,unit);
}
public void setWithLogicalExpire(String key, Object value,Long expire,TimeUnit unit){
//设置逻辑过期
RedisData redisData = new RedisData();
redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(expire)));
redisData.setData(value);
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(redisData));
}

img

img

在CacheClient类中编写缓存穿透的共性方法queryWithPassThrough:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public <R,ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type,
Function<ID,R> dbFallBack,Long time,TimeUnit unit){
String key = keyPrefix + id;
//1.从Redis查询缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2.判断是否存在
if(StrUtil.isNotBlank(shopJson)){
//3.存在,直接返回
return JSONUtil.toBean(shopJson, type);
}
//上面是有值的情况,下面是无值的2种情况:A:空字符串。B:null。
if(shopJson != null){
return null;
}
//4.不存在,根据id查询数据库
R r = dbFallBack.apply(id);
//5.不存在,返回错误
if(r==null){
stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
return null;
}
//6.存在,写入Redis
this.set(key,r,time,unit);
return r;
}

img

编写完queryWithPassThrough之后可以到ShopServiceImpl中直接调用新的方法(记得引入CacheClient类):

1
2
3
4
5
6
7
8
9
10
11
@Resource
private CacheClient cacheClient;
@Override
public Object queryById(Long id) {
//调用工具类解决缓存击穿
Shop shop = cacheClient.queryWithPassThrough(CACHE_SHOP_KEY, id, Shop.class, this::getById, CACHE_SHOP_TTL, TimeUnit.MINUTES);
if(shop==null){
return Result.fail("店铺不存在!");
}
return Result.ok(shop);
}

测试:成功会对不存在的店铺空值进行缓存。

img

img

接下来拷贝queryWithLogicalExpire的代码到CacheClient类中进行改写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
public <R,ID> R queryWithLogicalExpire(String keyPrefix,ID id,Class<R> type,Function<ID,R> dbFallBack,Long time,TimeUnit unit){
String key = keyPrefix + id;
//1.从Redis查询缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2.判断是否存在
if(StrUtil.isBlank(shopJson)){
//3.不存在,返回空
return null;
}
//4.命中,需要先把json反序列化为对象
RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
JSONObject data = (JSONObject) redisData.getData();
R r = JSONUtil.toBean(data, type);
//5.判断是否过期
//5.1 未过期直接返回店铺信息
LocalDateTime expireTime = redisData.getExpireTime();
if(expireTime.isAfter(LocalDateTime.now())){
return r;
}
//5.2 已过期重建缓存
//6.缓存重建
//6.1.获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
//6.2.判断是否获取互斥锁成功
if(isLock){
//6.3.成功,开启独立线程,实现缓存重建
CACHE_REBUILD_EXECUTOR.submit(()->{
try {
//查询数据库
R r1 = dbFallBack.apply(id);
//写入redis
this.setWithLogicalExpire(key,r1,time,unit);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
unLock(lockKey);
}
});

}
//6.4.失败,返回过期的商铺信息
return r;
}
public boolean tryLock(String key){
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}
public void unLock(String key){
stringRedisTemplate.delete(key);
}

img

img

改写test下的HmDianPingApplicationTests类:

1
2
3
4
5
6
7
8
9
10
11
12
13
@SpringBootTest
class HmDianPingApplicationTests {
@Resource
private CacheClient cacheClient;
@Resource
private ShopServiceImpl shopService;

@Test
void testSaveShop() throws InterruptedException {
Shop shop = shopService.getById(1L);
cacheClient.setWithLogicalExpire(CACHE_SHOP_KEY+1L,shop,10L,TimeUnit.SECONDS);
}
}

测试:首先运行HmDianPingApplicationTests类里的测试方法,10秒后逻辑过期,此时运行后台程序,修改数据库1号商铺的name字段,此时访问:localhost:8080/api/shop/1 会出现效果第1次访问为缓存旧值,然后发现缓存过期开始重建,第2次访问开始就是新值。数据库也只有1次重建。

img

1

img

3、优惠卷秒杀

3.1 -全局唯一ID

每个店铺都可以发布优惠券,当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID会存在一些问题。

1.id的规律性太明显。

2.受单表数据量的限制(分表之后每张表都自增长,id会出现重复)。

全局ID生成器:是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:

1.唯一性。2.高可用。3.高性能。4.递增性。5.安全性。

img

为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:

ID的组成部分:符号位:1bit,永远为0代表整数

时间戳:31bit,以秒为单位,定义了一个起始时间,用当前时间减起始时间,预估可以使用69年。

序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID

img

3.2 -Redis实现全局唯一Id

在utils包下定义一个RedisWorker类,是一个基于Redis的ID生成器。

如果只使用一个key来自增记录有一个坏处,最终key的自增数量会突破容量的上限,假如自增超过32位彼时便无法再存储新的数据,解决的方案是采用拼接日期。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Component
public class RedisIdWorker {
private static final long BEGIN_TIMESTAMP = 1640995200L;
//序列号的位数
private static final int COUNT_BITS=32;
private StringRedisTemplate stringRedisTemplate;

public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
public long nextId(String keyPrefix){
//1.生成时间戳
LocalDateTime now = LocalDateTime.now();
long timeStamp = now.toEpochSecond(ZoneOffset.UTC) - BEGIN_TIMESTAMP;
//2.生成序列号
//2.1获取当前日期,精确到天
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
//2.2自增长
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
//3.拼接并返回
return timeStamp << COUNT_BITS | count;
}
}

在HmDianPingApplicationTests中写入如下的测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Resource
private ShopServiceImpl shopService;
@Resource
private RedisIdWorker redisIdWorker;
private ExecutorService es = Executors.newFixedThreadPool(500);
@Test
void testIdWorker() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(300);
Runnable task = ()->{
for(int i=0;i<100;i++){
long id = redisIdWorker.nextId("order");
System.out.println("id="+id);
}
latch.countDown();
};
long begin = System.currentTimeMillis();
for(int i=0;i<300;i++){
es.submit(task);
}
latch.await();
long end = System.currentTimeMillis();
System.out.println("Result Time = " + (end-begin));
}

运行之后可以看到以十进制输出的所有编号

可以在Redis中看到自增长的结果,1次是30000:

img

大概2秒可以生成3万条,速度还是可以的。

全局唯一ID生成策略:

1.UUID利用JDK自带的工具类即可生成,生成的是16进制的字符串,无单调递增的特性。

2.Redis自增(每天一个key,方便统计订单量。时间戳+计数器的格式。)

3.snowflake雪花算法(不依赖于Redis,性能更好,对于时钟依赖)

4.数据库自增

🧩 与其他 ID 生成方案对比

方法 特点 优缺点
UUID 通过 JDK 提供 UUID.randomUUID() 保证唯一,但无序、长度大,不适合排序和数据库主键
Redis 自增(本例) 时间戳 + 自增 并发高,易于控制和拆分,但依赖 Redis
雪花算法(Snowflake) Twitter 开源的分布式 ID 方案 性能高,不依赖 Redis,但需要精确的机器时间
数据库自增主键 利用数据库主键自增 简单但性能差,分布式下易冲突或失效

3.3 添加优惠卷

每个店铺都可以发放优惠券,分为平价券和特价券。平价券可以任意抢购,特价券需要秒杀抢购。

tb_voucher:优惠券基本信息,优惠金额,使用规则等。

tb_seckill_voucher:优惠券的库存,开始抢购时间,结束抢购时间,只有特价优惠券才需要填写这些信息。

而代金券由于优惠力度大,所以像第二种卷,就得限制数量,从表结构上也能看出,特价卷除了具有优惠卷的基本信息以外,还具有库存,抢购时间,结束时间等等字段

请求的信息如下:

1
2
3
4
5
6
7
8
9
10
11
12
{
"shopId":1,
"title":"100元代金券",
"subTitle":"周一至周五均可使用",
"rules":"全场通用\n无需预约\n可无限叠加\不兑现、不找零\n仅限堂食",
"payValue":8000,
"actualValue":10000,
"type":1,
"stock":100,
"beginTime":"2024-04-10T10:09:17",
"endTime":"2024-04-11T12:09:04"
}

新增普通卷代码:

VoucherController

1
2
3
4
5
@PostMapping
public Result addVoucher(@RequestBody Voucher voucher) {
voucherService.save(voucher);
return Result.ok(voucher.getId());
}

新增秒杀卷代码:

VoucherController

1
2
3
4
5
@PostMapping("seckill")
public Result addSeckillVoucher(@RequestBody Voucher voucher) {
voucherService.addSeckillVoucher(voucher);
return Result.ok(voucher.getId());
}

VoucherServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
// 保存秒杀库存到Redis中
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}

3.4 实现秒杀下单

下单核心思路:当我们点击抢购时,会触发右侧的请求,我们只需要编写对应的controller即可

img

秒杀下单应该思考的内容:

下单时需要判断两点:

  • 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
  • 库存是否充足,不足则无法下单

下单核心逻辑分析:

当用户开始进行下单,我们应当去查询优惠卷信息,查询到优惠卷信息,判断是否满足秒杀条件

比如时间是否充足,如果时间充足,则进一步判断库存是否足够,如果两者都满足,则扣减库存,创建订单,然后返回订单id,如果有一个条件不满足则直接结束。

img

在VouchrOrderController类中:

1
2
3
4
5
6
7
8
9
10
@RestController
@RequestMapping("/voucher-order")
public class VoucherOrderController {
@Resource
private IVoucherService voucherService;
@PostMapping("seckill/{id}")
public Result seckillVoucher(@PathVariable("id") Long voucherId) {
return voucherService.seckillVoucher(voucherId);
}
}

在IVoucherOrderService中写入如下代码:

1
2
3
 public interface IVoucherOrderService extends IService {
Result seckillVoucher(Long voucherId);
}

在VoucherOrderServiceImpl中写入如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Service
@Transactional
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Override
public Result seckillVoucher(Long voucherId) {
//1.查询优惠券信息
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//2.判断秒杀是否开始
//2.1秒杀尚未开始返回异常
if(voucher.getBeginTime().isAfter(LocalDateTime.now())){
return Result.fail("秒杀尚未开始");
}
//2.2秒杀已结束返回异常
if(voucher.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("秒杀已经结束");
}
//3.判断库存是否充足
if(voucher.getStock()<1){
//3.1库存不足返回异常
return Result.fail("库存不足!");
}
//3.2库存充足扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId).update();
if(!success){
return Result.fail("库存不足!");
}
//4.创建订单,返回订单id
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");//订单id
voucherOrder.setId(orderId);
Long userId = UserHolder.getUser().getId();//用户id
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);//代金券id
save(voucherOrder);
return Result.ok(orderId);
}
}

3.5 库存超卖问题分析

假设线程1过来查询库存,判断出来库存大于1,正准备去扣减库存,但是还没有来得及去扣减,此时线程2过来,线程2也去查询库存,发现这个数量一定也大于1,那么这两个线程都会去扣减库存,最终多个线程相当于一起去扣减库存,此时就会出现库存的超卖问题。

正常逻辑:

img

非正常逻辑:

img

超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁:而对于加锁,我们通常有两种解决方案:见下图:

img

悲观锁:

悲观锁可以实现对于数据的串行化执行,比如syn,和lock都是悲观锁的代表,同时,悲观锁中又可以再细分为公平锁,非公平锁,可重入锁,等等

乐观锁:

乐观锁:会有一个版本号,每次操作数据会对版本号+1,再提交回数据时,会去校验是否比之前的版本大1 ,如果大1 ,则进行操作成功,这套机制的核心逻辑在于,如果在操作过程中,版本号只比原来大1 ,那么就意味着操作过程中没有人对他进行过修改,他的操作就是安全的,如果不大1,则数据被修改过,当然乐观锁还有一些变种的处理方式比如cas

乐观锁的典型代表:就是cas,利用cas进行无锁化机制加锁,var5 是操作前读取的内存值,while中的var1+var2 是预估值,如果预估值 == 内存值,则代表中间没有被人修改过,此时就将新值去替换 内存值

其中do while 是为了在操作失败时,再次进行自旋操作,即把之前的逻辑再操作一次。

1
2
3
4
5
6
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

return var5;

课程中的使用方式:

课程中的使用方式是没有像cas一样带自旋的操作,也没有对version的版本号+1 ,他的操作逻辑是在操作时,对版本号进行+1 操作,然后要求version 如果是1 的情况下,才能操作,那么第一个线程在操作后,数据库中的version变成了2,但是他自己满足version=1 ,所以没有问题,此时线程2执行,线程2 最后也需要加上条件version =1 ,但是现在由于线程1已经操作过了,所以线程2,操作时就不满足version=1 的条件了,所以线程2无法执行成功

版本号法:

img

CAS法(版本号法的简化版):查询的时候把库存查出来,更新的时候判断库存和之前查到的库存是否一致,如果一致则更新数据。

3.6 乐观锁解决超卖问题

只需加上下面这段代码即可:.eq(“stock”,voucher.getStock()) 。用于比较当前数据库的库存值和之前查询到的库存值是否相同,只有相同时才可以执行set语句。

1
2
3
4
5
//3.2库存充足扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") //相当于set条件 set stock = stock - 1
.eq("voucher_id", voucherId) //相当于where条件 where id = ? and stock = ?
.eq("stock",voucher.getStock()).update();

但经过测试现在出现了异常值偏高的问题,正常的请求大约只占10%。

原理是因为:假如一次有30个线程涌入,查询到库存值为100,只有1个线程能把值改为99,其它29个线程比对库存值99发现和自己查询到的库存值100不同,所以都认为数据已经被修改过,所以都失败了。

现在只需要保证stock>0即可,只要存量大于0就可以任意扣减。

1
2
3
4
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") //相当于set条件 set stock = stock - 1
.eq("voucher_id", voucherId) //相当于where条件 where id = ? and stock = ?
.gt("stock",0).update();

img

img

乐观锁缺陷:

需要大量对数据库进行访问,容易导致数据库的崩溃。

总结:

img

3.7优惠券秒杀-一人一单

需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单

问题分析:优惠卷是为了引流,但是目前的情况是,一个人可以无限制的抢这个优惠卷,所以我们应当增加一层逻辑,让一个用户只能下一个单,而不是让一个用户下多个单

具体操作逻辑如下:比如时间是否充足,如果时间充足,则进一步判断库存是否足够,然后再根据优惠卷id和用户id查询是否已经下过这个订单,如果下过这个订单,则不再下单,否则进行下单

img

❌ 错误方式:锁整个方法(比如 synchronized public void createVoucherOrder()

问题:所有用户都会竞争同一个锁,系统性能会很差(串行执行,效率极低)

✅ 正确方式:只对同一个用户加锁

1
synchronized(userId.toString().intern()) { ... }

.intern() 会让字符串引用变成常量池中的共享对象,相同内容就是同一个锁。

为什么用 .intern()

因为 toString() 每次是新对象,锁不生效。

但是因为事务是在函数执行结束之后由Spring进行提交,如果把锁加在createVoucherOrder内部其实有点小——因为如果解锁之后,其它线程可以进入,而此时事务尚未提交,仍然会导致安全性问题。

🔁 事务控制注意事项

问题点:Spring 的 @Transactional 只对代理对象生效。

错误方式:直接在当前类中内部调用 createVoucherOrder(),事务不生效!

解决方式

1
2
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);

这就是用当前线程的代理对象来调用方法,事务才能起作用。

因此最终方案是把synchronized加在createVoucherOrder的方法外部,锁住的是用户id。

seckillVoucher() 方法(外层逻辑 + 加锁 + 事务代理调用)

1
2
3
4
synchronized (userId.toString().intern()) {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}

createVoucherOrder() 方法(真正创建订单的逻辑,带事务)

  • 先查询是否已经有订单(保证一人一单)。
  • 如果没有,再扣库存。
  • 然后创建订单并保存。

总结:关于代理对象事务的问题:通常情况下,当一个使用了@Transactional注解的方法被调用时,Spring会从上下文中获取一个代理对象来管理事务。

但是如果加@Transactional方法是被同一个类中的另一个方法调用时,Spring不会使用代理对象,而是直接调用该方法,导致事务注解失效。

为避免这种情况,可以使用AopContext.currentProxy方法获取当前的代理对象,然后通过代理对象调用被@Transactional注解修饰的方法,确保事务生效。

在VoucherOrderServiceImpl中写入如下代码(注意:ctrl+alt+m可以把含有return的代码段进行提取):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Override
public Result seckillVoucher(Long voucherId) {
//1.查询优惠券信息
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//2.判断秒杀是否开始
//2.1秒杀尚未开始返回异常
if(voucher.getBeginTime().isAfter(LocalDateTime.now())){
return Result.fail("秒杀尚未开始");
}
//2.2秒杀已结束返回异常
if(voucher.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("秒杀已经结束");
}
voucher = seckillVoucherService.getById(voucherId);
//3.判断库存是否充足
if(voucher.getStock()<1){
//3.1库存不足返回异常
return Result.fail("库存不足!");
}
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()){
//获取代理对象
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
}
@Transactional
public Result createVoucherOrder(Long voucherId) {
//6.一人一单
Long userId = UserHolder.getUser().getId();
//6.1查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
//6.2判断是否存在
if(count>0){
//用户已经购买过了
return Result.fail("用户已经购买过一次!");
}
//3.2库存充足扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") //相当于set条件 set stock = stock - 1
.eq("voucher_id", voucherId) //相当于where条件 where id = ? and stock = ?
.gt("stock",0).update();
if(!success){
return Result.fail("库存不足!");
}
//4.创建订单,返回订单id
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");//订单id
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);//代金券id
save(voucherOrder);
return Result.ok(orderId);
}
}

在IVoucherOrderService接口中加入下面这个方法:

1
Result createVoucherOrder(Long voucherId);

在pom.xml中引入如下的依赖:

1
2
3
4
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>

在启动类HmDianPingApplication上加如下注解:

1
@EnableAspectJAutoProxy(exposeProxy = true)

测试: 成功实现一名用户只能领取一张优惠券。

img

img

img

img

3.8 集群环境下的并发问题

通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。

如下图可以设置项目启动的端口号,确保启动的项目之间端口号不同:

img

img

在nginx.conf中放开8082的这个配置:

img

向下面这个页面发送请求:

1
http://localhost:8080/api/voucher/list/1

可以看到请求会分别被8082和8081接收,是轮询的效果:

img

img

首先到tb_voucher_order把之前的订单删除,到tb_seckill_voucher中把stock重新改回100。

准备2个相同的秒杀请求:要注意请求的地址是:http://localhost:8080/api/voucher-order/seckill/13

我这里直接用Jemeter来进行测试,模拟高并发场景:

下面是效果:可以看到并发请求能够同时进入集群的每台结点。

img

img

正常情况:

img

单机下的锁控制(第一张图)

这一张图展示了多个线程在同一个 JVM 中操作的场景,用来说明单机下通过 synchronized 锁对象控制一人一单的机制 是有效的

✅ 关键流程:

  1. 线程1 和 线程2 分别尝试为同一用户下订单。
  2. 线程1 拿到了锁,执行完“查询是否已有订单”,判断没有,于是创建新订单,释放锁。
  3. 线程2 此时等待线程1释放锁,再做同样的流程。
  4. 但线程2执行到查询订单时,已经能查到订单,所以不会重复下单

✅ 结论:

单 JVM 下,同一个用户加锁是生效的,能有效防止并发时的重复下单。

由于现在我们部署了多个tomcat,每个tomcat都有一个属于自己的jvm,那么假设在服务器A的tomcat内部,有两个线程,这两个线程由于使用的是同一份代码,那么他们的锁对象是同一个,是可以实现互斥的,但是如果现在是服务器B的tomcat内部,又有两个线程,但是他们的锁对象写的虽然和服务器A一样,但是锁对象却不是同一个,所以线程3和线程4可以实现互斥,但是却无法和线程1和线程2实现互斥,这就是 集群环境下,syn锁失效的原因,在这种情况下,我们就需要使用分布式锁来解决这个问题。

img

❌ 集群下的锁失效问题(第二张图)

左侧:JVM1(Tomcat A)

  • 线程1 加锁成功,正在下单;
  • 线程2 来了,在等待释放锁。

右侧:JVM2(Tomcat B)

  • 线程3 是同一个用户的请求,但由于 JVM2 的锁对象和 JVM1 的锁对象不是同一个实例,所以也能加锁成功;
  • 同样线程4 跟进等待线程3;

🧨 问题产生:

虽然线程1和线程3锁的代码看起来是一样的,但因为JVM不一样,锁对象不是同一个,所以线程3并不知道线程1已经在处理下单逻辑了。

结果:

  • JVM1 和 JVM2 各自都创建了一份订单,违反了一人一单的规则

现在就要实现让多个JVM使用的是同一把锁。跨JVM、跨进程的锁。

4、分布式锁

4.1 、基本原理和实现方式对比

什么是分布式锁?

满足分布式系统或集群模式下多进程可见并且互斥的锁.(所有服务器都能访问的共享锁系统)

分布式锁的核心思想就是让大家都使用同一把锁,只要大家使用的是同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行,这就是分布式锁的核心思路

img

那么分布式锁他应该满足一些什么样的条件呢?

  • 可见性:多个线程都能看到相同的结果,注意:这个地方说的可见性并不是并发编程中指的内存可见性,只是说多个进程之间都能感知到变化的意思
  • 互斥:互斥是分布式锁的最基本的条件,使得程序串行执行
  • 高可用:程序不易崩溃,时时刻刻都保证较高的可用性
  • 高性能:由于加锁本身就让性能降低,所有对于分布式锁本身需要他就较高的加锁性能和释放锁性能
  • 安全性:安全也是程序中必不可少的一环

img

常见的分布式锁有三种

  • Mysql:mysql本身就带有锁机制,但是由于mysql性能本身一般,所以采用分布式锁的情况下,其实使用mysql作为分布式锁比较少见
  • Redis:redis作为分布式锁是非常常见的一种使用方式,现在企业级开发中基本都使用redis或者zookeeper作为分布式锁,利用setnx这个方法,如果插入key成功,则表示获得到了锁,如果有人插入成功,其他人插入失败则表示无法获得到锁,利用这套逻辑来实现分布式锁
  • Zookeeper:zookeeper也是企业级开发中较好的一个实现分布式锁的方案,由于本套视频并不讲解zookeeper的原理和分布式锁的实现,所以不过多阐述

img

4.2 、Redis分布式锁的实现核心思路

你可以把它理解成在 Redis 里“占一个位置”,谁先占上就有执行权,其它人就得等。

实现分布式锁时需要实现的两个基本方法:

4.2.1实现 Redis 锁最核心的就两个指令:

1. SETNX(Set if Not Exists)

1
SETNX lock thread1

意思是:“如果这个 lock 键不存在,就设置为 thread1。”

  • 成功返回 1:说明你拿到锁了;
  • 失败返回 0:说明别人已经抢先拿到锁。

2. DEL:释放锁

1
DEL lock

释放锁,表示任务做完了,别人可以来了。

img

4.2.2设置过期时间,避免死锁

为什么需要加 EXPIRE

假设:

  • 线程1 拿到了锁;
  • 但它崩溃了,没来得及释放锁(DEL 没执行);
  • 那这把锁就永远卡在那里了,别人都拿不到!

解决办法:设置锁的自动过期时间,例如 5 秒钟:

1
EXPIRE lock 5

然后可以用 TTL lock 查看锁剩下多少秒:

  • 正常返回剩余时间;
  • 如果返回 -1,说明没有过期时间,会死锁;
  • 如果返回 -2,说明锁已经不存在。

img

4.2.3避免 SETNXEXPIRE 之间的间隙

问题来了:
如果你分两步执行——先 SETNX 拿锁,再 EXPIRE 设置过期时间,在这中间一旦宕机,就会出问题。

正确做法:用 Redis 的一条原子指令

1
SET lock thread EX 10 NX

解释:

  • EX 10:设置过期时间 10 秒;
  • NX:只有在 key 不存在时才设置;
  • 这是 Redis 的原子操作,设置值+过期+互斥一次完成

img

加锁效果演示

1
SET lock thread EX 10 NXOK

说明加锁成功。

1
SET lock thread EX 10 NXnull

说明锁已存在,加锁失败。

1
2
TTL lock
7

说明锁还有 7 秒就会自动释放。

4.2.4加锁与释放的正确流程总结

  • 获取锁:
    • 互斥性:确保只能有一个线程获取锁成功
    • 非阻塞:尝试一次,成功返回true,失败返回false
    • 自动过期:避免死锁
  • 释放锁:
    • 手动释放(业务完成后 DEL
    • 超时释放:获取锁时添加一个超时时间

4.3 实现分布式锁版本一

在utils下面创建一个ILock接口:

这一步是做接口编程,便于后期扩展(比如换 Redisson 或 ZooKeeper 锁时,不改业务逻辑)

1
2
3
4
5
6
public interface ILock {
//尝试获取锁
boolean tryLock(long timeoutSec);
//释放锁
void unlock();
}

在utils下面实现SimpleRedisLock类:

细节解释:

  • threadId:当前线程的唯一标识,设置为锁的值,便于后期扩展安全性;
  • setIfAbsent:就是 Redis 的 SET key value NX EX 操作;
  • timeoutSec:自动过期时间,防止死锁;
  • delete:释放锁,直接删掉对应 Redis key。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class SimpleRedisLock implements ILock {
private String name;
private StringRedisTemplate stringRedisTemplate;
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
private static final String KEY_PREFIX = "lock:";
@Override
public boolean tryLock(long timeoutSec) {
//获取线程标示
long threadId = Thread.currentThread().getId();
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX+name,threadId+"",timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
//释放锁
stringRedisTemplate.delete(KEY_PREFIX+name);
}
}

更改VoucherOrderServiceImpl类中的seckillVoucher方法的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private StringRedisTemplate stringRedisTemplate;

@Override
public Result seckillVoucher(Long voucherId) {
//1.查询优惠券信息
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//2.判断秒杀是否开始
//2.1秒杀尚未开始返回异常
if(voucher.getBeginTime().isAfter(LocalDateTime.now())){
return Result.fail("秒杀尚未开始");
}
//2.2秒杀已结束返回异常
if(voucher.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("秒杀已经结束");
}
voucher = seckillVoucherService.getById(voucherId);
//3.判断库存是否充足
if(voucher.getStock()<1){
//3.1库存不足返回异常
return Result.fail("库存不足!");
}
Long userId = UserHolder.getUser().getId();
SimpleRedisLock lock = new SimpleRedisLock("order:"+userId,stringRedisTemplate);
boolean isLock = lock.tryLock(1200);
//判断是否获取锁成功
if(!isLock) {
return Result.fail("不允许重复下单");
}
try {
//获取代理对象
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}finally {
lock.unlock();
}

}

核心代码:

VoucherOrderServiceImpl 里替换原来的加锁方式,改用 SimpleRedisLock

1
2
SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
boolean isLock = lock.tryLock(1200);

如果加锁成功,就调用代理对象创建订单;失败就说明这个用户已经在处理中,拒绝重复下单。

1
2
3
4
5
6
7
8
9
if (!isLock) {
return Result.fail("不允许重复下单");
}
try {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
lock.unlock();
}

经测试多台节点相同用户只能获取同一张优惠券成功:

img

img

目前这个锁实现 仍有风险

  • ❗️直接 delete key,没有验证是否是自己加的锁就删了(容易误删别人的锁);
  • ❗️如果执行时间超过了过期时间(比如 2 秒),锁会自动释放,但任务还没做完,另一个线程就可能抢到锁并重复执行!

这些问题后面会通过加唯一标识 + Lua 脚本释放锁方式改进。

4.4Redis分布式锁误删情况说明

如果你没有校验“这把锁是不是我加的”,就可能会发生这种错误:

一个线程因为卡顿或网络问题,等它醒来时,这把锁已经被别人拿走了,但它还以为是自己的,就把别人的锁删了!

逻辑说明:

持有锁的线程在锁的内部出现了阻塞,导致他的锁自动释放,这时其他线程,线程2来尝试获得锁,就拿到了这把锁,然后线程2在持有锁执行过程中,线程1反应过来,继续执行,而线程1执行过程中,走到了删除锁逻辑,此时就会把本应该属于线程2的锁进行删除,这就是误删别人锁的情况说明

解决方案:解决方案就是在每个线程释放锁的时候,去判断一下当前这把锁是否属于自己,如果属于自己,则不进行锁的删除,假设还是上边的情况,线程1卡顿,锁自动释放,线程2进入到锁的内部执行逻辑,此时线程1反应过来,然后删除锁,但是线程1,一看当前这把锁不是属于自己,于是不进行删除锁逻辑,当线程2走到删除锁逻辑时,如果没有卡过自动释放锁的时间点,则判断当前这把锁是属于自己的,于是删除这把锁。

总结一句话就是不要用 delete 直接释放锁,一定要检查“这把锁是不是我加的”再删,用 Lua 脚本保证原子性,否则就会在多线程或多机器环境下删错别人的锁!

img

图中流程详解

▶ 线程1:

  • 成功加锁并执行业务逻辑;
  • 中途“卡住了”(可能是网络、GC 等卡顿);
  • 锁过期后,自动释放
  • 线程1 后续恢复,继续执行 delete,但这时锁已经不是它的了!

▶ 线程2:

  • 线程1卡住时,锁过期;
  • 它尝试加锁,成功(因为 Redis 已自动释放);
  • 它开始执行自己的业务。

▶ 问题来了:

  • 线程1 后知后觉地恢复并执行 DEL lock
  • 此时锁的值已变成线程2的标识;
  • 但它仍然执行 delete,误删了线程2的锁
  • 线程3 也进来了,它就能获取锁,造成并发访问,逻辑错乱

4.5 解决Redis分布式锁误删问题

目标:锁释放前先校验“锁是我加的”

新需求:

  • 加锁时:存入线程标识(如:UUID + Thread ID);
  • 解锁时:先取出 Redis 里的标识;
    • 如果和当前线程一致:才能删锁;
    • 否则:不删,防止误删。

核心逻辑:在存入锁时,放入自己线程的标识,在删除锁时,判断当前这把锁的标识是不是自己存入的,如果是,则进行删除,如果不是,则不进行删除。

img

首先要修改SimpleRedisLock里面的如下代码,主要是调用hutool工具包生成UUID(每次线程调用都会生成一个唯一的UUID),让Redis的前缀变成UUID+线程ID:

1
2
3
4
5
6
7
8
private static final String ID_PREFIX = UUID.fastUUID().toString(true)+"-";
@Override
public boolean tryLock(long timeoutSec) {
//获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX+name,threadId,timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}

现在要修改的是SimpleRedisLock类里面的unlock方法,主要是比较当前线程的标示和Redis中锁的标示是否一致,只有标示一致才能释放锁:

1
2
3
4
5
6
7
8
9
10
11
@Override
public void unlock() {
//获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
//获取锁中的标示
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
if(threadId.equals(id)){
//释放锁
stringRedisTemplate.delete(KEY_PREFIX+name);
}
}

总结一句话:通过加“线程唯一标识”,并在释放锁时比对这个标识,成功解决 Redis 分布式锁的误删问题,确保只有“加锁者”才能“解锁”。

4.6 分布式锁的原子性问题

更为极端的误删逻辑说明:

线程1现在持有锁之后,在执行业务逻辑过程中,他正准备删除锁,而且已经走到了条件判断的过程中,比如他已经拿到了当前这把锁确实是属于他自己的,正准备删除锁,但是此时他的锁到期了,那么此时线程2进来,但是线程1他会接着往后执行,当他卡顿结束后,他直接就会执行删除锁那行代码,相当于条件判断并没有起到作用,这就是删锁时的原子性问题,之所以有这个问题,是因为线程1的拿锁,比锁,删锁,实际上并不是原子性的,我们要防止刚才的情况发生,

img

  • 线程1 获取锁成功(锁标识是线程1的 UUID);
  • 执行业务;
  • 准备释放锁,正在判断锁是不是自己的(这个时候判断成功);
  • 但是!就在这一步之后线程1突然被卡住(阻塞或宕机);
  • 锁超时后被 Redis 自动释放;
  • 线程2、线程3 先后获取锁并执行业务;
  • 线程1 醒来,继续往下执行 delete,把线程2或3的锁删掉了!!

关键问题:判断 + 删除不是原子操作

这两个操作之间一旦出现“卡顿、延迟、调度切换”等非预期情况,就可能导致你误删了别人的锁,即便你之前已经做了线程标识校验。

解决方案:用 Lua 脚本让“判断+删除”原子执行

4.7 Lua脚本解决多条命令原子性问题

为什么用 Lua?

答:Redis 提供的 EVAL 命令支持在服务端运行 Lua 脚本,能让多个 Redis 命令组成 一个不可拆分的原子操作

Redis提供Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。

Lua是一种编程语言,它的基本语法可以参考网站:https://www.runoob.com/lua/lua-tutorial.html

4.7.1Lua 脚本基本语法

这里重点介绍Redis提供的调用函数,语法如下:

1
redis.call('命令名称', 'key', '其它参数', ...)

例如,我们要执行set name jack,则脚本是这样:

1
2
# 执行 set name jack
redis.call('set', 'name', 'jack')

例如,我们要先执行set name Rose,再执行get name,则脚本如下:

1
2
3
4
5
6
# 先执行 set name jack
redis.call('set', 'name', 'Rose')
# 再执行 get name
local name = redis.call('get', 'name')
# 返回
return name

4.7.2 使用 EVAL 调用脚本

写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下:

1
EVAL "脚本内容" numkeys key1 [key2 …] arg1 [arg2 …]

img

例如,我们要执行 redis.call(‘set’, ‘name’, ‘jack’) 这个脚本,语法如下:

img

4.7.3. 参数动态传入方式

如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数:

img

  • 1:表示有一个 key 参数
  • KEYS[1] = nameARGV[1] = Rose

接下来我们来回一下我们释放锁的逻辑:

目标:让以下逻辑变成一个原子操作:

  1. 读取锁的值
  2. 比较值是否等于当前线程标识
  3. 如果一致,释放锁
  4. 如果不一致,不做任何操作

如果用Lua脚本来表示则是这样的:

最终我们操作redis的拿锁比锁删锁的lua脚本就会变成这样

1
2
3
4
5
6
7
8
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
-- 一致,则删除锁
return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0

总结一句话

把“判断是不是自己 + 删除锁”这两步放进 Lua 脚本中交给 Redis 原子执行,彻底解决分布式锁的并发安全问题。

4.8 利用Java代码调用Lua脚本改造分布式锁

在SimpleRedisLock中写入如下的代码,因为我们希望的是在一开始就将Lua的脚本加载好,而不是等到要调用释放锁的时候再去加载Lua脚本,所以采用静态变量和静态代码块,这些部分在类初始化的时候就会被加载:

1
2
3
4
5
6
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}

说明:

  • DefaultRedisScript 是 Spring 提供的类;
  • setLocation 是加载 Lua 脚本文件;
  • setResultType 是告诉 Redis 你这个脚本要返回什么类型。

在SimpleRedisLock类的unlock方法中写入如下的代码:

SimpleRedisLockunlock() 方法中,替换原来的 delete 逻辑:

1
2
3
4
5
6
@Override
public void unlock() {
stringRedisTemplate.execute(UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX+name),
ID_PREFIX + Thread.currentThread().getId());
}

这段代码在执行 Lua 脚本时:

  • KEYS[1] = lock:order:xxx(锁的 key);
  • ARGV[1] = UUID-threadId(锁的值,用于身份验证);

RedisTemplate 是如何执行 Lua 的?

我们的RedisTemplate中,可以利用execute方法去执行lua脚本,参数对应关系就如下图股

img

测试逻辑:

第一个线程进来,得到了锁,手动删除锁,模拟锁超时了,其他线程会执行lua来抢锁,当第一天线程利用lua删除锁时,lua能保证他不能删除他的锁,第二个线程删除锁时,利用lua同样可以保证不会删除别人的锁,同时还能保证原子性。

在程序1和程序2的下面这个位置打上断点:

img

在测试API中测试访问如下的URL:

1
http://localhost:8080/api/voucher-order/seckill/14

分别测试秒杀优惠券1和2:

img

img

在Redis中能看到程序1获取锁成功,然后直接把lock锁删掉,模拟超时释放的情况:

img

然后让程序2往下走一步,可以看到程序2获取到了锁

img

然后可以直接放行程序1,会看到结果是程序2加的锁没有被删除。

最后放行程序2,会看到程序2加的锁被删除。

小总结:

基于Redis的分布式锁实现思路:

  • 利用set nx ex获取锁,并设置过期时间,保存线程标示
  • 释放锁时先判断线程标示是否与自己一致,一致则删除锁
  • 特性:
    • 利用set nx满足互斥性
    • 利用set ex保证故障时锁依然能释放,避免死锁,提高安全性
    • 利用Redis集群保证高可用和高并发特性

笔者总结:我们一路走来,利用添加过期时间,防止死锁问题的发生,但是有了过期时间之后,可能出现误删别人锁的问题,这个问题我们开始是利用删之前 通过拿锁,比锁,删锁这个逻辑来解决的,也就是删之前判断一下当前这把锁是否是属于自己的,但是现在还有原子性问题,也就是我们没法保证拿锁比锁删锁是一个原子性的动作,最后通过lua表达式来解决这个问题

已经把 Redis 分布式锁从“能用”优化到“高并发可安全用”的版本,使用 Lua 脚本在 Java 中原子释放锁,是实现这一切的关键一步。

但是目前还剩下一个问题锁不住,什么是锁不住呢,你想一想,如果当过期时间到了之后,我们可以给他续期一下,比如续个30s,就好像是网吧上网, 网费到了之后,然后说,来,网管,再给我来10块的,是不是后边的问题都不会发生了,那么续期问题怎么解决呢,可以依赖于我们接下来要学习redission啦

5、分布式锁-redission

5.1 分布式锁-redission功能介绍

基于setnx实现的分布式锁存在下面的问题:

  1. 不可重入
  • 含义:同一个线程在还没释放锁时,又想再次获取这个锁,不能成功(会被自己阻塞);

  • 举例:就像你在方法 A 拿了锁,然后方法 A 又调用了 B,而 B 又要拿同一把锁,就挂住了;

  • 风险:递归调用/多层封装下容易死锁;

  • synchronized/ReentrantLock 是可重入的,而原始 Redis 实现不是。

  • 不可重试

  • 含义:获取失败一次就放弃,没有“等待+重试”机制;

  • 问题:并发环境下,很多线程抢锁失败后立刻返回,不能等待排队,业务体验差。

  • 超时释放有风险

  • 加锁时设置了过期时间,防止死锁;

  • :如果业务执行时间过长,锁先过期了;

  • 别人进来抢锁,你业务还没做完;

  • 即使你用 Lua 判断 + 删除,也只能避免误删,但无法保证锁一定有效到你业务真正执行完毕

  • 结论:需要“锁续期机制”来保持锁有效!

  • 主从一致性问题(分布式集群)

  • Redis 使用主从集群时,写入数据是先写主节点,再同步给从节点;

  • 如果在写成功前主节点挂了,但从节点没有拿到同步数据,从节点会认为锁没被加,导致锁被重复加

  • 结果:多个线程同时进入临界区,锁形同虚设

  • 这个问题是 Redis 本身架构的弱点,单靠你手写逻辑不好处理。

img

总结一句话:

自己写的 Redis 分布式锁,虽然能用,但存在 不可重入、不可续期、无排队、不稳定 等问题,在真正生产环境里不够健壮。

什么是Redission呢

Redisson 是一个基于 Redis 的 Java 客户端框架,它封装了很多分布式服务,其中包括各种类型的 分布式锁实现,还带有“自动续期”、“可重入”、“公平锁”、“读写锁”等高级功能。

img

5.2 分布式锁-Redission快速入门

第1步,先引入依赖:

1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>

第2步,在config包下创建RedissonConfig类,写入如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
public class RedissonConfig {

@Bean
public RedissonClient redissonClient(){
// 配置
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.150.101:6379")
.setPassword("123321");
// 创建RedissonClient对象
return Redisson.create(config);
}
}

说明:

  • 创建并注册 RedissonClient Bean;
  • 使用 useSingleServer() 表示你使用的是单机 Redis(后期可以切换为集群模式);
  • 注意地址前缀是 redis://
  • 密码如果 Redis 没设可以不写 .setPassword(...)

第3步,引入RedissonClient,调用getLock获取锁对象,然后用tryLock获取锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Resource
private RedissionClient redissonClient;

@Test
void testRedisson() throws Exception{
//获取锁(可重入),指定锁的名称
RLock lock = redissonClient.getLock("anyLock");
//尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS);
//判断获取锁成功
if(isLock){
try{
System.out.println("执行业务");
}finally{
//释放锁
lock.unlock();
}

}



}

参数说明:

tryLock(1, 10, TimeUnit.SECONDS)

  • 1 秒内尝试获取锁(抢不到就放弃);
  • 锁持有 10 秒后自动释放

lock.unlock():安全释放锁,防止阻塞;

Redisson 内部已经自动实现了线程标识、防误删、自动续期等逻辑。

在 VoucherOrderServiceImpl

注入RedissonClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Resource
private RedissonClient redissonClient;

@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀已经结束!");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
Long userId = UserHolder.getUser().getId();
//创建锁对象 这个代码不用了,因为我们现在要使用分布式锁
//SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
RLock lock = redissonClient.getLock("lock:order:" + userId);
//获取锁对象
boolean isLock = lock.tryLock();

//加锁失败
if (!isLock) {
return Result.fail("不允许重复下单");
}
try {
//获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
//释放锁
lock.unlock();
}
}

说明:

  • lock:order:userId:给每个用户设置唯一的锁,防止同一用户重复下单;
  • 使用 tryLock() 获取锁(默认立即尝试获取一次);
  • Redisson 自动处理续期和线程隔离(不需要你手动判断 UUID + 线程 ID);
  • 释放锁直接调用 .unlock(),更简单更安全。

第4步,启动服务

发送下面的请求:

img

在执行释放锁的语句前,可以看到Redis中有锁的记录:

img

用jmeter来测试,可以发现没有出现并发安全问题:

img

img

img

img

5.3 分布式锁-redission可重入锁原理

5.3.1重入锁的概念(本地锁和分布式锁通用)

可重入锁(Reentrant Lock)允许 同一个线程重复获取同一把锁

  • 例如一个线程调用了两次 lock.lock(),只要线程一致,不会死锁。解锁时必须解锁两次才彻底释放。

5.3.2本地锁(如 Java ReentrantLock)的实现机制

  • 通过一个

    1
    volatile state

    状态变量控制重入次数。

    • 初始状态:state = 0 表示没人持有。
    • 第一次获取:state = 1
    • 同一线程再次获取:state += 1
    • 每次释放:state -= 1
    • state == 0,锁释放成功。

5.3.3Redisson 可重入分布式锁实现原理

1. Redis 中如何存储锁?

  • 使用

    Hash 结构

    存储:

    • Key:锁的名称(如 "lock"
    • Field(小key):UUID:ThreadId 表示哪个线程持有锁
    • Value:重入次数,例如 1 表示加锁了一次
1
2
3
lock {
thread1 : 1
}

2.锁的获取逻辑

情况1:锁不存在,直接加锁

1
2
3
4
5
if redis.call('exists', KEYS[1]) == 0 then
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end
  • 判断锁是否存在(exists
  • 如果不存在,设置 hash field = threadId,值为 1(加锁一次)
  • 设置过期时间(防止死锁)
  • 返回 nil,表示加锁成功

情况2:锁存在,且是自己线程持有(可重入)

1
2
3
4
5
if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end
  • 判断当前线程是否已持有(hexists
  • 如果是:hincrby 锁计数 +1
  • 续期(pexpire
  • 返回 nil 表示加锁成功(重入)

情况3:锁被其他线程持有

1
return redis.call('pttl', KEYS[1]);
  • 返回剩余过期时间
  • Java 端根据返回值是否是 null 判断是否加锁成功,如果失败会进行 自旋重试(while(true) 循环)

完整代码

1
2
3
4
5
6
7
8
9
10
11
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);"

5.3.4 释放锁的逻辑

  • 判断锁是不是自己加的(判断 field 是否是自己)
  • hincrby 把锁值减1
  • 如果减完为0,使用 del 删除整个锁
  • 若不是自己线程,加锁失败

img

代码说明:

  • 第一次 tryLock() 成功后,调用 method2() 再加一次锁,也成功,体现 可重入性
  • 每次 unlock() 只是让 value -1,直到为0,才释放 Redis 中的锁。

img

img

5.4 分布式锁-redission锁重试和WatchDog机制

5.4.1Redisson 分布式锁中 tryLock 重试 + WatchDog 自动续期的源码机制。

下面是对含有waitTime(等待时间)的tryLock的跟踪:

img

img

主题:tryLock(long waitTime, TimeUnit unit) 的源码调用链 + 默认看门狗时间为 30 秒

你调用的是:

1
tryLock(waitTime, unit)

实际会调用重载方法:

1
tryLock(waitTime, -1L, unit) // -1 表示未指定超时时间

img

img

重点含义:

leaseTime = -1 触发 WatchDog 看门狗机制

默认配置 this.lockWatchdogTimeout = 30000L // 30秒

Lua 脚本说明

return redis.call(‘pttl’, KEYS[1]);

  • 返回当前锁剩余时间(毫秒)
  • 如果锁被占用,则 Redisson 用这个值判断是否自旋重试

img

img

主题:锁的获取流程和释放锁的 Lua 脚本实现逻辑

获取锁逻辑:

1
2
3
if (ttl == null) {
return true; // 加锁成功
}

否则进入剩余时间判断:是否还有等待机会。

释放锁逻辑(Redis 脚本)

1
2
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]); // 发布解锁通知

当锁释放时,通过 Redis publish 发消息,通知其他线程“锁已释放”,以唤醒它们继续抢锁。

img

说明 subscriberFuture.await:

1
subscriberFuture.await(time, TimeUnit.MILLISECONDS);
  • 等待一段时间内是否能收到锁释放的通知(订阅模式)
  • 如果 false,表示超时未收到通知

后续继续抢锁:

1
tryAcquire(waitTime, leaseTime, unit, threadId)

会再调用这个方法尝试重新获取锁。

img

img

Redisson 并非死等,而是结合“等待发布+主动抢锁”,避免资源浪费。

img

主题:触发 WatchDog 续期的逻辑(重点)

条件判断:

1
if (leaseTime == -1) // 表示未指定锁时间

那么 Redisson 将默认使用:

1
getLockWatchdogTimeout() = 30_000 ms

成功加锁后调用:

1
scheduleExpirationRenewal(threadId); // 开启续约机制

img

img

img

主题:WatchDog 定时任务如何每 10 秒自动续期

核心逻辑:

1
this.internalLockLeaseTime / 3

默认锁时间是 30 秒,所以每 10秒调用一次 renewExpiration(),实现持续续期。

Lua脚本中的续期操作:

1
redis.call('pexpire', KEYS[1], ARGV[1]); // 续期锁有效时间

img

img

主题:取消看门狗续期任务(释放锁时触发)

解锁后调用:

1
this.cancelExpirationRenewal(threadId)
  • 作用:停止定时器,释放线程资源
  • 只有当锁真正释放或者线程结束时,才会取消续期逻辑,确保资源不浪费

img

img

img

加锁逻辑总结:

  • ttl == null

    :获取锁成功

    • leaseTime == -1:开启自动续期(WatchDog)
    • leaseTime != -1:定时释放锁,不续期
  • ttl != null

    :获取锁失败

    • 剩余时间 > 0:订阅解锁信号
    • 等待并判断是否超时,进行下一轮尝试

解锁逻辑:

  • 成功释放后,publish 发布消息通知等待线程
  • 取消续约任务,清理 EXIPRATION_RENEWAL_MAP

总结一句话:Redisson 分布式锁 = Redis + Lua + Java定时任务 实现了一个 可重入、自动续期、带发布通知机制的分布式锁系统,健壮且高性能。

5.4.2Redisson 的两种加锁方式

Redisson 的 lock() 方法分为两种情况:

方法 描述
lock(long leaseTime, TimeUnit unit) 指定锁的超时时间,不会启动看门狗
lock()(无参) 未指定锁超时时间,Redisson 会自动启动 WatchDog 定时续约机制

5.4.3lock() 加锁流程回顾

1.获取当前线程 ID

1
long threadId = Thread.currentThread().getId();

2.尝试加锁(tryAcquire)

1
2
3
4
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
if (ttl == null) {
return; // 加锁成功或重入成功
}

若返回值为 null,表示加锁成功(或是当前线程重入)
若返回 ttl(过期时间),表示锁被其他线程占用,进入 while(true) 自旋重试逻辑

5.4.4三种加锁场景逻辑判断(tryAcquire 内部)

情况 Redis 逻辑 说明
锁不存在 hset + pexpire 直接加锁成功
锁是当前线程持有 hincrby + pexpire 可重入,计数+1
锁存在,非当前线程 返回剩余过期时间 加锁失败,等待自旋重试

5.4.5 重试机制(循环抢锁)

如果初始加锁失败,会进入以下逻辑:

1
2
3
4
while (true) {
tryAcquire() // 不断尝试获取锁
...
}

Redisson 会自旋尝试,直到获取锁成功或超时。

5.4.6 看门狗机制(WatchDog)

适用场景:

调用 lock() 无参版本时,不指定 leaseTime,Redisson 默认设置锁有效期为 30秒,并会启动 自动续期机制

WatchDog 实现核心逻辑:

1
2
3
4
5
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
});

如果加锁成功,就调用 scheduleExpirationRenewal(),触发自动续期。

5.4.7 续约机制源码分析(renewExpiration)

1
2
3
4
5
6
7
8
9
10
11
12
13
private void renewExpiration() {
// 每次续期都会重新创建一个定时任务(10秒后触发)
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) {
// 仍持有锁 -> 再续期30s
renewExpirationAsync(threadId).onComplete((res, e) -> {
if (res) {
renewExpiration(); // 递归设置下次续期
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); // 默认是每10秒执行一次(30/3)
}

整体过程:

  1. 默认锁有效时间:30秒
  2. 每 10 秒(30/3)启动一个定时任务续期
  3. 如果线程还活着且仍持有锁 → 重置锁的过期时间为 30秒
  4. 如果线程挂了(如宕机、异常) → 不再续期,锁自动过期释放

5.4.8 总结:Redisson 锁的核心机制

机制 功能
tryAcquire() 尝试加锁逻辑,判断锁状态
while(true) 自旋重试机制,直到加锁成功
WatchDog(看门狗) 自动续期,防止锁提前释放
renewExpiration() 启动定时任务每 10 秒续一次期
锁自动释放 线程挂掉不会续期,锁30秒后自动释放
  • 看门狗机制是 线程持有锁自动续期的保障
  • 使用 lock()(无参)时 不要担心死锁,即使忘了释放锁,30秒后也会释放;
  • 不传 leaseTime 才会触发 WatchDog;
  • Redisson 的重入锁依靠 Hash结构 + 线程id + 计数 实现。

5.5 分布式锁-redission锁的MutiLock原理

为什么需要 MultiLock?Redisson 锁丢失的背景

为了提高redis的可用性,我们会搭建集群或者主从,现在以主从为例

主从同步延迟 + 宕机导致锁丢失

图中描述的是以下场景:

  1. Java 应用尝试获取 Redis 锁:SET lock thread1 NX EX 10(设置锁)
  2. 锁成功设置到 Redis 主节点(Master)
  3. Redis 主节点还未将锁同步到从节点(Slave)
  4. 此时主节点宕机了
  5. Redis 自动从 Slave 中选出一个新的 Master(原来的从节点)
  6. 由于新 Master 上 没有锁数据 → 锁丢失!💥

这意味着:即使你刚加完锁,锁信息也可能因为主从未同步就丢失,造成并发安全隐患!

img

为了避免上述锁丢失问题,Redisson 提出了 MultiLock,也叫“多节点锁”或“联合锁”。

MultiLock 核心原理:

  • 如果任何一个节点失败,则失败的锁会回滚并释放
  • 同时向多个 Redis 主节点加锁(比如 Redis 主机集群)
  • 只有 所有节点都加锁成功,才算真正获取到锁

img

那么MutiLock 加锁原理是什么呢?笔者画了一幅图来说明:当我们去设置了多个锁时,redission会将多个锁添加到一个集合中,然后用while循环去不停去尝试拿锁,但是会有一个总共的加锁时间,这个时间是用需要加锁的个数 * 1500ms ,假设有3个锁,那么时间就是4500ms,假设在这4500ms内,所有的锁都加锁成功, 那么此时才算是加锁成功,如果在4500ms有线程加锁失败,则会再次去进行重试.

img

  1. 收集多个锁
1
LockList.add(lock1, lock2, lock3)
  • 收集所有待加锁的 Redis 实例
  • 设置总加锁时间:
1
总超时时间 = 锁数量 * 1500ms
  • 例如:3 个锁 → 超时时间为 4500ms
  • 加锁流程(伪代码):
1
2
3
4
5
6
while (true) {
if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) {
return true;
}
// 加锁失败时自动重试
}
  1. 遍历每个锁,逐一尝试加锁
1
2
3
4
5
6
7
8
9
10
for (lock in LockList) {
try {
boolean locked = lock.tryLock(4500ms, -1); // -1:表示触发自动续期
if (locked) {
acquiredLocks.add(lock);
}
} catch (Exception e) {
break; // 加锁失败中断
}
}

5. 成功判定条件:

1
2
如果 acquiredLocks.size == LockList.size // 所有锁都成功
return true;

6. 如果有锁失败,回滚所有已获取的锁:

1
2
unlockInner(acquiredLocks); // 释放所有已加锁的节点
return false;

img

6、秒杀优化

6.1 秒杀优化-异步秒杀思路

我们来回顾一下下单流程

当用户发起请求,此时会请求nginx,nginx会访问到tomcat,而tomcat中的程序,会进行串行操作,分成如下几个步骤

  • 1、查询优惠卷
  • 2、判断秒杀库存是否足够
  • 3、查询订单
  • 4、校验是否是一人一单
  • 5、扣减库存
  • 6、创建订单

在这六步操作中,又有很多操作是要去操作数据库的,而且还是一个线程串行执行, 这样就会导致我们的程序执行的很慢,所以我们需要异步程序执行,那么如何加速呢?

img

原始秒杀流程(未优化)

这张图描述的是传统的秒杀请求处理流程:

流程步骤:

  1. 用户发起秒杀请求,请求被 Nginx 接收并转发到 Tomcat
  2. Tomcat 中执行串行逻辑(每一步都访问数据库):
    • 查询优惠券
    • 判断库存
    • 查询订单(是否重复)
    • 校验一人一单
    • 扣减库存
    • 创建订单

问题:

  • 所有操作都在主线程中串行执行,访问数据库频繁
  • 请求高峰期,数据库压力爆炸,响应速度慢
  • 容易引发 超卖性能瓶颈

优化方案:我们将耗时比较短的逻辑判断放入到redis中,比如是否库存足够,比如是否一人一单,这样的操作,只要这种逻辑可以完成,就意味着我们是一定可以下单完成的,我们只需要进行快速的逻辑判断,根本就不用等下单逻辑走完,我们直接给用户返回成功, 再在后台开一个线程,后台线程慢慢的去执行queue里边的消息,这样程序不就超级快了吗?而且也不用担心线程池消耗殆尽的问题,因为这里我们的程序中并没有手动使用任何线程池,当然这里边有两个难点

第一个难点是我们怎么在redis中去快速校验一人一单,还有库存判断

第二个难点是由于我们校验和tomct下单是两个线程,那么我们如何知道到底哪个单他最后是否成功,或者是下单完成,为了完成这件事我们在redis操作完之后,我们会将一些信息返回给前端,同时也会把这些信息丢到异步queue中去,后续操作中,可以通过这个id来查询我们tomcat中的下单逻辑是否完成了。

img

前端请求流程变更:

  1. 用户请求发送到 Nginx

  2. Nginx 将请求发给 Redis 进行

    快速判断

    • 秒杀库存是否足够
    • 是否一人一单

3.这两个逻辑在 Redis 中执行,响应极快,避免 Tomcat 阻塞。

4.后台 Tomcat 消费消息队列中的数据,执行真正的下单流程(慢慢完成)

5.判断通过后,Redis 将 userId、orderId 等信息 存入 消息队列

优势:

  • 高并发下不会压垮数据库
  • 秒杀操作近似“无锁”,吞吐高
  • 用户体验好(快速响应)
  • 可扩展性强(异步处理易于伸缩)

img

Redis Lua 脚本保障 库存判断 + 一人一单判断 + 写入状态原子性,这是关键!

Redis 数据结构设计

Key Value
stock:vid:7 秒杀库存(例如100)
order:vid:7 用户ID列表(如Set/Stream)记录下单人

Lua 脚本逻辑图(左下):

  1. 判断库存是否充足(if stock <= 0
  2. 判断用户是否已下单(是否在 set 集合中)
  3. 若均通过:
    • 扣减库存
    • 将用户加入 set 集合(防止重复)
    • 返回成功码 0

后续处理(右侧流程):

  1. 执行 Lua 脚本
  2. 如果返回结果为 0,说明可以下单
  3. 将 couponId、userId、orderId 加入消息队列
  4. 后台线程消费消息,执行真正的:
    • 校验库存(可选)
    • 创建订单
    • 持久化入数据库

6.2 秒杀优化-Redis完成秒杀资格判断

需求:

  • 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
  • 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
  • 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
  • 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能

这个阶段我们要实现的是:

功能 技术方案
秒杀资格判断(库存 & 一人一单) 使用 Redis + Lua 脚本,原子判断
异步下单处理 Redis Stream 队列 + 后台线程消费
快速响应用户 Redis 脚本立即返回结果(成功/失败)
延迟落库 后台线程根据消息内容执行真正的下单操作

img

在VoucherServiceImpl的addSeckillVoucher方法的末尾添加下面这段代码把秒杀的库存保存到Redis中:

1
2
//保存秒杀的库存到Redis
stringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK_KEY +voucher.getId(),voucher.getStock().toString());

说明:

  • SECKILL_STOCK_KEY 前缀常量为 "seckill:stock:"
  • Redis 中会生成一个 Key,例如:seckill:stock:17,值为库存数量
  • 相当于秒杀券一上架,就把库存同步到了 Redis 里,后续不再访问数据库判断库存

发送请求,新增一份优惠券:

img

可以看到在Redis中记录了优惠券的记录:

img

img

完整的VoucherServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
// 保存秒杀库存到Redis中
//SECKILL_STOCK_KEY 这个变量定义在RedisConstans中
//private static final String SECKILL_STOCK_KEY ="seckill:stock:"
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}

完整lua表达式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 1.3.订单id
local orderId = ARGV[3]

-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId

-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
-- 3.2.库存不足,返回1
return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
-- 3.3.存在,说明是重复下单,返回2
return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0

解析一下这个lua脚步

(1)判断库存是否足够

1
if tonumber(redis.call('get', stockKey)) <= 0 then return 1 end
  • 库存不足返回 1,结束逻辑

(2)判断是否重复下单

1
if redis.call('sismember', orderKey, userId) == 1 then return 2 end
  • orderKeyseckill:order:<voucherId>,用 Set 保存已下单用户

(3)扣库存、记录下单用户

1
2
redis.call('incrby', stockKey, -1)
redis.call('sadd', orderKey, userId)

(4)写入消息队列 Stream

1
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)

使用 Redis Stream 作为消息队列,后续后台线程从这里消费数据,实现异步下单。

当以上lua表达式执行完毕后,剩下的就是根据步骤3,4来执行我们接下来的任务了

VoucherOrderServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public Result seckillVoucher(Long voucherId) {
//获取用户
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
// 1.执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(), String.valueOf(orderId)
);
int r = result.intValue();
// 2.判断结果是否为0
if (r != 0) {
// 2.1.不为0 ,代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
//TODO 保存阻塞队列
// 3.返回订单id
return Result.ok(orderId);
}

脚本执行后返回值含义:

返回值 含义
1 库存不足
2 重复下单
0 有秒杀资格,成功入队,异步下单中

测试:

在Apifox中发送测试数据,秒杀下单,成功后返回订单id:

img

在Redis中库存成功扣减1,order有缓存

img

如果再次发送会提示不能重复下单:

img

准备在Jemeter中测试,首先把缓存中的优惠券库存改为200:

img

测试后库存减为0,新增200条订单记录:

img

img

可以看到平均响应时间减少10倍,最快响应时间减少60倍,最大响应时间缩短:

img

6.3 秒杀优化-基于阻塞队列实现秒杀优化

img

阻塞队列:尝试从队列获取元素,如果没有元素会被阻塞,直到队列中有元素才会被唤醒,获取元素。

只要类一启动,用户随时都有可能来抢购,因此VoucherOrderHandler这个类的初始化必须在类初始化后执行。

在VoucherOrderServiceImpl类中,首先要新增一个orderTasks阻塞队列,然后设置一个线程池和run方法。

在run方法中调用阻塞队列的take方法,orderTasks.take方法是一个阻塞方法,如果队列中有元素会获取,如果队列中无元素则阻塞等待。

这里相当于是开启了一个全新的线程来执行获取队列中订单信息和异步创建订单的任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024*1024);
private static ExecutorService seckill_order_executor = Executors.newSingleThreadExecutor();
/**orderTasks 是一个阻塞队列,用来存储待处理的订单。seckill_order_executor 是一个单线程的执行器,用来处理队列中的订单任务。**/
@PostConstruct
private void init(){
seckill_order_executor.submit(new VoucherOrderHandler());
}//在 @PostConstruct 注解的方法中,启动了一个新线程来不断地从队列中取出订单信息并处理:
private class VoucherOrderHandler implements Runnable{
@SneakyThrows
@Override
public void run() {
while(true){
try {
//1.获取队列中的订单信息
VoucherOrder voucherOrder = orderTasks.take();
//2.创建订单
handleVoucherOrder(voucherOrder);
} catch (InterruptedException e) {
log.debug("处理订单异常",e);
}
}

}
}
//take() 方法是阻塞的,只有队列中有元素时才会返回,如果没有元素,则当前线程会阻塞,直到有新元素加入队列。

然后新增一个handleVoucherOrder方法,,首先通过 Redisson 分布式锁 来确保 一人一单 逻辑的原子性,防止同一个用户多次下单。:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public IVoucherOrderService proxy ;
private void handleVoucherOrder(VoucherOrder voucherOrder) {
//1.获取用户
Long userId = voucherOrder.getUserId();
//2.创建锁对象
RLock lock = redissonClient.getLock("lock:order:"+userId);
//3.获取锁
boolean isLock = lock.tryLock();
//4.判断是否获取锁成功
if(!isLock) {
log.error("不允许重复下单");
return;
}
try {
//获取代理对象
proxy.createVoucherOrder(voucherOrder);
}finally {
lock.unlock();
}
}
  • 获取锁后,调用 createVoucherOrder 方法来扣减库存并保存订单。
  • 如果锁获取失败,说明该用户已经下单,直接返回。

createVoucherOrder方法主要是用来对数据库操作,比如扣减库存,然后保存订单的信息到数据库,会有额外的对一人一单和库存数量的判断,虽然这些在Redis中已经判断过,但这里是双重保险。

异步处理不需要再返回给前端任何东西。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
//6.一人一单
Long userId = voucherOrder.getUserId();
//6.1查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
//6.2判断是否存在
if(count>0){
//用户已经购买过了
log.error("用户已经购买过一次!");
return;
}
//3.2库存充足扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") //相当于set条件 set stock = stock - 1
.eq("voucher_id", voucherOrder.getVoucherId()) //相当于where条件 where id = ? and stock = ?
.gt("stock",0).update();
if(!success){
log.error("库存不足!");
return;
}

long orderId = redisIdWorker.nextId("order");//订单id
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherOrder.getVoucherId());//代金券id
save(voucherOrder);

}
下面是对seckillVoucher的简单修改:seckillVoucher 方法在执行 Lua 脚本后,判断用户是否有秒杀资格。如果有,则将订单信息封装并添加到阻塞队列中

public Result seckillVoucher(Long voucherId) {

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
    //1.执行Lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
UserHolder.getUser().getId().toString()
);
int r = result.intValue();
if(r != 0){ //2.判断结果是否为0,不为0,代表没有购买资格
return Result.fail(r==1 ? "库存不足":"不能重复下单");
}
//2.2.为0,有购买资格,把下单信息保存到阻塞队列
long orderId = redisIdWorker.nextId("order");
//封装
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setId(orderId);//订单id
voucherOrder.setUserId(UserHolder.getUser().getId());//用户id
voucherOrder.setVoucherId(voucherId);//代金券id
//保存阻塞队列
orderTasks.add(voucherOrder);
//获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
//3.返回订单id
return Result.ok(orderId);
}

流程总结

  • 秒杀资格判断:通过 Lua 脚本判断库存是否充足、是否重复下单。
  • 封装订单信息:如果有资格,封装订单信息并保存到阻塞队列。
  • 异步下单:通过新线程异步处理订单,防止阻塞前端。
  • 双重保险:Redis 和数据库中都进行库存与一人一单的判断,保证数据一致性

测试:

先把tb_voucher_order内容清空。把tb_seckill_voucher的stock库存改为200。

然后把Redis中对应优惠券的库存改为200。清空之前生成的订单。检查是否有1000个用户的token。

先用Apifox进行测试,测试一人一单的情况:第2次下单显示不能重复下单。

img

检查数据库是否多1条订单记录,库存是否减少1,缓存中库存是否减少1。

接下来用Jemeter进行测试,会发现库存扣减为0,数据库中多200条数据,缓存中的库存也扣减到0。

看聚合报告的结果如下:

img

因为做了异步下单,会占用一定的CPU,所以平均值要比第2次更长。

和下面前2次的结果进行对比可以发现,响应的平均值比最初提高10倍,最快响应时间提高了80倍,最慢响应时间提高了6倍。

img

秒杀业务的优化思路:

1.先利用Redis完成库存量、一人一单的判断,完成抢单业务。

2.将下单业务放入阻塞队列,利用独立线程异步下单。

基于阻塞队列的异步秒杀存在哪些问题:

1.内存限制问题。使用的是jdk提供的阻塞队列,使用的是JVM的内存,在一开始写死了队列空间的大小,如果在高并发的情况下,队列很快会被占满,如果不对队列的空间加以限制,很容易造成内存的溢出。

2.数据安全问题。缺乏持久化机制,是基于内存来保存信息,如果服务突然宕机,内存中保存的信息都会丢失。如果任务被取出,但由于突然发生事故异常,导致任务没有被消费,任务丢失,会造成数据不一致问题。

7、Redis消息队列

7.1 Redis消息队列-认识消息队列

什么是消息队列:

消息队列 是一种以异步通信为基础的机制,在分布式系统中应用广泛。它的作用就是 存储和管理消息,并通过 消息中间件(Message Broker) 来实现消息的传递。最基本的消息队列包含了三个主要角色:

  1. 生产者(Producer):发送消息到队列。
  2. 消息队列(Message Queue):存储和管理消息。
  3. 消费者(Consumer):从队列中获取消息并进行处理。

img

示例说明:

通过生活中的例子来帮助理解消息队列的作用:

  • 生产者:就像一个 快递员,他负责将快递从商家拿到快递站点(消息队列),等待送货。
  • 消息队列:快递站点,在这里所有的快递都暂时存放,等待被派送到消费者。
  • 消费者:类似 收件人,他们接收到快递,处理自己的需求。

为什么使用消息队列?

提高效率:消息队列帮助系统处理并发,生产者可以快速发送消息,而消费者会异步处理。

解耦:生产者和消费者的工作可以解耦开,生产者无需关心消费者的处理速度,反之亦然。

这种场景在我们秒杀中就变成了:

  1. 用户发起秒杀请求:用户点击秒杀按钮,系统作为 生产者 将秒杀请求(消息)发送到 Redis 消息队列。
  2. 系统判断库存:队列中的消息进入处理流程,由 消费者 负责从 Redis 消息队列中取出消息并执行相应的处理,如判断库存、生成订单等。
  3. 异步处理:消费者处理完消息后,最终结果会异步返回到

Redis提供了3种不同的方式来实现消息队列:

  • 1.list结构:基于List结构模拟消息队列。
  • 2.PubSub(发布订阅):基本的点对点消息模型。
  • 3.Stream:比较完善的功能强大的消息队列模型。

7.2 Redis消息队列-基于List实现消息队列

基于List结构模拟消息队列

消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,因此支持从两端进行操作,能够实现队列的 先进先出(FIFO)模式。

关键命令:

  • LPUSH:将一个或多个元素插入到列表的 头部。用于生产者将消息发送到队列。
  • RPOP:移除并返回列表的 尾部 元素。用于消费者从队列获取消息。
  • BRPOP:当队列为空时,阻塞等待,直到有消息加入队列中

关键点:

  • BRPOPRPOP 命令在队列为空时会返回 null,这与 JVM 阻塞队列 不同,JVM 阻塞队列会直接阻塞线程,直到队列有可用元素。
  • Redis 中使用 BRPOP 来实现 阻塞式队列,使得消费者可以在队列为空时阻塞等待。

队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。

img

img

img

基于List的消息队列有哪些优缺点? 优点:

  • 利用 Redis 存储,不依赖于 JVM 内存中的队列,保证了消息队列的 持久性高可用性
  • 跨进程、跨平台:Redis 可以用于多台机器间的消息传递,适用于分布式系统。
  • 安全性:数据存储在 Redis 中,确保了消息不会丢失,特别是在 Redis 持久化(AOF 或 RDB)开启的情况下。

缺点:

  • 无法强制消费者处理消息的顺序:因为 Redis 是基于 List 实现的,所以 多个消费者 可能会错过某些消息,或消费顺序不保证。
  • 没有内建的消息确认机制:消费者可能会消费同一个消息,若处理失败时缺乏补偿机制。

7.3 Redis消息队列-基于PubSub的消息队列

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

SUBSCRIBE channel [channel] :订阅一个或多个频道 PUBLISH channel msg :向一个频道发送消息 PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道

img

img

img

img

基于PubSub的消息队列有哪些优缺点? 优点:

  • 支持多生产者、多消费者:一个频道可以有多个生产者发布消息,多个消费者订阅并处理这些消息。
  • 灵活的订阅方式:可以精确订阅某个频道,也可以通过模式匹配(PSUBSCRIBE)订阅多个频道。
  • 实时性高:一旦消息发布,所有订阅者可以即时收到,适用于对实时性要求高的场景。

缺点:

  • 不支持数据持久化:Redis Pub/Sub 不会将消息持久化到磁盘,数据丢失风险较大。
  • 无法避免消息丢失:如果消费者在订阅时没有准备好,或 Redis 服务重启等原因,会丢失消息。
  • 消息堆积有限制:如果消息队列堆积过多,超出了 Redis 的内存限制,可能会导致消息丢失。

7.4 Redis消息队列-基于Stream的消息队列

Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

发送消息的命令:

img

  • key:指定 Stream 的名称。
  • fieldvalue:消息的字段和值,每条消息会有多个字段和值。

例如:

img

这条命令会将 "name""age" 这两个字段及其对应的值 "jack""21" 添加到名为 users 的 Stream 中,同时 Redis 会自动为该消息生成一个 唯一的 ID(如 1644085070523-0)。

读取消息的方式之一:XREAD,需要注意的是key和*|ID中间那俩参数是可选参数,一个是用来判断是否自动创建队列,一个是用来设置队列最大消息数量。

img

例如,使用XREAD读取第一个消息:

img

这条命令会从名为 users 的 Stream 中读取消息,从 ID 为 0 的位置开始,最多读取 1 条消息。

XREAD阻塞方式,读取最新的消息:

img

  • BLOCK 1000 表示阻塞等待最多 1000 毫秒,直到队列中有新的消息,或超时。
  • $ 代表读取最新的消息。

在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下

img

  • 该循环会不断执行 XREAD 命令,直到有新消息进入队列。
  • 每次从 Stream 中读取消息并执行 handleMessage() 处理消息。

注意:当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题

STREAM类型消息队列的XREAD命令特点:

  • 消息可回溯
  • 一个消息可以被多个消费者读取
  • 可以阻塞读取
  • 有消息漏读的风险

7.5 Redis消息队列-基于Stream的消息队列-消费者组

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。消费者之间是竞争关系。

具备下列特点:

img

创建消费者组:

img

  • key:队列名称
  • groupName:消费者组名称
  • ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
  • MKSTREAM:队列不存在时自动创建队列

其它常见命令:

删除指定的消费者组

1
XGROUP DESTORY key groupName

给指定的消费者组添加消费

1
XGROUP CREATECONSUMER key groupname consumername

删除消费者组中的指定消费者

1
XGROUP DELCONSUMER key groupname consumername

创建消费者组:

img

从消费者组读取消息:

1
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group:消费组名称
  • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
  • count:本次查询的最大数量
  • BLOCK milliseconds:当没有消息时最长等待时间
  • NOACK:无需手动ACK,获取到消息后自动确认
  • STREAMS key:指定队列名称
  • ID:获取消息的起始ID:

可以发现在同一个消费者组里的消费者对消息不会重复读取,而是依次读取,已被读取的消息不会再次被读取。

img

img

img

消费者确认消息:

XACK key group ID

  • key:是队列名称。
  • group:是消费者组名称。
  • ID:是接收到的消息的ID。

查看Pending-list队列的信息:

img

  • key:是队列名称。
  • group:是组名称。

下面是消息确认:

img

对消息进行确认,确认完消息会被移除:

img

Pending-list队列里面存储的是已经读取,但是还没确认的消息。

img

假如一台节点读取完消息还没却来得及确认就宕机了,可以通过以下的方法解决:

img

正常情况下先用>,如果出现异常,信息会进入到Pending-list,把ID从>改为0,此时取的就是在Pending-list里的消息。

消费者监听消息的基本思路:

img

STREAM类型消息队列的XREADGROUP命令特点:

  • 消息可回溯
  • 可以多消费者争抢消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏读的风险
  • 有消息确认机制,保证消息至少被消费一次
  • 支持消息持久化

img

7.6 基于Redis的Stream结构作为消息队列,实现异步秒杀下单

需求:在这个场景中,我们需要通过 Redis Stream 来实现一个 异步秒杀下单 的消息队列。步骤如下:

  • 创建一个Stream类型的消息队列,名为stream.orders
  • 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
  • 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单

直接通过控制台创建一个stream.orders队列:

img

直接在Lua脚本中编写代码(主要增加一个局部变量)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
--1.参数列表
--1.1.优惠券id
local voucherId = ARGV[1]
--1.2.用户id
local userId = ARGV[2]
--1.3.订单id
local orderId = ARGV[3]

--2.数据key
--2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
--2.2.订单key
local orderKey = 'seckill:order:' .. voucherId

--3.脚本业务
--3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get',stockKey)) <= 0) then
--3.1.2.库存不足,返回1
return 1
end
--3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember',orderKey,userId)==1) then
--3.2.1.存在,说明是重复下单,返回2
return 2
end
--3.3.扣库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
--3.4.下单 sadd orderKey userId
redis.call('sadd',orderKey,userId)
--3.5.发送消息到队列中 XADD stream.orders * k1 v1 k2 v2
redis.call('xadd','stream.orders','*','userId',userId,'voucherId',voucherId,'orderId',orderId)
return 0
  • xadd 命令将 userIdvoucherIdorderId 发送到名为 stream.orders 的队列中。
  • 消息被发送到队列后,消费者可以异步处理这个消息。

在VoucherOrderServiceImpl类中修改seckillVoucher方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public Result seckillVoucher(Long voucherId) {
//获取订单id
long orderId = redisIdWorker.nextId("order");
//1.执行Lua脚本(判断用户是否有购买资格,消息发出)
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
UserHolder.getUser().getId().toString(),
String.valueOf(orderId)
);
int r = result.intValue();
if(r != 0){ //2.判断结果是否为0,不为0,代表没有购买资格
return Result.fail(r==1 ? "库存不足":"不能重复下单");
}
//获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
//3.返回订单id
return Result.ok(orderId);
}

通过执行 Lua 脚本来进行秒杀资格的判断,并将订单信息发送到 stream.orders 队列。

在VoucherOrderServiceImpl中修改VoucherOrderHandler方法的代码:

代码思路如下:

1.从消息队列中尝试读消息。

1
1.1.获取失败,继续循环。

2.获取成功,进行解析和转换。

3.调用createVoucherOrder(voucherOrder)方法完成下单。

4.ACK确认

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
    4.1.确认失败,调用handlePendingList()方法进行处理。
private static ExecutorService seckill_order_executor = Executors.newSingleThreadExecutor();
@PostConstruct
private void init(){
seckill_order_executor.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable{
String queueName = "stream.order";
@SneakyThrows
@Override
public void run() {
while(true){
try {
//1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS streams.order >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
//2.判断消息获取是否成功
if(list==null || list.isEmpty()){
//2.1.获取失败,没有消息,继续下一次循环
continue;
}
//3.解析消息中的订单信息
MapRecord<String, Object, Object> record = list.get(0);
//4.获取成功,可以下单
Map<Object, Object> values = record.getValue();
//3.创建订单
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
createVoucherOrder(voucherOrder);
//4.ACK确认
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
} catch (Exception e) {
log.debug("处理订单异常",e);
handlePendingList();
}
}
}
  • XREAD 命令从 stream.orders 队列中读取消息。
  • 读取到消息后,调用 createVoucherOrder() 方法来处理订单。
  • 使用 ACK 确认 来确保消息被正确消费。
1
  

在VoucherOrderServiceImpl中添加handlePendingList()方法的代码:

下面有几个修改点:1.XREADGROUP语句末尾改为0,表示读Pending-list队列。2.Pending-list消息获取失败结束循环。3.如果抛异常只是暂停一下,然后会继续循环读。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void handlePendingList() {
while(true){
try {
//1.获取Pending-List中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS streams.order 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0"))
);
//2.判断消息获取是否成功
if(list==null || list.isEmpty()){
//2.1.获取失败,说明Pending-list里没有异常消息,结束循环
break;
}
//3.解析消息中的订单信息
MapRecord<String, Object, Object> record = list.get(0);
//4.获取成功,可以下单
Map<Object, Object> values = record.getValue();
//3.创建订单
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
//4.ACK确认
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
} catch (Exception e) {
log.debug("处理Pending-list异常",e);
try {
Thread.sleep(20);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}
}

总结:

  • 异步下单:通过 Redis Stream 实现秒杀下单的异步处理,避免了前端用户的阻塞。
  • 消息队列:使用 Redis Stream 存储消息,确保每个秒杀请求可以被异步处理。
  • 消息确认机制:使用 ACK 确认确保消息被成功消费,并通过 Pending List 处理异常消息,防止丢失。

测试:

用Apifox进行测试,测试接口请求发送成功:

img

测试成功后可以看到:tb_voucher_order表多了1条记录,tb_seckill_voucher表对应优惠券的库存-1;在Redis中seckill:order下出现订单记录,在stockill:stock下的库存-1,在stream.orders下出现1条新的记录。

然后用Jmeter进行测试:可以发现相较于未做异步处理的情况性能仍有较大提升。

img

img

8、达人探店

8.1、达人探店-发布探店笔记

发布探店笔记

探店笔记类似点评网站的评价,往往是图文结合。对应的表有两个:
tb_blog:探店笔记表,包含笔记中的标题、文字、图片等
tb_blog_comments:其他用户对探店笔记的评价

img

具体发布流程

img

上传接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Slf4j
@RestController
@RequestMapping("upload")
public class UploadController {

@PostMapping("blog")
public Result uploadImage(@RequestParam("file") MultipartFile image) {
try {
// 获取原始文件名称
String originalFilename = image.getOriginalFilename();
// 生成新文件名
String fileName = createNewFileName(originalFilename);
// 保存文件
image.transferTo(new File(SystemConstants.IMAGE_UPLOAD_DIR, fileName));
// 返回结果
log.debug("文件上传成功,{}", fileName);
return Result.ok(fileName);
} catch (IOException e) {
throw new RuntimeException("文件上传失败", e);
}
}

}

注意:同学们在操作时,需要修改SystemConstants.IMAGE_UPLOAD_DIR 自己图片所在的地址,在实际开发中图片一般会放在nginx上或者是云存储上。

img

BlogController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@RestController
@RequestMapping("/blog")
public class BlogController {

@Resource
private IBlogService blogService;

@PostMapping
public Result saveBlog(@RequestBody Blog blog) {
//获取登录用户
UserDTO user = UserHolder.getUser();
blog.setUpdateTime(user.getId());
//保存探店博文
blogService.saveBlog(blog);
//返回id
return Result.ok(blog.getId());
}
}

8.2 达人探店-查看探店笔记

探店笔记要包含笔记的内容和博主的相关信息。所以选择在Blog表中添加如下2个字段,这两个字段需要后续我们手动维护(赋值)。

img

img

img

在BlogController类里添加一个queryBlogById方法(虽然Controller里不应该出现业务代码,但鉴于只是简单的查询操作,就不必在意细节了):

1
2
3
4
5
6
7
8
9
10
11
@GetMapping("/{id}")
public Result queryBlogById(@PathVariable("id") Long id){
Blog blog = blogService.getById(id);
if(blog==null){
return Result.fail("笔记不存在");
}
User user = userService.getById(id);
blog.setIcon(user.getIcon());
blog.setName(user.getNickName());
return Result.ok(blog);
}

8.3 达人探店-点赞功能

现在的点赞逻辑是,一个人可以对同一篇笔记点赞无数次。

需求:

  • 1.同一个用户只能点赞一次,如果再次点击则取消点赞。
  • 2.如果当前用户已经点赞,则点赞按钮高亮显示(前端已实现,判断字段Blog类的isLike属性)

分析:

  • 1.给Blog类中添加一个isLike字段,标示是否被当前用户点赞。
  • 2.修改点赞功能,利用Redis的Set集合判断是否点赞过,未点赞则点赞数+1,已点赞则点赞数-1。
  • 3.修改分页查询Blog业务和根据id查询Blog的业务,判断当前用户是否点赞过,赋值给isLike字段。
  • 4.修改分页查询Blog业务,判断当前登录用户是否点赞过,赋值给isLike字段

为什么采用set集合:

Redis 的 Set 是一个不允许重复元素的集合,特别适合用于存储用户是否点赞的状态。

代码如下:

在BlogController类中新增likeBlog方法:

1
2
3
4
@PutMapping("/like/{id}")
public Result likeBlog(@PathVariable("id") Long id) {
return blogService.likeBlog(id);
}
  • 使用 @PutMapping 表示用户点赞或取消点赞。
  • 调用 likeBlog 方法来执行点赞逻辑。

在IBlogService接口中添加方法声明:

1
Result likeBlog(Long id);

在BlogServiceImpl类中添加下面代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private final StringRedisTemplate stringRedisTemplate;
public BlogServiceImpl(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public Result likeBlog(Long id) {
//1.获取登录用户
UserDTO user = UserHolder.getUser();
Long userId = user.getId();
//2.判断当前用户是否已经点赞过
String key = "blog:liked:" +id;
Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
if(BooleanUtil.isFalse(isMember)){
//3.未点赞,可以点赞
//3.1.数据库点赞数+1
boolean isSuccess = update().setSql("liked=liked+1").eq("id", id).update();
//3.2.保存用户到Redis
if(isSuccess){
stringRedisTemplate.opsForSet().add(key,userId.toString());
}
}else{
//4.已点赞,取消点赞
//4.1.数据库点赞数-1
boolean isSuccess = update().setSql("liked=liked-1").eq("id", id).update();
//4.2.把用户从Redis的set集合移除
stringRedisTemplate.opsForSet().remove(key,userId.toString());
}
return Result.ok();
}
  • 点赞:如果 Redis Set 中没有当前用户的 ID,说明用户还未点赞,执行数据库更新(点赞数+1),并将用户 ID 存入 Redis Set。
  • 取消点赞:如果 Redis Set 中有当前用户的 ID,说明用户已点赞,执行数据库更新(点赞数-1),并将用户 ID 从 Redis Set 中移除。

因为我的queryBlogById和queryHotBlog的业务代码都沿用原本的代码写在BlogController中,因此我是直接在BlogController中写入isBlogLiked代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public Boolean isBlogLiked(Blog blog) {
Long userId = null;
try {
//1.获取登录用户
userId = UserHolder.getUser().getId();
} catch (Exception e) {
log.debug("用户未登录!");
return false;
}
//2.判断当前用户是否已经点赞过
String key = "blog:liked:" +blog.getId();
Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
try {
blog.setIsLike(BooleanUtil.isTrue(isMember));
} catch (Exception e) {
log.debug("点赞信息为空!");
return false;
}
return isMember;
}

判断用户是否点赞:通过 Redis 判断当前登录用户是否已点赞,如果点赞,则将 isLike 字段设置为 true,前端根据该字段值来控制点赞按钮的显示状态

img

img

总结:

  • 点赞操作:使用 Redis Set 来存储每个用户对博客的点赞记录,确保每个用户只能对同一篇文章点赞一次。
  • 异步更新:通过 Redis 存储点赞状态,减少数据库操作频率,提高性能。
  • 前端交互:根据 isLike 字段,前端动态更新点赞按钮的状态,实现高亮显示和取消高亮。
  • 缓存优化:使用 Redis 快速检查用户的点赞状态,避免频繁访问数据库,提高系统响应速度。

测试效果:点赞一次高亮,点赞两次取消。在缓存中有相应的记录

img

8.4 达人探店-点赞排行榜

需求:在探店笔记详情页面,按照时间排序,把最早点赞的TOP5列举出来,形成点赞排行榜。

之前的点赞是放到set集合,但是set集合是不能排序的,所以这个时候,咱们可以采用一个可以排序的set集合,就是咱们的sortedSet

SortedSet(有序集合) 是 Redis 中一个重要的数据结构,支持按 score 值排序,这非常适合用于排名等场景。我们需要根据 score(通常是时间戳或者用户的点赞数等)来排序。

img

img

可以用ZADD命令添加元素,ZSCOPE来获得分数对应的元素,ZRANGE来

Redis 中通过 ZADD 命令来向 SortedSet 中添加元素,元素会根据 score 值排序。每个元素有两个部分:

  • Member:表示元素。
  • Score:表示排序值,Redis 会根据这个值对元素进行排序。

img

  • ZADD 添加了 3 个元素,分别为 m1、m2、m3,且分别赋予了不同的 score
  • ZRANGE 命令按 score 升序排列,并返回指定范围的元素。

修改以下几点

img

  • ZADD 用来将用户 ID 和当前时间戳(作为 score)加入到 Redis 的 SortedSet 中。
  • ZREM 用来移除已点赞用户的记录,进行取消点赞操作。

img

在Redis缓存中多了一个score:

img

在BlogController类中添加如下方法:

1
2
3
4
@GetMapping("/likes/{id}")
public Result queryBlogLikes(@PathVariable("id") Long id) {
return blogService.queryBlogLikes(id);
}

BlogController 类中新增查询接口方法 queryBlogLikes

  • GET 请求 /blog/likes/{id} 查询某篇博客的 TOP 5 点赞用户
  • 返回结果是一个 List<UserDTO>,包含了点赞最多的用户。

在IBlogService接口中添加如下方法:

1
Result queryBlogLikes(Long id);

在BlogServiceImpl类中添加如下方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public Result queryBlogLikes(Long id) {
String key = RedisConstants.BLOG_LIKED_KEY +id;
//1.查询top5的点赞用户 zrange key 0 4
Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
if(top5==null || top5.isEmpty()){
return Result.ok(Collections.emptyList());
}
//2.解析出其中的用户id
List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
//3.根据用户id查询用户
List<UserDTO> userDTOS = userService.listByIds(ids)
.stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());
//4.返回
return Result.ok(userDTOS);
}

现在会出现左图问题,先点赞的反而被排到后面了:

img

下面是修改后的queryBlogLikes:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public Result queryBlogLikes(Long id) {
String key = RedisConstants.BLOG_LIKED_KEY +id;
//1.查询top5的点赞用户 zrange key 0 4
Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
if(top5==null || top5.isEmpty()){
return Result.ok(Collections.emptyList());
}
//2.解析出其中的用户id
List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
//3.根据用户id查询用户
String idStr = StrUtil.join(",", ids);
List<UserDTO> userDTOS = userService.query()
.in("id",ids)
.last("ORDER BY FIELD(id,"+idStr+")").list()
.stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());
//4.返回
return Result.ok(userDTOS);
}

能够正常展示:

img

9、好友关注

9.1 好友关注-关注和取消关注

针对用户的操作:可以对用户进行关注和取消关注功能。

img

实现思路:

需求:基于该表数据结构,实现两个接口:

  • 关注和取关接口
  • 判断是否关注的接口

关注是User之间的关系,是博主与粉丝的关系,数据库中有一张tb_follow表来标示:

img

关注是给表新增记录,取关是删除表中记录。

在FollowController类中写入如下代码:

1
2
3
4
5
6
7
8
9
10
//关注
@PutMapping("/{id}/{isFollow}")
public Result follow(@PathVariable("id") Long followUserId, @PathVariable("isFollow") Boolean isFollow) {
return followService.follow(followUserId, isFollow);
}
//取消关注
@GetMapping("/or/not/{id}")
public Result isFollow(@PathVariable("id") Long followUserId) {
return followService.isFollow(followUserId);
}

关注接口(PUT 请求)

  • 路由:/follow/{id}/{isFollow}
  • 功能:判断是关注还是取消关注,通过 isFollow 参数来区分。如果 isFollowtrue,则表示关注;如果为 false,则表示取消关注。

取消关注接口(GET 请求)

  • 路由:/or/not/{id}
  • 功能:判断当前用户是否已关注目标用户。

在IFollowService中写入如下代码:

1
2
3
4
public interface IFollowService extends IService<Follow> {
Result follow(Long followUserId, Boolean isFollow); // 关注或取关
Result isFollow(Long followUserId); // 判断是否关注
}

在FollowServiceImpl类中写入如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Service
public class FollowServiceImpl extends ServiceImpl<FollowMapper, Follow> implements IFollowService {
// 关注与取消关注操作
@Override
public Result follow(Long followUserId, Boolean isFollow) {
Long userId = UserHolder.getUser().getId();
//1.判断是关注还是取关
if(isFollow){
//2.关注,新增数据
Follow follow = new Follow();
follow.setUserId(userId);
follow.setFollowUserId(followUserId);
save(follow);
}else{
//3.取关,删除记录
remove(new QueryWrapper<Follow>()
.eq("user_id", userId).eq("follow_user_id", followUserId));
}
return Result.ok();
}

// 判断当前用户是否已关注目标用户
@Override
public Result isFollow(Long followUserId) {
Long userId = UserHolder.getUser().getId();
//1.查询是否关注
Integer count = query().eq("user_id", userId).eq("follow_user_id", followUserId).count();
//2.判断是否关注
return Result.ok(count>0);
}
}
  • 关注操作:首先获取当前登录的 userId,然后根据 followUserId 创建一个 Follow 实体对象,并将其保存到数据库中。
  • 取消关注操作:根据当前用户的 userId 和目标用户的 followUserId 删除 tb_follow 表中的记录,表示取消关注。
  • 判断是否已关注
  • isFollow 方法:该方法通过查询 tb_follow 表来判断当前用户是否已经关注某个目标用户。如果存在对应记录,则返回 true,否则返回 false

测试:点击关注显示关注成功,数据库里会有记录。取消关注后删除消息。

9.2 好友关注-共同关注

想要去看共同关注的好友,需要首先进入到这个页面,这个页面会发起两个请求

1、去查询用户的详情

2、去查询用户的笔记

以上两个功能和共同关注没有什么关系,大家可以自行将笔记中的代码拷贝到idea中就可以实现这两个功能了,我们的重点在于共同关注功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// UserController 根据id查询用户
@GetMapping("/{id}")
public Result queryUserById(@PathVariable("id") Long userId){
// 查询详情
User user = userService.getById(userId);
if (user == null) {
return Result.ok();
}
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
// 返回
return Result.ok(userDTO);
}

// BlogController 根据id查询博主的探店笔记
@GetMapping("/of/user")
public Result queryBlogByUserId(
@RequestParam(value = "current", defaultValue = "1") Integer current,
@RequestParam("id") Long id) {
// 根据用户查询
Page<Blog> page = blogService.query()
.eq("user_id", id).page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
// 获取当前页数据
List<Blog> records = page.getRecords();
return Result.ok(records);
}

img

功能实现的思路是:

  • 关注操作:当用户关注另一个用户时,不仅要在数据库表中插入记录,还需要将关注的用户ID存储在Redis的 Set 集合中。
  • 取消关注操作:当用户取消对另一个用户的关注时,除了删除数据库中的记录,还需要从Redis中移除对应的用户ID。
  • 共同关注操作:通过Redis的 SINTER 命令,获取当前用户与目标用户关注的交集,进而查询这些共同关注的用户信息。

首先要更改FollowServiceImpl代码中的follow方法,主要在关注时把被关注用户的id放入redis,取关时从redis中移除id:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Resource
private final StringRedisTemplate stringRedisTemplate;
@Resource
private IUserService userService;
public FollowServiceImpl(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public Result follow(Long followUserId, Boolean isFollow) {
Long userId = UserHolder.getUser().getId();
String key = "follows:" + userId;
//1.判断是关注还是取关
if(isFollow){
//2.关注,新增数据
Follow follow = new Follow();
follow.setUserId(userId);
follow.setFollowUserId(followUserId);
boolean isSuccess = save(follow);
if(isSuccess){
//把关注用户的id放入redis的set集合
stringRedisTemplate.opsForSet().add(key,followUserId.toString());
}
}else{
//3.取关,删除记录
boolean isSuccess = remove(new QueryWrapper<Follow>()
.eq("user_id", userId).eq("follow_user_id", followUserId));
if(isSuccess){
//把关注用户的id从Redis移除
stringRedisTemplate.opsForSet().remove(key,followUserId.toString());
}
}
return Result.ok();
}
  • 关注操作:当用户关注某人时,我们在数据库中插入关注记录,并将被关注用户的 userId 存入 Redis 的 Set 集合。
  • 取消关注操作:当用户取消关注时,我们从数据库删除对应的关注记录,并将该用户的 userId 从 Redis 中的 Set 集合移除。

下面进行简单测试,关注时数据存入redis没问题:

img

img

点击共同关注报错,但发出了请求,原因是还没编写方法:

img

可以通过SINTER命令求出交集:

img

FollowController 中添加路由,在FollowController中写入下面代码:

1
2
3
4
@GetMapping("/common/{id}")
public Result followCommons(@PathVariable("id") Long id){
return followService.followCommons(id);
}

IFollowService 接口中声明,在IFollowService中写入下面代码:

1
Result followCommons(Long id);

FollowServiceImpl 类中实现,在FollowServiceImpl类中写入下面代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public Result followCommons(Long id) {
//求的是目标用户和当前用户关注的交集
//1.获取key
Long userId = UserHolder.getUser().getId();
String key1 = "follows:"+userId; // 当前用户关注的集合
String key2 = "follows:"+id; // 目标用户关注的集合
//2.使用Redis的SINTER命令求交集
Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key1, key2);
if(intersect==null||intersect.isEmpty()){
return Result.ok(Collections.emptyList());
}
//3.解析id集合,将交集中的用户ID转换成Long类型
List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
//4. 查询共同关注的用户信息
List<UserDTO> users = userService.listByIds(ids)
.stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());
return Result.ok(users);
}

获取共同关注用户:首先,我们获取当前用户和目标用户的关注列表(存储在 Redis 的 Set 集合中),然后使用 SINTER 命令获取它们的交集(即共同关注的用户)。接着,我们将这些用户ID查询出来,并返回用户信息。

像我现在关注的是小鱼同学和可可今天不吃肉。接下来换号,换成小鱼同学,电话:13686869696,让小鱼同学关注我和可可今天不吃肉。以小鱼同学的视角来查看我,可以看到我们共同关注了可可今天不吃肉。

img

9.3 好友关注-Feed流实现方案

Feed流,即动态消息流,是社交平台用来为用户持续提供内容的一种方式。通过Feed流,用户能够无需主动查找,系统自动推送符合其兴趣的内容,提供更加沉浸式的体验。常见的例子有社交网络的动态时间线(Timeline),例如Facebook、Instagram、Twitter等。

img

Feed流的两种模式:

  1. Timeline模式

    • 定义:不对内容进行筛选,按内容的发布时间排序,通常用于好友或关注的人发布的内容,例如社交平台的动态。
    • 优点:信息全面,不会缺失任何内容;实现较为简单。
    • 缺点:可能存在信息噪音,用户未必感兴趣,导致内容获取效率低。
  2. 智能排序

    • 定义:通过智能算法筛选出用户感兴趣的内容,同时屏蔽掉违规或不感兴趣的内容。这样,系统推送的内容更符合用户的偏好。
    • 优点:用户能够获得更符合其兴趣的内容,增加用户粘性,可能让用户更沉浸在平台内。
    • 缺点:如果推荐算法不准确,可能会误推送不相关或让用户不满意的内容。

本例中,我们使用的是 Timeline模式,即基于用户关注的好友信息,按照时间顺序展示内容。

因此采用Timeline的模式。该模式的实现方案有三种:

  • 拉模式
  • 推模式
  • 推拉结合

拉模式:也叫做读扩散

该模式的核心含义就是:当用户想查看好友的动态时,系统会从用户关注的好友中拉取最新的内容。系统会根据用户关注的人群,拉取这些人发布的信息,并进行排序。

优点:

  • 节约空间:数据仅在用户需要时才进行读取,减少了重复存储。
  • 用户读取完信息后,可以清空收件箱,避免存储大量未读数据。

缺点:

  • 延迟性:用户在读取信息时才会去拉取,若用户关注了大量的人,系统可能需要一次性拉取大量的内容,增加服务器负担。
  • 在数据量大时,可能会导致延时较高。

img

推模式:也叫做写扩散。

当用户(如博主)发布新的内容时,系统会主动将内容推送到关注他的粉丝的收件箱中。这避免了用户每次都要去拉取数据。

优点:时效快,不用临时拉取

缺点:内存压力大,假设一个大V写信息,很多人关注他, 就会写很多分数据到粉丝那边去

img

推拉结合模式:也叫做读写混合,兼具推和拉两种模式的优点。

推拉模式是一个折中的方案

  • 普通用户:普通用户发布的内容会直接推送到其粉丝的收件箱中。
  • 大V用户:对于大V,系统会先将其内容写入到发件箱,再将内容推送到活跃粉丝的收件箱中;对于不活跃的粉丝,内容会存入发件箱,等他们上线时再拉取。

优点

  • 折中方案:既有推送的实时性,又能避免大V账号产生过大内存压力。

img

9.4 好友关注-推送到粉丝收件箱

需求:

  • 修改新增探店笔记的业务,在保存blog到数据库的同时,推送到粉丝的收件箱
  • 收件箱满足可以根据时间戳排序,必须用Redis的数据结构实现
  • 查询收件箱数据时,可以实现分页查询

Feed流中的数据会不断更新,所以数据的角标也在变化,因此不能采用传统的分页模式。

传统了分页在feed流是不适用的,因为我们的数据会随时发生变化

情景解释

  • 假设你首先请求了动态的第一页(page=1,每页5条数据),然后你会得到类似10、11、12、13、14这样的数据。
  • 接着你请求第二页(page=2),你会期望得到接下来的数据(例如15、16、17等)。
  • 然而,如果在你请求数据的过程中,动态流(feed)有新内容发布,那么你的请求就可能会获取到不一致的数据。

潜在问题

  • 如果第一页的数据是在t1时刻请求的,而新数据(比如“Feed: 11”)是在t3时刻发布的,那么第二页获取的数据可能会和第一页重叠。
  • 这会导致分页数据的重复或者错乱,因为Feed在更新时,数据会发生变化。

img

Feed流的滚动分页

  • 这里提出的解决方法是记录每次请求的最后一个数据ID(lastId),这样可以确保每次分页请求都知道从哪里开始获取数据。
  • 比如,如果你已经获取到的最后一个ID是6,那么第二页的数据应该从ID为6的下一条数据开始,这样可以避免重复获取已读过的数据。

举个例子:我每次请求时,保存当前最后读取的数据ID。

当请求下一页时,通过lastId来确认从哪里继续读取数据,确保不会重复或者遗漏。

这种方式确保了分页的数据不会受动态流中间更新的影响。

img

核心的意思:就是我们在保存完探店笔记后,获得到当前笔记的粉丝,然后把数据推送到粉丝的redis中去。

在BlogController里面写入如下代码:

1
2
3
4
5
6
7
8
9
10
@PostMapping
public Result saveBlog(@RequestBody Blog blog) {
// 获取登录用户
UserDTO user = UserHolder.getUser();
blog.setUserId(user.getId());
// 保存探店博文
blogService.saveBlog(blog);
// 返回id
return Result.ok(blog.getId());
}

在IBlogService接口里面写入如下代码:

1
Result saveBlog(Blog blog);

在BlogServiceImpl类中写入如下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
public Result saveBlog(Blog blog) {
//1.获取登录用户
UserDTO user = UserHolder.getUser();
blog.setUserId(user.getId());
//2.保存探店笔记
boolean isSuccess = save(blog);
if(!isSuccess){
return Result.fail("新增笔记失败!");
}
//3.查询笔记作业的所有粉丝
//select * from tb_follow where follow_user_id = ?
List<Follow> follows = followService.query().eq("follow_user_id", user.getId()).list();
//4.推送笔记id给粉丝
for(Follow follow : follows){
//4.1.获取粉丝id
Long userId = follow.getUserId();
//4.2.推送到粉丝收件箱是sortedSet
String key = "feed::"+userId;
stringRedisTemplate.opsForZSet().add(key,blog.getId().toString(),System.currentTimeMillis());
}
//返回id
return Result.ok(blog.getId());
}

9.5好友关注-实现分页查询收邮箱的思路

ZRANGE是按照角标从小到大排序:

img

这个命令返回一个有序集合中成员的范围,可以根据给定的最小值和最大值来限制返回的元素。

ZREVRANGE是按照角标从大到小排序:

img

这个命令与 ZRANGE 类似,不过它是倒序返回集合中的元素。

ZREVRANGEBYSCORE是按照分数从大到小排序:

img

这个命令根据分数进行过滤,返回分数范围内的成员,通常用来处理时间线或者排名之类的数据。

滚动查询:每次分页查询都会获取上一次查询返回的最小值,并将其作为下一次查询的最大值。这样确保每次查询返回的数据是连续的,不会重复或者遗漏。

img

例如,当你使用 ZRANGEZREVRANGE 进行分页查询时,你可以设置 LIMIT 参数来限制返回的结果数量。第一次查询时,你会从第一个成员开始,后续每次查询会根据上一次查询的结果来调整开始的地方。

规律:分数最小值和查的数量固定不变。最大值为上一次查询的最小值、偏移量第1次给0,第1次后给在上一次的结果中,与最小值一样的元素的个数。

当分数一致出现问题:

img

  • 分数相同的成员将被视为具有相同的顺序。
  • 当分数相同的成员出现在两次分页查询的结果中时,它们会按照第一次查询的结果顺序被返回。
  • 比如,当第一次查询返回分数为 6 的成员时,第二次查询会继续返回分数为 6 的成员,这样保证了分页的连续性。

如果 z1 集合中有几个分数为 6 的元素,第一次查询返回了这些元素后,下一次查询会从上一次查询的结果继续获取数据。

img

9.6好友关注-实现滚动分页查询

需求:在实现滚动分页查询时,目的是在用户的个人主页展示他们关注的博客内容。每次查询返回一定数量的数据,同时将最小时间戳和偏移量作为查询条件传递给下一次查询。

具体操作如下:

1.获取最小时间戳:每次查询结束后,分析出返回数据中的最小时间戳,这个时间戳将作为下一次查询的条件,用于获取比当前查询时间更新的博客。

2.计算偏移量:偏移量是上次查询中返回的数据条目数,目的是跳过已经查询过的数据,从而确保每次查询都能获取新的数据。

3.请求参数:每次请求时需要携带 lastId(即上一次查询的最小时间戳)和 offset(即偏移量)这两个参数。第一次查询时,前端会指定这两个参数,之后的查询由后台根据上一次的查询结果进行传递。

综上:我们的请求参数中就需要携带 lastId:上一次查询的最小时间戳 和偏移量这两个参数。

这两个参数第一次会由前端来指定,以后的查询就根据后台结果作为条件,再次传递到后台。

首先定义出来具体的返回值实体类

1
2
3
4
5
6
@Data
public class ScrollResult {
private List<?> list;
private Long minTime;
private Integer offset;
}

ScrollResult 类用于封装查询结果,包含三个字段:

  • list:存储查询到的博客列表。
  • minTime:本次查询返回数据中的最小时间戳,用于下一次查询的条件。
  • offset:偏移量,表示这次查询返回的数据数量,下一次查询需要跳过这些数据。

在BlogController中写入下面代码:

1
2
3
4
@GetMapping("/of/follow")
public Result queryBlogOfFollow(@RequestParam("lastId") Long max,@RequestParam(value = "offset",defaultValue = "0") Integer offset) {
return blogService.queryBlogOfFollow(max,offset);
}

接口接收两个请求参数:

  • lastId:上次查询的最小时间戳。
  • offset:本次查询的偏移量,默认值为 0。

在IBlogService写入如下代码:

1
Result queryBlogOfFollow(Long max, Integer offset);

在BlogServiceImpl中写入如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Override
public Result queryBlogOfFollow(Long max, Integer offset) {
//1.获取当前用户
Long userId = UserHolder.getUser().getId();
//2.查询收件箱 ZREVRANGEBYSCORE key Max Min LIMIT offset count
String key = FEED_KEY+userId;
Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet()
.reverseRangeByScoreWithScores(key, 0, max, offset, 3);
//2.1.非空判断
if(typedTuples==null || typedTuples.isEmpty()){
return Result.ok();
}
//3.解析数据:blogId、minTime(时间戳)、offset
List<Long> ids = new ArrayList<>(typedTuples.size());
long minTime = 0;
int os = 1;
for(ZSetOperations.TypedTuple<String> tuple:typedTuples){
//4.1.获取id
String idStr = tuple.getValue();
ids.add(Long.valueOf(idStr));
//4.2.获取分数
long time = tuple.getScore().longValue();
if(time == minTime){
os++;
}else{
minTime = time;
os=1;
}
}
//4.根据id查询blog
String idStr = StrUtil.join(",",ids);
List<Blog> blogs = query().in("id",ids).last("ORDER BY FIELD(id,"+idStr+")").list();
for (Blog blog : blogs) {
isBlogLiked(blog);
User user = userService.getById(blog.getUserId());
blog.setName(user.getNickName());
blog.setIcon(user.getIcon());
}
//5.封装并返回
ScrollResult r = new ScrollResult();
r.setList(blogs);
r.setOffset(os);
r.setMinTime(minTime);
return Result.ok(r);
}

获取当前用户

  • 使用 UserHolder.getUser().getId() 获取当前登录用户的 ID。

查询动态流

  • 使用 Redis 的 reverseRangeByScoreWithScores 命令按时间戳倒序获取动态流中的博客 ID。查询范围是从时间戳 0 到 max(即上次查询的最小时间戳),偏移量为 offset,每次查询 3 条数据。

处理查询结果

  • 遍历查询结果,提取出博客的 ID 和时间戳。根据时间戳判断是否与上一次的最小时间戳相同,若相同,则偏移量加 1,若不同,则更新最小时间戳并重置偏移量。

查询博客信息

  • 使用 query().in("id", ids) 根据查询到的博客 ID 查询博客详细信息,使用 ORDER BY FIELD 确保返回的博客顺序与 ID 列表一致。

返回结果

  • 将查询到的博客信息、最小时间戳和偏移量封装在 ScrollResult 对象中,并返回给前端。

效果图如下(我关注了小鱼同学,于是我可以看到小鱼同学发布的文章):

img

10、附近商户

10.1、附近商户-GEO数据结构的基本用法

GEO是Geolocation的简写形式,代表地理坐标,在Redis的3.2版本后加入了GEO的支持,允许存储地理坐标信息,帮助我们根据经纬度来检索数据。

常见的命令有:

  1. GEOADD:添加一个地理空间信息,包含:经度(longitude)、纬度(latitude)、值(member)
  2. GEODIST:计算指定的两个点之间的距离并返回
  3. GEOHASH:将指定member的坐标转为hash字符串形式并返回
  4. GEOPOS:返回指定member的坐标
  5. GEORADIUS:指定圆心、半径,找到该圆内包含的所有member,并按照与圆心之间的距离排序后返回。6.以后已废弃
  6. GEOSEARCH:在指定范围内搜索member,并按照与指定点之间的距离排序后返回。范围可以是圆形或矩形。6.2.新功能
  7. GEOSEARCHSTORE:与GEOSEARCH功能一致,不过可以把结果存储到一个指定的key。 6.2.新功能

10.2、 附近商户-导入店铺数据到GEO

img

为什么使用 Redis GEO

Redis 提供的 GEO 数据结构支持存储基于地理位置的成员,可以通过经纬度来存储地理坐标,并支持非常高效的基于距离的查询。这使得我们可以非常方便地进行以下操作:

  • 插入商户的地理位置信息:将商户的经纬度和商户ID存入 Redis。
  • 根据距离查询商户:基于用户的当前位置,查询指定范围内的商户。

具体场景说明:

当用户点击“美食”后,后台会根据用户的地理位置查询出周边的商户。查询的方式是:通过用户提供的地理坐标作为圆心,在 Redis 中根据经纬度存储商户信息,按距离来排序商户。

img

我们要做的事情是:将数据库表中的数据导入到redis中去,redis中的GEO,GEO在redis中就一个menber和一个经纬度,我们把x和y轴传入到redis做的经纬度位置去,但我们不能把所有的数据都放入到menber中去,毕竟作为redis是一个内存级数据库,如果存海量数据,redis还是力不从心,所以我们在这个地方存储他的id即可。

但是这个时候还有一个问题,就是在redis中并没有存储type,所以我们无法根据type来对数据进行筛选,所以我们可以按照商户类型做分组,类型相同的商户作为同一组,以typeId为key存入同一个GEO集合中即可

在src/test/java/com/hmdp的HmDianPingApplicationTests类中写入如下的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Resource
StringRedisTemplate stringRedisTemplate;
@Test
void loadShopData(){
//1.查询店铺信息
List<Shop> list = shopService.list();
//2.把店铺分组,按照typeId分组,typeId一致的放到一个集合
Map<Long,List<Shop>> map = list.stream().collect(Collectors.groupingBy(Shop::getTypeId));
//3.分批完成写入Redis
for (Map.Entry<Long, List<Shop>> entry : map.entrySet()) {
//3.1.获取类型id
Long typeid = entry.getKey();
String key = "shop:geo:"+typeid;
//3.2.获取同类型的店铺的集合
List<Shop> value = entry.getValue();
//3.3.写入redis GEOADD key 经度 纬度 member
List<RedisGeoCommands.GeoLocation<String>> locations = new ArrayList<>(value.size());
for(Shop shop : value){
// stringRedisTemplate.opsForGeo().add(key,new Point(shop.getX(),shop.getY()),shop.getId().toString());
locations.add(new RedisGeoCommands.GeoLocation<>(
shop.getId().toString(),
new Point(shop.getX(),shop.getY())
));
}
stringRedisTemplate.opsForGeo().add(key,locations);
}
}

步骤详解

1. 查询商户信息

首先,从数据库中查询出所有的商户数据。

shopService.list() 返回的是所有商户的信息,包括商户的经纬度(xy)、商户类型(typeId)等。

  1. 将商户按类型分组

商户数据按 typeId(商户类型)进行分组,确保同类型的商户会存储在同一个 GEO 集合中。

通过 Java 8 的 Collectors.groupingBy() 方法,按照 typeId 对商户进行分组,返回一个以 typeId 为键的 Map,每个键对应一个商户类型的商户列表。

3. 将商户数据分批写入 Redis

对于每一组商户,使用 Redis 的 GEOADD 命令将商户的经纬度数据添加到对应的 GEO 集合中。这样,我们就可以在 Redis 中为每种类型的商户创建一个单独的 GEO 集合。

  • key:以商户类型的 typeId 为键,创建一个对应的 Redis GEO 集合(如 shop:geo:1 对应类型为 1 的商户)。
  • locations:这是一个 List,包含每个商户的 GeoLocation,每个商户的位置信息包括商户的 IDPoint(经纬度)。
  • stringRedisTemplate.opsForGeo().add(key, locations):通过 stringRedisTemplateGeo 操作将商户的位置信息批量写入 Redis 中。

关于 Redis GEO 的存储和查询

  • 存储数据:每个商户的 ID 作为 Redis GEO 集合的 member,商户的经纬度作为对应的 Point 存储在 Redis 中。
  • 查询数据:存储完成后,我们可以使用 Redis 提供的 GEO 命令来进行基于地理位置的查询。例如,可以查询给定坐标范围内的商户,或者根据用户的位置,查询附近的商户。

测试:点击运行之后,在Redis中能够看到导入的数据:

img

10.3 附近商户-实现附近商户功能

SpringDataRedis的2.3.9版本并不支持Redis 6.2提供的GEOSEARCH命令,因此我们需要提示其版本,修改自己的POM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>spring-data-redis</groupId>
<artifactId>org.springframework.data</artifactId>
</exclusion>
<exclusion>
<groupId>lettuce-core</groupId>
<artifactId>io.lettuce</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.6.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.1.6.RELEASE</version>
<scope>compile</scope>
</dependency>
</dependencies>
  • spring-data-redis 升级至 2.6.2 版本,以支持 Redis 6.2 新特性。
  • lettuce-core 升级至 6.1.6 版本,确保支持 GEOSEARCH 命令。

在ShopController中修改queryShopByType方法:

1
2
3
4
5
6
7
8
9
@GetMapping("/of/type")
public Result queryShopByType(
@RequestParam("typeId") Integer typeId,
@RequestParam(value = "current", defaultValue = "1") Integer current,
@RequestParam(value="x",required=false) Double x,
@RequestParam(value="y",required=false) Double y
) {
return shopService.queryShopByType(typeId,current,x,y);
}
  • typeId:商户类型ID,用于区分不同的商户。
  • current:当前页,默认为1,用于分页。
  • xy:可选的经纬度,用于查询附近的商户。如果为空,则不按距离查询。

在IShopService接口中写入如下方法:

1
Result queryShopByType(Integer typeId, Integer current, Double x, Double y);

在ShopServiceImpl类中写入如下方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Override
public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {
//1.判断是否需要根据坐标查询
if(x==null || y==null){
//不需要查询坐标,按数据库查
Page<Shop> page = query()
.eq("type_id",typeId)
.page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE));
return Result.ok(page.getRecords());
}
//2.计算分页参数
int from = (current - 1)*SystemConstants.DEFAULT_PAGE_SIZE;
int end = current*SystemConstants.DEFAULT_PAGE_SIZE;
//3.查询redis,按照距离排序、分页。结果:shopId,distance
String key = SHOP_GEO_KEY+typeId;
GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate.opsForGeo()
.search(
key,
GeoReference.fromCoordinate(x, y),
new Distance(5000),
RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end)
);
//4.解析出id
if(results==null){
return Result.ok(Collections.emptyList());
}
List<GeoResult<RedisGeoCommands.GeoLocation<String>>> list = results.getContent();
//4.1.截取from-end的部分
List<Long> ids = new ArrayList<>(list.size());
Map<String,Distance> distanceMap = new HashMap<>(list.size());
if(list.size()<=from){
return Result.ok(Collections.emptyList());
}
list.stream().skip(from).forEach(result->{ //跳过可能把所有数据跳过了
//4.2.获取店铺id
String shopIdStr = result.getContent().getName();
ids.add(Long.valueOf(shopIdStr));
//4.3.获取距离
Distance distance = result.getDistance();
distanceMap.put(shopIdStr,distance);
});
//5.根据id查询shop
String idStr = StrUtil.join(",", ids);
List<Shop> shops = query().in("id", ids).last("ORDER BY FIELD ( id," + idStr + ")").list();
for(Shop shop : shops){
shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());
}
//6、返回
return Result.ok(shops);
}

步骤 1:判断是否需要根据坐标查询

首先检查 xy 是否为空,如果为空,则不需要根据坐标查询,直接从数据库中查询商户信息。如果 xy 不为空,则表示需要根据经纬度查询附近的商户。

步骤 2:计算分页参数

根据当前页数 current,计算分页的起始位置 from 和结束位置 end。这将用于从查询结果中提取对应的数据。

步骤 3:查询 Redis 中的商户数据

使用 Redis 的 GEOSEARCH 命令查询商户,根据用户提供的经纬度(x, y)计算距离,筛选出距离用户位置在一定范围内的商户。

  • SHOP_GEO_KEY + typeId:Redis 中存储商户位置信息的键,以商户类型 typeId 为区分。
  • GeoReference.fromCoordinate(x, y):通过用户提供的经纬度创建 GeoReference
  • new Distance(5000):设置查询的半径为 5000 米,即查询 5 公里范围内的商户。
  • limit(end):限制返回的商户数量。

步骤 4:解析查询结果

从 Redis 返回的结果中提取商户 ID 和距离信息,并根据分页参数截取需要的数据。

  • 使用 skip(from) 跳过不需要的商户数据,确保从正确的起始位置开始提取商户。
  • 提取商户的 ID 和距离信息,存入 idsdistanceMap

步骤 5:根据商户 ID 查询商户详情

根据提取到的商户 ID,从数据库中查询商户的详细信息,并将查询到的距离信息添加到商户数据中。

步骤 6:返回结果

最终返回商户的详细信息以及距离。

效果:

img

11、用户签到

11.1、用户签到-BitMap功能演示

我们针对签到功能完全可以通过mysql来完成,比如说以下这张表

img

用户一次签到,就是一条记录,假如有1000万用户,平均每人每年签到次数为10次,则这张表一年的数据量为 1亿条

每签到一次需要使用(8 + 8 + 1 + 1 + 3 + 1)共22 字节的内存,一个月则最多需要600多字节

我们如何能够简化一点呢?其实可以考虑小时候一个挺常见的方案,就是小时候,咱们准备一张小小的卡片,你只要签到就打上一个勾,我最后判断你是否签到,其实只需要到小卡片上看一看就知道了

我们可以采用类似这样的方案来实现我们的签到需求。

在实际场景中,可能有很多用户每月的签到记录。例如,假设每个用户每月有 30 天的签到记录,我们可以为每个用户使用一个 BitMap 来记录他们的签到情况。每位用户的签到信息会对应一个 1 或 0,1 表示签到,0 表示未签到。

把每一个bit位对应当月的每一天,形成了映射关系。用0和1标示业务状态,这种思路就称为位图(BitMap)。这样我们就用极小的空间,来实现了大量数据的表示

Redis中是利用string类型数据结构实现BitMap,因此最大上限是512M,转换为bit则是 2^32个bit位。

img

每天的签到是一个二进制位,1 表示签到,0 表示未签到。每个用户每月的签到数据就对应一个长度为 30 或 31 的二进制数。

假设一个用户的签到记录是:
1110101001001010111011001...
每个位置对应一个日期,1 表示该日期签到,0 表示未签到。

BitMap的操作命令有:

  • SETBIT:向指定位置(offset)存入一个0或1
  • GETBIT :获取指定位置(offset)的bit值
  • BITCOUNT :统计BitMap中值为1的bit位的数量
  • BITFIELD :操作(查询、修改、自增)BitMap中bit数组中的指定位置(offset)的值
  • BITFIELD_RO :获取BitMap中bit数组,并以十进制形式返回
  • BITOP :将多个BitMap的结果做位运算(与 、或、异或)
  • BITPOS :查找bit数组中指定范围内第一个0或1出现的位置

img

img

BITFIELD key GET(代表查询) u(u代表无符号,i代表有符号)截取几位作为结果 开始的位置

img

11.2 、用户签到-实现签到功能

需求:实现签到接口,将当前用户当天签到信息保存到Redis中

思路:

  • 将每个用户的签到数据以 BitMap 的形式存储。BitMap 通过位来表示数据,签到的位为 1,未签到的位为 0
  • 每次签到时,更新对应日期的位置(即每个用户的签到信息)。

我们通过接口文档发现,此接口并没有传递任何的参数,没有参数怎么确实是哪一天签到呢?这个很容易,可以通过后台代码直接获取即可,然后到对应的地址上去修改bitMap。

img

代码

UserController

1
2
3
4
@PostMapping("/sign")
public Result sign(){
return userService.sign();
}

UserServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public Result sign() {
// 1.获取当前登录用户
Long userId = UserHolder.getUser().getId();
// 2.获取日期
LocalDateTime now = LocalDateTime.now();
// 3.拼接key
String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
String key = USER_SIGN_KEY + userId + keySuffix;
// 4.获取今天是本月的第几天
int dayOfMonth = now.getDayOfMonth();
// 5.写入Redis SETBIT key offset 1
stringRedisTemplate.opsForValue().setBit(key, dayOfMonth - 1, true);
return Result.ok();
}

UserServiceImpl 中,具体实现签到的功能。代码执行的步骤如下:

  • 获取当前用户 ID:通过 UserHolder.getUser().getId() 获取当前用户的 ID。
  • 获取当前日期:使用 LocalDateTime.now() 获取当前日期,用于计算当天的日期。
  • 构建 Redis key:根据用户 ID 和当前年月(格式:yyyyMM)构建 Redis 的 key,即 USER_SIGN_KEY + userId + keySuffix,其中 keySuffix 为年月。
  • 计算今天是本月的第几天:通过 now.getDayOfMonth() 获取今天是本月的第几天(例如 1、2、3…)。
  • 更新 Redis 中的 BitMap:通过 stringRedisTemplate.opsForValue().setBit(key, dayOfMonth - 1, true) 将对应日期的签到状态设置为 1,表示该用户已签到。

SETBIT key offset 1:将 key 对应的 BitMap 中的 offset 位置设置为 1,表示用户在该日期已签到。

  • key:用户某个月的签到记录。
  • offset:签到的日期,dayOfMonth - 1(如 1 号签到对应的 offset 为 0)。
  • true:表示该日期已签到(1),false 表示未签到(0)。

用Apifox进行测试:

img

11.3 用户签到-签到统计

img

在UserController类中写入如下代码:

1
2
3
4
@GetMapping("/sign/count")
public Result signCount(){
return userService.signCount();
}

在IUserService类中写入如下代码:

1
Result signCount();

在UserServiceImpl类中写入如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Override
public Result signCount() {
//1.获取当前登录用户
Long userId = UserHolder.getUser().getId();
//2.获取日期
LocalDateTime now = LocalDateTime.now();
//3.拼接key
String keySuffix = now.format(DateTimeFormatter.ofPattern("yyyyMM"));
String key = USER_SIGN_KEY + userId + keySuffix;
//4.获取今天是本月第几天
int dayOfMonth = now.getDayOfMonth();
//5.获取本月截止今天为止的所有的签到记录,返回的是一个十进制的数字 BITFIELD sign:5:202203 GET u14 0
List<Long> result = stringRedisTemplate.opsForValue().bitField(
key, BitFieldSubCommands.create()
.get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0)
);
if(result==null || result.isEmpty()){
//没有任何签到结果
return Result.ok(0);
}
Long num = result.get(0);
if(num==null || num==0){
return Result.ok(0);
}
//6.循环遍历
int count=0;
while(true){
//6.1.让这个数字与1做与运算,得到数字的最后一个bit位
if((num&1)==0){//6.2.判断这个bit位是否为0
//6.3.如果为0,说明未签到结束
break;
}else{
//6.4.如果不为0,说明已签到,计数器+1
count++;
}
//6.5.把数字右移一位,抛弃最后一个bit位,继续下一个bit位
num >>>= 1;
}

return Result.ok(count);
}
  • 步骤 1:获取当前登录用户的 ID。
  • 步骤 2:获取当前日期。
  • 步骤 3:拼接 Redis key,格式为 USER_SIGN_KEY + userId + "yyyyMM",用于标识每个月的签到数据。
  • 步骤 4:获取今天是本月的第几天,即 dayOfMonth
  • 步骤 5:使用 BITFIELD 命令获取当前月截至今天的签到数据。通过 BitFieldSubCommands 设置获取签到信息的位数。
  • 步骤 6:遍历签到数据,通过位操作(与运算)统计连续签到的天数。

img

img

11.4 额外补充-关于使用bitmap来解决缓存穿透的方案

回顾缓存穿透

缓存穿透问题概述

  • 典型的解决办法是 缓存空值,即当数据库查询返回空数据时,将这个空数据也存入缓存,以防止相同的无效查询频繁访问数据库。
  • 缓存穿透指的是用户请求的数据既不在缓存中,也不在数据库中。每次用户请求的都可能是一个不存在的数据。攻击者可以通过不断请求不存在的数据来绕过缓存,直接访问数据库,造成数据库负担过重。

传统解决方案

在传统解决方案中,如果数据库中没有数据,缓存中也没有,我们就会把该数据标记为空并存入缓存,这样下一次请求该数据时直接从缓存中获取。但这个方法存在的问题是:

  • 如果查询的是不同的 ID 数据,缓存会逐渐变得非常大,占用较多内存。
  • 如果请求的是某个不存在的数据,缓存无法从一开始解决。

所以我们如何解决呢?

为了更有效地解决缓存穿透问题并优化存储空间,我们可以使用 BitMap 代替 List 来存储 ID 数据。

思路

  1. 数据存储方式:将所有合法的 ID 存储到一个非常大的 BitMap 中。每个 ID 的位置(即索引)对应于 BitMap 的某一位。当用户访问某个 ID 时,我们通过哈希算法(如 id % bitmap.size)计算出该 ID 应该存储在 BitMap 的哪个位置,并将该位置的值设置为 1,表示该数据已经存在。
  2. 查询过程:当用户查询某个数据时,我们同样通过哈希算法计算该 ID 应该落在 BitMap 的哪个位置。如果该位置的值为 1,表示该数据存在;如果为 0,则表示该数据不存在,可以直接返回,不访问数据库。

步骤

  1. 将 ID 数据存储到 BitMap

    :例如,使用 Redis 的

    1
    SETBIT

    命令将某个 ID 对应的位设置为

    1
    1

    ,表示该 ID 对应的数据存在。

    • SETBIT key offset 1:设置 key 对应的位(由 ID 计算得出)为 1,表示数据存在。
  2. 查询时通过哈希计算位置:当用户请求某个 ID 时,使用哈希算法(id % bitmap.size)计算出该 ID 在 BitMap 中的偏移位置。如果该位置为 1,则表示该数据存在,反之为 0,则说明该数据不存在。

优点

  • 节省内存:BitMap 只用一个比特位来表示一个数据的存在与否,相比 List 或 HashMap 等数据结构,它能够显著减少内存占用。
  • 高效查询:BitMap 的查询操作非常快速,通过直接访问对应的位来判断数据是否存在,查询复杂度为 O(1)。

误差率

由于使用哈希算法来映射 ID 到 BitMap 中的位,因此可能会发生 哈希冲突。如果多个 ID 被映射到 BitMap 的同一位上,可能会出现误判的情况(即认为某个数据存在,但实际并不存在)。这种情况称为误差率。

误差率会随着哈希冲突的增加而增大,通常我们可以通过增加 BitMap 的大小来降低误差率。

img

  • 用户发起请求,查询某个数据(如 ID 为 1、2、3 的数据)。
  • 数据被存储到 Redis 中,通过 List 存储 ID 数据。查询时,通过检查 List 是否包含对应的 ID 数据来判断缓存是否命中。

img

  • List 数据结构替换成 BitMap,并使用哈希算法将每个 ID 映射到 BitMap 的某一位。
  • 使用 Redis 的 BitMap(SETBITGETBIT)操作,减少了 List 存储空间,并且提高了查询效率。

具体操作

  • SETBIT:设置某个位为 1,表示数据存在。
  • GETBIT:获取某个位的值,判断数据是否存在。

12、UV统计

12.1 、UV统计-HyperLogLog

首先我们搞懂两个概念:

  • UV:全称Unique Visitor,也叫独立访客量,是指通过互联网访问、浏览这个网页的自然人。1天内同一个用户多次访问该网站,只记录1次。
  • PV:全称Page View,也叫页面访问量或点击量,用户每访问网站的一个页面,记录1次PV,用户多次打开页面,则记录多次PV。往往用来衡量网站的流量。

通常来说UV会比PV大很多,所以衡量同一个网站的访问量,我们需要综合考虑很多因素,所以我们只是单纯的把这两个值作为一个参考值

UV统计在服务端做会比较麻烦,因为要判断该用户是否已经统计过了,需要将统计过的用户信息保存。但是如果每个访问的用户都保存到Redis中,数据量会非常恐怖,那怎么处理呢?

Hyperloglog(HLL)是从Loglog算法派生的概率算法,用于确定非常大的集合的基数,而不需要存储其所有值。相关算法原理大家可以参考:https://juejin.cn/post/6844903785744056333#heading-0 Redis中的HLL是基于string结构实现的,单个HLL的内存永远小于16kb内存占用低的令人发指!作为代价,其测量结果是概率性的,有小于0.81%的误差。不过对于UV统计来说,这完全可以忽略。

img

不论添加几次,永远只记录一次。

img

12.2 UV统计-测试百万数据的统计

测试思路:我们直接利用单元测试,向HyperLogLog中添加100万条数据,看看内存占用和统计效果如何

img

经过测试:我们会发生他的误差是在允许范围内,并且内存占用极小

100万数据成功写入:

img

测试前:

img

测试后:

img

大约占用11kb,确实是小于16kb。

Redis(实战篇-黑马点评) 完结