Skip to main content

黑马点评(速通版)

· 39 min read
ayanami

个人环境 Ubuntu 24.04

项目配置

  • repo: 搜一搜就行 https://github.com/cs001020/hmdp?tab=readme-ov-file

  • idea config: 降java版本到11就能不报错

  • redis, mysql: 搜索即可 systemd 启动

  • nginx 稍微复杂一点, 给的是win下的nginx, 配完systemd之后,用他的nginx.conf替换/etc/nginx/nginx.conf(记得备份) 然后修改

        # 指定前端项目所在的位置
location / {
root /home/ayanami/www/hmdp/html/hmdp; # 修改此处, 改为${下载的nginx文件夹原来位置}/hmdp/html/hmdp
<!--truncate--> index index.html index.htm;
}

即可

可能还需要在顶部修改 user ${Your User Name}

ps: nginx似乎套一层后会让之前的连接的token之类invalid掉, 例如b站 kimi 退出登录

目前不知道除了简单sudo systemctl stop nginx的方法

会不会nginx丢docker里面之类别放本机跑好一点

登录

threadlocal

redis 存 session, 做水平拓展负载均衡

用户校验 phone key, 验证码 value

session信息, value 用hash而不用string(json)

存储session数据 value是一个"HashMap", 支持单字段crud, 内存占用少

key ? 生成一个唯一 token, 返回给客户端

redis key加业务前缀"login:code:"

设置有效期

redisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone, code, 2, TimeUnit.MINUTES);

设置30min有效期

        // 保存用户信息到redis
UserDTO userDTO = new UserDTO();
BeanUtils.copyProperties(user, UserDTO.class);
String token = UUID.randomUUID().toString(true);
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO);
stringRedisTemplate.opsForHash().putAll(LOGIN_USER_KEY + token, userMap);
stringRedisTemplate.expire(LOGIN_USER_KEY + token, 30, TimeUnit.MINUTES);
return Result.ok();

"用户不活跃30min": 在拦截器里面更新token有效期(再调用一次expire就行)

自定义拦截器不能做依赖注入? 老实写构造函数, 在外部@Configuration的Configurer中注入, 再调用registry时构造

快捷键缩写行尾.var

这样redis + threadlocal就绕开了登录中tomcat的session机制,减少session传递开销

前端得到token在请求头里面放

    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 1.获取请求头token
// 2.由token获取redis用户
// 3.判断token是否存在
// 4.保存用户到threadlocal
// 5.刷新用户token有效期
String token = request.getHeader("authorization");
if (StrUtil.isBlank(token)){
response.setStatus(401); // unauthorized
return false;
}
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(LOGIN_USER_KEY + token);
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);

// exist, save user info
UserHolder.saveUser(userDTO);

stringRedisTemplate.expire(LOGIN_USER_KEY + token, LOGIN_USER_TTL, TimeUnit.MINUTES);
return true;
}

拦截器只拦需要登录的路径->再加一个新的拦截器, 拦所有路径

拦截器顺序, 可以.order调整优先级, 从order小到order大执行, 也可以默认(相同order按照添加顺序)

缓存一致性问题

  • 内存淘汰(redis机制)
  • 超时剔除(用户指定)
  • 主动更新(立刻触发)

低一致性的部分内存淘汰或者超时剔除就行, 如类型

高一致性主动更新+超时兜底, 如详情

主动更新

  • cache aside 缓存调用者负责更新
  • read/write through 缓存和数据库整体作为一个服务维持一致性
  • Write Behind Caching 调用者只操作缓存,其他线程异步将缓存持久化到db(可加批处理)

后两种较复杂

cache aside

删除还是更新?

删除是一种lazy alloc, 如果在多次更新中间没有查询就浪费了,还不如删,一般删除缓存

缓存db原子性, 单体->事务, 分布式->TCC等分布式事务

先后?

先删缓存, 再更新db

并发下数据可以不一致, T1 delete cache,T2 cache miss, query db, update cache, T1 write db

得到缓存和db不一致! 并且触发概率很高, 因为查db慢

先更新db再删除缓存, 大部分时候是一致的, 除了

  1. 缓存已失效
  2. T1 查缓存, 查数据库
  3. T2 更新数据库, 删缓存
  4. T1 更新缓存

这样才会不一致, 但缓存已失效和T1查操作比T2更新操作还慢两个条件同时满足是罕见的, 所以好

所以先更新db再删除缓存, 还有同步问题交给超时时间

同时先db再缓存方便出错时回滚, 如果先缓存再db, db出错滚不了redis缓存(需要手动修改redis)

缓存穿透

客户端恶意请求不存在数据,打崩数据库

  • 缓存空对象: 额外的内存消耗, 短期不一致, 通过expire TTL控制
  • 布隆过滤: 内存少, 但有假阳性 -> redis bitmap
  • 增强id复杂度
  • 做好数据的基础格式校验
  • 用户权限校验, 热点参数限流

