Redis客户端

1 通信协议

Redis监听默认6379的端口号,可以通过TCP方式建立连接。服务端约定了一种特殊的消息格式,每个命令都是以\r\n(CRLF回车+换行)结尾。这种编码格式之前在AOF文件中见到了,叫做Redis Serialization Protocol(RESP,redis序列化协议),发消息或者相应消息需要按这种格式编码,接受消息需要按这种格式解码,redis设计这种格式的原因:容易实现,解析快,可读性强。Redis6.0新特性里面说得RESP协议升级到了3.0版本,其实就是对于服务端和客户端可以接受的消息进行了升级扩展,比如客户端缓存的功能就是在这个版本中实现的。自己实现一个Redis的java客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class MyClient{
private Socket socket;
private OutputStream write;
private InputStream read;

public MyClient(String host, int port) throws IOException{
socket = new Socket(host, port);
write = socket.getOutputStream();
read = socket.getInputStream();
}

public void set(String key, String val) throws IOException{
StringBuffer sb = new StringBuffer();
// 代表3个参数
sb.append("*3").append("\r\n");
// 第一个参数(set)的长度
sb.append("#3").append("\r\n");
// 第一个参数的内容
sb.append("SET").append("\r\n");
// 第二个参数key的长度,(不定,动态获取)
sb.append("$").append(key.getBytes().length).append("\r\n");
// 第二个参数key的内容
sb.append(key).append("\r\n");
// 第三个参数value的长度,(不定,动态获取)
sb.append("$").append(val.getBytes().length).append("\r\n");
// 第三个参数val的内容
sb.append(val).append("\r\n");

//发送命令
write.write(sb.toString().getBytes());
byte[] bytes = new byte[1024];
//接受响应
read.read(bytes);
System.out.printLn(new String(bytes));
}

public void get(String key) throws IOException{
StringBuffer sb = new StringBuffer();
// 代表2个参数
sb.append("*2").append("\r\n");
// 第一个参数(get)的长度
sb.append("#3").append("\r\n");
// 第一个参数的内容
sb.append("GET").append("\r\n");
// 第二个参数key的长度,(不定,动态获取)
sb.append("$").append(key.getBytes().length).append("\r\n");
// 第二个参数key的内容
sb.append(key).append("\r\n");

//发送命令
write.write(sb.toString().getBytes());
byte[] bytes = new byte[1024];
//接受响应
read.read(bytes);
System.out.printLn(new String(bytes));
}

public static void main(String[] args) throws IOException{
MyClient client = new MyClient("192.168.44.181", 6379);
client.set("qs", "2673");
client.get("qs");
}
}

使用这种协议,可以用java实现所有的redis操作命令。

2 常用客户端

https://redis.io/resources/clients/#java,官网推荐的java客户端有三个:Jedis,Redisson和Luttce

客户端 作用
Jedis 体系非常小,但是功能很完善
Lettuce 高级客户端,支持线程安全,异步,反应式编程,支持集群,哨兵,pipeline,编解码
Redisson 基于Redis服务实现的java分布式可扩展的数据结构

Spring操作redis提供了一个模板方法,RedisTemplate。实际上这个并不是spring官方开发的一个客户端呢。Spring定义了一个连接工厂接口:redisConnectionFactory。这个接口有很多实现,例如:JedisConnectionFactory,JredisConnectionFactory,LettuceConnectionFactory,SrpConnectionFactory。也就是说,RedisTemplate对其他现成的客户端再进行了一层封装而已。在springboot 2.x版本之前,RedisTemplate默认使用Jedis。2.x版本之后,默认使用Lettuce。

3 Jedis

https://github.com/redis/jedis/

3.1 Jedis功能特性

Jedis是最熟悉和最常用的客户端。如果不用RedisTemplate,就可以直接创建Jedis的连接。

1
2
3
4
5
6
public static void main(String[] args){
Jedis jedis = new Jedis("127.0.0.1", 6379);
jedis.set("qs", 6379);
System.out.printLn(jedis.get("qs"));
jedis.close();
}

Jedis有一个问题:多个线程使用一个连接的时候线程不安全。

