Redis -- 延时队列

异步消息队列

Redis的list数据结构常用来作为异步消息队列使用,使用rpush/lpush操作入队,使用lpop/rpop来操作出队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
> rpush my-queue apple banana pear
(integer) 3
> llen my-queue
(integer) 3
> lpop my-queue
"apple"
> llen my-queue
(integer) 2
> lpop my-queue
"banana"
> llen my-queue
(integer) 1
> lpop my-queue
"pear"
> llen my-queue
(integer) 0
> lpop my-queue
(nil)

空队列

  1. 如果队列为空,客户端会陷入pop的死循环空轮询不仅拉高了客户端的CPURedis的QPS也会被拉高
  2. 如果空轮询的客户端有几十个,Redis的慢查询也会显著增加,可以尝试让客户端线程sleep 1s
  3. 但睡眠会导致消息的延迟增大,可以使用blpop/brpop(blocking,阻塞读
    • 阻塞读在队列没有数据时,会立即进入休眠状态,一旦有数据到来,会立即被唤醒消息延迟几乎为0

空闲连接

  1. 如果线程一直阻塞在那里,Redis的客户端连接就成了闲置连接
  2. 闲置过久,服务器一般会主动断开连接,减少闲置的资源占用,此时blpop/brpop抛出异常

锁冲突处理

  1. 分布式锁加锁失败的处理策略
    • 直接抛出异常,通知用户稍后重试
    • sleep后再重试
    • 将请求转移到延时队列,过一会重试
  2. 抛出异常
    • 这种方式比较适合由用户直接发起的请求
  3. sleep
    • sleep会阻塞当前的消息处理线程,从而导致队列的后续消息处理出现延迟
    • 如果碰撞比较频繁,sleep方案不合适
  4. 延时队列
    • 比较适合异步消息处理的场景,通过将当前冲突的请求转移到另一个队列延后处理避免冲突

延时队列

  1. 可以通过Redis的zset来实现延时队列
  2. 将消息序列化成一个字符串作为zet的value,将该消息的到期处理时间作为score
  3. 然后多线程轮询zset获取到期的任务进行处理
    • 多线程是为了保障可用性,但同时要考虑并发安全,确保任务不能被多次执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public class RedisDelayingQueue<T> {
@Data
@AllArgsConstructor
@NoArgsConstructor
private static class TaskItem<T> {
private String id;
private T msg;
}

private Type taskType = new TypeReference<TaskItem<T>>() {
}.getType();

private Jedis jedis;
private String queueKey;

public RedisDelayingQueue(Jedis jedis, String queueKey) {
this.jedis = jedis;
this.queueKey = queueKey;
}

public void delay(T msg) {
TaskItem<T> task = new TaskItem<>(UUID.randomUUID().toString(), msg);
jedis.zadd(queueKey, System.currentTimeMillis() + 5000, JSON.toJSONString(task));
}

public void loop() {
// 可以进一步优化,通过Lua脚本将zrangeByScore和zrem统一挪到Redis服务端进行原子化操作,减少抢夺失败出现的资源浪费
while (!Thread.interrupted()) {
// 只取一条
Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
if (values.isEmpty()) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
break;
}
continue;
}
String s = values.iterator().next();
if (jedis.zrem(queueKey, s) > 0) {
// zrem是多线程多进程争夺任务的关键
TaskItem<T> task = JSON.parseObject(s, taskType);
this.handleMsg(task.msg);
}
}
}

private void handleMsg(T msg) {
try {
System.out.println(msg);
} catch (Throwable ignored) {
// 一定要捕获异常,避免因为个别任务处理问题导致循环异常退出
}
}

public static void main(String[] args) {
final RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(new Jedis("localhost", 16379), "q-demo");
Thread producer = new Thread() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
queue.delay("zhongmingmao" + i);
}
}

};
Thread consumer = new Thread() {
@Override
public void run() {
queue.loop();
}

};

producer.start();
consumer.start();
try {
producer.join();
Thread.sleep(6000);
consumer.interrupt();
consumer.join();
} catch (InterruptedException ignored) {
}
}
}
0%