缓存雪崩:

大量缓存key同时失效(例如同时过期)或者redis宕机, 大量请求打到db, db宕机

  • key TTL 加随机值
  • 降级限流
  • 多级缓存(前端, nginx, jvm)等

缓存击穿:

热点key(被高并发访问且缓存重建业务比较复杂(例如db需要做多表join和运算)的key)失效

N个线程都在请求db并且尝试缓存重建

  • 互斥锁, 重建先拿锁
  • 逻辑过期

互斥锁的不足在于线程还是会空等, 但保证一致性, 并且实现非常简单

逻辑过期解决根本问题, 热点key不采用redis的过期策略不就好了?(顷刻炼化!)

不设置TTL, 而是在value里面加上{expired: timestamp}

在业务代码发现逻辑过期之后, 拿锁, 开启一个新线程(一般实际上是给线程池一个任务)做整个查询和重建缓存的操作

然后在这个新线程做完放锁之前, 其他线程就尝试拿锁-失败-返回旧数据

这里的自定义互斥锁用了个很生草的方法, redis的setnx命令

setnx lock 1只有一个线程能成功

放锁就del lock

估计底层是CAS或者就是简单的mutex之类, 但放在redis里面了, 简化了java逻辑

锁的超时时间? 看业务时间来定

基于锁的查询代码demo如下

@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {
@Autowired
private StringRedisTemplate stringRedisTemplate;

private Boolean tryLock(String key){
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}
private void unlock(String key){
stringRedisTemplate.delete(key);
}
public Result queryShopById(Long id) throws InterruptedException {
Map<Object,Object> shopMap = stringRedisTemplate.opsForHash().entries(CACHE_SHOP_KEY + id);
if (!shopMap.isEmpty()){
Shop shop = BeanUtil.fillBeanWithMap(shopMap, new Shop(), false);
return Result.ok(shop);
}
// cache miss
// 避免缓存击穿, 先拿锁
String CACHE_SHOP_LOCK = "shop:lock:";
if (tryLock(CACHE_SHOP_LOCK + id)){
try {
Shop shop = super.getById(id);
if (shop != null){
stringRedisTemplate.opsForHash().putAll(CACHE_SHOP_KEY + id, BeanUtil.beanToMap(shop, new HashMap<>(),
CopyOptions.create().setIgnoreNullValue(true)
.setFieldValueEditor((fieldname, fieldvalue) -> fieldvalue == null ? "" : fieldvalue.toString())));
stringRedisTemplate.expire(CACHE_SHOP_KEY + id, CACHE_SHOP_TTL, TimeUnit.MINUTES);
return Result.ok(shop);
}
// 缓存穿透, 写入空缓存
HashMap<Object, Object> nullMap = new HashMap<>();
nullMap.put("null", "null");
stringRedisTemplate.opsForHash().putAll(CACHE_SHOP_KEY + id, nullMap);
stringRedisTemplate.expire(CACHE_SHOP_KEY + id, CACHE_NULL_TTL, TimeUnit.MINUTES);
return Result.fail("商铺不存在");
} finally {
unlock(CACHE_SHOP_LOCK + id);
}
}
// 拿不到, 等待后重试
Thread.sleep(100);
return queryShopById(id);
}

}

然后用Jmeter 起1000个线程用200QPS打一打

发现这个锁的逻辑还是没什么问题的, 只查询了一次缓存

还可以这么封装一下

package com.hmdp.utils;

import cn.hutool.core.util.BooleanUtil;
import cn.hutool.json.JSONUtil;
import io.lettuce.core.RedisURI;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.io.Serializable;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;


@Slf4j
@Component
public class CacheClient {
private StringRedisTemplate stringRedisTemplate;


public CacheClient(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}

public void set(String key, Object value, long timeout, TimeUnit unit) {
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), timeout, unit);
}

public <R> Optional<R> get(String key, Class<R> type){
String json = stringRedisTemplate.opsForValue().get(key);
if (json == null){
return Optional.empty();
}
R result = JSONUtil.toBean(json, type);
if (result == null){
return Optional.empty();
}
return Optional.of(result);
}

public void delete(String key){
stringRedisTemplate.delete(key);
}

public <R, ID extends Serializable> Optional<R> queryWithPassThrough(String keyPrefix, ID id, Class<R> type, Function<ID, Optional<R>> dbQuery, long timeout, TimeUnit unit) {
// 1. 查缓存
Optional<R> cacheVal = get(keyPrefix + id, type);
if(cacheVal.isPresent()){
return cacheVal;
}
// 2. 不存在查db
Optional<R> result = dbQuery.apply(id);
// 3. 存在返回
if (result.isPresent()){
// 3.1 更新缓存
set(keyPrefix + id, result.get(), timeout, unit);
return result;
}
// 4. 均不存在更新缓存为空
set(keyPrefix + id, null, timeout, unit);
return Optional.empty();
}