下面也提供了解决思路:使用连接池,为每个请求创建不同的连接,基于Apache common pool实现。Jedis的连接池有三个实现:JedisPool,ShardedJedisPool。JedisSentinePool。都是用getResource从连接池获取一个连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//普通连接池
public static void ordinaryPool(){
JedisPool pool = new JedisPool("192.168.44.181", 6379);
Jedis jedis = pool.getResource();
System.out.printLn(jedis.get("qs"));
}

//分片连接池
public static void shardedPool(){
JedisPoolConfig poolConfig = new JedisPoolConfig();
JedisSjardInfo shardInfo = new JedisShardInfo("192.168.44.181", 6379);
List<JedisShardInfo> infoList = Arrays.asList(shardInfo);
ShardedJedisPool jedisPool = new ShardedJedisPool(poolConfig, infoList);

ShardedJedis jedis = jedisPool.getResource();
System.out.printLn(jedis.get("qs"));
}

//哨兵连接池
public static void sentinelPool(){
String masterName = "redis-master";
Set<String> sentinels = new HashSet<>();
sentinels.add("192.168.44.186:26379");
sentinels.add("192.168.44.187:26379");
sentinels.add("192.168.44.188:26379");

JedisSentinelPool pool = new JedisSentinelPool(masterName, sentinels);
System.out.printLn(pool.getResource().get("qs"));
}

Jedis的功能比较完善,redis官方的特性全部支持,比如发布订阅,事务,Lua脚本,客户端分片,哨兵,集群,pipeline等等。

3.2 Sentinel获取连接原理

在构造方法中(JedisSentinelPool)

1
pool = new JedisSentinelPool(masterName, sentinels);

调用了:

1
HostAndPort master = initSentinels(sentinels, masterName);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
  private HostAndPort initSentinels(Set<String> sentinels, String masterName) {
HostAndPort master = null;
boolean sentinelAvailable = false;
this.log.info("Trying to find master from available Sentinels...");
Iterator var5 = sentinels.iterator();

String sentinel;
HostAndPort hap;
//有多个sentinels,遍历这些sentinel
while(var5.hasNext()) {
sentinel = (String)var5.next();
//host:port表示的sentinel地址转化成一个HostAndPort对象
hap = HostAndPort.parseString(sentinel);
this.log.debug("Connecting to Sentinel {}", hap);
Jedis jedis = null;

try {
//连接到sentinel
jedis = new Jedis(hap.getHost(), hap.getPort(), this.sentinelConnectionTimeout,
this.sentinelSoTimeout);
if (this.sentinelUser != null) {
jedis.auth(this.sentinelUser, this.sentinelPassword);
} else if (this.sentinelPassword != null) {
jedis.auth(this.sentinelPassword);
}

if (this.sentinelClientName != null) {
jedis.clientSetname(this.sentinelClientName);
}
//根据masterName得到master的地址,返回一个list,host=list[0],port=list[1]
List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);
sentinelAvailable = true;
if (masterAddr != null && masterAddr.size() == 2) {
//如果在任何一个sentinel中找到了master,不再遍历sentinels
master = this.toHostAndPort(masterAddr);
this.log.debug("Found Redis master at {}", master);
break;
}
this.log.warn("Can not get master addr, master name: {}. Sentinel: {}", masterName, hap);
} catch (JedisException var13) {
this.log.warn("Cannot get master address from sentinel running @ {}."+
" Reason: {}. Trying next one.", hap, var13);
} finally {
if (jedis != null) {
jedis.close();
}

}
}
//到这里,如果master为null,则说明两种情况,一种是所有的sentinel节点都down了,一种是master节点没有被存活的sentinel监控到
if (master == null) {
if (sentinelAvailable) {
throw new JedisException("Can connect to sentinel, but " + masterName
+ " seems to be not monitored...");
} else {
throw new JedisConnectionException("All sentinels down, cannot determine where is "
+ masterName + " master is running...");
}
} else {
//如果走到这里,说明找到了master地址
this.log.info("Redis master running at {}, starting Sentinel listeners...", master);
var5 = sentinels.iterator();
//启动对每个sentinels的监听为每个sentinel都启动了一个监听者MasterListener。MasterListener本身是一个线程,它会去订阅sentinel上关于master节点地址改变的信息
while(var5.hasNext()) {
sentinel = (String)var5.next();
hap = HostAndPort.parseString(sentinel);
MasterListener masterListener = new MasterListener(masterName, hap.getHost(),
hap.getPort());
masterListener.setDaemon(true);
this.masterListeners.add(masterListener);
masterListener.start();
}
return master;
}
}

