Redis实战
视频学习:黑马Redis入门到实战
课程资料:链接 提取码:eh11
学习路线及部分内容参考:Kyle’s Blog
内容概述
短信登陆
项目导入
- 资料中导入SQL文件,所提供表的内容如下:
表 | 说明 |
---|---|
tb_user | 用户表 |
tb_user_info | 用户详情表 |
tb_shop | 商户信息表 |
tb_shop_type | 商户类型表 |
tb_blog | 用户日记表(达人探店日记) |
tb_follow | 用户关注表 |
tb_voucher | 优惠券表 |
tb_voucher_order | 优惠券的订单表 |
导入后端项目:同样已经放在资源中了,修改Mysql和Redis的相关配置。启动项目后访问http://localhost:8081/shop-type/list ,如果可以看到JSON数据,则说明导入成功
这里我修改了数据库连接驱动为8以后版本,所以对应修改了maven依赖和驱动名称
1
2
3
4
5<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.0.33</version>
</dependency>1
driver-class-name: com.mysql.cj.jdbc.Driver
页面显示:
导入前端项目:这里也是直接拿资源中的nginx项目解压即可,其中已经包含了hmdp的前端资源,位于html文件夹下。
nginx文件结构:
在
nginx所在目录
打开一个cmd窗口,输入命令:start nginx.exe
,即可启动项目访问http://localhost:8080/ ,打开开发者模式,就可以看到页面
基于Session实现登录
登录流程
- 发送验证码
用户在提交手机号后,会校验手机号是否合法,如果不合法,则要求用户重新输入手机号
如果手机号合法,后台此时生成对应的验证码,同时将验证码进行保存,然后再通过短信的方式将验证码发送给用户 - 短信验证码登录、注册
用户将验证码和手机号进行输入,后台从session中拿到当前验证码,然后和用户输入的验证码进行校验,如果不一致,则无法通过校验,如果一致,则后台根据手机号查询用户,如果用户不存在,则为用户创建账号信息,保存到数据库,无论是否存在,都会将用户信息保存到session中,方便后续获得当前登录信息 - 校验登录状态
用户在请求的时候,会从cookie中携带JsessionId到后台,后台通过JsessionId从session中拿到用户信息,如果没有session信息,则进行拦截,如果有session信息,则将用户信息保存到threadLocal中,并放行
发送验证码
首先进入我的
页面,第一次进入会显示手机验证码登录,这里点击发送验证码按钮,发送如下请求:
Request URL: http://localhost:8080/api/user/code?phone=13264429299
Request Method: POST
可以看出,操作的内容是UserController,且方法拦截方法为code
,携带参数phone
1 |
|
修改以上方法,这里我们通过调用service中的实现进行,不要把业务实现放在controller中,不便于重用和阅读。
1 |
|
然后再对应的serviceImpl中实现该方法
1 |
|
注意:这里没有去真的发送验证码,想要改成邮箱验证参考瑞吉外卖部分
登录
上一步填上验证码后,点击登录发送请求和携带参数如下:
Request URL:http://localhost:8080/api/user/login
Request Method:POST
{phone: “13264429299”, code: “404849”}
可以看出是UserController中的login方法
1 |
|
并且使用的Dto来封装手机号和验证码。然后根据上述逻辑进行代码实现。依旧是写在serviceImpl中
1 |
|
其中
1 | private User createUserWithPhone(String phone) { |
登录完后,数据库中就新增了用户数据:
在上述功能完成期间,直接用课程资料中给的maven依赖会报错,我这里经过查阅,将mybatis-plus版本从3.4.3降到3.4.2后就正常运行
校验登录状态
目前登录功能还有问题,当我们登录成功后,再点击我的,会发现还是让我们验证登录。这是因为我们还没完成登录校验功能,所以直接跳转到登录页面。
并且,不仅仅这个登录页面需要进行校验,很多controller都需要这个功能,所以这里可以通过设置一个拦截器,再所有的controller执行之前进行相应地操作。
这里创建一个LoginInterceptor
类,实现HandlerInterceptor
接口,重写其中的两个方法(快捷键shift+alt+p
),前置拦截器和完成处理方法,前置拦截器主要用于登陆之前的权限校验,完成处理方法是用于处理登录后的信息,避免内存泄露
LoginInterceptor
:登录拦截器方法,将session中的信息保存在ThreadLocal中。
1 | public class LoginInterceptor implements HandlerInterceptor { |
UserHolder
:使用ThreadLocal保存用户信息
1 | public class UserHolder { |
MvcConfig
:添加配置类,配置拦截器,并指定放行的请求
1 |
|
/user/me
请求:这里返回登录用户信息,从而完成登录校验(登录校验的逻辑应该在前端实现的,对/me
的ajax请求的返回进行判断,如果没有登录则跳转到指定页面)
1 |
|
此时登录后就能正常显示我的
页面:
番外:ThreadLocal
ThreadLocal可以解释成线程的局部变量,也就是说一个ThreadLocal的变量只有当前自身线程可以访问,别的线程都访问不了,那么自然就避免了线程竞争。
番外:拦截器和过滤器
在上面我们用到了拦截器实现登录校验。但是一直不太清楚拦截器和过滤器具体使用场景和区别。所以这里进行了对比和总结。
这里参考:https://blog.csdn.net/xinzhifu1/article/details/106356958、https://zhuanlan.zhihu.com/p/408809649
过滤器Filter
过滤器的配置比较简单,直接实现Filter 接口即可,也可以通过@WebFilter注解实现对特定URL拦截,看到Filter 接口中定义了三个方法。
init()
:该方法在容器启动初始化过滤器时被调用,它在 Filter 的整个生命周期只会被调用一次。注意:这个方法必须执行成功,否则过滤器会不起作用。doFilter()
:容器中的每一次请求都会调用该方法, FilterChain 用来调用下一个过滤器 Filter。destroy()
: 当容器销毁 过滤器实例时调用该方法,一般在方法中销毁或关闭资源,在过滤器 Filter 的整个生命周期也只会被调用一次
1 |
|
拦截器Interceptor
拦截器它是链式调用,一个应用中可以同时存在多个拦截器Interceptor
, 一个请求也可以触发多个拦截器 ,而每个拦截器的调用会依据它的声明顺序依次执行。需要实现HandlerInterceptor接口。接口中也定义了三个方法:
preHandle()
:这个方法将在请求处理之前进行调用。注意:如果该方法的返回值为false ,将视为当前请求结束,不仅自身的拦截器会失效,还会导致其他的拦截器也不再执行。postHandle()
:只有在preHandle()
方法返回值为true 时才会执行。会在Controller 中的方法调用之后,DispatcherServlet 返回渲染视图之前被调用。postHandle()
方法被调用的顺序跟preHandle()
是相反的,先声明的拦截器preHandle()
方法先执行,而postHandle()
方法反而会后执行。afterCompletion()
:只有在preHandle()
方法返回值为true 时才会执行。在整个请求结束之后, DispatcherServlet 渲染了对应的视图之后执行。
1 | public class MyInterceptor implements HandlerInterceptor { |
然后可以创建一个config类实现WebMvcConfigurer
接口,重写addInterceptors
方法,在其中通过addPathPatterns
、excludePathPatterns
等属性设置需要拦截或需要排除的 URL
1 |
|
区别
过滤器是基于函数回调,拦截器是基于java的反射机制
过滤器属于Servlet级别,拦截器属于Spring级别 ****
- Filter是在javax.servlet包中定义的,要依赖于网络容器,因此只能在web项目中使用。
- Interceptor是SpringMVC中实现的,拦截器是一个Spring组件,由Spring容器进行管理。
执行顺序不同:
- 当一个请求进入Servlet之前,过滤器的
doFilter
方法进行过滤,进入Servlet容器之后,执行Controller方法之前,拦截器的preHandle
方法进行拦截,执行Controller方法之后,视图渲染之前,拦截器的postHandle
方法进行拦截,请求结束之后,执行拦截器的postHandle
方法。
- 当一个请求进入Servlet之前,过滤器的
二者实际开发中的应用场景:
- 拦截器:权限控制(登录校验),日志打印,参数校验
- 过滤器:跨域问题解决,编码转换
隐藏用户敏感信息
在上面的内容中,我们可以在/user/me
的请求返回值中看到如下用户的全部信息,这样会泄露用户的信息。所以这里在返回用户信息前,将其敏感信息进行隐藏。
1 | { |
核心思想就是实现一个UserDto
类,其中只包含用户名,头像和id即可
1 |
|
然后修改UserHolder,将其User类型都换为UserDto
1 | public class UserHolder { |
并且对应修改/login
方法,在将用户信息存入session中时只存UserDto类型
1 | @Override |
同时也要修改拦截器中的方法
1 | @Override |
此时再查看返回的内容,可见不包含敏感信息:
1 | { |
基于Redis实现共享Session登录
集群的Session共享问题
session共享问题:多台Tomcat并不共享session的存储空间,当请求切换到不同tomcat服务器上时导致数据丢失的问题
早期的方案是session拷贝,就是说虽然每个tomcat上都有不同的session,但是每当任意一台服务器的session修改时,都会同步给其他的Tomcat服务器的session,这样的话,就可以实现session的共享了。但是这种方法存在很大的缺点:每台服务器中都有完整的一份session数据,服务器压力过大;session拷贝数据时,可能会出现延迟。
session的替代方案应该满足:数据共享、内存存储和key-value结构。这自然想到Redis
结构设计
- 保存登录的用户信息,可以使用String结构,以JSON字符串来保存,比较直观
- Hash结构可以将对象中的每个字段独立存储,可以针对单个字段做CRUD,并且内存占用更少。(本项目以Hash存储)
对于保存用户手机号和验证码,采用String结构,以手机号为key,以验证码为value。
对于保存用户登录状态,采用Hash结构,以随机token为key存储用户数据,而不是将手机号作为key,因为这会造成用户信息的泄露。
基于Redis实现验证码登录
注入StringRedisTemplate
修改
sendCode
key以手机号存储,同时加上业务逻辑便于查看。并且可以定义一个常量类来替换
login:code:
和2
。
1 |
|
- 常量类
1 | public class RedisConstants { |
- 修改
login
注意:直接按照老师代码会报错,这里将userMap中的value全转化为String类型才行
1 |
|
经过这样的修改后,就可以在redis中看到缓存的数据。但是此时就会校验登录失败,因为之前实现是使用session,这里修改成redis,需要对应的修改。
此外,注意返回时要带上token,因为前端进行了数据判断,如果不携带token就会重新验证码登录
解决状态登录刷新问题
之前设置了用户信息在redis中存活30分钟,但是此时无论用户是否有操作,30分钟一到都会被删除,这是不合理的。这里可以通过拦截器拦截到的请求,来证明用户是否在操作,如果用户没有任何操作30分钟,则token会消失,用户需要重新登录
初步版本
首先通过查看前端请求,发现存的token在请求头里(这部分前端实现)
authorization: 6867061d-a8d0-4e60-b92f-97f7d698a1ca
修改
LoginInterceptor
类注意:这里不能直接注入StringRedisTemplate,因为这个类并没有给String容器管理,所以使用以下方法进行赋值实现。
而在
MvcConfig
中调用时进行依赖的注入,因为这个类标注了@Configuration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44public class LoginInterceptor implements HandlerInterceptor {
private StringRedisTemplate stringRedisTemplate;
public LoginInterceptor(StringRedisTemplate stringRedisTemplate){
this.stringRedisTemplate = stringRedisTemplate;
}
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
//1.获取请求头中的token
String token = request.getHeader("authorization");
if (StrUtil.isBlank(token)){
//不存在,返回401状态码
response.setStatus(401);
return false;
}
//2.基于token获取redis中的用户
String key = RedisConstants.LOGIN_USER_KEY + token;
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
//3.判断用户是否存在
if (userMap.isEmpty()){
//不存在,返回401状态码
response.setStatus(401);
return false;
}
//4.将查询到的Hash数据转为UserDto对象
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
//5.将userDto保存在TheadLocal中
UserHolder.saveUser(userDTO);
//6.刷新token的有效期
stringRedisTemplate.expire(key, RedisConstants.LOGIN_USER_TTL, TimeUnit.MINUTES);
return true;
}
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
//移除用户
UserHolder.removeUser();
}
}在上述操作6中,会设置token数据的有效期,而每一次操作页面就会触发拦截器,所以只要用户在活跃状态,这个用户信息就不会过期。
MvcConfig
类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class MvcConfig implements WebMvcConfigurer {
private StringRedisTemplate stringRedisTemplate;
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new LoginInterceptor(stringRedisTemplate))
.excludePathPatterns(
"/user/code",
"/user/login",
"/blog/hot",
"/shop/**",
"/shop-type/**",
"/upload/**",
"/voucher/**"
);
}
}
此时再运行,可以看到redis数据库中的数据和前端请求头中的authorization
一致。
目前这个拦截器只是拦截需要被拦截的路径,假设当前用户访问了一些不需要拦截的路径,那么这个拦截器就不会生效,所以此时令牌刷新的动作实际上就不会执行,所以这个方案他是存在问题的。
优化版本
可以添加一个拦截器,在第一个拦截器中拦截所有的路径,把第二个拦截器做的事情放入到第一个拦截器中,同时刷新令牌,因为第一个拦截器有了threadLocal的数据,所以此时第二个拦截器只需要判断拦截器中的user对象是否存在即可,完成整体刷新功能
首先新建一个拦截器
RefreshTokenInterceptor
,将之前的拦截器主要逻辑粘在这里,但是并不做拦截操作,而是全部放行,主要功能是获取token将用户信息保存在Threadloacl并且刷新token的有效期。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39public class RefreshTokenInterceptor implements HandlerInterceptor {
private StringRedisTemplate stringRedisTemplate;
public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate){
this.stringRedisTemplate = stringRedisTemplate;
}
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
//1.获取请求头中的token
String token = request.getHeader("authorization");
if (StrUtil.isBlank(token)){
return true;
}
//2.基于token获取redis中的用户
String key = RedisConstants.LOGIN_USER_KEY + token;
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
//3.判断用户是否存在
if (userMap.isEmpty()){
return true;
}
//4.将查询到的Hash数据转为UserDto对象
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
//5.将userDto保存在TheadLocal中
UserHolder.saveUser(userDTO);
//6.刷新token的有效期
stringRedisTemplate.expire(key, RedisConstants.LOGIN_USER_TTL, TimeUnit.MINUTES);
return true;
}
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
//移除用户
UserHolder.removeUser();
}
}修改原来的拦截器,只需要对ThreadLocal中的数据判断是否为空即可,为空则拦截。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21public class LoginInterceptor implements HandlerInterceptor {
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
//判断是否需要拦截(ThreadLocal中是否有用户)
if (UserHolder.getUser()==null){
//没有则需要拦截,设置状态码
response.setStatus(401);
//拦截
return false;
}
//有用户则放行
return true;
}
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
//移除用户
UserHolder.removeUser();
}
}此时修改对应的配置类
MvcConfig
,此时LoginInterceptor
不需要传入stringRedisTemplate了,并且设置ReefreshTokenInterceptor
的注册,设置拦截所有路径。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class MvcConfig implements WebMvcConfigurer {
private StringRedisTemplate stringRedisTemplate;
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new LoginInterceptor())
.excludePathPatterns(
"/user/code",
"/user/login",
"/blog/hot",
"/shop/**",
"/shop-type/**",
"/upload/**",
"/voucher/**"
).order(1);
registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate))
.addPathPatterns("/**").order(0);
}
}注意:这里可以使用
order
函数设置拦截器的执行顺序,我们把RefreshTokenInterceptor
的优先级设为最高。
此时,通过Redis图行界面查看数据的TTL,每次更换访问页面,token的TTL就会刷新。
商户查询缓存
什么是缓存
缓存就是数据交换的缓存区,是存储数据的临时地方,一般读写性能较高。
缓存的作用:
- 降低后端负载
- 提高读写效率,降低响应时间
缓存的成本:
- 数据一致性成本
- 代码维护成本
- 运维成本
添加商户缓存
原本查询方法
启动前端和后端的项目,登陆之后访问一个商户,查看浏览器发送的请求
Request URL: http://localhost:8080/api/shop/1
Request Method: GET
可以看出在ShopController
中的实现,原本的代码直接从数据库进行查询,如下:
1 |
|
添加缓存模型
可以在客户端与数据库之间加上一个Redis缓存,先从Redis中查询,如果没有查到,再去MySQL中查询,同时查询完毕之后,将查询到的数据也存入Redis,这样当下一个用户来进行查询的时候,就可以直接从Redis中获取到数据
对应的代码逻辑如下所示:
代码实现
在Service中实现业务逻辑。
ShopController
1 |
|
IShopService
1 | public interface IShopService extends IService<Shop> { |
小技巧:快捷键
ctrl+alt+B
:直接跳转对应的实现类
ShopServiceImpl
1 |
|
此时商铺信息就会存储在Redis数据库中:
添加商户类型缓存
代码实现
商户类型指的是首页上方的几种类型:
此时点击任意一种类型,会发送如下请求
Request URL: http://localhost:8080/api/shop-type/list
Request Method: GET
可以看出是属于ShopTypeController
中的内容。
1 |
|
这里主要需要缓存的就是查询出的商户列表。像之前一样,还是将业务逻辑写在Service中。
ShopTypeController
1 |
|
ShopTypeServiceImpl
1 |
|
此时可以看到redis数据库中缓存到了商户类型
番外:JSONUtil工具类
这里对JSONUtil
的几个常用方法进行记录:
toBean(JSON json, Type beanType)
:将json转化为指定的实体类toList(String jsonArray, Class<T> elementType )
:将JSONArray字符串转换为Bean的ListtoJsonStr(JSON json)
:将实体类或实体类列表转为JSON字符串
测试:
1 |
|
1 | [{"name":"wzy","age":12},{"name":"yzw","age":23}] |
缓存更新策略
常见更新策略
因为Redis数据在内存中存储,插入太多数据,会导致缓存中数据过多,所以需要对缓存进行更新。常见的更新策略如下:
内存淘汰 | 超时剔除 | 主动更新 | |
---|---|---|---|
说明 | 不用自己维护, 利用Redis的内存淘汰机制, 当内存不足自动淘汰部分数据。 下次查询时更新缓存。 | 给缓存数据添加TTL时间, 到期后自动删除缓存。 下次查询时更新缓存。 | 编写业务逻辑, 在修改数据库的同时, 更新缓存。 |
一致性 | 差 | 一般 | 好 |
维护成本 | 无 | 低 | 高 |
业务场景
- 低一致性需求:使用内存淘汰机制,例如店铺类型的查询缓存)
- 高一致性需求:主动更新,并以超时剔除作为兜底方案,例如店铺详情查询的缓存、优惠券缓存
主动更新策略
缓存数据源来自数据库,如果当数据库中数据发生变化,而缓存却没有同步,此时就导致一致性问题。有以下三种方案:
Cache Aside Pattern
人工编码方式:缓存调用者在更新完数据库之后再去更新缓存,也称之为双写方案Read/Write Through Pattern
:缓存与数据库整合为一个服务,由服务来维护一致性。调用者调用该服务,无需关心缓存一致性问题。但是维护这样一个服务很复杂,市面上也不容易找到这样的一个现成的服务,开发成本高Write Behind Caching Pattern
:调用者只操作缓存,其他线程去异步处理数据库,最终实现一致性。但是维护这样的一个异步的任务很复杂,需要实时监控缓存中的数据更新,其他线程去异步更新数据库也可能不太及时,而且缓存服务器如果宕机,那么缓存的数据也就丢失了
在以上三种方法中,更多采用方案一。
操作缓存和数据库时有三个问题需要考虑:
删除缓存还是更新缓存?
- 更新缓存:每次更新数据库都更新缓存,无效写操作较多。❎
- 删除缓存:更新数据库时让缓存失效,查询时再更新缓存。✅
如何保证缓存与数据库的操作的同步成功或失败?
- 单体系统,将缓存与数据库放在一个事务
- 分布式系统,使用TTC等分布式事务
先操作缓存还是先操作数据库?
先删除缓存:
删除缓存的操作很快,更新数据库的操作相对较慢。在线程1进行修改时,如果此时有一个线程2进来查询缓存,由于刚刚删除缓存,所以线程2需要查询数据库,并写入缓存,但是更新数据库的操作还未完成,所以线程2此时缓存的数据和数据库中的数据不一致,出现线程安全问题。
先删除数据库
线程1在查询缓存的时候,缓存TTL刚好失效,需要查询数据库并写入缓存,这个操作耗时相对较短,但是就在这么短的时间内,线程2进来了,更新数据库,删除缓存,但是线程1虽然查询完了数据(更新前的旧数据),但是还没来得及写入缓存,所以线程2的更新数据库与删除缓存,并没有影响到线程1的查询旧数据,写入缓存,造成线程安全问题
二者相比,后者出现线程安全问题的概率相对较低,所以通常采用先操作数据库,再删除缓存的方案
实现商铺缓存与数据库双写一致
修改ShopController中的业务逻辑,满足以下要求
- 根据id查询店铺时,如果缓存未命中,则查询数据库,并将数据库结果写入缓存,并设置超时时间
- 根据id修改店铺时,先修改数据库,再删除缓存
修改:
修改ShopService的queryById方法,写入缓存时设置一下超时时间
1
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, Time)
修改Controller中的
updateShop
方法1
2
3
4
5
public Result updateShop( { Shop shop)
// 写入数据库
return shopService.update(shop);
}serviceImpl实现对应的update方法
1
2
3
4
5
6
7
8
9
10
11
12
13
public Result update(Shop shop) {
Long id = shop.getId();
if (id==null){
return Result.fail("店铺id不能为空");
}
//1.更新数据库
updateById(shop);
//2.删除缓存
stringRedisTemplate.delete(RedisConstants.CACHE_SHOP_KEY+id);
return Result.ok();
}注意:该方法需要声明为事务,因为涉及sql数据库操作和redis数据库操作,一旦其中有一个错误就回滚。
然后启动项目,点击餐厅,将其数据缓存到Redis中。但是由于目前没有后端提供修改功能,所以这里使用POSTMAN发送PUT请求,请求路径http://localhost:8081/shop,携带JSON数据如下
1
2
3
4
5
6
7
8
9
10
11
12{
"area": "大关",
"openHours": "10:00-22:00",
"sold": 4215,
"address": "金华路锦昌文华苑29号",
"comments": 3035,
"avgPrice": 80,
"score": 37,
"name": "369茶餐厅",
"typeId": 1,
"id": 1
}此时数据库更新成功,同时redis中相关的缓存也被清理了,并且再次点击详情,对应数据的缓存就被保存在redis中。并且
缓存穿透
问题描述和解决方案
缓存穿透:缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远都不会生效(只有数据库查到了,才会让redis缓存,但现在的问题是查不到),会频繁的去访问数据库。
常见的解决方案有两种
缓存空对象(可添加TTL)
优点:实现简单,维护方便
缺点:额外的内存消耗;可能造成短期的不一致
实现思路:当客户端访问不存在的数据时,会先请求redis,但是此时redis中也没有数据,就会直接访问数据库,但是数据库里也没有数据,那么这个数据就穿透了缓存,直击数据库。但是数据库能承载的并发不如redis这么高,所以如果大量的请求同时都来访问这个不存在的数据,那么这些请求就会访问到数据库,简单的解决方案就是哪怕这个数据在数据库里不存在,我们也把这个这个数据存在redis中去,存为空对象,这样下次用户过来访问这个不存在的数据时,redis缓存中也能找到这个数据,不用去查数据库,但这样就造成了额外的内存消耗。短期不一致是指在空对象的存活期间,如果更新了数据库,但由于空对象的TTL还没过,所以当用户来查询的时候,查询到的还是空对象,等TTL过了之后,才能访问到正确的数据。
布隆过滤
- 优点:内存占用较少,没有多余的key
- 缺点:实现复杂;可能存在误判
- 实现思路:布隆过滤器采用的是哈希思想,并不是将数据库中的内容都保存到这里,而是通过一个二进制数组,根据哈希思想去判断当前这个要查询的数据是否存在,如果布隆过滤器判断存在,则放行,这个请求会去访问redis,从数据库中查询到数据之后,再将其放到redis中。如果布隆过滤器判断这个数据不存在,则直接返回。优点在于节约内存空间,但存在误判,误判的原因在于:布隆过滤器基于哈希思想,只要是哈希思想,都可能存在哈希冲突
解决商铺查询缓存穿透
商铺查询缓存穿透流程图:
思路:如果这个数据不存在,将这个数据写入到Redis中,将value设置为空字符串,然后设置一个较短的TTL,返回错误信息。当再次发起查询时,先去Redis中判断value是否为空字符串,如果是空字符串,则说明是不存在的数据,直接返回错误信息
1 |
|
抽取为一个方法
这里先把之前写的缓存穿透代码修改一下,提取成一个独立的方法queryWithPassThrough
,以免全写在queryById中,不便于阅读。同时方便后续功能的增加,更有区分性。
1 | public Shop queryWithPassThrough(Long id){ |
修改后的的queryById
1 |
|
总结
缓存穿透产生的原因是什么?
- 用户请求的数据在缓存中和在数据库中都不存在,不断发起这样的请求,会给数据库带来巨大压力
缓存穿透的解决方案有哪些?
- 缓存null值
- 布隆过滤
- 增强id复杂度,避免被猜测id规律(可以采用雪花算法)
- 做好数据的基础格式校验
- 加强用户权限校验
- 做好热点参数的限流
缓存雪崩
缓存雪崩是指在同一时间段,大量缓存的key同时失效,或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。
解决方案:
- 给不同的Key的TTL添加随机值。
- 让其在不同时间段分批失效
- 利用Redis集群提高服务的可用性。
- 使用一个或者多个哨兵实例组成的系统,对redis节点进行监控,在主节点出现故障的情况下,能将从节点中的一个升级为主节点,进行故障转义,保证系统的可用性。
- 给缓存业务添加降级限流策略。
- 给业务添加多级缓存。
- 浏览器访问静态资源时,优先读取浏览器本地缓存;访问非静态资源(ajax查询数据)时,访问服务端;请求到达Nginx后,优先读取Nginx本地缓存;如果Nginx本地缓存未命中,则去直接查询Redis(不经过Tomcat);如果Redis查询未命中,则查询Tomcat;请求进入Tomcat后,优先查询JVM进程缓存;如果JVM进程缓存未命中,则查询数据库。
缓存击穿
缓存击穿也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,那么无数请求访问就会在瞬间给数据库带来巨大的冲击
例如:秒杀商品的key突然失效了,大家都在疯狂抢购,那么这个瞬间就会有无数的请求访问去直接抵达数据库,从而造成缓存击穿
解决方案
常见的解决方案:
互斥锁
流程:
在第一个线程缓存未命中时,首先会获取到一个互斥锁,然后继续后续的数据库查询重建任务,在此期间,如果有其他线程进来,在缓存查询未命中时,首先会尝试获取互斥锁,此时因为线程1还没释放锁,所以线程2获取失败,然后休眠一段时间后循环上述操作,直到线程1释放了锁,此时线程2就能获取到缓存,从而解决了上述问题
原理示意:
逻辑过期
流程:
之所以会出现缓存击穿问题,主要原因是对key设置了TTL,如果不设置TTL,就不会出现缓存击穿,但是就会占用过多内存,所以采用逻辑过期方案。也就是实际上没有设置TTL,但是在缓存的字段中新加了
expire
并指定了时间。在查询时,会判断逻辑时间是否过期,如果过期就会重新查询数据库进行重建。在重建完成前,其他线程都直接获取旧的缓存,直到锁的释放原理示意:
解决方案 | 优点 | 缺点 |
---|---|---|
互斥锁 | 没有额外的内存消耗; 保证一致性; 实现简单 | 线程需要等待,性能受影响; 可能有死锁风险 |
逻辑过期 | 线程无需等待,性能较好 | 不保证一致性; 有额外内存消耗; 实现复杂 |
互斥锁使用示例
修改根据id查询商铺信息的业务,基于互斥锁方式来解决缓存击穿的问题。流程如下:
这里锁的获取和释放采用的是Redis中的setnx
命令实现:
- 获取锁:
setnx lock [value] [TTL]
- 当一个线程使用该命令进行指定key(lock)缓存的写入时,其他线程就无法操作这个lock
- 这里设置TTL是防止忘记释放锁而导致无法后续操作
- 释放锁:
del lock
- 当前线程执行完后,需要释放锁,可以直接将该缓存数据删除即可。
这里我们在ShopServiceImpl
中新建两个方法,实现获取锁和释放锁的方法:
这里的
setIfAbsent
就对应Redis命令setnx
1 | //缓存击穿:获取锁 |
类似之前的缓存穿透,我们这里也把互斥锁解决缓存击穿抽离为一个方法。
1 | public Shop queryWithMutex(Long id){ |
这里用到了try-catch去捕获线程sleep的异常。可以在选择需要包括的code后使用快捷键
ctrl+alt+T
快速实现功能。
此时queryById
方法如下:
1 |
|
JMeter并发测试
使用教程参考:https://zhuanlan.zhihu.com/p/64847409
官网:https://jmeter.apache.org/download_jmeter.cgi
操作如下:
- 首先将Redis中的热点商品数据删除,模拟TTL到期,然后用Jmeter进行压力测试,开1000个线程来访问这个没有缓存的热点数据
- 创建线程组,设置线程数和间隔时间。
右键点击“Test Plan测试计划”-“Add添加”-“Threads(Users)线程(用户)”-“Thread Group线程组”
- 然后设置HTTP请求,设定服务器协议,请求ip及端口号,设置请求方式和请求路径,修改编码为UTF-8
右键点击“Thread Group线程组” - “Add添加”-“Sampler” - “HTTP Request”
- 然后添加添加结果树视图/聚合视图
右键点击“Thread线程组”-“Add添加”-“Listener监听器”-“View Results Tree结果树视图”及“Summary Report聚合视图”
然后点击上方小三角即可进行并发测试。此时查看结果树视图、聚合视图结果如下:
结果树视图:
可以看出,请求都顺利执行
聚合视图:
可以看到请求的各项参数
此时查看控制台输出,可以看出,执行1000个线程查询请求,后台只输出了一次sql数据库查询。这说明互斥锁有效,降低了数据库的查询量。
1 | ==> Preparing: SELECT id,name,type_id,images,area,address,x,y,avg_price,sold,comments,score,open_hours, create_time,update_time FROM tb_shop WHERE id=? |
逻辑过期使用示例
需求:修改根据id查询商铺的业务,基于逻辑过期方式来解决缓存击穿的问题。
过程:从Redis中查询缓存判断是否命中。如果没有命中则直接返回空数据,不查询数据库;如果命中,则将缓存中数据的value取出,判断value中的过期时间是否满足。如果没有过期,则直接返回Redis中的数据;如果过期,该线程直接返回之前的数据并获取互斥锁,然后新创建一个线程独去查询数据库以重构缓存数据,重构完成后再释放互斥锁。
流程:
因为原本的Shop实体类并没有这个过期时间属性,为了不对原本的代码进行入侵(直接在Shop中添加字段),所以这里新建一个实体类
RedisData
,包含原有数据和过期时间1
2
3
4
5
public class RedisData {
private LocalDateTime expireTime;
private Object data;
}然后再ShopServiceImlp中实现将shop数据和过期时间封装到redisData中
1
2
3
4
5
6
7
8
9
10public void saveShop2Redis(Long id, Long expireSeconds){
//1.查询店铺数据
Shop shop = getById(id);
//2.封装逻辑过期时间
RedisData redisData = new RedisData();
redisData.setData(shop);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
//3.写入Redis
stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY+id, JSONUtil.toJsonStr(redisData));
}然后这里进行测试,看看是否能成功写入redis,这里在测试类中进行:
1
2
3
4
5
6
7
private ShopServiceImpl shopService;
void testSaveShop(){
shopService.saveShop2Redis(1L, 100L); //id=1,过期时间=当前时间+100s
}Redis数据库中的缓存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20{
"data": {
"area": "大关",
"openHours": "10:00-22:00",
"sold": 4215,
"images": "https://qcloud.dpfile.com/pc/jiclIsCKmOI2arxKN1Uf0Hx3PucIJH8q0QSz-Z8llzcN56-_QiKuOvyio1OOxsRtFoXqu0G3iT2T27qat3WhLVEuLYk00OmSS1IdNpm8K8sG4JN9RIm2mTKcbLtc2o2vfCF2ubeXzk49OsGrXt_KYDCngOyCwZK-s3fqawWswzk.jpg,https://qcloud.dpfile.com/pc/IOf6VX3qaBgFXFVgp75w-KKJmWZjFc8GXDU8g9bQC6YGCpAmG00QbfT4vCCBj7njuzFvxlbkWx5uwqY2qcjixFEuLYk00OmSS1IdNpm8K8sG4JN9RIm2mTKcbLtc2o2vmIU_8ZGOT1OjpJmLxG6urQ.jpg",
"address": "金华路锦昌文华苑29号",
"comments": 3035,
"avgPrice": 80,
"updateTime": 1689337040000,
"score": 37,
"createTime": 1640167839000,
"name": "369茶餐厅",
"x": 120.149192,
"y": 30.316078,
"typeId": 1,
"id": 1
},
"expireTime": 1689424146614
}可以看出,保存的缓存中多了过期时间字段
然后就是类似上面互斥锁,也新建一个方法单独实现逻辑过期功能
注意:因为这里代码实现是如果reids中不存在缓存,则直接返回null,这样会导致前端加载商铺信息失败,所以需要提前进行redis的缓存存入,具体操作如上述2操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51//这里需要声明一个线程池,因为下面需要新建一个现成来完成重构缓存
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
/**
* queryById:解决缓存击穿:使用逻辑过期
* @param id
* @return
*/
public Shop queryWithExpire(Long id){
//1.从Redis中查询商铺缓存
String key = RedisConstants.CACHE_SHOP_KEY + id;
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2.判断是否存在
if (StrUtil.isBlank(shopJson)){
//不存在直接返回null
return null;
}
//3.命中,将json反序列化为对象
RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
//4.判断是否过期
if (LocalDateTime.now().isBefore(redisData.getExpireTime())){
//4.1如果未过期则直接返回数据
return shop;
}
//4.2 过期,则需要缓存重建
//5.缓存重建
//5.1 获取互斥锁
String lockKey = RedisConstants.LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
//5.2 判断是否获取锁成功
if (isLock){
//成功则开启独立线程,并返回旧的店铺信息
CACHE_REBUILD_EXECUTOR.submit(()->{
try {
//重建缓存(这里过期时间设置得短,为了便于测试)
this.saveShop2Redis(id, 30L);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
//释放锁
unlock(lockKey);
}
});
//直接返回商铺信息
return shop;
}
//未获取到锁,直接返回商铺信息
return shop;
}ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
这里用到的是Java中的一个常用线程池类——newFixedThreadPool。创建的线程池是一个固定大小的线程池,线程池中的线程数量是固定的,由构造函数传入的参数指定,而任务队列的大小则由内部的阻塞队列来决定。
这里我们模拟场景,在此之前,我们已经将餐厅数据缓存到redis,但是缓存时间早已过期,这时,我们去数据库修改餐厅的数据,(这里是修改餐厅名称)。这样逻辑过期前和逻辑过期后的数据就不一致,当用户来访问数据的时候,需要花时间来进行重构缓存数据,但是在重构完成之前,都只能获得脏数据(也就是修改前的数据),只有当重构完毕之后,才能获得新数据(修改后的数据)。
然后此时在缓存重建部分内容人为添加上延时,便于观察重建期间其他线程访问到的数据内容
1 | public void saveShop2Redis(Long id, Long expireSeconds) throws InterruptedException { |
此时使用JMeter进行100个线程访问测试:
在前面部分线程访问到的都是旧数据,在线程25以后访问到的就是更新后的数据。并且通过IDEA的控制台输出可以看出只进行了一次SQL查询。
缓存工具封装
无论是解决缓存穿透还是缓存击穿,其逻辑都是复杂的,如果每次开发都重写这些逻辑,就会有些复杂,所以这里将其封装为工具类
基于StringRedisTemplate封装一个缓存工具类,需满足下列要求
- 方法1:将任意Java对象序列化为JSON,并存储到String类型的Key中,并可以设置TTL过期时间
- 方法2:将任意Java对象序列化为JSON,并存储在String类型的Key中,并可以设置逻辑过期时间,用于处理缓存击穿问题
- 方法3:根据指定的Key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
- 方法4:根据指定的Key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
流程:
这里在util包中创建
CacheClinet
让后将上述内容写在该类中即可
1
2
3
4
5
6
7
8
9
public class CacheClient {
private final StringRedisTemplate stringRedisTemplate;
public CacheClient(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
}方法一
1
2
3
4//方法1:将**任意**Java对象序列化为JSON,并存储到String类型的Key中,并可以设置TTL过期时间
public void set(String key, Object value, Long time, TimeUnit unit){
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), time, unit);
}方法二
1
2
3
4
5
6
7
8
9// 方法2:将任意Java对象序列化为JSON,并存储在String类型的Key中,并可以设置逻辑过期时间,用于处理缓存击穿问题
public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit) {
//设置逻辑过期
RedisData redisData = new RedisData();
redisData.setData(value);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));
//写入redis
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
}方法三
该方法直接在之前
queryWithPassThrough
的基础上进行修改。- 因为需要改成通用方法,所以这里返回类型不能是Shop,而是需要返回一个泛型,所以方法上定义泛型和返回类型
<R> R
,并且通过参数Class<R> type
传递需要处理的类型 - 传入id的类型也不一定是Long,所以也是用泛型,定义在前面。
- 然后此时缓存前缀也会随着任务的不同而不同,所以也需要调用者自己输入,所以这里也抽离称为一个参数
String keyPrefix
- 因为具体的数据库查询逻辑也不同,所以在参数列表中加入一个查询数据库逻辑的函数
Function<ID, R> dbFallback
,传递id和处理的类型。然后调用apply
方法调用传递的函数,返回对应类型的数据r。 - 以及传入TTL的两个参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30// 方法3:根据指定的Key查询缓存,并反序列化为指定类型,利用**缓存空值**的方式解决**缓存穿透**问题
public <R, ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) {
//1.从Redis中查询商铺缓存
String key = keyPrefix + id;
String json = stringRedisTemplate.opsForValue().get(key);
//2.判断是否存在
if (StrUtil.isNotBlank(json)) { //isNotBlank中如果是null、""都会返回false
//3.存在,直接返回
return JSONUtil.toBean(json, type);
}
//[new]判断命中的是否为空值。
if (json != null) {
return null;
}
//4.不存在,根据id查询数据库
R r = dbFallback.apply(id);
//5.不存在,返回错误
if (r == null) {
//[new]将空值写入redis
stringRedisTemplate.opsForValue().set(key, "", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);
return null;
}
//6.存在,写入Redis
this.set(key, r, time, unit);
//7.返回
return r;
}然后修改
ShopServiceImpl
中的queryById
方法,修改缓存击穿的实现1
2
3
4
5
6
7
8
9
10
11
12
13
14
private CacheClient cacheClient;
public Result queryById(Long id) {
//调用封装工具解决缓存击穿
Shop shop = cacheClient.queryWithPassThrough(RedisConstants.CACHE_SHOP_KEY, id, Shop.class, this::getById, RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
if (shop == null){
return Result.fail("店铺不存在");
}
return Result.ok(shop);
}- 因为需要改成通用方法,所以这里返回类型不能是Shop,而是需要返回一个泛型,所以方法上定义泛型和返回类型
方法四
类似上面的逻辑,传递的参数都一致。只不过这里需要将线程获取、获取锁和释放锁的代码拿过来。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61// 方法4:根据指定的Key查询缓存,并反序列化为指定类型,需要利用**逻辑过期**解决**缓存击穿**问题
//这里需要声明一个线程池,因为下面需要新建一个现成来完成重构缓存
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
//[缓存击穿-互斥锁]:获取锁
private boolean tryLock(String key) {
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
//避免返回值为null,我们这里使用了BooleanUtil工具类
return BooleanUtil.isTrue(flag);
}
//[缓存击穿-互斥锁]:释放锁
private void unlock(String key) {
stringRedisTemplate.delete(key);
}
public <R, ID> R queryWithLogicalExpire(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit){
//1.从Redis中查询商铺缓存
String key = keyPrefix + id;
String json = stringRedisTemplate.opsForValue().get(key);
//2.判断是否存在
if (StrUtil.isBlank(json)){
//不存在直接返回null
return null;
}
//3.命中,将json反序列化为对象
RedisData redisData = JSONUtil.toBean(json, RedisData.class);
R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);
//4.判断是否过期
if (LocalDateTime.now().isBefore(redisData.getExpireTime())){
//4.1如果未过期则直接返回数据
return r;
}
//4.2 过期,则需要缓存重建
//5.缓存重建
//5.1 获取互斥锁
String lockKey = RedisConstants.LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
//5.2 判断是否获取锁成功
if (isLock){
//成功则开启独立线程,并返回旧的店铺信息
CACHE_REBUILD_EXECUTOR.submit(()->{
try {
//[]重建缓存:查询数据库
R r1 = dbFallback.apply(id);
//[]重建缓存:写入Redis
this.setWithLogicalExpire(key, r1, time, unit);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
//释放锁
unlock(lockKey);
}
});
//直接返回商铺信息
return r;
}
//未获取到锁,直接返回商铺信息
return r;
}注意:这部分内容不能直接执行,因为这个方法会判断过期时间字段,如果缓存没有这个字段就会查询失败。所以执行前先使用测试方法添加一个缓存信息。
1
2
3
4
5
6
7
8
private CacheClient cacheClient;
public void testLogicalExpire(){
Shop shop = shopService.getById(1L);
cacheClient.setWithLogicalExpire(RedisConstants.CACHE_SHOP_KEY+1L, shop, 10L, TimeUnit.SECONDS);
}
至此,之前在ShopServiceImpl
中实现的缓存击穿、缓存穿透的代码就不需要了,直接调用写好的CacheClient
工具类即可,并且更加灵活,应对不同的输入类型和操作逻辑。
优惠券秒杀
全局唯一ID
背景:这里以购物app中的优惠券为例,当用户下单抢购优惠券后,对应的订单就会保存在 user_voucher_order
,因为对应的订单号的id需要展示给用户,如果使用数据库自增id就会出现一些问题:
- id的规律性太明显
- 如果id有明显的规律,会被用户或者竞争对手从中获取到敏感信息,比如商城一天之内的销售量
- 受单表数据量的限制
- 随着商城规模的扩大,mysql单表数据量不宜超过500W,这时过多的订单就要通过拆库拆表实现,从逻辑上讲,这些属于一张表,所以id不能重复,因此要保证id的唯一性
这就引入了全局ID生成器,这是一种在分布式系统下用来生成全局唯一ID的工具,一般需要满足下列特征:
- 唯一性
- 高可用
- 高性能
- 递增性
- 安全性
全局唯一ID生成策略:
- UUID
- Redis自增
- snowflake算法
- 数据库自增
ID组成部分
- 符号位:1bit,永远为0
- 时间戳:31bit,以秒为单位,可以使用69年(2^31秒约等于69年)
- 序列号:32bit,秒内的计数器,支持每秒传输2^32个不同ID
Redis自增
Redis自增策略:
- 每天一个key,方便统计销售量;同时也避免超过存储数据上限(incr自增的值限制在64位)
- ID构造是时间戳+计数器
代码实现:
这里首先在utils包下新建一个
RedisIdWorker
类
这里使用Redis的INCR
递增函数实现,该命令会将key中存储的数字值增加一。如果key不存在,那么key值就会先被初始化为0,再执行incr命令。本操作的值限制在 64 位(bit)有符号数字表示之内。对应的SpringRedisTemplate中对应的是opsForString.increment()
- 首先获取起始时间的秒数
1 | public static void main(String[] args) { |
然后进行唯一id的获取,将上面得到的其实时间定义为一个常量。
注意:在生成序列号的部分,之所以获取当天日期,是为了便于后续统计当天、当月或当年的销售量,便于区分
1 | public class RedisIdWorker { |
实现优惠券秒杀下单
每个店铺都可以发布优惠券,分为平价券和特价券。平价券可以随意购买,特价券需要秒杀抢购
这两种代金券对应数据库中的两个表:
平价券
tb_voucher
表结构如下:
特价券
tb_seckill_voucher
表结构如下:
对比:
- 可以看出平价券并没有库存字段,而特价券除了具有优惠券的基本信息外(通过关联优惠券id),还有库存、生效时间、失效时间等字段
- 并且平价券的字段中包含了type,0代表普通券,1代表秒杀券
添加优惠券
因为这时数据库中还没有秒杀券,这里进行添加逻辑分析和数据添加(这部分代码项目中已经实现好了)
VoucherController
中的优惠券添加方法。新增普通券,只是将普通券的信息保存到表中
1 | /** |
新增秒杀券位于addSeckillVoucher
中的业务逻辑
1 | /** |
这里对应的addSeckillVoucher
方法如下:
秒杀券可以看做是一种特殊的普通券,将普通券信息保存到普通券表中,同时将秒杀券的数据保存到秒杀券表中,通过券的ID进行关联
1 |
|
因为该项目没有实现后端管理界面,所以使用postman模拟发送请求来新增秒杀券,请求路径为localhost:8081/voucher/seckill
, 请求方式POST,JSON数据如下。注意优惠券的截止日期设置,若优惠券过期,则不会在页面上显示。
1 | { |
页面显示如下:
秒杀实现
此时点击限时抢购
,发送如下请求
Request URL:http://localhost:8080/api/voucher-order/seckill/10
Request Method: POST
可以看出是VoucherOrderController
中的seckill/{id}
请求
1 |
|
下单时需要判断两点:
- 秒杀是否开始或者结束,如果尚未开始或已经结束则无法下单
- 库存是否充足,不足则无法下单
代码实现:
依旧是修改controller中的方法,调用service中的实现,然后在serviceImpl中实现对应功能:
1 |
|
注意:这里扣减库存逻辑采用
setSql
实现:在此之前,我们如果想要更新数据库中的某条数据的某个字段,需要前先查询出来再更新。但若直接使用setSql
则相当于直接更新。以下述为例:
1
2
3
4 seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id",voucherId)
.update();相当于sql语句:
1 UPDATE 'voucher' SET 'stock' = stock - 1 WHERE 'voucher_id' = voucherId;
结果:
可以看到抢购成功,并且刷新后库存减一
超卖问题
场景模拟
之前的代码其实是有问题的,当遇到高并发场景时,会出现超卖现象,可以用Jmeter开200个线程来模拟抢优惠券的场景(此时券只有100个),URL为 localhost:8081/voucher-order/seckill/10
,请求方式为POST
但是这样直接去请求会出现问题,所有的http请求都失败了。这是因为没有携带登录的token ,可以从redis缓存或者浏览器中获取,添加到http请求中。
执行完后,会发现出现了109个订单,并且秒杀券的数量变成了-9,说明超卖了9张。
问题分析|悲、乐观锁
之所以出现超卖问题,是因为代码中首先会查询库存数量,然后进行库存判断和扣减,在这个过程中,可能涉及多个线程同时操作,造成了多线程并发问题。如下图所示:
针对这一问题的常见解决方案就是加锁:而对于加锁,我们通常有两种解决方案:
- 悲观锁
- 悲观锁认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行
- 例如Synchronized、Lock等,都是悲观锁
- 优点:简单粗暴
- 缺点:性能一般
- 乐观锁
- 乐观锁认为线程安全问题不一定会发生,因此不加锁,只是在更新数据的时候再去判断有没有其他线程对数据进行了修改
- 如果没有修改,则认为自己是安全的,自己才可以更新数据
- 如果已经被其他线程修改,则说明发生了安全问题,此时可以重试或者异常
- 优点:性能好
- 缺点:存在成功率低的问题
- 乐观锁认为线程安全问题不一定会发生,因此不加锁,只是在更新数据的时候再去判断有没有其他线程对数据进行了修改
悲观锁比较常用,这里不做赘述,这里具体介绍乐观锁
乐观锁的关键是判断之前查询得到的数据是否被修改过,常见有两种方式:
版本号法:
乐观锁会有一个版本号,每次操作数据会对版本号+1,再提交回数据时,会比较之前查询到的版本号和目前保存的版本号,如果一致,则进行操作成功,如果不一致,则数据被修改过。
CAS(Compare-And-Set)法:
类似上面的方法,只不过不需要版本号,而是判断查询到的库存是否和实际库存一致。
代码实现
初步实现:
1 | @Override |
上述逻辑:扣减库存时的库存和之前查询到的库存是一样的,就意味着没有人在中间修改过库存,那么此时就是安全的,但是以上这种方式通过测试发现会有很多失败的情况,失败的原因在于:在使用乐观锁过程中假设100个线程同时都拿到了100的库存,然后大家一起去进行扣减,但是100个人中只有1个人能扣减成功,其他的人在处理时,他们在扣减时,库存已经被修改过了,所以此时其他线程都会失败。但实际上有其他线程修改是没有问题的,只要stock的数目大于0就行。
完善版本:
实际上就是在前面判断了库存大于0的基础上,在后面准备修改数据时在判断库存是否大于0.
1 | @Override |
这时再测试,就会发现优惠券正好被抢光。
一人一单
所谓一人一单就是修改秒杀业务,要求同一个优惠券,一个用户只能抢一张
实现逻辑也很简单,在判断库存是否充足之后,根据用户id和优惠券id查询订单,判断用户订单是否已存在。如果已存在,则不能下单,返回错误信息;如果不存在,则继续下单,获取优惠券。
初步实现
1 | @Override |
此时使用JMeter进行多线程购买测试,但是因为http头携带的token是同一个用户的,所以按说执行200个线程只有一个订单。但实际运行后,发现还是会出现多个订单。其实发生的原因是一样的,还是因为在一人一单逻辑之前,如果进来了多个线程,还是可以抢多张优惠券的,这里使用悲观锁来解决这个问题。
进一步优化(加锁)
核心思想:把一人一单逻辑之后的代码都提取到一个createVoucherOrder
方法中,然后给这个方法加锁
1 |
|
注意:
- 这里获取锁并没有放在方法上,而是放在获取了用户id之后
- 这是因为如果放在方法上,所有的用户都要公用这一把锁,导致每个线程进来都会被锁住,串行执行,效率很低
- 这里的锁采用用户的id实现,但是为什么要使用
userId.toString().intern()
?
- 这是因为
toString
的源码是new String,所以如果只用userId.toString()
会拿到不同的地址引用,所以需要使用intern()
:如果字符串常量池中已经包含了一个等于这个string对象的字符串(由equals(object)方法确定),那么将返回池中的字符串。否则,将此String对象添加到池中,并返回对此String对象的引用。- 具体解释:https://blog.csdn.net/weixin_44201525/article/details/120897052
原来的方法变为:
此时这个方法就不需要事务注解了
1 |
|
最终版本(事务)
但是以上代码还是存在问题,问题的原因在于当前方法被Spring事务控制,如果在内部加锁,可能会导致当前方法事务还没有提交,但是锁已经释放了,这样也会导致其他线程的进入,从而引发线程安全问题,所以需要使用锁将当前方法整体包裹起来,确保事务不会出现问题。
所以还是在函数外加锁,外部传递用户id给锁:
createVoucherOrder
函数
1 |
|
在调用函数处加锁
1 |
|
此时,只有在事务完成后才会释放锁,因此解决了线程安全问题。但是,此时还存在事务的问题。因为调用的函数声明了事务,但是上述方法不涉及事务。这是因为调用的createVoucherOrder
方法,其实是this.的方式调用的,事务想要生效,需要利用代理来生效,所以这个地方,需要获得原始的事务对象, 来操作事务,这里可以使用AopContext.currentProxy()
来获取当前对象的代理对象,然后再用代理对象调用方法,从而使这个方法被spring事务管理。
注意:这里需要去
IVoucherOrderService
中创建createVoucherOrder
方法
1 | Long userId = UserHolder.getUser().getId(); |
该方法会对应依赖,需要导入。导入完成记得刷新!
1 | <!--创建代理事务--> |
同时需要启动类上加上@EnableAspectJAutoProxy(exposeProxy = true)
注解,来暴露这个代理对象
1 |
|
重启服务器,再次使用JMeter测试,100个线程并发,但是只能抢到一张优惠券,实现一人一单.
并发安全问题
通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了
我们将服务启动两份,端口分别为8081和8082:
- 首先IEAD锤子图标右边选择
edit Configuration
- 然后点击本项目,然后点击复制上方图标
- 然后点击
Modify options
–>Add VM option
,填写指定端口即可-Dserver.port=8081
和-Dserver.port=8082
- 首先IEAD锤子图标右边选择
然后修改nginx的config目录下的nginx.conf文件,配置反向代理和负载均衡(默认轮询就行)
这部分内容在瑞吉外卖优化篇有介绍,参考以前博客
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51worker_processes 1;
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/json;
sendfile on;
keepalive_timeout 65;
server {
listen 8080;
server_name localhost;
# 指定前端项目所在的位置
location / {
root html/hmdp;
index index.html index.htm;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
location /api {
default_type application/json;
#internal;
keepalive_timeout 30s;
keepalive_requests 1000;
#支持keep-alive
proxy_http_version 1.1;
rewrite /api(/.*) $1 break;
proxy_pass_request_headers on;
#more_clear_input_headers Accept-Encoding;
proxy_next_upstream error timeout;
#proxy_pass http://127.0.0.1:8081; # modify-1
proxy_pass http://backend;
}
}
upstream backend {
server 127.0.0.1:8081 max_fails=5 fail_timeout=10s weight=1;
server 127.0.0.1:8082 max_fails=5 fail_timeout=10s weight=1; # modify-2
}
}
具体测试:使用POSTMAN
发送两次请求,header携带同一用户的token,尝试用同一账号抢两张优惠券,结果出现了同一用户下了两次单。
原因分析:部署了多个Tomcat时,每个Tomcat都有一个属于自己的JVM
,假设在JVM1
的内部,有两个线程:线程1和线程2,在JVM
内部有一个锁监视器来管理锁的线程所有者,所以线程1和线程2可以实现互斥的。但是如果在JVM2
,又有两个线程,但是他们的锁对象虽然写的和JVM1
一样,但是锁对象却不是同一个,因为此时锁监视器变了,所以线程3和线程4可以实现互斥,但是却无法和线程1和线程2互斥。
这就是集群环境下,syn锁失效的原因,在这种情况下,需要使用分布式锁来解决这个问题,让锁不存在于每个jvm的内部,而是让所有jvm公用外部的一把锁(Redis)
分布式锁
分布式锁:满足分布式系统或集群模式下多线程可见并且可以互斥的锁。此外,还需要具有高可用、高性能、安全性等特性。
分布式锁的核心思想就是让大家共用同一把锁,那么就能锁住线程,不让线程进行,让程序串行执行,以下是原理示意:
常见的分布式锁有三种:
MySQL | Redis | Zookeeper | |
---|---|---|---|
互斥 | 利用mysql本身的互斥锁机制 | 利用setnx这样的互斥命令 | 利用节点的唯一性和有序性实现互斥 |
高可用 | 好 | 好 | 好 |
高性能 | 一般 | 好 | 一般 |
安全性 | 断开连接,自动释放锁 | 利用锁超时时间,到期释放 | 临时节点,断开连接自动释放 |
分布式锁获取和释放
实现分布式锁时需要实现两个基本方法:
获取锁:
互斥:确保只能有一个线程获取锁
非阻塞:尝试一次,成功返回true,失败返回false
实现:
老方法,这种方法添加锁和设置过期时间是两步实现的,有可能在设置过期时间前就出现问题,还会造成死锁。
1
2
3
4# 添加锁
SETNX lock thread01
# 添加锁过期时间,避免服务器宕机引起的死锁
EXPIRE lock 10新方法,使用redis中的方法,同时添加锁和过期时间
1
2# 添加锁的同时设置过期时间 NX是互斥,EX是设置过期时间
SET lock thread01 NX EX 10
释放锁:
手动释放
超时释放:获取锁的时候添加一个超时时间
实现:
1
2# 释放锁,删除即可
DEL lock
初步实现分布式锁
首先给出ILock接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14public interface ILock {
/**
* 尝试获取锁
*
* @param timeoutSec 锁持有的超时时间,过期自动释放
* @return true表示获取锁成功,false表示获取锁失败
*/
boolean tryLock(long timeoutSec);
/**
* 释放锁
*/
void unlock();
}新建
SimpleRedisLock
类集成ILock,实现对应方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29public class SimpleRedisLock implements ILock{
private StringRedisTemplate stringRedisTemplate;
//锁的前缀
private static final String KEY_PREFIX = "lock:";
//具体业务名称,将前缀和业务名拼接之后当做Key
private String name;
public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
this.stringRedisTemplate = stringRedisTemplate;
this.name = name;
}
public boolean tryLock(long timeoutSec) {
//获取线程标识
long threadId = Thread.currentThread().getId();
//获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX+name, threadId+"", timeoutSec, TimeUnit.SECONDS);
//自动拆箱可能会出现null,这样写避免空指针
return success.TRUE.equals(success);
}
public void unlock() {
stringRedisTemplate.delete(KEY_PREFIX+name);
}
}修改之前一人一单的业务逻辑,替换
synchronized
为自己写的redis锁1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41@Override
public Result seckillVoucher(Long voucherId) {
//1.查询优惠券
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
//2.判断秒杀是否开始
if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀尚未开始");
}
//3.判断秒杀是否结束
if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {
return Result.fail("秒杀已经结束");
}
//4.判断库存是否充足
if (seckillVoucher.getStock() < 1) {
return Result.fail("库存不足");
}
Long userId = UserHolder.getUser().getId();
- synchronized (userId.toString().intern()){
- IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();
- return proxy.createVoucherOrder(voucherId);
- }
+ //[redis实现分布式锁]
+ SimpleRedisLock lock = new SimpleRedisLock(stringRedisTemplate, "order:" + userId);
- boolean tryLock = lock.tryLock(5);
+ if (!tryLock){
+ return Result.fail("一人只能下一单");
+ }
+ //获取代理对象(事务)
+ try {
+ IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();
+ return proxy.createVoucherOrder(voucherId);
+ } catch (IllegalStateException e) {
+ throw new RuntimeException(e);
+ } finally {
+ lock.unlock();
+ }
}
锁的超时释放问题
问题分析
上述完成了redis分布式锁的功能,在大部分场景下都能正常工作,但也会出现极端情况:
- 线程1首先获取到锁,但是由于业务问题,造成了阻塞,所以Redis锁超时释放,然后此时线程2过来就能获取到锁,执行自己的业务逻辑,但线程2未完成前,线程1业务完成,于是释放了锁,注意:此时线程1将线程2的锁释放了。接着线程3过来,发现锁被释放了,于是也获取了锁,因此,相当于线程2和线程3都获取到了锁。
- 上述问题主要是线程1释放了线程2的锁,关键在于缺少锁的线程标识。解决这个问题就是在释放锁的时候,都判断一下这个锁是不是自己的,如果不属于自己,则不进行删除操作。
代码实现
主要逻辑:
- 在获取锁时存入线程标识
- 在释放锁的时候先获取锁的线程标识,判断与当前线程标识是否一致。一致则可以释放锁
1 | public class SimpleRedisLock implements ILock { |
分布式锁的原子性问题
虽然上面已经对超时释放问题进行了解决,但是极端情况下还会出现下面的问题:
- 线程1执行完任务后进行锁标识的判断,判断一致,刚准备释放锁的时候出现了阻塞,此时redis锁超时释放,然后线程2获取到了锁,执行业务时线程1的阻塞结束,于是直接释放了锁,导致线程3再来也能获取到锁。这时因为判断锁标识和释放锁两个方法缺乏原子性所致
Lua脚本解决原子性问题
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法可以上菜鸟教程看看,链接:https://www.runoob.com/lua/lua-tutorial.html
这里重点介绍Redis提供的调用函数,可以使用Lua去操作Redis,而且还能保证它的原子性,这样就可以实现获取锁,判断线程标识,释放锁是一个原子性动作了
使用如下:
调用函数:
redis.call('命令名称','key','其他参数', ...)
- 比如:
redis.call('set', 'name', 'wzy')
编写脚本:
# 先执行set name wzy redis.call('set', 'name', 'wzy') # 再执行get name local name = redis.call('get', 'name') # 返回 return name
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
- 写好脚本以后,需要用Redis命令来调用脚本
- `EVAL script numkeys key [key ...] arg [arg ...]`
- 比如:`EVAL "return redis.call('set', 'name', 'wzy')" 0`
- 如果脚本中的key和value不想写死,可以作为参数传递,key类型参数会放入KEYS数组,其他参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组中获取这些参数
> 在Lua中,数组下标从1开始
- `EVAL "return redis.call('set', KEYS[1], ARGV[1])" 1 name wzy`
**使用Lua脚本来代替释放锁的逻辑:**
- 原始的释放锁的逻辑
```java
@Override
public void unlock() {
// 获取当前线程的标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁中的标识
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判断标识是否一致
if (threadId.equals(id)) {
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
初步改写成Lua脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15-- 锁的key
local key = "lock:order:userId"
-- 线程标识
local threadId = "UUID-33"
-- 获取锁中线程标识
local id = redis.call('get', key)
-- 比较线程标识与锁的标识是否一致
if (threadId == id) then
-- 一致则释放锁 del key
return redis.call('del', key)
end
return 0但是这里的参数都写死了,所以过传参的方式来变成动态的Lua脚本
动态传参的Lua脚本
1
2
3
4
5
6
7
8-- 这里的KEYS[1]就是传入锁的key
-- 这里的ARGV[1]就是线程标识
-- 比较锁中的线程标识与线程标识是否一致
if (redis.call('get', KEYS[1]) == ARGV[1]) then
-- 一致则释放锁
return redis.call('del', KEYS[1])
end
return 0
Java调用Lua脚本改造分布式锁
在RedisTemplate中,可以利用execute方法去执行lua脚本
1 | public <T> T execute(RedisScript<T> script, List<K> keys, Object... args) { |
该方法与Redis中的 EVAL
方法一一对应:
实际操作:
首先在resource资源夹下新建
unlock.lua
文件,将之前写的粘贴进去替换原本的
unlock
函数脚本不是调用函数时进行加载,而是在类加载时就加载脚本。这里通过静态代码块实现相应设定
1
2
3
4
5
6
7
8
9
10
11
12
13
14private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));//即resource资源文件夹下
UNLOCK_SCRIPT.setResultType(Long.class);
}
public void unlock() {
stringRedisTemplate.execute(UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId());
}
Redisson
基于SETNX实现的分布式锁存在以下问题:
- 不可重入
- 同一个线程无法多次获得同一把锁
- 可重入锁的意义在于防止死锁,例如在HashTable这样的代码中,它的方法都是使用synchronized修饰的,加入它在一个方法内调用另一个方法,如果此时是不可重入的,那就死锁了。所以可重入锁的主要意义是防止死锁,synchronized和Lock锁都是可重入的
- 不可重试
- 编写的分布式锁只能尝试一次,失败了就返回false,没有重试机制。
- 但合理的情况应该是:当线程获取锁失败后,他应该能再次尝试获取锁
- 超时释放
- 在加锁的时候增加了TTL,这样可以防止死锁,但是如果业务执行耗时过长,也会导致锁的释放,存在安全隐患。
- 虽然采用Lua脚本来防止删锁的时候,误删别人的锁,但现在的新问题是没锁住,也有安全隐患
- 主从一致性
- 如果Redis提供了主从集群,那么当向集群写数据时,主机需要异步的将数据同步给从机,万一在同步之前,主机宕机了(主从同步存在延迟,虽然时间很短,但还是发生了),那么又会出现死锁问题
上述问题可一通过Redisson解决:
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现
Redis提供了分布式锁的多种多样功能
- 可重入锁(Reentrant Lock)
- 公平锁(Fair Lock)
- 联锁(MultiLock)
- 红锁(RedLock)
- 读写锁(ReadWriteLock)
- 信号量(Semaphore)
- 可过期性信号量(PermitExpirableSemaphore)
- 闭锁(CountDownLatch)
Redisson入门
导入依赖
1
2
3
4
5<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>配置Redisson客户端,在config包下新建
RedissonConfig
类1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class RedissonConfig {
public RedissonClient redissonClient() {
//配置类
Config config = new Config();
//添加redis地址,这里添加的是单点地址,也可以使用config.useClusterServers()添加集群地址
config.useSingleServer()
.setAddress("redis://192.168.186.128:6379")
.setPassword("xxxxxx");
//创建客户端
return Redisson.create(config);
}
}使用Redisson的分布式锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private RedissonClient redissonClient;
void testRedisson() throws InterruptedException {
//获取可重入锁
RLock lock = redissonClient.getLock("anyLock");
//尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁的自动释放时间,时间单位
boolean isLock = lock.tryLock(1,10, TimeUnit.SECONDS);
//判断获取锁成功
if (isLock) {
try {
System.out.println("执行业务");
} finally {
//释放锁
lock.unlock();
}
}
}替换之前自己写的分布式锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43+@Autowired
+private RedissonClient redissonClient;
@Override
public Result seckillVoucher(Long voucherId) {
//1.查询优惠券
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
//2.判断秒杀是否开始
if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀尚未开始");
}
//3.判断秒杀是否结束
if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {
return Result.fail("秒杀已经结束");
}
//4.判断库存是否充足
if (seckillVoucher.getStock() < 1) {
return Result.fail("库存不足");
}
Long userId = UserHolder.getUser().getId();
// synchronized (userId.toString().intern()){
//[redis实现分布式锁]
- SimpleRedisLock lock = new SimpleRedisLock(stringRedisTemplate, "order:" + userId);
+ RLock lock = redissonClient.getLock("order:" + userId);
- boolean tryLock = lock.tryLock(5);
+ boolean tryLock = lock.tryLock();
if (!tryLock){
return Result.fail("一人只能下一单");
}
//获取代理对象(事务)
try {
IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} catch (IllegalStateException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
Redisson锁重入
案例背景:如下所示,method1在方法内部调用method2,method1和method2出于同一个线程,那么method1已经拿到一把锁了,想进入method2中拿另外一把锁,必然是拿不到的,从而出现死锁
1 |
|
根据上述问题,容易想到获取锁时不能只看线程,可以设定一个属性state进行重用次数计量,对于同一个线程,可以拿到锁,但是state会+1,之后执行method2中的方法,释放锁,释放锁的时候也只是将state进行-1,只有减至0,才会真正释放锁
因为需要新增一个属性,所以不能使用原本的String类型了,这里可以使用Hash结构存储:
此时代码逻辑如下:
Lua脚本实现
为了保证原子性,流程图中的业务逻辑也需要用Lua来实现
- 获取锁的逻辑
1 | local key = KEYS[1]; -- 锁的key |
- 释放锁的逻辑
1 | local key = KEYS[1]; |
对应Redisson中的源码,可以看出逻辑跟上述类似:
- 获取锁
1 | <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { |
- 释放锁
1 | protected RFuture<Boolean> unlockInnerAsync(long threadId) { |
Redisson锁重试和WatchDog机制
这里是对带参数的tryLock
方法的分析
逻辑流程:
Redisson锁的MutiLock原理
上面解决了SETNX
锁的不可重入、不可重试、延时释放的缺点,但是还存在主从一致性的问题,如下图所示,当主节点宕机后,其两个从节点中的其中一个会变成主节点,但是由于主从同步失败,会导致新的主节点不存在锁,因此此时其他应用进来也能获取到锁,造成了主从节点的不一致性。
Redisson中使用MultiLock机制解决这个问题,如下图所示,会设定多个主节点,对应的主节点可以建立从节点。但是获取锁时必须依次向多个主节点获取锁,如果有一个没获取到都获取失败。需要将加锁的逻辑写入到每一个主从节点上,只有所有的服务器都写入成功,此时才是加锁成功,若其中一个节点宕机并且主从同步失败导致丢失锁,那么他去获取锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性。
代码测试
- 这里先配置多个redis节点
这里可以通过配置多个id或者同一id的多个端口进行配置
1 |
|
- 注入三个client,使用
redissonClient.getMultiLock
获取多个锁
1 |
|
Redisson分布式锁总结
- 可重入:利用hash结构记录线程id和重入次数
- 可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制
- 超时续约:利用WatchDog,每隔一段时间,重置超时时间。
- 不可重入
- 原理:利用SETNX的互斥性;利用EX避免死锁;释放锁时判断线程标识
- 缺陷:不可重入、无法重试、锁超时失效
- 可重入
- 原理:利用Hash结构,记录线程标识与重入次数;利用WatchDog延续锁时间;利用信号量控制锁重试等待
- 缺陷:Redis宕机引起锁失效问题
- Redisson的multiLock
- 原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功
- 缺陷:运维成本高、实现复杂
秒杀优化
异步秒杀
回顾下单流程:
- 当用户发起请求,此时会先请求Nginx,Nginx反向代理到Tomcat,而Tomcat中的程序,会进行串行操作,这样就会导致程序执行很慢,再加上分布式锁,整体执行就会很慢。
优化方案:
将耗时较短的逻辑判断放到Redis中,例如:判断秒杀库存,校验一人一单这样的操作,只要满足这两条操作,那么是一定可以下单成功的,不用等数据真的写进数据库,可以直接告诉用户下单成功就好了。然后后台同时开一个线程,后台线程异步读取阻塞队列里的消息,完成下单业务。逻辑如下所示:
如何实现:
判断秒杀库存
- 因为需要在redis中判断秒杀的库存,所以需要提前将库存缓存在redis中,直接使用String类型。
校验一人一单
- 需要在redis中记录该商品被哪些用户购买过,当有用户购买时,先判断是否存在,如果存在就不能购买。其特点是一个key可以对应多个value,并且value不能重复。自然想到使用Set结构存储。
额外注意
- 在判断了库存充足和一人一单后,在redis中进行库存的预减,并且把对应的用户id存入set集合
- 将两个过程使用Lua脚本执行,从而使得具有原子性。
- 根据Lua脚本执行的结果进行后续判断,如果具有购买资格,就将优惠券id、用户id和订单id存入阻塞队列,并将订单id返回给用户。后续另一个线程进行数据库的读写操作执行异步下单。
秒杀资格判断
需求:
- 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
- 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
- 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
- 开启线程任务,不断从阻塞队列中获取信息,实现异步下单。
代码实现
修改保存优惠券逻辑,添加库存到redis
VoucherServiceImpl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
//[异步秒杀] 保存秒杀库存到redis中
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY+voucher.getId(), voucher.getStock().toString());
}然后使用PostMan发送优惠券添加请求:localhost:8081/voucher/seckill
1
2
3
4
5
6
7
8
9
10
11
12{
"shopId":1,
"title":"99元代金券",
"subTitle":"周一至周五可用",
"rules":"全场通用\\n无需预约\\n可无限叠加",
"payValue":5000,
"actualValue":10000,
"type":1,
"stock":100,
"beginTime":"2023-07-22T19:00:00",
"endTime":"2023-07-22T23:59:59"
}此时可以在Mysql数据库和Redis中看到新增的优惠券数据。redis中缓存了优惠券的库存
编写Lua脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29-- 1.参数列表
-- 1.1 优惠券id
local voucherId = ARGV[1]
-- 1.2 用户id
local userId = ARGV[2]
-- 2.数据key
-- 2.1 库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 订单key
local orderKey = 'seckill:order:' .. voucherId
-- 3.脚本业务
-- 3.1 判断库存是否充足 get stockKey
if (tonumber(redis.call('get', stockKey)) <= 0) then
return 1
end
-- 3.2 判断用户是否下单 SISMEMBER orderKey userId
if (redis.call('sismember', orderKey, userId) == 1) then
-- 3.3 存在说明是重复下单
return 2
end
-- 3.4 扣库存 incrby stockKey -1
redis.call('incrby' ,stockKey, -1)
-- 3.5 下单(保存用户) sadd orderKey userId
redis.call('sadd', orderKey, userId)
return 0注意:
- lua的字符串拼接使用
..
,字符串转数字是tonumber()
- lua的字符串拼接使用
修改下单逻辑(保存阻塞队列部分后续实现)
VoucherOrderServiceImpl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30//这部分是导入lua脚本
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
public Result seckillVoucher(Long voucherId) {
//1. 执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), UserHolder.getUser().getId().toString()
);
//2. 判断返回值是否为0
int r = result.intValue();
if (r != 0){
//2.1 不为0代表没有购买资格
return Result.fail(r==1?"库存不足":"不能重复下单");
}
//2.2 为0,有购买资格,把下单信息保存在阻塞队列
long orderId = redisIdWorker.nextId("order");
// TODO 保存阻塞队列
//3. 返回订单id
return Result.ok(orderId);
}
此时使用JMeter进行高并发测试,可以看出平均耗时只有197ms
并且redis中也保存了下单信息,保存了用户的id
阻塞队列实现
需求:
- 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
- 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
- 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
- 开启线程任务,不断从阻塞队列中获取信息,实现异步下单。
代码实现:
将优惠券,用户等信息封装并存入阻塞队列
阻塞队列特点:当一个线程尝试从阻塞队列里获取元素的时候,如果没有元素,那么该线程就会被阻塞,直到队列中有元素,才会被唤醒,并去获取元素
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33//阻塞队列
private final BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
public Result seckillVoucher(Long voucherId) {
//1. 执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), UserHolder.getUser().getId().toString()
);
//2. 判断返回值是否为0
int r = result.intValue();
if (r != 0) {
//2.1 不为0代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
//2.2 为0,有购买资格,把下单信息保存在阻塞队列
long orderId = redisIdWorker.nextId("order");
// TODO 保存阻塞队列
//封装到voucherOrder中
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setVoucherId(voucherId);
voucherOrder.setUserId(UserHolder.getUser().getId());
voucherOrder.setId(orderId);
//加入到阻塞队列
orderTasks.add(voucherOrder);
//3. 返回订单id
return Result.ok(orderId);
}实现异步下单功能
- 创建一个线程池
1
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
- 开启一个线程,这里使用内部类实现
这里的异步下单需要在项目一启动就执行从阻塞队列中获取信息。所以这里就用到了
@PostConstruct
注解,项目一启动就调用这个方法,在其中开启一个线程任务,不断从队列中获取订单信息1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable {
public void run() {
while (true) {
try {
//1. 获取队列中的订单信息
VoucherOrder voucherOrder = orderTasks.take();
//2. 创建订单
handleVoucherOrder(voucherOrder);
} catch (Exception e) {
log.error("订单处理异常", e);
}
}
}
}- 创建订单逻辑handleVoucherOrder
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19private void handleVoucherOrder(VoucherOrder voucherOrder) {
//1. 获取用户
Long userId = voucherOrder.getUserId();
//2. 创建锁对象,作为兜底方案
RLock redisLock = redissonClient.getLock("order:" + userId);
//3. 获取锁
boolean isLock = redisLock.tryLock();
//4. 判断是否获取锁成功
if (!isLock) {
log.error("不允许重复下单!");
return;
}
try {
//5. 使用代理对象,由于这里是另外一个线程,
proxy.createVoucherOrder(voucherOrder);
} finally {
redisLock.unlock();
}
}注意:
这里同样需要创建代理进行,后续进行解释;并且这里的锁已经不需要了,因为我们在前面秒杀资格判断中已经进行了判断,这里只是作为兜底
这里用到了
createVoucherOrder
方法,传入的整个订单(原本是传入订单id),该方法后续会进行部分修改。- 获取代理对象是通过ThreadLocal进行获取的,由于这里是异步下单,和主线程不是一个线程,所以不能获取成功
- 可以将proxy放在成员变量的位置,然后在主线程中获取代理对象。然后如上述所示
handleVoucherOrder
调用proxy执行方法即可
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34+private IVoucherOrderService proxy;
@Override
public Result seckillVoucher(Long voucherId) {
//1. 执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), UserHolder.getUser().getId().toString()
);
//2. 判断返回值是否为0
int r = result.intValue();
if (r != 0) {
//2.1 不为0代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
//2.2 为0,有购买资格,把下单信息保存在阻塞队列
long orderId = redisIdWorker.nextId("order");
// TODO 保存阻塞队列
//封装到voucherOrder中
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setVoucherId(voucherId);
voucherOrder.setUserId(UserHolder.getUser().getId());
voucherOrder.setId(orderId);
//加入到阻塞队列
orderTasks.add(voucherOrder);
+ //主线程获取代理对象
+ proxy = (IVoucherOrderService) AopContext.currentProxy();
//3. 返回订单id
return Result.ok(orderId);
}修改创建订单的逻辑
原本代码:
1 |
|
这里的创建订单逻辑已经不需要了,可以删去
因为这部分逻辑是通过异步执行,不能再使用
UserHolder
中获取userId,所以这里通过传入的订单信息获取用户id,并且订单id也通过传入的订单获取订单id
修改后:
1 |
|
此时,下单后redis中的缓存stock会-1,数据库中的stock也会-1,并且出现订单,重复下单就会提示一人一单。
总结
秒杀业务的优化思路是什么?
- 先利用Redis完成库存容量、一人一单的判断,完成抢单业务
- 再将下单业务放入阻塞队列,利用独立线程异步下单
基于阻塞队列的异步秒杀存在哪些问题?
- 内存限制问题:
- 我们现在使用的是JDK里的阻塞队列,它使用的是JVM的内存,如果在高并发的条件下,无数的订单都会放在阻塞队列里,可能就会造成内存溢出,所以我们在创建阻塞队列时,设置了一个长度,但是如果真的存满了,再有新的订单来往里塞,那就塞不进去了,存在内存限制问题
- 数据安全问题:
- 经典服务器宕机了,用户明明下单了,但是数据库里没看到
完整代码
1 |
|
消息队列
什么是消息队列
消息队列(Message Queue),字面意思就是存放消息的队列,最简单的消息队列模型包括3个角色
- 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息
举例:
快递员把快递放到驿站/快递柜里去(Message Queue)去,消费者从快递柜/驿站去拿快递,这就是一个异步,如果耦合,那么快递员必须亲自上楼把快递送到你手里,服务当然好,但是万一我不在家,快递员就得一直等我,浪费了快递员的时间。所以解耦还是非常有必要的
但这样看,跟之前的阻塞队列不一样吗?这里有两个区别:
- 消息队列独立于JVM之外的服务,不受JVM内存的限制
- 消息队列不仅仅是做数据存储,还要确保数据安全,存入消息队列中的数据做持久化。而且将消息投递给消费者后需要消费者确认,如果不确认消息队列中还会保存该信息。
一些现成的(MQ)消息队列:如kafka,rabbitmq等,如果没有安装MQ,也可以使用Redis提供的MQ方案。Redis提供了三种不同的方式实现消息队列:
- list结构:基于List结构模拟消息队列
- PubSub:基本的点对点消息模型
- Stream:比较完善的消息队列模型
基于List实现消息队列
消息队列(Message Queue),字面意思就是存放消息的队列,而Redis的list数据结构是一个双向链表,很容易模拟出队列的效果。队列的入口和出口不在同一边,所以我们可以利用:LPUSH结合RPOP或者RPUSH结合LPOP来实现消息队列。
注意:当队列中没有消息时,RPOP和LPOP操作会返回NULL,而不像JVM阻塞队列那样会阻塞,并等待消息,所以我们这里应该使用BRPOP或者BLPOP来实现阻塞效果
基于List的消息队列有哪些优缺点?
- 优点
- 利用Redis存储,不受限于JVM内存上限
- 基于Redis的持久化机制,数据安全性有保障
- 可以满足消息有序性
- 缺点
- 无法避免消息丢失(服务器宕机)
- 只支持单消费者(一个消费者把消息拿走了,其他消费者就看不到这条消息了)
基于PubSub的消息队列
PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费和可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息
常用命令:
SUBSCRIBE channel [channel]
:订阅一个或多个频道PUBLISH channel msg
:向一个频道发送消息PSUBSCRIBE pattern [pattern]
:订阅与pattern格式匹配的所有频道Subscribes the client to the given patterns.
Supported glob-style patterns:
h?llo
subscribes tohello
,hallo
andhxllo
h*llo
subscribes tohllo
andheeeello
h[ae]llo
subscribes tohello
andhallo,
but nothillo
Use
\
to escape special characters if you want to match them verbatim.
基于PubSub的消息队列有哪些优缺点
- 优点:
- 采用发布订阅模型,支持多生产,多消费
- 缺点:
- 不支持数据持久化
- 无法避免消息丢失(如果向频道发送了消息,却没有人订阅该频道,那发送的这条消息就丢失了)
- 消息堆积有上限,超出时数据丢失(消费者拿到数据的时候处理的太慢,而发送消息发的太快)
基于Stream的消息队列
Stream是Redis 5.0引入的一种新数据类型,可以时间一个功能非常完善的消息队列
命令:
发送消息:
使用示例:
1
2# 创建名为users的队列,并向其中发送一个消息,内容是{name=jack, age=21},并且使用Redis自动生成ID
XADD users * name jack age 21
读取消息:
使用示例:
读取第一个消息:
1
2
3
4
5
6
7127.0.0.1:6379>XREAD COUNT 1 STREAMS users 0
1) 1) "users"
2) 1) 1) "1667119621804-0"
2) 1) "name"
2) "jack"
3) "age"
4) "21"并且可以重复读取,消息不会删除,一直保存在消息队列中
读取最后一个消息:
1
127.0.0.1:6379>XREAD COUNT 1 STREAMS users $
阻塞读取:
使用阻塞读取最新消息:
1
XREAD COUNT 1 BLOCK 1000 STREAMS users $
在业务开发中,可以循环调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下:
1
2
3
4
5
6
7
8
9
10while (true){
//尝试读取队列中的消息,最多阻塞2秒
Object msg = redis.execute("XREAD COUNT 1 BLOCK 2000 STREAMS users $");
//没读取到,跳过下面的逻辑
if(msg == null){
continue;
}
//处理消息
handleMessage(msg);
}注意:当我们指定其实ID为$时,代表只能读取到最新消息,如果当我们在处理一条消息的过程中,又有超过1条以上的消息到达队列,那么下次获取的时候,也只能获取到最新的一条,会出现漏读消息的问题
STREAM类型消息队列的XREAD命令特点
- 消息可回溯
- 一个消息可以被多个消费者读取
- 可以阻塞读取
- 有漏读消息的风险
基于Stream的消息队列—消费者组
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列,具备以下特点
消息分流:队列中的消息会分留给组内的不同消费者,而不是重复消费者,从而加快消息处理的速度
消息标识:消费者会维护一个标识,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标识之后读取消息,确保每一个消息都会被消费
消息确认:消费者获取消息后,消息处于pending状态(待处理),并存入一个pending-list,当处理完成后,需要通过XACK命令来确认消息,标记消息为已处理,才会从pending-list中移除
命令:
创建消费者组:
1
XGROUP CREATE key groupName ID [MKSTREAM]
- key:队列名称
- groupName:消费者组名称
- ID:起始ID标识,$代表队列中的最后一个消息,0代表队列中的第一个消息
- MKSTREAM:队列不存在时自动创建队列
从消费者组中读取消息
1
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [keys ...] ID [ID ...]
- group:消费者组名称
- consumer:消费者名,如果消费者不存在,会自动创建一个消费者
- count:本次查询的最大数量
- BLOCK milliseconds:当前没有消息时的最大等待时间
- NOACK:无需手动ACK,获取到消息后自动确认(一般不配置,们都是手动确认)
- STREAMS key:指定队列名称
- ID:获取消息的起始ID
>
:从下一个未消费的消息开始(pending-list中)- 其他:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始
其他常见命令
1
2
3
4
5
6
7
8# 删除指定的消费者组
XGROUP DESTORY key groupName
# 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupName consumerName
# 删除消费者组中指定的消费者
XGROUP DELCONSUMER key groupName consumerName
STREAM类型消息队列的XREADGROUP命令的特点
- 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读风险
- 有消息确认机制,保证消息至少被消费一次
List | PubSub | Stream | |
---|---|---|---|
消息持久化 | 支持 | 不支持 | 支持 |
阻塞读取 | 支持 | 支持 | 支持 |
消息堆积处理 | 受限于内存空间, 可以利用多消费者加快处理 | 受限于消费者缓冲区 | 受限于队列长度, 可以利用消费者组提高消费速度,减少堆积 |
消息确认机制 | 不支持 | 不支持 | 支持 |
消息回溯 | 不支持 | 不支持 | 支持 |
消费者监听消息的基本思路
1 | while(true){ |
后续的内容我没有继续看了,因为实际生产中用的更多的还是MQ中间件,这里只是作为了解下即可,后续可以看SpringCloud课程学习消息队列
达人探店
发布探店笔记
探店笔记类似点评网站的评价,往往是图文结合,对应的表有两个:
tb_blog:探店店笔记表,包含笔记中的标题、文字、图片等
tb_blog_comments:其他用户对探店笔记的评价
点击首页最下方菜单栏中的+按钮,即可发布探店笔记。(因为这部分的代码不涉及Redis,已经实现好了,可以直接使用。)
这里上传图片和发布评论是独立实现的:
图片上传代码
UploadController
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public Result uploadImage( { MultipartFile image)
try {
// 获取原始文件名称
String originalFilename = image.getOriginalFilename();
// 生成新文件名
String fileName = createNewFileName(originalFilename);
// 保存文件
image.transferTo(new File(SystemConstants.IMAGE_UPLOAD_DIR, fileName));
// 返回结果
log.debug("文件上传成功,{}", fileName);
return Result.ok(fileName);
} catch (IOException e) {
throw new RuntimeException("文件上传失败", e);
}
}这里通过
SystemConstants.IMAGE_UPLOAD_DIR
制定了图片的本地路径1
2
3
4
5
6public class SystemConstants {
public static final String IMAGE_UPLOAD_DIR = "D:\\lesson\\nginx-1.18.0\\html\\hmdp\\imgs\\";
public static final String USER_NICK_NAME_PREFIX = "user_";
public static final int DEFAULT_PAGE_SIZE = 5;
public static final int MAX_PAGE_SIZE = 10;
}但是这里我们需要修改图片路径为自己的,也就是之前下载的Nginx目录下的imgs文件夹下
发布笔记代码:
BlogController
1
2
3
4
5
6
7
8
9
10
public Result saveBlog( { Blog blog)
// 获取登录用户
UserDTO user = UserHolder.getUser();
blog.setUserId(user.getId());
// 保存探店博文
blogService.save(blog);
// 返回id
return Result.ok(blog.getId());
}
查看探店笔记
需求:点击首页的探店笔记,会进入详情页面,现在需要实现页面的查询接口
点击对应的笔记后,首先会发送如下请求:
Request URL: http://localhost:8080/api/blog/23
Request Method: GET
此时BlogController
中没有对应的方法,首先在Controller中进行调用
Controller层
1
2
3
4
public Result getBlog({ Long id)
return blogService.queryById(id);
}ServiceImpl
在Service类中创建对应方法之后,在Impl类中实现,查看用户探店笔记的时候,需要额外设置用户名和其头像,由于设置用户信息这个操作比较通用,所以这里封装成了一个方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public Result queryBlogById(Long id) {
//查询blog
Blog blog = getById(id);
if (blog==null){
return Result.fail("笔记不存在");
}
//查询blog相关用户
queryBlogUser(blog);
return Result.ok(blog);
}
public void queryBlogUser(Blog blog){
Long userId = blog.getUserId();
User user = userService.getById(userId);
blog.setName(user.getNickName());
blog.setIcon(user.getIcon());
}
顺手将queryHotBlog
也修改一下,原始代码将业务逻辑写到了Controller中,此时整体代码如下:
Controller
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class BlogController {
private IBlogService blogService;
public Result saveBlog( { Blog blog)
// 获取登录用户
UserDTO user = UserHolder.getUser();
blog.setUserId(user.getId());
// 保存探店博文
blogService.save(blog);
// 返回id
return Result.ok(blog.getId());
}
public Result queryById({ Long id)
return blogService.queryBlogById(id);
}
public Result likeBlog( { Long id)
// 修改点赞数量
blogService.update()
.setSql("liked = liked + 1").eq("id", id).update();
return Result.ok();
}
public Result queryMyBlog( { Integer current)
// 获取登录用户
UserDTO user = UserHolder.getUser();
// 根据用户查询
Page<Blog> page = blogService.query()
.eq("user_id", user.getId()).page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
// 获取当前页数据
List<Blog> records = page.getRecords();
return Result.ok(records);
}
public Result queryHotBlog( { Integer current)
return blogService.queryHotBlog(current);
}
}ServiceImpl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {
private IUserService userService;
public Result queryBlogById(Long id) {
//查询blog
Blog blog = getById(id);
if (blog==null){
return Result.fail("笔记不存在");
}
//查询blog相关用户
queryBlogUser(blog);
return Result.ok(blog);
}
public Result queryHotBlog(Integer current) {
// 根据用户查询
Page<Blog> page = query()
.orderByDesc("liked")
.page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
// 获取当前页数据
List<Blog> records = page.getRecords();
// 查询用户
records.forEach(this::queryBlogUser);
return Result.ok(records);
}
public void queryBlogUser(Blog blog){
Long userId = blog.getUserId();
User user = userService.getById(userId);
blog.setName(user.getNickName());
blog.setIcon(user.getIcon());
}
}这里L28用到了java8新特性中的lambda表达式,后续进行详细了解
此时点击查看探店笔记,可以看到详细内容
点赞功能
原本的代码已经有了初步的实现。点击点赞按钮,查看发送的请求
请求网址: http://localhost:8080/api/blog/like/23
请求方法: PUT
BlogController中的like方法,代码如下
1 |
|
但是目前存在问题,也就是同一个用户可以无限制点赞,这明显是不合理的。造成这个问题的原因是现在的逻辑发起请求只是简单地给数据库+1,并没有用户判断,所以才会出现这个问题
功能完善
需求
- 同一个用户只能对同一篇笔记点赞一次,再次点击则取消点赞
- 如果当前用户已经点赞,则点赞按钮高亮显示(前端已实现,判断字段Blog类的isLike属性)
实现步骤
- 给Blog类添加一个isLike字段,表示是否被当前用户点赞过。
- 这里可以利用Redis中的set集合来存储点赞后的用户id表明该用户已经点赞过。
- 修改点赞功能,利用Redis中的set集合来判断是否点赞过,未点赞则点赞数
+1
,已点赞则点赞数-1
- 修改根据id查询的业务,判断当前登录用户是否点赞过,赋值给isLike字段
- 修改分页查询Blog业务,判断当前登录用户是否点赞过,赋值给isLike字段
代码实现
首先在Blog类中添加字段
1
2
3
4
5/**
* 是否点赞过了
*/
//表明不是数据库中的字段
private Boolean isLike;修改controller逻辑
1
2
3
4
public Result likeBlog( { Long id)
return blogService.likeBlog(id);
}将对应业务逻辑写在serviceImpl中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public Result likeBlog(Long id) {
//获取登录用户
Long userId = UserHolder.getUser().getId();
//判断当前用户是否已经点赞
String key = RedisConstants.BLOG_LIKED_KEY + id;
Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
if (Boolean.FALSE.equals(isMember)){
//如果未点赞,则数据库对应点赞数+1
boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
//保存用户id到redis
if (isSuccess){
stringRedisTemplate.opsForSet().add(key, userId.toString());
}
}else {
//如果已点赞,取消点赞,并数据库点赞数-1
boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
//并从redis中移除用户id
if (isSuccess){
stringRedisTemplate.opsForSet().remove(key, userId.toString());
}
}
return Result.ok();
}修改查询blog的逻辑,判断Blog是否被当前用户点赞过,并设置blog的isLiked字段
因为这里按照id查询和分页查询都要用到这个功能,所以这里也抽离成一个方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public Result queryBlogById(Long id) {
//查询blog
Blog blog = getById(id);
if (blog==null){
return Result.fail("笔记不存在");
}
//查询blog相关用户
queryBlogUser(blog);
//[new]查询blog是否被点赞
isBlogLiked(blog);
return Result.ok(blog);
}
public Result queryHotBlog(Integer current) {
// 根据用户查询
Page<Blog> page = query()
.orderByDesc("liked")
.page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
// 获取当前页数据
List<Blog> records = page.getRecords();
records.forEach(blog -> {
// 查询用户
this.queryBlogUser(blog);
//[new]查询blog是否被点赞
this.isBlogLiked(blog);
});
return Result.ok(records);
}
private void isBlogLiked(Blog blog) {
//获取登录用户
Long userId = UserHolder.getUser().getId();
//判断当前用户是否已经点赞
String key = RedisConstants.BLOG_LIKED_KEY + blog.getId();
Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
blog.setIsLike(BooleanUtil.isTrue(isMember));
}
此时再点赞笔记,点赞后就会高亮,再次点赞就会取消。并且对应Redis中也保存了指定笔记的点赞用户id信息
点赞排行榜
在探店笔记的详情页面,应该把给该笔记点赞的用户显示出来,比如最早点赞的Top5,形成点赞排行榜。参考微信、QQ的动态点赞
这里进行Redis中集合的对比:
List | Set | SortedSet | |
---|---|---|---|
排序方式 | 按添加顺序排序 | 无法排序 | 根据score值排序 |
唯一性 | 不唯一 | 唯一 | 唯一 |
查找方式 | 按索引查找或首尾查找 | 根据元素查找 | 根据元素查找 |
之前的点赞是放到Set集合中,但是Set集合又不能排序,所以这个时候,就可以改用SortedSet(Zset),可以用时间戳作为score,从而实现排序
- 但是ZSet没有isMember方法,所以这里使用
ZSCORE
命令查询score来判断集合中是否有该元素,如果有该元素,则返回值是对应的score,如果没有该元素,则返回值为null - 获取点赞排行可以通过
ZRANGE
命令获取指定范围内的数据
功能实现
使用ZSET替换SET:
修改
BlogServiceImpl
中likeBlog
的点赞逻辑1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29@Override
public Result likeBlog(Long id) {
//获取登录用户
Long userId = UserHolder.getUser().getId();
//判断当前用户是否已经点赞
String key = RedisConstants.BLOG_LIKED_KEY + id;
- Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
+ Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
- if (Boolean.FALSE.equals(isMember)){
+ if (score == null){
//如果未点赞,则数据库对应点赞数+1
boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
//保存用户id到redis
if (isSuccess){
- stringRedisTemplate.opsForSet().add(key, userId.toString());
+ stringRedisTemplate.opsForZSet().add(key, userId.toString(), System.currentTimeMillis());
}
}else {
//如果已点赞,取消点赞,并数据库点赞数-1
boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
//并从redis中移除用户id
if (isSuccess){
- stringRedisTemplate.opsForSet().remove(key, userId.toString());
+ stringRedisTemplate.opsForZSet().remove(key, userId.toString());
}
}
return Result.ok();
}修改
isBlogLiked
1
2
3
4
5
6
7
8
9
10private void isBlogLiked(Blog blog) {
//获取登录用户
Long userId = UserHolder.getUser().getId();
//判断当前用户是否已经点赞
String key = RedisConstants.BLOG_LIKED_KEY + blog.getId();
- Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
- blog.setIsLike(BooleanUtil.isTrue(isMember));
+ Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
+ blog.setIsLike(score != null);
}
点赞排行实现:
目前点击笔记详情,会有一个请求报错,内容如下:
Request URL: http://localhost:8080/api/blog/likes/23
Request Method: GET
在controller中实现对应方法
1
2
3
4
public Result queryBlogLikes({ Long id)
return blogService.queryBlogLikes(id);
}在serviceImpl中实现具体逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public Result queryBlogLikes(Long id) {
// 获取top点赞用户 zrange key 0 4
String key = RedisConstants.BLOG_LIKED_KEY + id;
Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
if (top5 == null || top5.isEmpty()){
return Result.ok(Collections.emptyList());
}
// 解析其中的用户
List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
// 根据id查询用户
List<UserDTO> userDTOS = userService.listByIds(ids)
.stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());
return Result.ok(userDTOS);
}又是stream流,后需要好好学java8新特性!
这里使用userDTO还是防止用户敏感信息泄露,只返回用户name、头像和id
此时点赞后,可以在详情页面看到点赞用户的头像
解决BUG
不登陆访问首页
此时,若没有进行登录,直接访问首页,就会发生空指针异常,并且获取不到笔记详情。但是首页的信息并不需要登录,所以就无法获取到用户id,因此在后续根据id查询笔记和分页查询笔记中,根据用户id查询是否点赞就会报出异常。
所以这里需要对isBlogLiked
方法进行修改,如果获取用户为空,直接返回即可,不在进行后续是否点赞的判断。
1 | private void isBlogLiked(Blog blog) { |
此时不登陆也能正常访问首页。
点赞顺序问题
此时,我们更换一个新的账号,对笔记进行点赞,会发现我们是后点赞的,但是却排到了第一个,这是有问题的
并且,此时sql语句查询的顺序和redis中存储的用户顺序也都是正确的
1 | SELECT id,phone,password,nick_name,icon,create_time,update_time FROM tb_user WHERE id IN ( ? , ? ) |
那是因为什么呢?这是因为sql语句查询使用的IN
关键字,我们来演示一下:
1 | SELECT id,phone,password,nick_name,icon,create_time,update_time |
可以看出,我们在in中首先是1014,然后才是1,但查到的数据却是反的。
那如何解决呢?这里可以通过添加ORDER BY FIELD(id, 1, 1014)
人为指定顺序
1 | SELECT id,phone,password,nick_name,icon,create_time,update_time |
对应代码修改:
1 | @Override |
这里不再使用listByIds
根据ids直接返回userDTO的list了,而是使用自定义的sql语句:
- 使用
in()
指定查询范围 - 使用
last()
即最后拼接指定的sql语句(因为默认没有order by field),然后通过动态拼接的方法传入点赞用户的ids,从而实现按照指定顺序输出。
此时效果如下:可以看出点赞排行已经按照先后顺序显示了
好友关注
关注和取关
进入到笔记详情页面时,会发送一个请求,判断当前登录用户是否关注了笔记博主
Request URL: http://localhost:8080/api/follow/or/not/2
Request Method: GET
点击关注按钮时,会发送一个请求,实现关注/取关
Request URL: http://localhost:8080/api/follow/2/true
Request Method: PUT
关注是User之间的关系,是博主与粉丝的关系,数据库中有一张tb_follow表来表示:
代码实现
controller中实现以上两个方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class FollowController {
private IFollowService followService;
public Result follow({ Long id, Boolean isFollow)
return followService.follow(id, isFollow);
}
public Result checkFollw({ Long id)
return followService.checkFollow(id);
}
}serviceImpl中实现对应逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class FollowServiceImpl extends ServiceImpl<FollowMapper, Follow> implements IFollowService {
/**
* 关注取关功能
* @param id
* @param isFollow
* @return
*/
public Result follow(Long id, Boolean isFollow) {
Long userId = UserHolder.getUser().getId();
//判断关注还是取关
if (isFollow){
//新增数据
Follow follow = new Follow();
follow.setUserId(userId);
follow.setFollowUserId(id);
save(follow);
}else {
//删除数据 delete from tb_follow where userId = ? and followUserId = ?
remove(new LambdaQueryWrapper<Follow>()
.eq(Follow::getUserId, userId)
.eq(Follow::getFollowUserId, id));
}
return Result.ok();
}
/**
* 进入笔记详情页后判断是否关注
* @param id
* @return
*/
public Result checkFollow(Long id) {
Long userId = UserHolder.getUser().getId();
Follow follow = getOne(new LambdaQueryWrapper<Follow>()
.eq(Follow::getUserId, userId)
.eq(Follow::getFollowUserId, id));
return Result.ok(follow!=null);
}
}
共同关注
在本项目中,点击用户头像,进入到用户详情页,可以查看用户发布的笔记,和共同关注列表。但是目前项目还没有实现这些功能,此时点击进去都是空的。
用户信息
查询用户信息
请求网址: http://localhost:8080/api/user/2
请求方法: GET
此时controller中没有这个方法,所以进行实现:
1 |
|
此时就能看到用户头像和用户name
用户笔记
在上面的基础上,访问用户主页后还会触发一下请求获取笔记内容
请求网址: http://localhost:8080/api/blog/of/user?&id=2¤t=1
请求方法: GET
代码实现:
1 |
|
此时页面显示效果
共同关注
此时点击共同关注,发送如下请求:
请求网址: http://localhost:8080/api/follow/common/2
请求方法: GET
需求:利用Redis中恰当的数据结构,实现共同关注功能,在博主个人页面展示出当前用户与博主的共同关注
思路:在set集合中,有交集并集补集的api,可以把二者关注的人放入到set集合中,然后通过SINTER
命令查询两个set集合的交集
修改关注逻辑
修改之前的关注逻辑,在关注博主的同时,需要将数据放到set集合中,方便后期实现共同关注,当取消关注时,也需要将数据从set集合中删除
1 | @Override |
此时关注两个用户,可以看到redis中缓存了数据
功能实现
controller请求方法
1
2
3
4
public Result followCommons({ Long id)
return followService.followCommons(id);
}serviceImpl功能实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public Result followCommons(Long id) {
Long userId = UserHolder.getUser().getId();
String key_prefix = RedisConstants.FOLLOW_KEY;
Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key_prefix + userId, key_prefix + id);
if (intersect == null | intersect.isEmpty()){
return Result.ok(Collections.emptyList());
}
//解析出用户id
List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
//查询用户(转换为UserDTO隐藏隐私信息)
List<UserDTO> userDTOS = userService.listByIds(ids)
.stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());
return Result.ok(userDTOS);
}
此时查看一个和我有共同关注的用户,可以看到:
关注推送
概念介绍
关注推送也叫作Feed流,直译为投喂,为用户提供沉浸式体验,通过无限下拉刷新获取新的信息。
传统模式内容检索:用户需要主动通过搜索引擎或者是其他方式去查找想看的内容
新型Feed流:系统分析用户到底想看什么,然后直接把内容推送给用户,从而使用户能更加节约时间,不用去主动搜素
Feed流实现的两种模式
Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注(朋友圈等)
优点:信息全面,不会有缺失,并且实现也相对简单
缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低
智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容,推送用户感兴趣的信息来吸引用户
优点:投喂用户感兴趣的信息,用户粘度很高,容易沉迷
缺点:如果算法不精准,可能会起到反作用
本项目针对好友的操作,采用的是Timeline方式,只需要拿到关注用户的信息,然后按照时间排序即可
采用Timeline模式,有三种具体的实现方案
拉模式(也叫读扩散)
含义:当张三和李四、王五发了消息之后,都会保存到自己的发件箱中,如果赵六要读取消息,那么他会读取他自己的收件箱,此时系统会从他关注的人群中,将他关注人的信息全都进行拉取,然后进行排序
优点:比较节约空间,因为赵六在读取信息时,并没有重复读取,并且读取完之后,可以将他的收件箱清除
缺点:有延迟,当用户读取数据时,才会去关注的人的时发件箱中拉取信息,假设该用户关注了海量用户,那么此时就会拉取很多信息,对服务器压力巨大
推模式(也叫写扩散)
含义:推模式是没有发件箱的,当用户写了一个内容,会主动把用户写的内容发送到它粉丝的收件箱中,假设此时粉丝再来读取,就不用再去临时拉取了
优点:时效快,不用临时拉取
缺点:内存压力大,假设一个大V发了一个动态,很多人关注他,那么就会写很多份数据到粉丝那边去
推拉结合(也叫读写混合,兼具推和拉的优点,是一个折中的方案)
站在发件人这边来看:
- 如果是普通人,那么采用推模式,直接把数据写入到他的粉丝收件箱中,因为普通人的粉丝数量较少,所以这样不会产生太大压力。
- 但如果是大V,那么他是直接将数据写入一份到发件箱中去(拉模式),再直接写一份到活跃粉丝的收件箱中(推模式)
站在收件人这边来看:
- 如果是活跃粉丝,那么大V和普通人发的都会写到自己的收件箱里
- 如果是普通粉丝,由于上线不是很频繁,所以等他们上线的时候,再从发件箱中去拉取信息。
推送功能
鉴于点评软件几乎没有大V,所以直接使用推模式实现推送功能
Feed流的分页问题
Feed流中的数据会不断更新,所以数据的角标也会不断变化,所以我们不能使用传统的分页模式
传统分页
假设在t1时刻,取读取第一页,此时page = 1,size = 5,那么拿到的就是10~6
这几条记录,假设t2时刻有发布了一条新数据,那么在t3时刻来读取第二页,此时page = 2,size = 5,那么此时读取的数据是从6开始的,读到的是6~2
,那么就读到了重复的数据,所以要使用Feed流的分页,不能使用传统的分页
Feed流的滚动分页
记录每次操作的最后一条,然后从这个位置去开始读数据
从t1时刻开始,拿到第一页数据,拿到了10~6
,然后记录下当前最后一次读取的记录,就是6,t2时刻发布了新数据,此时这个11在最上面,但不会影响之前拿到的6,此时t3时刻来读取第二页,第二页读数据的时候,从6-1=5
开始读,这样就拿到了5~1
的记录。在这个地方可以使用SortedSet来做,使用时间戳来充当表中的1~10
推送到收件箱
需求:
- 修改新增探店笔记的业务,在保存blog到数据库的同时,推送到粉丝的收件箱
- 收件箱满足可以根据时间戳排序,必须使用Redis的数据结构实现
- 查询收件箱数据时,可以实现分页查询
实现:
BlogServiceImpl.saveBlog
1 |
|
测试:用户发一篇笔记后,其粉丝的收件箱(redis数据)中记录了其笔记的id和发送的时间戳
滚动分页查询仿真
Redis中实现:
- 命令:
ZREVRANGEBYSCORE key max min WITHSCORE LIMITE offset count
ZREVRANGEBYSCORE
指的是按照score降序排列max
、min
指的是查询的score范围,这里是时间戳WITHSCORE
指的是结果带上socreLIMITE offset count
有点类似分页查询,但是这里的offset指的是max值的偏移量。- 若第一次查询,max默认当前时间戳,则offset为0即可,再一次查询时,max是上一次查询的最后一条数据的score,所以此时需要offset向后偏移一位,也就是从比最后一条数据score小的数据开始查询(这时正常情况下,下面介绍了特殊情况)
具体步骤:
- 每次查询之后,需要获取上次查询出的最小时间戳,这个值作为下一次的查询条件
- 需要找到与上一次查询最小score相同的查询个数,并作为偏移量,下次查询的时候,才能跳过这些查询过的数据,拿到需要的数据
- 例如时间戳为8 6 6 5 4 3,每次查询3个,第一次是8 6 6,此时最小时间戳是6,如果不设置偏移量,会从第一个6之后开始查询,那么查询到的就是6 5 5,而不是5 4 3,所以需要设置offset为2,则从第一个6向后偏移两个。
滚动分页查询参数:
- max:当前时间戳(第一次) | 上一次查询的最小时间戳(后续)
- min:0
- offset:0 (第一次)**|** 在上一次的结果中,与最小值一样的元素的个数(后续)
- count:每页笔记条数(固定的)
分页查询收件箱
需求:
- 修改新增探店笔记的业务,在保存blog到数据库的同时,推送到粉丝的收件箱
- 收件箱满足可以根据时间戳排序,必须使用Redis的数据结构实现
- 查询收件箱数据时,可以实现分页查询
分析:
到自己主页下,点击关注,这里会发送如下请求:
请求网址: http://localhost:8080/api/blog/of/follow?&lastId=1690179906849
请求方法: GET
并且可以发现这个ladtId就是当前的时间戳,是第一次查询的max值;这里缺少参数offset,这是因为是第一次查询,默认为0
方法的返回值需要包括分页查询数据、上一次查询中的最小时间戳、offset偏移量
实现:
编写一个通用的实体类,用泛型做一个通用的分页查询,list是封装返回的结果,minTime是记录的最小时间戳,offset是记录偏移量
dto.ScrollResult
1
2
3
4
5
6
public class ScrollResult {
private List<?> list;
private Long minTime;
private Integer offset;
}controller方法
1
2
3
4
public Result queryBlogOfFollow({ Long max, Integer offset)
return blogService.queryBlogOfFollow(max, offset);
}serviceImpl实现具体逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public Result queryBlogOfFollow(Long max, Integer offset) {
//1. 获取当前用户
Long userId = UserHolder.getUser().getId();
//2. 查询该用户收件箱(之前存的key固定前缀 + 粉丝id),所以根据当前用户id就可以查询是否有关注的人发了笔记
String key = RedisConstants.FEED_KEY + userId;
Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet()
.reverseRangeByScoreWithScores(key, 0, max, offset, 3);
if (typedTuples == null || typedTuples.isEmpty()) {
return Result.ok();
}
//3.解析数据:blogId、minTime(时间戳)、offset、
List<Long> ids = new ArrayList<>(typedTuples.size());
long minTime = 0;
int off = 1;
for (ZSetOperations.TypedTuple<String> tuple : typedTuples) {
//3.1 获取id
ids.add(Long.valueOf(tuple.getValue()));
//3.2 获取score(时间戳)
long time = tuple.getScore().longValue();
if (time == minTime) {
off++;
} else {
minTime = time;
off = 1;
}
}
//4.根据id查询blog (SQL的in不能排序问题,手动指定排序为传入的ids)
String idsStr = StrUtil.join(",",ids);
List<Blog> blogs = query().in("id", idsStr).last("ORDER BY FIELD(id," + idsStr + ")").list();
for (Blog blog : blogs) {
//4.1 查询发布该blog的用户信息
queryBlogUser(blog);
//4.2 查询当前用户是否给该blog点过赞
isBlogLiked(blog);
}
//5.封装返回
ScrollResult r = new ScrollResult();
r.setList(blogs);
r.setOffset(off);
r.setMinTime(minTime);
return Result.ok(r);
}
显示效果:
附近商铺
GEO数据结构
GEO就是Geolocation的简写形式,代表地理坐标。Redis在3.2版本中加入了对GEO的支持,允许存储地理坐标信息,帮助我们根据经纬度来检索数据,常见的命令有:
GEOADD:添加一个地理空间信息,包含:经度(longitude)、纬度(latitude)、值(member)
GEODIST:计算指定的两个点之间的距离并返回
GEOHASH:将指定member的坐标转化为hash字符串形式并返回
GEOPOS:返回指定member的坐标
GEOGADIUS:指定圆心、半径,找到该园内包含的所有member,并按照与圆心之间的距离排序后返回。6.2以后已经弃用
GEOSEARCH:在指定范围内搜索member,并按照与制定点之间的距离排序后返回,范围可以使圆形或矩形。6.2新功能
GEOSEARCHSTORE:与GEOSEARCH功能一致,不过可以把结果存储到一个指定的key。6.2新功能
GEOADD
添加一个地理空间信息,包含:经度(longitude)、纬度(latitude)、值(member)
命令格式:
GEOADD key longitude latitude member [longitude latitude member …]
举例
1
GEOADD china 13.361389 38.115556 "shanghai" 15.087269 37.502669 "beijing"
GEODIST
计算指定的两个点之间的距离并返回
- 命令格式:
GEODIST key member1 member2 [m|km|ft|mi]
- 如果两个位置之间的其中⼀个不存在, 那么命令返回空值。
- 指定单位的参数 unit 必须是以下单位的其中⼀个:( 默认使用米作为单位)
- m 表示单位为米。
- km 表示单位为千米。
- mi 表示单位为英⾥。
- ft 表示单位为英尺。
GEODIST
命令在计算距离时会假设地球为完美的球形, 在极限情况下, 这⼀假设最⼤会造成 0.5% 的误差
举例
1
GEODIST china beijing shanghai km
GEOHASH
将指定member的坐标转化为hash字符串形式并返回
命令格式:
GEOHASH key member [member …]
举例
1
2
3>GEOHASH china beijing shanghai
1) "sqdtr74hyu0"
2) "sqc8b49rny0"
GEOPOS
返回指定member的坐标
格式:
GEOPOS key member [member …]
示例:
1
2
3
4
5
6>geopos china beijing shanghai
1) 1) "15.08726745843887329"
2) "37.50266842333162032"
2) 1) "13.36138933897018433"
2) "38.11555639549629859"
GEOGADIUS
指定圆心、半径,找到该园内包含的所有member,并按照与圆心之间的距离排序后返回,6.2之后已废弃
命令格式
1
2GEORADIUS key longitude latitude radius m|km|ft|mi [WITHCOORD] [WITHDIST] [WITHHASH]
[COUNT count [ANY]] [ASC|DESC] [STORE key] [STOREDIST key]范围可以使用以下其中一个单位:
- m 表示单位为米。
- km 表示单位为千米。
- mi 表示单位为英里。
- ft 表示单位为英尺。
在给定以下可选项时, 命令会返回额外的信息:
- WITHDIST: 在返回位置元素的同时, 将位置元素与中心之间的距离也一并返回。 距离的单位和用户给定的范围单位保持一致。
- WITHCOORD: 将位置元素的经度和维度也一并返回。
- WITHHASH: 以 52 位有符号整数的形式, 返回位置元素经过原始 geohash 编码的有序集合分值。 这个选项主要用于底层应用或者调试, 实际中的作用并不大。
命令默认返回未排序的位置元素。 通过以下两个参数, 用户可以指定被返回位置元素的排序方式:
- ASC: 根据中心的位置, 按照从近到远的方式返回位置元素。
- DESC: 根据中心的位置, 按照从远到近的方式返回位置元素。
举例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16>GEORADIUS china 15 37 200 km WITHDIST WITHCOORD
1) 1) "shanghai"
2) "190.4424"
3) 1) "13.36138933897018433"
2) "38.11555639549629859"
2) 1) "beijing"
2) "56.4413"
3) 1) "15.08726745843887329"
2) "37.50266842333162032"
>GEORADIUS china 15 37 200 km WITHDIST
1) 1) "shanghai"
2) "190.4424"
2) 1) "beijing"
2) "56.4413"
GEOSEARCH
在指定范围内搜索member,并按照与制定点之间的距离排序后返回,范围可以使圆形或矩形,6.2的新功能
命令格式
1
2GEOSEARCH key [FROMMEMBER member] [FROMLONLAT longitude latitude] [BYRADIUS radius m|km|ft|mi]
[BYBOX width height m|km|ft|mi] [ASC|DESC] [COUNT count [ANY]] [WITHCOORD] [WITHDIST] [WITHHASH]举例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21>geosearch china FROMLONLAT 15 37 BYRADIUS 200 km ASC WITHCOORD WITHDIST
1) 1) "beijing"
2) "56.4413"
3) 1) "15.08726745843887329"
2) "37.50266842333162032"
2) 1) "shanghai"
2) "190.4424"
3) 1) "13.36138933897018433"
2) "38.11555639549629859"
>geosearch china FROMLONLAT 15 37 BYBOX 400 400 km DESC WITHCOORD WITHDIST
1) 1) "shanghai"
2) "190.4424"
3) 1) "13.36138933897018433"
2) "38.11555639549629859"
2) 1) "beijing"
2) "56.4413"
3) S1) "15.08726745843887329"
2) "37.50266842333162032"
GEOSEARCHSTORE
与GEOSEARCH功能一致,不过可以把结果存储到一个指定的key,也是6.2的新功能
命令格式
1
2
3GEOSEARCHSTORE destination source [FROMMEMBER member] [FROMLONLAT longitude latitude]
[BYRADIUS radius m|km|ft|mi] [BYBOX width height m|km|ft|mi]
[ASC|DESC] [COUNT count [ANY]] [STOREDIST]这个命令和 GEORADIUS 命令一样, 都可以找出位于指定范围内的元素, 但是 GEORADIUSBYMEMBER 的中心点是由给定的位置元素决定的, 而不是像 GEORADIUS 那样, 使用输入的经度和纬度来决定中心点指定成员的位置被用作查询的中心。
示例:
1
2
3>GEORADIUSBYMEMBER china beijing 200 km
1) "shanghai"
2) "beijing"
附近商户搜索
导入店铺数据到GEO
思路:
- 将数据库中的数据导入到Redis中去,GEO在Redis中就是一个member和一个经纬度,经纬度对应的就是tb_shop中的x和y,而member用shop_id来存,因为Redis只是一个内存级数据库,如果存海量的数据占用资源过多,所以只存一个id,用的时候再拿id去SQL数据库中查询shop信息
- 此时还有一个问题,在redis中没有存储shop_type,无法根据店铺类型来对数据进行筛选,解决办法就是将type_id作为key,存入同一个GEO集合即可
示例:
Key | Value | Score |
---|---|---|
shop:geo:美食 | 海底捞 | 40691512240174598 |
吉野家 | 40691519846517915 | |
shop:geo:KTV | KTV 01 | 40691165486458787 |
KTV 02 | 40691514154651657 |
数据库中的对应数据如下:
实现上述功能:
1 |
|
上述代码是是一条一条写入redis的,效率较低。这里进行改进,根据key一次写入对应的数据
1 | @Test |
进入redis中查看,可以看出保存成功
功能实现
进入到频道页后会发送如下请求:
请求网址: http://localhost:8080/api/shop/of/type?&typeId=1¤t=1&x=120.149993&y=30.334229
请求方法: GET
其中typeId就是频道类型;current是当前页码;xy就是经纬度
代码实现:
SpringDataRedis的2.3.9版本并不支持Redis 6.2提供的
GEOSEARCH
命令,因此我们需要提示其版本,修改自己的pom.xml文件这里主要是将spring-data-redis和lettuce-core从spring-boot-starter-data-redis中排除,然后导入新版本的依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<artifactId>spring-data-redis</artifactId>
<groupId>org.springframework.data</groupId>
</exclusion>
<exclusion>
<artifactId>lettuce-core</artifactId>
<groupId>io.lettuce</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.6.2</version>
</dependency>
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.1.6.RELEASE</version>
</dependency>修改ShopController中的方法,传入参数除了typeId,分页码,还需要其坐标
1
2
3
4
5
6
7
8
9
public Result queryShopByType(
Integer typeId,
Integer current,
Double x,
Double y
) {
return shopService.queryShopByType(typeId,current,x,y);
}对应的逻辑实现
这里传入的经纬度是前端写死的,实际业务是获取定位处的经纬度
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {
//1.判断是否需要根据坐标查询
if (x == null || y == null) {
//不需要坐标查询,直接数据库查询
Page<Shop> page = query().eq("type_id", typeId)
.page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE));
return Result.ok(page.getRecords());
}
//2.计算分页参数
int from = (current - 1) * SystemConstants.MAX_PAGE_SIZE;
int end = current * SystemConstants.MAX_PAGE_SIZE;
//3.查询redis、按照距离排序、分页:shopId、distance
//GEOSEARCH key FROMLONLAT x y BYRADIUS 5000 m WITHDIST
String key = RedisConstants.SHOP_GEO_KEY + typeId;
GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate.opsForGeo().search(
key,
GeoReference.fromCoordinate(x, y), //搜索的圆心
new Distance(5000), //搜索半径
RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end));
//4.解析出id
if (results == null) {
return Result.ok(Collections.emptyList());
}
List<GeoResult<RedisGeoCommands.GeoLocation<String>>> list = results.getContent();
// 判断集合的长度是否小于等于下一页的from,如果符合则说明没有下一页的数据了
if (list.size() <= from) {
return Result.ok(Collections.emptyList());
}
//4.1 截取从from到end的内容
List<Long> ids = new ArrayList<>(list.size());//保存查询到的店铺id,后续根据id查询shop
Map<String, Distance> distanceMap = new HashMap<>(list.size());//保存id和距离对
list.stream().skip(from).forEach(result -> {
//4.2 获取店铺id
String shopIdStr = result.getContent().getName();
ids.add(Long.valueOf(shopIdStr));
//4.3 获取距离
Distance distance = result.getDistance();
distanceMap.put(shopIdStr, distance);
});
//5.根据id查询shop
String idsStr = StrUtil.join(",", ids);
List<Shop> shops = query().in("id", ids).last("ORDER BY FIELD( id," + idsStr + ")").list();
//遍历shop,将距离set到shop中
for (Shop shop : shops) {
shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());
}
return Result.ok(shops);
}这里用到了stream流的
skip()
,可以截取指定位置后的所有数据
这里我直接运行会报错,如下:
1 | Error in execution; nested exception is io.lettuce.core.RedisCommandExecutionException: ERR unknown command 'GEOSEARCH' |
经过查阅,出现问题的原因是服务端的redis版本过低。这里我的版本确实低,只有4.0(x_x),需要6.2以上版本,因为GEOSEARCH
是6.2的新功能。所以这里从官网下载了6.2.5版本,参考以前的笔记Redis入门中进行相关设置。
此时查看商户列表,可以看到包含了距离,并且按照距离排序:
我在上面逻辑实现的L21中,开始将
includeDistance()
写成了includeCoordination()
,导致得到的距离都是0,这里bug找了好久。
includeDistance()
相当于GEOSEARCH中的[WITHCOORD],返回的是经纬度坐标- 而
includeCoordination()
则相当于 [WITHDIST],返回的是距离信息
用户签到
BitMap用法
假如我们使用一张表来存储用户签到的信息,其结构如下:
假如有1000W用户,平均没人每年签到10次,那这张表一年的数据量就有1亿条。这样即消耗内存又影响效率。
有什么更好地方法?类似签到表,这里可以使用二进制位来记录每个月的签到情况,签到记录为1,未签到记录为0。把每一个bit位对应当月的每一天,形成映射关系,用0和1标识业务状态,这种思路就成为位图(BitMap)。这样我们就能用极小的空间,来实现大量数据的表示。
Redis中是利用String类型数据结构实现BitMap,因此最大上限是512M,转换为bit则是2^32个bit位
BitMap的操作命令:
- SETBIT:向指定位置(offset)存入一个0或1
- GETBIT:获取指定位置(offset)的bit值
- BITCOUNT:统计BitMap中值为1的bit位的数量
- BITFIELD:操作(查询、修改、自增)BitMap中bit数组中的指定位置(offset)的值
- BITFIELD_RO:获取BitMap中bit数组,并以十进制形式返回
- BITOP:将多个BitMap的结果做位运算(与、或、异或)
- BITPOS:查找bit数组中指定范围内第一个0或1出现的位置
签到功能
需求:实现签到接口,将当前用户当天签到信息保存到Redis中
代码实现:
把用户id、年和月作为BitMap的key,然后保存到一个BitMap中,每次签到获取今天是当前月的第几天,就把对应位上的0变成1,只要是1就说明这一天已经签到了,反之则没有签到
控制器方法
1
2
3
4
public Result sign(){
return userService.sign();
}serviceImlp逻辑实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public Result sign() {
//获取当前用户
Long userId = UserHolder.getUser().getId();
//获取日期
LocalDateTime now = LocalDateTime.now();
//拼接key
String date = now.format(DateTimeFormatter.ofPattern("yyyyMM"));
String key = USER_SIGN_KEY + userId + date;
//获取今天是这个月的第几天
int dayOfMonth = now.getDayOfMonth();
//写入redis setbit key offset 0|1
stringRedisTemplate.opsForValue().setBit(key,dayOfMonth-1,true);
return Result.ok();
}注意:getDayOfMonth()返回的是1-31,但是位图从0开始,所以需要dayOfMonth-1
因为这里前端没有实现功能额,所以使用PostMan发送请求测试,注意请求头中需携带登录用户的token
然后在Redis中可以看到对应生成的BitMap(今天是25号,所以第25位为1)
签到统计
Q1:什么叫连续签到天数?从最后一天签到到开始向前统计,直到遇到第一次未签到为止,统计总的签到次数,就是连续签到天数。
Q2:如何获取本月到今天为止的所有签到数据?BITFIELD key GET u[dayOfMonth] 0
Q3:如何从后往前遍历每个bit位,获取连续签到天数?与1做与运算,就能得到最后一个bit位;随后右移一位,下一个bit位就成最后一个bit位
需求:实现下面接口,统计当前用户截止当前时间在本月的连续签到天数
代码实现:
控制器方法
1
2
3
4
public Result signCount(){
return userService.signCount();
}逻辑实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public Result signCount() {
//获取当前用户
Long userId = UserHolder.getUser().getId();
//获取日期
LocalDateTime now = LocalDateTime.now();
//拼接key
String date = now.format(DateTimeFormatter.ofPattern("yyyyMM"));
String key = USER_SIGN_KEY + userId + date;
//获取今天是这个月的第几天
int dayOfMonth = now.getDayOfMonth();
//获取本月截至今天为止所有的签到记录,返回的是一个十进制数字 BITFIELD key GET dayOfMonth 0
List<Long> result = stringRedisTemplate.opsForValue().bitField(
key,
BitFieldSubCommands.create()
.get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0)
);
if (result == null | result.isEmpty()) {
return Result.ok(0);
}
Long num = result.get(0);
if (num == 0 || num == null) {
return Result.ok(0);
}
//循环遍历,
int count = 0;
while (true){
if ((num & 1) == 0){
break;
}else {
count++;
}
//把数字右移一位,抛弃最后一个bit位,继续下一个bit位
num >>>= 1;
}
return Result.ok(count);
}补充:按位移位运算符:
>>
表示算术右移,>>>
表示逻辑右移。参考:Java中>>和>>>的区别>>
,它在右移n位(扩展符号)后保留符号(正数或负数)。>>>
,它会在右移n位(零扩展)后忽略符号,都在高位插入0。
UV统计
HyperLogLog用法
首先搞懂两个概念:
- UV:全称Unique Visitor,也叫独立访客量,是指通过互联网访问、浏览这个网页的自然人。1天内同一个用户多次访问该网站,只记录1次。
- PV:全称Page View,也叫页面访问量或点击量,用户每访问网站的一个页面,记录1次PV,用户多次打开页面,则记录多次PV。往往用来衡量网站的流量。
UV统计在服务端做会很麻烦,因为要判断该用户是否已经统计过了,需要将统计过的信息保存,但是如果每个访问的用户都保存到Redis中,那么数据库会非常恐怖,那么该如何处理呢?
**HyperLogLog(HLL)**是从Loglog算法派生的概率算法,用户确定非常大的集合基数,而不需要存储其所有值,算法相关原理可以参考下面这篇文章:https://juejin.cn/post/6844903785744056333#heading-0
Redis中的HLL是基于string结构实现的,单个HLL的内存永远小于16kb,内存占用低的令人发指!作为代价,其测量结果是概率性的,有小于0.81%的误差。不过对于UV统计来说,这完全可以忽略。
常用方法:(就这三个)
1 | PFADD key element [element...] |
实现UV统计
使用单元测试,向HyperLogLog中添加100万条数据,看看内存占用和统计效果如何:
1 |
|
得到的count为997593,误差率为0.002407%。在Redis中使用info memory
命令查看占用情况为:12.3K字节(插入前和插入后相减)