private Boolean tryLock(String key){
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}
private void unlock(String key){
stringRedisTemplate.delete(key);
}

// 用于热点key(被高并发访问且缓存重建业务比较复杂(例如db需要做多表join和运算)的key), 避免缓存击穿
public <R, ID extends Serializable> Optional<R> queryWithLock(String keyPrefix, ID id, Class<R> type, Function<ID, Optional<R>> dbQuery, long timeout, TimeUnit unit) {
// 1. 查缓存
Optional<R> cacheVal = get(keyPrefix + id, type);
if(cacheVal.isPresent()){
return cacheVal;
}
// 2. 不存在查db
// 避免缓存击穿, 先拿锁
String CACHE_LOCK = "lock:" + keyPrefix + ":" + id;
if (tryLock(CACHE_LOCK)){
try {
Optional<R> result = dbQuery.apply(id);
System.out.println("result = " + result);
// 3. 存在返回
if (result.isPresent()){
// 3.1 更新缓存
set(keyPrefix + id, result.get(), timeout, unit);
return result;
}
// 4. 均不存在更新缓存为空
set(keyPrefix + id, "{}", timeout, unit);
return Optional.empty();
} catch (Exception e) {
throw new RuntimeException(e);
}
finally {
unlock(CACHE_LOCK);
}
}
// 5. 未获取到锁, 等待后重试
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return queryWithLock(keyPrefix, id, type, dbQuery, timeout, unit);
}
}

订单

id生成器

不采用自增ID, 主要是自增不方便分表, 改为全局唯一ID生成器

如何实现唯一ID? 还是用redis实现并发下的控制, 也方便拓展到分布式, 使用redis的incr

id设计为 时间戳+seqNo, 还有就是把key设置为日期以避免超出32位

package com.hmdp.utils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;

@Component
public class RedisIdWorker {
@Autowired
private StringRedisTemplate stringRedisTemplate;

private static final long BEGIN_TIMESTAMP = 1640995200L;
private static final long COUNT_SEQ_BITS = 32;

public long nextId(String keyPrefix){
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timeStamp = nowSecond - BEGIN_TIMESTAMP;
// id: sign bit + timestamp + seqNo
// 为了避免超过2^32, 使用日作为key的后缀
// 获取日期
String yyyyMMdd = now.format(DateTimeFormatter.ofPattern("yyyyMMdd")); // "20241019"
long seqNo = stringRedisTemplate.opsForValue().increment("incr:" + keyPrefix + ":" + yyyyMMdd);
return timeStamp << COUNT_SEQ_BITS | seqNo;
}
}

秒杀超卖问题

简单的getStock查询库存, 不足返回在高并发下是会寄的(jmeter 打一打, 打不出来的多试几次)

image-20241019222129520

    @Transactional
@Override
public Result seckillVoucher(Long voucherId) {
// 1. 是否存在该秒杀优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2. 是否在秒杀时间内
if (LocalDateTime.now().isBefore(voucher.getBeginTime()) || LocalDateTime.now().isAfter(voucher.getEndTime())) {
return Result.fail("不在秒杀时间内");
}
// 3. 是否还有库存
if (voucher.getStock() < 1) {
return Result.fail("库存不足");
}
// 4. 生成订单
VoucherOrder order = new VoucherOrder();
order.setId(redisIdWorker.nextId(SECKILL_STOCK_KEY));
// 5. 扣减库存
seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).update();
return Result.ok(order);
}

这样单单加一个@Transactional只是保证了错误回滚, 并不能保证并发次序(除非比如强指定isolation sequential)

库存减还没有commit

不是原子性导致的

所以要加锁: 悲观锁/乐观锁

乐观锁: 认为线程安全问题不一定会发生, 只在更新的时候判断是否有线程已经改过了

如果没有修改就是安全的, 有可以重试或者异常

判断修改过:

  • 版本号法

在这里就是, set stock = stock - 1 where id = 10 and stock = original_stock

(广义CAS)

这实际上是依赖于并发事务时会重新判断这一行是否成立

pg doc