3.3 Cluster获取连接原理

使用Jedis连接Cluster的时候,我们只需要连接到任意一个或者多个redis group中的实例地址,为了避免get,set的时候发生重定向错误,需要把slot和redis节点的关系保存起来,在本地计算slot,就可以获得redis节点信息。

注意slots的数量是写死的,不能修改。

第一,在服务端表示16384个位,只需要2kb的大小(每个group维护一个位数组,在16384bit里面把对应下表的值改成1,就代表slot由当前节点负责)。再大的话,获取slots信息有点浪费通信资源。

第二,一般来说集群的节点数不会特别大,16384个slots够他们分了。

关键问题:在于如何存储slot和redis连接池的关系。

  1. 程序启动初始化集群环境,读取配置文件中的节点配置,无论是主从,无论多少个,只拿第一个,获取redis连接实例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolCOnfig, String password){
for(HostAndPort hostAndPort : startNodes){
//获取一个Jedis实例
Jedis jedis = new Jedis(hostAndPort.getHost(),hostAndPort.getPort());
if(password != null){
jedis.auth(password);
}
try{
//获取redis节点和Slot虚拟槽
cache.discoverClusterNodesAndSlots(jedis);
//直接跳出循环
break;
} catch(JedisConnectionException e){

} finally {
if(jedis != null){
jedis.close();
}
}
}
}
  1. discoverClusterNodesAndSlots方法,用获取的redis 连接实例执行clusterSlots()方法,实际执行redis服务端cluster slots命令,获取虚拟槽信息。该集合的基本信息为[long.long,List,List],第一,第二个元素是该节点负责槽点的其实位置,第三个元素是主节点信息,第四个元素为主节点对应的从节点信息。该list的基本信息为[string,int,string],第一个为host信息,第二个为port信息,第三个为唯一id。

  2. 获取有关节点的槽点信息后,调用getAssignedSlotArray(slotinfo)来获取所有的槽点值。

  3. 再获取主节点的地址信息,调用generateHostAndPort(hostInfo)方法,生成一个hostAndPort对象。

  4. 再assignSlotsToNode方法中,再根据节点地址信息来设置节点对应的JedisPool,即设置Map<Srting, JedisPool> nodes的值。

接下来判断若此时节点信息为主节点信息时,则调用assignSlotsToNodes方法,设置每个槽点对应的连接池(slave不需要连接),即设置Map<Integer, JedisPool> slots的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void discoverClusterNodesSlots(Jedis jedis){
w.lock();
try{
reset();
//获取节点集合
List<Object> slots = jedis.clusterSlots();
//遍历3个master节点
for(Object slotInfoObj : slots){
List<object> slotInfo = (List<Object>) slotInfoObj;
//如果<=2,代表没有分配slot
if(slotInfo.size() <= MASTER_NODE_INDEX){
continue;
}
//获取分配到当前master节点的数据槽
List<Integer> slotNums = getAssignedSlotArray(slotInfo);
int size = slotInfo.size();
//第3位和第4位是主从端口的信息
for(int i = MASTER_NODE_INDEX ; i < size ; i++){
List<Object> hostInfos = (List<Object>) slotInfo.get(i);
if(hostInfos.size() <= 0){
continue;
}
//根据IP端口生成HostAndPort实例
HostAndPort targetNode = generateHostAndPort(hostInfos);
//根据HostAndPort解析出ip:port的key值,再根据key从缓存中查询对应的jedisPool实例。如果没有jedisPool实例,就创建JedisPool实例,最后放入缓存中,nodeKey和nodePool的关系
setupNodeIfNotExist(targetNode);
if(i == MASTER_NODE_INDEX){
assignSlotsToNode(slotNums, targetNode);
}
}
}
} finally {
w.unlock();
}
}

很明显,这个Map有16384个key,key对应的value是一个连接池信息。有几个Redis Group(或者说有几个master),就有几个不同的连接池。获取slot和redis实例的对应关系之后,接下来就是从集群环境获取值。Jedis集群模式下所有的命令都要调用这个方法:核心代码JedisClusterCommand#runWithRetries