UPDATE, DELETE, SELECT FOR UPDATE, and SELECT FOR SHARE commands behave the same as SELECT in terms of searching for target rows: they will only find target rows that were committed as of the command start time. However, such a target row might have already been updated (or deleted or locked) by another concurrent transaction by the time it is found. In this case, the would-be updater will wait for the first updating transaction to commit or roll back (if it is still in progress). If the first updater rolls back, then its effects are negated and the second updater can proceed with updating the originally found row. If the first updater commits, the second updater will ignore the row if the first updater deleted it, otherwise it will attempt to apply its operation to the updated version of the row. The search condition of the command (the WHERE clause) is re-evaluated to see if the updated version of the row still matches the search condition. If so, the second updater proceeds with its operation using the updated version of the row. In the case of SELECT FOR UPDATE and SELECT FOR SHARE, this means it is the updated version of the row that is locked and returned to the client. UPDATEDELETESELECT FOR UPDATESELECT FOR SHARE命令在搜索目标行方面与SELECT的行为相同:它们只会查找截至命令开始时间已提交的目标行。然而,这样的目标行在被发现时可能已经被另一个并发事务更新(或删除或锁定)。在这种情况下,潜在的更新程序将等待第一个更新事务提交或回滚(如果仍在进行中)。如果第一个更新程序回滚,则其效果将被否定,第二个更新程序可以继续更新最初找到的行。如果第一个更新程序提交,则第二个更新程序将忽略第一个更新程序删除的行,否则它将尝试将其操作应用于该行的更新版本。重新评估命令的搜索条件( WHERE子句),以查看该行的更新版本是否仍然与搜索条件匹配。如果是,则第二更新器使用该行的更新版本继续其操作。在SELECT FOR UPDATESELECT FOR SHARE的情况下,这意味着它是锁定并返回给客户端的行的更新版本。

mysql 默认是更高一档的 repeatable read

https://dev.mysql.com/doc/refman/8.4/en/innodb-consistent-read.html

update的set部分本身是一定有原子性的(只要支持锁和事务, 无非是锁整表还是行级锁)

但update需要一个read的过程, 这部分在低一致性下是可以不同的(不加读锁), 这也是这里需要检查的原因

最后就是简单加一个 eq判断

        boolean succcess = seckillVoucherService.update().setSql("stock = stock - 1").
eq("voucher_id", voucherId).
eq("stock", voucher.getStock()).
update();
if (!succcess){
return Result.fail("库存不足");
}

可以试试, 高qps一样打不动的

实际上有更好的方案, 可以观察到, 在还有库存的时候, 这样的判断方式也会导致部分请求由于触发了冲突而失败

所以用

        boolean succcess = seckillVoucherService.update().setSql("stock = stock - 1").
eq("voucher_id", voucherId).
gt("stock", 0). // 大于0就行
update();

一人一单

一个用户只能买一个优惠券 有以下几点需要注意:

  1. 这里用悲观锁, 由于是不停的insert, 所以乐观锁不好做
  2. 悲观锁的synchronized 的key用userId.toString().intern()
    • 不同user应该有不同的锁, 而不是一把大锁退化成串行.
    • synchronized必须放在(事务的)外面, 也就是调用的地方而不是被调用函数内部锁, 否则sychronized先退出,相当于先放锁再commit
    • 在锁的时候, 需要注意到userId是一个本地变量, 不能锁在对象上, 需要锁在字符串的字面值上, 所以有toString().intern(), 锁在了常量池的对象上
  3. 调用的时候, 需要注意到事务的底层实现是加了一个proxy代理类, 如果直接CheckAndSaveOrder是在this上调用, 就没有了事务的效果, 所以使用AopContext.currentProxy()获取代理类后调用.

(在依赖里面加上

        <dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.9.7</version>
</dependency>

在Application处加上@EnableAspectJAutoProxy(exposeProxy = true) 暴露代理

@EnableAspectJAutoProxy(exposeProxy = true)
@MapperScan("com.hmdp.mapper")
@SpringBootApplication
public class HmDianPingApplication {
//...
}

)

最后代码这样(定死id是方便测试)

    @Override
public Result seckillVoucher(Long voucherId) {
// 1. 是否存在该秒杀优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2. 是否在秒杀时间内
if (LocalDateTime.now().isBefore(voucher.getBeginTime()) || LocalDateTime.now().isAfter(voucher.getEndTime())) {
return Result.fail("不在秒杀时间内");
}
// 3. 是否还有库存
if (voucher.getStock() < 1) {
return Result.fail("库存不足");
}
// Long userId = UserHolder.getUser().getId();
Long userId = 1L;
VoucherOrder order;
synchronized (userId.toString().intern()) {
// get proxy object
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
order = proxy.CheckAndSaveOrder(voucherId);
if (order == null) {
return Result.fail("您已经抢购过了");
}
}

// 5. 扣减库存
boolean succcess = seckillVoucherService.update().setSql("stock = stock - 1").
eq("voucher_id", voucherId).
gt("stock", 0).
update();
if (!succcess) {
return Result.fail("库存不足");
}
return Result.ok(order);
}

@Transactional
@Override
public VoucherOrder CheckAndSaveOrder(Long voucherId){
// 一人一单校验
// boolean exist =super.lambdaQuery().eq(VoucherOrder::getUserId, UserHolder.getUser().getId()).
// eq(VoucherOrder::getVoucherId, voucherId).count() > 0;
boolean exist =super.lambdaQuery().eq(VoucherOrder::getUserId, 1).
eq(VoucherOrder::getVoucherId, voucherId).count() > 0;
if (exist) {
return null;
}
// 4. 生成订单
VoucherOrder order = new VoucherOrder();
order.setId(redisIdWorker.nextId(SECKILL_STOCK_KEY));
order.setVoucherId(voucherId);
order.setCreateTime(LocalDateTime.now());
order.setUpdateTime(LocalDateTime.now());
order.setStatus(1);
// Long userId = UserHolder.getUser().getId();
Long userId = 1L;
order.setUserId(userId);
boolean success = super.save(order);
if (!success) {
return null;
}
return order;

}

集群安全问题

加锁可以解决单机一人一单, 但不能解决集群一人一单

如下就寄

    upstream backend {
server 127.0.0.1:8081 max_fails=5 fail_timeout=10s weight=1;
server 127.0.0.1:8082 max_fails=5 fail_timeout=10s weight=1;
}

(可以copy一个application再启动(vmoptions:-Dserver.port=8082改端口)就让idea又启动了一个tomcat)

经典把加锁逻辑丢到redis

分布式锁

synchronized调用本地JVM的锁监视器

分布式锁:多进程可见且互斥

  • 高可用

  • 高性能

  • 安全性

  • 可重入?公平?...

分布式锁:

mysqlrediszookeeper
互斥mysql本身的互斥锁setnx这样的互斥命令节点唯一性和有序性实现互斥
高可用
高性能一般一般(强一致性)
安全性断开连接自动释放需要手动删key,但可以利用过期时间断开连接自动释放

redis setnx: set if not exists

setnx key value
expire key 10
del key

setnx和expire中间宕机?-> 原子性

合并成一条命令

SET lock thread1 EX 10 NX
del lock

一个大概的代码像这样

public class RedisDistributedLock implements ILock{
@Resource
private StringRedisTemplate redisTemplate;
private static final String LOCK_PREFIX = "lock:";

private static String getKey(String name){
return LOCK_PREFIX + name;
}

@Override
public boolean tryLock(String name, long Timeout) {
String key = getKey(name);
long value = Thread.currentThread().getId();
Boolean success = redisTemplate.opsForValue().setIfAbsent(key, String.valueOf(value), Timeout, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success); // success != null && success
}

@Override
public void unlock(String name) {
redisTemplate.delete(getKey(name));
}
}

存在的问题:

业务A阻塞超过超时时间->另一个线程B拿锁->阻塞线程A结束阻塞,把锁释放->寄,这时候C可以抢B锁了

解决方案,释放前检查这锁还是不是自己的

获取锁->业务超时或者服务宕机, 自动释放锁/成功执行业务,判断锁的持有者还是否是自己,是自己手动释放锁

锁标识-> UUID(as jvm id) + 线程id

public class RedisDistributedLock implements ILock {
@Resource
private StringRedisTemplate redisTemplate;
private static final String LOCK_PREFIX = "lock:";
// JVM唯一标识
private static final String JVMPrefix = UUID.randomUUID().toString(true);

private static String getKey(String name){
return LOCK_PREFIX + name;
}
private static String getUniqueValue(){
return JVMPrefix + "-" + Thread.currentThread().getId();
}
@Override
public boolean tryLock(String name, long Timeout) {
String key = getKey(name);
String value = getUniqueValue();
Boolean success = redisTemplate.opsForValue().setIfAbsent(key, value, Timeout, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success); // success != null && success
}

@Override
public void unlock(String name) {
String whoTakes = redisTemplate.opsForValue().get(getKey(name));
if (getUniqueValue().equals(whoTakes)){
redisTemplate.delete(getKey(name));
}
}
}

但还有问题

在判断完的时候,触发了GC阻塞,然后超时释放了,然后其他线程拿走,然后就是误删

提供原子性: redis 的 lua 脚本模式

多条redis命令写到一行lua里面,从而让redis保证原子性

redis的EVAL命令执行脚本

help @scripting

例如

EVAL "return redis.call('set', KEYS[1], ARGV[1])" 1 name Rose

1:有1个key类型参数, 放入KEYS[1]

local key = KEYS[1]
local me = ARGV[1]

-- 获取锁的持有者
local whoTakes = redis.call('get', key)
if(whoTakes == me) then
return redis.call('del', key)
end
return 0

最后这样就可以引入了

private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setResultType(Long.class);
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua")); // classpath:unlock.lua
// classpath, 类路径, 默认是当前目录,但是springboot + maven会将resources,依赖等目录自动加入classpath简化配置
}
public void unlock(String name) {
String whoTakes = redisTemplate.opsForValue().get(getKey(name));
// if (getUniqueValue().equals(whoTakes)){
// redisTemplate.delete(getKey(name));
// }
// 为了避免误删,保证GC或者其他java内部堵塞下的原子性,使用lua脚本
redisTemplate.execute(UNLOCK_SCRIPT, Collections.singletonList(getKey(name)), getUniqueValue());
}

这个简单的分布式锁剩下的问题:

  • 不可重入->改造脚本
  • 不可重试
  • 超时释放
  • 主从一致 主节点获取锁尚未同步给从节点,然后主节点挂了