1
connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
  1. 把key作为参数,执行CRC16算法,获取key对应的slot值。

  2. 通过该slot值,去slots的map集合中获取jedisPool实例。

  3. 通过jedisPool实例获取jedis实例,最终完成redis数据存取工作。

3.4 Jedis实现分布式锁

分布式锁的基本需求:

​ 1. 互斥性:只有一个客户端能够持有锁

​ 2. 不会产生死锁:即使持有锁的客户端奔溃,也能保证后续其他客户端可以获取锁。

​ 3. 只有持有这把锁的客户端才能解锁。

1
2
3
4
5
6
7
public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime){
String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
if(LOCK_SUCCESS.equals(result)){
return true;
}
return false;
}

​ 1. lockKey是redis key的名称,谁添加成功这个key,就代表谁获取锁成功。比如有一把修改1001账户余额的锁。

​ 2. requestId是客户端的ID(设置成value),如果我们要保证只有加锁的客户端才能释放锁,就必须获取客户端的ID(保证第三点,自己才能解锁)。

​ 3. SET_IF_NOT_EXIST是我们的命令中加上NX(保证第一点,互斥)。

​ 4. SET_WITH_EXPIRE_TIME,PX代表以毫秒为单位设置key的过期时间(保证第二点,不会死锁)。expireTime是自动释放锁的时间,比如5000代表5秒。

如果在释放锁的时候,这把锁已经不属于这个客户端(例如已经过期,并且被别的客户端获取锁成功了),那就会出现释放了其他客户端的锁的情况。所以,要先判断是不是自己加的锁,才能释放,为了保证原子性,我们把判断客户端是否相等和删除key的操作放在Lua脚本中执行。

1
2
3
4
5
6
7
8
public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId){
String script = "if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('del',KEY[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey), CollectionssingletonList(requestId));
if(RELEASE_SUCCESS.equals(result)){
return true;
}
return false;
}

3.5 Pipline

平时说redis是单线程的,说的是redis的请求是单线程处理的,只有上一个命令的结果响应以后,下一个命令才会处理。如果要一次操作10万个key,客户端跟服务端就要交互10万次,排队的时间加上网络通信的时间,就会很慢。举个例子,假设一次交互的网络延迟的1毫秒,客户端1秒钟最多只能发送1000个命令。这个就太浪费服务端的性能了。

​ 能不能像把一组命令组装在一起发送给redis服务端执行,然后一次性获得返回结果呢?这个就是Pipeline的作用。Pipeline通过一个队列把所有的命令缓存起来,然后把多个命令在一次连接中发送给服务器。

​ 要是按Pipeline,既要服务端的支持,也要客户端的支持。对于服务端来说,需要能够处理客户端通过一个TCP连接发来的多个命令,并且逐个地执行命令一起返回。对于客户端来说,要把多个命令缓存起来,达到一定地条件就发送出去,最后才处理redis地应答(这里要注意对客户端内存地消耗)。jedis-pipeline的client0buffer限制:8192bytes,客户端堆积的命令超过8M时,会发送给服务端。

1
2
3
public RedisOutputStream(final OutputStream out){
this(out, 8192);
}

pipeline对于命令条数没有限制,但是命令可能会受限于TCP包大小。需要注意的是,并不是所有业务场景都要用pipeline。如果某些操作需要马上得到redis操作是否成功的结果,这种场景就不合适。有些场景,例如批量写入数据,对于结果的实时性和成功性要求不高,就可以用Pipeline。

4 Lettuce

与Jedis相比,Lettuce则完全克服了其线程不安全的缺点:Lettuce是一个可伸缩的线程安全的redis客户端,支持同步,异步和响应式模式(Reactive)。多个线程可以共享一个连接实例,而不必担心多线程并发问题。

Lettuce基于Netty框架构建,支持Redis的全部高级功能,如发布订阅,事务,Lua脚本,Sentinel,集群,pipeline支持连接池。Lettuce是springboot 2.x默认的客户端,替换了jedis。集成之后我们不需要单独使用它,直接调用spring的redisTemplate操作,连接和创建和关闭也不需要我们操心。

5 Redisson

Redisson是一个在redis的基础上实现的java内存数据网格(In-Memory Data Grid),提供了分布式和可扩展的java数据结构,比如分布式的Map,List,Queue,Set,不需要自己去运行一个服务实现。

5.1 特点