使用成熟框架 Redisson

各种分布式锁实现:

可重入锁、公平锁、联锁、读写锁、可过期信号量、...

package com.hmdp.config;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RedissonConfig {
@Value("${spring.redis.host}")
private String host;

@Value("${spring.redis.port}")
private String port;

@Value("${spring.redis.password}")
private String password;
@Bean
public RedissonClient redissonClient(){
Config config = new Config();
String address = "redis://" + host + ":" + port;
config.useSingleServer().setAddress(address).setPassword(password);
return Redisson.create(config);
}
}

tryLock 尝试获取锁间隔时间,自动释放时间, 时间单位

配置完之后可以简单的demo测试

import com.hmdp.HmDianPingApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = HmDianPingApplication.class)
public class RedissonDemoTest {
@Resource
private RedissonClient redissonClient;

@Test
public void TryLockTest() throws InterruptedException {
RLock lock = redissonClient.getLock("redisson:anylock");
// 注意这里的key不能和redisTemplate的key重复,否则影响类型判断会寄
boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
if(isLock){
try {
System.out.println("Get lock success");
} finally {
lock.unlock();
}
}
}
}

之前的api大概修改成

da//    @Resource
// private RedisDistributedLock lock;
@Resource
private RedissonClient redissonClient;

@Override
public Result seckillVoucher(Long voucherId) {
// 1. 是否存在该秒杀优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2. 是否在秒杀时间内
if (LocalDateTime.now().isBefore(voucher.getBeginTime()) || LocalDateTime.now().isAfter(voucher.getEndTime())) {
return Result.fail("不在秒杀时间内");
}
// 3. 是否还有库存
if (voucher.getStock() < 1) {
return Result.fail("库存不足");
}
// Long userId = UserHolder.getUser().getId();
Long userId = 1L;
VoucherOrder order;
// synchronized (userId.toString().intern()) {
// // get proxy object
// IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
// order = proxy.CheckAndSaveOrder(voucherId);
// if (order == null) {
// return Result.fail("您已经抢购过了");
// }
// }
RLock lock = redissonClient.getLock("redisson:order:lock:" + userId);
boolean success = false;
try{
success = lock.tryLock(1, 10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
if(lock.isHeldByCurrentThread()){
lock.unlock();
}
throw new RuntimeException(e);
}

if(!success){
// 此用户并发下单
return Result.fail("您已经抢购过了");
}
// 拿到锁
try{
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
order = proxy.CheckAndSaveOrder(voucherId);
if (order == null) {
return Result.fail("您已经抢购过了");
}
// 5. 扣减库存
success = seckillVoucherService.update().setSql("stock = stock - 1").
eq("voucher_id", voucherId).
gt("stock", 0).
update();
} finally {
lock.unlock();
}

if (!success) {
return Result.fail("库存不足");
}
return Result.ok(order);
}

打一打发现确实ok

redisson是符合java线程规范的,只有拿锁的线程可以放锁

redisson可重入锁原理

redis的hset, hset的field是threadId, value是计数值

之后lua脚本, 拿锁放锁修改计数即可(hset, hexists, hincrby)

redisson的lock有类似futex的设计,利用了redis的pub/sub和future(底层的park/unpark/回调)机制达到让出cpu而不是自旋的机制(在等待锁的时候订阅解锁事件,然后await让出)

还有一个watch dog机制, 如果指定leaseTime为-1,则让redisson启动一个后台线程,在要过期的时候自动续约延长锁的expire时间(每30s)

没有解决的问题也就是主从复制主宕机导致的从重新选举主,导致主锁信息丢失,从拿锁的问题

image-20241027002737472

主从同步问题怎么解决?

不要主从, 全部变成redis node(node可以有slave)

获取锁变成需要在全部的redis node上拿到锁才行->只要有一个node存活,就可以避免slave错误拿到锁

放锁的时候简单能放都放(草),放不了(比如挂了)要么依赖超时,要么依赖重试

总之不负责节点挂了或者心跳方案之类,就是redis已有的主从机制,一个节点挂了就等redis给他切换到slave, 才能再次成功拿锁

超时是有用的(雾

拿锁是原子的, 如果有拿不到会回滚

这叫multilock

缺陷:运维成本高

运维成本和主节点宕机的情况下分布式锁正常运行这个edge case的trade off

优化秒杀

jmetor测QPS,准备好token文件,在请求里面引用变量后看聚合报告

现在:db密集+分布式锁,性能差

改造为异步操作,queue, db不要堵在这里

查询->判断库存->查询订单->校验一人一单->减库存->创建订单

判断库存&校验 放在redis? 结果存到阻塞队列

减库存和创建订单是update拿锁串行写的->交给后台线程异步刷, 甚至批量写

同时redis内部一段逻辑也需要是原子的->lua脚本

lua脚本逻辑

--- param: userId, voucherId
--- return: result(int): 0 success, 1 库存不足, 2 一人一单
--- 注意orderid是java里面的全局id生成器(redisIdWorker)生成的, 不需要lua管
--- 判断时间可以放在脚本里面,但没有必要,不管在里面还是外面都需要到时清掉redis, 不如redis直接把相关key的超时设置成时间(或者起额外的定时任务),简化脚本逻辑
local userId = ARGV[1]
local voucherId = ARGV[2]

local stockKey = "seckill:stock:" .. voucherId
local orderKey = "seckill:order:" .. voucherId --- order set(记录买过的userId集合)

--- 判断库存
if(tonumber(redis.call('get', stockKey) <= 0)) then
return 1
end

--- 判断用户是否下过单
if(tonumber(redis.call('sismember', orderKey, userId)) == 1) then
return 2
end

--- 成功下单,减少库存,记录已下单
redis.call('incrby', stockKey, -1)
redis.call('sadd', orderKey, userId)

整个异步的代码流程

  • 注入一个线程池,例如单个线程的
  • 创建一个任务,无限地从阻塞队列之中读取信息,并进行处理。使用@PostConstruct注解,在初始化结束的时候,将这个任务提交给线程池
  • 在任务里面需要注意,此时userId和proxy都不能再使用(都是从threadlocal里面拿的),proxy提前保存在外面,userId从传入的VoucherOrder对象拿

最后的代码示例

package com.hmdp.service.impl;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.Voucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IUserService;
import com.hmdp.service.IVoucherOrderService;
import com.hmdp.utils.CacheClient;
import com.hmdp.utils.RedisDistributedLock;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.aop.framework.AopContext;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.*;

import static com.hmdp.utils.RedisConstants.SECKILL_STOCK_KEY;
import static com.hmdp.utils.RedisConstants.SECKILL_VOUCHER_KEY;


@Slf4j
@Service
public class VoucherOrderServiceOptImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private RedissonClient redissonClient;
@Resource
private CacheClient cacheClient;
@Resource
private StringRedisTemplate stringRedisTemplate;

private static ArrayBlockingQueue<VoucherOrder> orderQueue;
private static ExecutorService orderExecutor;
static {
orderQueue = new ArrayBlockingQueue<VoucherOrder>(1024 * 1024);
orderExecutor = Executors.newSingleThreadExecutor();
}
private static DefaultRedisScript<Long> seckillScript;
static {
seckillScript = new DefaultRedisScript<>();
seckillScript.setResultType(Long.class);
seckillScript.setLocation(new ClassPathResource("classpath:seckill.lua"));
}
private final IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
private final Runnable orderTask = ()->{
while(true){
try {
VoucherOrder order = orderQueue.take(); // 当队列为空时,会阻塞
handleOrder(order);
} catch (Exception e) {
log.error("下单异常", e);
}
}
};
@PostConstruct
public void init(){
orderExecutor.submit(orderTask);
}

private void handleOrder(VoucherOrder order){
Long userId = order.getUserId();
RLock lock = redissonClient.getLock("redisson:order:lock:" + userId);
boolean success = false;
try{
success = lock.tryLock(1, 10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("获取锁异常", e);
}

if(!success){
// 此用户并发下单, 这里因为redis已经是原子性脚本,原则上不应该出现这种情况
log.error(userId + "已经抢购过了");
}
// 拿到锁
try{
success = proxy.CheckAndSaveOrder(order);
if (!success) {
log.error("下单失败");
}
// 5. 扣减库存
success = seckillVoucherService.update().setSql("stock = stock - 1").
eq("voucher_id", order.getVoucherId()).
gt("stock", 0).
update();
if (!success) {
log.error("下单失败");
}
} finally {
lock.unlock();
}
}

@Override
public Result seckillVoucher(Long voucherId) {
// 1. 是否存在该秒杀优惠券
// SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
Optional<SeckillVoucher> voucherOptional = cacheClient.get(SECKILL_VOUCHER_KEY + voucherId, SeckillVoucher.class);
if (voucherOptional.isPresent()) {
return Result.fail("秒杀优惠券不存在");
}
SeckillVoucher voucher = voucherOptional.get();
// 2. 是否在秒杀时间内
if (LocalDateTime.now().isBefore(voucher.getBeginTime()) || LocalDateTime.now().isAfter(voucher.getEndTime())) {
return Result.fail("不在秒杀时间内");
}
// 3. 是否还有库存和一人一单(校验逻辑在lua脚本中)
Long userId = 1L;
Long resultLong = stringRedisTemplate.execute(seckillScript, Collections.emptyList(), voucherId.toString(), userId.toString());
int result = resultLong.intValue();
if (result == 1) {
return Result.fail("库存不足");
}
if (result == 2) {
return Result.fail("您已经抢购过了");
}
// 生成任务,返回结果
VoucherOrder order = new VoucherOrder();
order.setId(redisIdWorker.nextId(SECKILL_STOCK_KEY));
order.setVoucherId(voucherId);
order.setCreateTime(LocalDateTime.now());
order.setUpdateTime(LocalDateTime.now());
order.setStatus(1);
order.setUserId(userId);
orderQueue.offer(order);
return Result.ok(order);
}

@Transactional
public boolean CheckAndSaveOrder(VoucherOrder order){
boolean exist = super.lambdaQuery().eq(VoucherOrder::getUserId, 1).
eq(VoucherOrder::getVoucherId, order.getVoucherId()).count() > 0;
if (exist) {
return false;
}
return super.save(order);
}
}

平均值大概加速了几倍,而最小值加速非常多

问题:

  • jdk阻塞队列可能不够用(高并发打爆/超出队列容量)
  • 秒杀的数据安全问题,假如挂了就寄了

消息代理 message broker

redis消息队列:

  • list模拟
  • PubSub 点对点
  • Stream 比较完善的消息队列模型

list模拟需要用BRPOP, BRPOSH, BLPOP, BLPUSH得到阻塞效果

但无法避免消息丢失,并且只支持单消费者

pubsub

  • SUBSCRIBE channel [channel]订阅一个或者多个频道
  • PUBLISH channel msg向一个频道发消息
  • PSUBSCRIBE pattern [pattern] 订阅通配符的多个频道
    • 支持*, ?,[]

支持多生产多消费,但

  • 不支持数据持久化
  • 堆积有上限->放在消费者的缓存区
  • 无法避免消息丢失

Stream: 数据类型

最简单用法

XADD写

XADD users * name jack age 21
# XADD [key] * [field value [field value...]]
# * 表示redis生成id, 格式是 时间戳+seq ID(0,1,2,...),读的时候用seqID就行

XLEN查看key里面的消息长度

XREAD读

# XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key...] ID [ID...]
XREAD COUNT 1 STREAMS users 0

可以多读,每个client有自己的计数

XREAD COUNT 1 BLOCK 0 STREAMS users $
# $ 表示最新消息, BLOCK 0 永久阻塞

但$的行为还和offset不一样,他始终是“这条命令开始后”的最新,不是offset += count而是offset = queue.lastIndex();

  • 消息可回溯
  • 一发多读
  • 可以阻塞读
  • 但有漏读风险

消费者组 Consumer Group

  • 消息分流
  • 消息标识(offset) >从下一个未消费的消息开始
  • 消息确认,拿到消息后会进入pending-list, XACK标识处理完成前不移除

redistemplate opsForStream就行,和命令是对应的

得到的是list, 再得到单个{id, entries}的对象,最后把entries转换回消息体

实战

点赞:放用户ID在redis集合

还是先改db再改redis

点赞排行榜("xxx, ...等赞了",微信按时间排序)改成sortedset就行, 然后取出用户的时候加个order by

关注和取关 db层级是follow表,但缓存呢?全部消息队列?前端ws?

之前看到一个方案是给用户建一个未读消息表,后端登录后定时拉取一遍自己关注的所有人的新消息(在关注表里面 user_id, follow_id, last_update 比较最后更新时间和关注的人的新帖子就行),并通过ws/轮询等发给前端

这样的优点在于避免一个被几万几十万用户关注的动态对数据库造成大压力

关注列表应该放redis, 这个没有必要一直查表

共同关注: 由于已经有redis的关注列表了,直接redis求交就行

关注推送 Feed流

  • timeline 不做内容筛选
  • 智能排序 依赖于算法

timeline:

  • 拉模式(读扩散),只有“发件箱”没有“收件箱”,只有在读的时候才会去读副本(不落盘),消息只在发送端存,延时较高省内存,实现复杂
  • 推模式 (写扩散),只有“收件箱”没有“发件箱”,新消息直接存到所有关注者的收件箱,延时低,不需要拉取排序,但占磁盘,实现简单
  • 推拉结合(读写混合):
    • 推送数少(例如<1000)推,推送数多拉
    • 还可以判活,只给活跃用户推,不活跃用户拉,这样减少推的数量,又优化了活跃用户频繁读的内存开销
    • ...其他策略
    • 实现非常复杂

黑马的意思是千万以下的用户数推模式都不会有大问题草

推模式:

feed流的分页问题:读第一页->插入->读第二页???(感觉不是什么特别的case...别的分页一般也不考虑这个啊,可能高速更新的流需要)

解决方法倒是很简单,滚动分页,记录上一次的最后一条。之后从它向后N条

redis sortedset 按照 score range查询 (zrange byscore)

opsForZSet

附近商户 redis GEO

用户签到 BitMap

UV 统计hyperloglog:

  • UV 独立访问量(访客量)
  • PV 点击量

loglog算法派生的概率算法,不需要存值; 误差小于1%, 单个HLL的内存永远小于16KB

PFADD PFCOUNT PFMERGE

算法本身自动去重

Loading Comments...