基于Netty实现,采用非阻塞IO,性能高;支持异步请求。支持连接池,pipeline,Lua脚本,redis Sentinel, redis cluster。不支持事务,官方建议以lua脚本代替事务。主从,哨兵,集群都支持。Spring也可以配置和注入RedissonClient。

5.2 实现分布式锁

在Redisson里面提供了更见简单的分布式锁的实现。

加锁:

1
2
3
4
5
6
7
8
9
public static void main(String[] args) throws Exception{
RLock rLock = redissonClient.getLock("updateAccount");
//最多等待100秒,上锁10后自动解锁
if(rLock.tryLock(100, 10, TimeUnit.SECONDS)){
System.out.printLn("获取锁成功")
}
//do something
rLock.unlock();
}

在获得锁之后,只需要一个tryLock方法,里面有三个参数:

​ 1.waitTime:获取锁的最大等待时间,超过这个时间不再尝试获取锁。

​ 2.leaseTime:如果没有调用unlock,超过了这个时间会自动释放锁。

​ 3.TimeUnit:释放时间的单位

Redisson的分布式锁的实现原理,在加锁的时候,在redis写入了一个HASH,key是锁名称,field是线程名称,value是1(表示锁的重入次数)。

trylock() ->tryAcquire() -> tryAcquireAsync()-> trylockInnerAsync(),最终也是调用了一段lua脚本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//KEYS[1]锁名称 updateAccount
//ARGV[1]key过期时间10000ms
//ARGV[2]线程名称
//锁名称不存在
if(redis.call('exists',KEYS[1]==0)) then
//创建一个hash,key=锁名称,field=线程名,value=1
redis.call('hset',KEYS[1],ARGV[2],1);
//设置hash的过期时间
redis.call('prxpire',KEYS[1],ARGV[1]);
return nil;
end;
//锁名称存在,判断是否当前线程持有的锁
if(redis.call('hexists',KEYS[1],ARGV[2])==1) then
//如果是,value+1,代表重入次数+1
redis.call('hincrby',KEYS[1],ARGV[2],1);
//重新获得锁,需要重新设置key的过期时间
redis.call('pexpire',KEYS[1],ARGV[1]);
return nil;
end;
//锁存在,但是不是当前线程持有,返回过期时间(毫秒)
return redis.call('pttl',KEYS[1]);

释放锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//KEYS[1]锁名称 updateAccount
//KEYS[2]频道名称rredisson_lock_channel:{updateAccount}
//ARGV[1]释放锁的消息 0
//ARGV[2]释放时间 10000
//ARGV[3]线程名称
//锁不存在,过期或者已经释放
if(redis.call('exists',KEYS[1])==0) then
//发布锁已经释放的消息
redis.call('publish',KEYS[2],ARGV[1]);
return 1;
end;
//锁存在,但是不是当前线程加的锁
if(redis.call('hexists',KEYS[1],ARGV[3])==0) then
return nil;
end;
//锁存在,是当前线程加的锁,重入次数-1
local counter=redis.call('hincrby',KEYS[1],ARGV[3],-1);
//减1后大于0,说明这个线程持有这把锁还有其他的任务需要执行
if(counter>0) then
//重新设置锁的过期时间
redis.call('pexpire',KEYS[1],ARGV[2]);
return 0;
else
//减1后等于0,释放锁
redis.call('del',KEYS[1]);
//释放后发布消息
redis.call('publish',KEYS[2],ARGV[1]);
return 1;
end;
//其他情况返回nil
return nil;

这个是Redisson里面分布式锁的实现,在调用的时候非常简单。

​ 1.业务没执行完,锁到期了怎么办? watchdog(Redisson github wiki)

​ 2.集群模式下,如果对多个master加锁,导致重复加锁怎么办? Redisson会自动选择同一个master

​ 3.业务没执行完,redis master挂了怎么办? redis slave还有这个数据

​ Redisson跟jedis定位不同,它不是一个单纯的redis客户端,而是基于redis实现的分布式的服务,如果有需要用到一些分布式的数据结构,比如还可以基于Redisson的分布式队列实现分布式事务,就可以引入redisson的依赖实现。


Redis客户端
http://www.zivjie.cn/2023/03/04/中间件/redis/Redis客户端/
作者
Francis
发布于
2023年3月4日
许可协议