1 通信协议
Redis监听默认6379的端口号,可以通过TCP方式建立连接。服务端约定了一种特殊的消息格式,每个命令都是以\r\n(CRLF回车+换行)结尾。这种编码格式之前在AOF文件中见到了,叫做Redis Serialization Protocol(RESP,redis序列化协议),发消息或者相应消息需要按这种格式编码,接受消息需要按这种格式解码,redis设计这种格式的原因:容易实现,解析快,可读性强。Redis6.0新特性里面说得RESP协议升级到了3.0版本,其实就是对于服务端和客户端可以接受的消息进行了升级扩展,比如客户端缓存的功能就是在这个版本中实现的。自己实现一个Redis的java客户端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| public class MyClient{ private Socket socket; private OutputStream write; private InputStream read; public MyClient(String host, int port) throws IOException{ socket = new Socket(host, port); write = socket.getOutputStream(); read = socket.getInputStream(); } public void set(String key, String val) throws IOException{ StringBuffer sb = new StringBuffer(); sb.append("*3").append("\r\n"); sb.append("#3").append("\r\n"); sb.append("SET").append("\r\n"); sb.append("$").append(key.getBytes().length).append("\r\n"); sb.append(key).append("\r\n"); sb.append("$").append(val.getBytes().length).append("\r\n"); sb.append(val).append("\r\n"); write.write(sb.toString().getBytes()); byte[] bytes = new byte[1024]; read.read(bytes); System.out.printLn(new String(bytes)); } public void get(String key) throws IOException{ StringBuffer sb = new StringBuffer(); sb.append("*2").append("\r\n"); sb.append("#3").append("\r\n"); sb.append("GET").append("\r\n"); sb.append("$").append(key.getBytes().length).append("\r\n"); sb.append(key).append("\r\n"); write.write(sb.toString().getBytes()); byte[] bytes = new byte[1024]; read.read(bytes); System.out.printLn(new String(bytes)); } public static void main(String[] args) throws IOException{ MyClient client = new MyClient("192.168.44.181", 6379); client.set("qs", "2673"); client.get("qs"); } }
|
使用这种协议,可以用java实现所有的redis操作命令。
2 常用客户端
https://redis.io/resources/clients/#java,官网推荐的java客户端有三个:Jedis,Redisson和Luttce
| 客户端 |
作用 |
| Jedis |
体系非常小,但是功能很完善 |
| Lettuce |
高级客户端,支持线程安全,异步,反应式编程,支持集群,哨兵,pipeline,编解码 |
| Redisson |
基于Redis服务实现的java分布式可扩展的数据结构 |
Spring操作redis提供了一个模板方法,RedisTemplate。实际上这个并不是spring官方开发的一个客户端呢。Spring定义了一个连接工厂接口:redisConnectionFactory。这个接口有很多实现,例如:JedisConnectionFactory,JredisConnectionFactory,LettuceConnectionFactory,SrpConnectionFactory。也就是说,RedisTemplate对其他现成的客户端再进行了一层封装而已。在springboot 2.x版本之前,RedisTemplate默认使用Jedis。2.x版本之后,默认使用Lettuce。
3 Jedis
https://github.com/redis/jedis/
3.1 Jedis功能特性
Jedis是最熟悉和最常用的客户端。如果不用RedisTemplate,就可以直接创建Jedis的连接。
1 2 3 4 5 6
| public static void main(String[] args){ Jedis jedis = new Jedis("127.0.0.1", 6379); jedis.set("qs", 6379); System.out.printLn(jedis.get("qs")); jedis.close(); }
|
Jedis有一个问题:多个线程使用一个连接的时候线程不安全。
下面也提供了解决思路:使用连接池,为每个请求创建不同的连接,基于Apache common pool实现。Jedis的连接池有三个实现:JedisPool,ShardedJedisPool。JedisSentinePool。都是用getResource从连接池获取一个连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public static void ordinaryPool(){ JedisPool pool = new JedisPool("192.168.44.181", 6379); Jedis jedis = pool.getResource(); System.out.printLn(jedis.get("qs")); }
public static void shardedPool(){ JedisPoolConfig poolConfig = new JedisPoolConfig(); JedisSjardInfo shardInfo = new JedisShardInfo("192.168.44.181", 6379); List<JedisShardInfo> infoList = Arrays.asList(shardInfo); ShardedJedisPool jedisPool = new ShardedJedisPool(poolConfig, infoList); ShardedJedis jedis = jedisPool.getResource(); System.out.printLn(jedis.get("qs")); }
public static void sentinelPool(){ String masterName = "redis-master"; Set<String> sentinels = new HashSet<>(); sentinels.add("192.168.44.186:26379"); sentinels.add("192.168.44.187:26379"); sentinels.add("192.168.44.188:26379"); JedisSentinelPool pool = new JedisSentinelPool(masterName, sentinels); System.out.printLn(pool.getResource().get("qs")); }
|
Jedis的功能比较完善,redis官方的特性全部支持,比如发布订阅,事务,Lua脚本,客户端分片,哨兵,集群,pipeline等等。
3.2 Sentinel获取连接原理
在构造方法中(JedisSentinelPool)
1
| pool = new JedisSentinelPool(masterName, sentinels);
|
调用了:
1
| HostAndPort master = initSentinels(sentinels, masterName);
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| private HostAndPort initSentinels(Set<String> sentinels, String masterName) { HostAndPort master = null; boolean sentinelAvailable = false; this.log.info("Trying to find master from available Sentinels..."); Iterator var5 = sentinels.iterator();
String sentinel; HostAndPort hap; while(var5.hasNext()) { sentinel = (String)var5.next(); hap = HostAndPort.parseString(sentinel); this.log.debug("Connecting to Sentinel {}", hap); Jedis jedis = null;
try { jedis = new Jedis(hap.getHost(), hap.getPort(), this.sentinelConnectionTimeout, this.sentinelSoTimeout); if (this.sentinelUser != null) { jedis.auth(this.sentinelUser, this.sentinelPassword); } else if (this.sentinelPassword != null) { jedis.auth(this.sentinelPassword); }
if (this.sentinelClientName != null) { jedis.clientSetname(this.sentinelClientName); } List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName); sentinelAvailable = true; if (masterAddr != null && masterAddr.size() == 2) { master = this.toHostAndPort(masterAddr); this.log.debug("Found Redis master at {}", master); break; } this.log.warn("Can not get master addr, master name: {}. Sentinel: {}", masterName, hap); } catch (JedisException var13) { this.log.warn("Cannot get master address from sentinel running @ {}."+ " Reason: {}. Trying next one.", hap, var13); } finally { if (jedis != null) { jedis.close(); }
} }
if (master == null) { if (sentinelAvailable) { throw new JedisException("Can connect to sentinel, but " + masterName + " seems to be not monitored..."); } else { throw new JedisConnectionException("All sentinels down, cannot determine where is " + masterName + " master is running..."); } } else { this.log.info("Redis master running at {}, starting Sentinel listeners...", master); var5 = sentinels.iterator(); while(var5.hasNext()) { sentinel = (String)var5.next(); hap = HostAndPort.parseString(sentinel); MasterListener masterListener = new MasterListener(masterName, hap.getHost(), hap.getPort()); masterListener.setDaemon(true); this.masterListeners.add(masterListener); masterListener.start(); } return master; } }
|
3.3 Cluster获取连接原理
使用Jedis连接Cluster的时候,我们只需要连接到任意一个或者多个redis group中的实例地址,为了避免get,set的时候发生重定向错误,需要把slot和redis节点的关系保存起来,在本地计算slot,就可以获得redis节点信息。

注意slots的数量是写死的,不能修改。
第一,在服务端表示16384个位,只需要2kb的大小(每个group维护一个位数组,在16384bit里面把对应下表的值改成1,就代表slot由当前节点负责)。再大的话,获取slots信息有点浪费通信资源。
第二,一般来说集群的节点数不会特别大,16384个slots够他们分了。
关键问题:在于如何存储slot和redis连接池的关系。
- 程序启动初始化集群环境,读取配置文件中的节点配置,无论是主从,无论多少个,只拿第一个,获取redis连接实例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolCOnfig, String password){ for(HostAndPort hostAndPort : startNodes){ Jedis jedis = new Jedis(hostAndPort.getHost(),hostAndPort.getPort()); if(password != null){ jedis.auth(password); } try{ cache.discoverClusterNodesAndSlots(jedis); break; } catch(JedisConnectionException e){ } finally { if(jedis != null){ jedis.close(); } } } }
|
discoverClusterNodesAndSlots方法,用获取的redis 连接实例执行clusterSlots()方法,实际执行redis服务端cluster slots命令,获取虚拟槽信息。该集合的基本信息为[long.long,List,List],第一,第二个元素是该节点负责槽点的其实位置,第三个元素是主节点信息,第四个元素为主节点对应的从节点信息。该list的基本信息为[string,int,string],第一个为host信息,第二个为port信息,第三个为唯一id。
获取有关节点的槽点信息后,调用getAssignedSlotArray(slotinfo)来获取所有的槽点值。
再获取主节点的地址信息,调用generateHostAndPort(hostInfo)方法,生成一个hostAndPort对象。
再assignSlotsToNode方法中,再根据节点地址信息来设置节点对应的JedisPool,即设置Map<Srting, JedisPool> nodes的值。
接下来判断若此时节点信息为主节点信息时,则调用assignSlotsToNodes方法,设置每个槽点对应的连接池(slave不需要连接),即设置Map<Integer, JedisPool> slots的值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public void discoverClusterNodesSlots(Jedis jedis){ w.lock(); try{ reset(); List<Object> slots = jedis.clusterSlots(); for(Object slotInfoObj : slots){ List<object> slotInfo = (List<Object>) slotInfoObj; if(slotInfo.size() <= MASTER_NODE_INDEX){ continue; } List<Integer> slotNums = getAssignedSlotArray(slotInfo); int size = slotInfo.size(); for(int i = MASTER_NODE_INDEX ; i < size ; i++){ List<Object> hostInfos = (List<Object>) slotInfo.get(i); if(hostInfos.size() <= 0){ continue; } HostAndPort targetNode = generateHostAndPort(hostInfos); setupNodeIfNotExist(targetNode); if(i == MASTER_NODE_INDEX){ assignSlotsToNode(slotNums, targetNode); } } } } finally { w.unlock(); } }
|
很明显,这个Map有16384个key,key对应的value是一个连接池信息。有几个Redis Group(或者说有几个master),就有几个不同的连接池。获取slot和redis实例的对应关系之后,接下来就是从集群环境获取值。Jedis集群模式下所有的命令都要调用这个方法:核心代码JedisClusterCommand#runWithRetries
1
| connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
|
把key作为参数,执行CRC16算法,获取key对应的slot值。
通过该slot值,去slots的map集合中获取jedisPool实例。
通过jedisPool实例获取jedis实例,最终完成redis数据存取工作。
3.4 Jedis实现分布式锁
分布式锁的基本需求:
1. 互斥性:只有一个客户端能够持有锁
2. 不会产生死锁:即使持有锁的客户端奔溃,也能保证后续其他客户端可以获取锁。
3. 只有持有这把锁的客户端才能解锁。
1 2 3 4 5 6 7
| public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime){ String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if(LOCK_SUCCESS.equals(result)){ return true; } return false; }
|
1. lockKey是redis key的名称,谁添加成功这个key,就代表谁获取锁成功。比如有一把修改1001账户余额的锁。
2. requestId是客户端的ID(设置成value),如果我们要保证只有加锁的客户端才能释放锁,就必须获取客户端的ID(保证第三点,自己才能解锁)。
3. SET_IF_NOT_EXIST是我们的命令中加上NX(保证第一点,互斥)。
4. SET_WITH_EXPIRE_TIME,PX代表以毫秒为单位设置key的过期时间(保证第二点,不会死锁)。expireTime是自动释放锁的时间,比如5000代表5秒。
如果在释放锁的时候,这把锁已经不属于这个客户端(例如已经过期,并且被别的客户端获取锁成功了),那就会出现释放了其他客户端的锁的情况。所以,要先判断是不是自己加的锁,才能释放,为了保证原子性,我们把判断客户端是否相等和删除key的操作放在Lua脚本中执行。
1 2 3 4 5 6 7 8
| public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId){ String script = "if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('del',KEY[1]) else return 0 end"; Object result = jedis.eval(script, Collections.singletonList(lockKey), CollectionssingletonList(requestId)); if(RELEASE_SUCCESS.equals(result)){ return true; } return false; }
|
3.5 Pipline
平时说redis是单线程的,说的是redis的请求是单线程处理的,只有上一个命令的结果响应以后,下一个命令才会处理。如果要一次操作10万个key,客户端跟服务端就要交互10万次,排队的时间加上网络通信的时间,就会很慢。举个例子,假设一次交互的网络延迟的1毫秒,客户端1秒钟最多只能发送1000个命令。这个就太浪费服务端的性能了。
能不能像把一组命令组装在一起发送给redis服务端执行,然后一次性获得返回结果呢?这个就是Pipeline的作用。Pipeline通过一个队列把所有的命令缓存起来,然后把多个命令在一次连接中发送给服务器。
要是按Pipeline,既要服务端的支持,也要客户端的支持。对于服务端来说,需要能够处理客户端通过一个TCP连接发来的多个命令,并且逐个地执行命令一起返回。对于客户端来说,要把多个命令缓存起来,达到一定地条件就发送出去,最后才处理redis地应答(这里要注意对客户端内存地消耗)。jedis-pipeline的client0buffer限制:8192bytes,客户端堆积的命令超过8M时,会发送给服务端。
1 2 3
| public RedisOutputStream(final OutputStream out){ this(out, 8192); }
|
pipeline对于命令条数没有限制,但是命令可能会受限于TCP包大小。需要注意的是,并不是所有业务场景都要用pipeline。如果某些操作需要马上得到redis操作是否成功的结果,这种场景就不合适。有些场景,例如批量写入数据,对于结果的实时性和成功性要求不高,就可以用Pipeline。
4 Lettuce
与Jedis相比,Lettuce则完全克服了其线程不安全的缺点:Lettuce是一个可伸缩的线程安全的redis客户端,支持同步,异步和响应式模式(Reactive)。多个线程可以共享一个连接实例,而不必担心多线程并发问题。
Lettuce基于Netty框架构建,支持Redis的全部高级功能,如发布订阅,事务,Lua脚本,Sentinel,集群,pipeline支持连接池。Lettuce是springboot 2.x默认的客户端,替换了jedis。集成之后我们不需要单独使用它,直接调用spring的redisTemplate操作,连接和创建和关闭也不需要我们操心。
5 Redisson
Redisson是一个在redis的基础上实现的java内存数据网格(In-Memory Data Grid),提供了分布式和可扩展的java数据结构,比如分布式的Map,List,Queue,Set,不需要自己去运行一个服务实现。
5.1 特点
基于Netty实现,采用非阻塞IO,性能高;支持异步请求。支持连接池,pipeline,Lua脚本,redis Sentinel, redis cluster。不支持事务,官方建议以lua脚本代替事务。主从,哨兵,集群都支持。Spring也可以配置和注入RedissonClient。
5.2 实现分布式锁
在Redisson里面提供了更见简单的分布式锁的实现。

加锁:
1 2 3 4 5 6 7 8 9
| public static void main(String[] args) throws Exception{ RLock rLock = redissonClient.getLock("updateAccount"); if(rLock.tryLock(100, 10, TimeUnit.SECONDS)){ System.out.printLn("获取锁成功") } rLock.unlock(); }
|
在获得锁之后,只需要一个tryLock方法,里面有三个参数:
1.waitTime:获取锁的最大等待时间,超过这个时间不再尝试获取锁。
2.leaseTime:如果没有调用unlock,超过了这个时间会自动释放锁。
3.TimeUnit:释放时间的单位
Redisson的分布式锁的实现原理,在加锁的时候,在redis写入了一个HASH,key是锁名称,field是线程名称,value是1(表示锁的重入次数)。

trylock() ->tryAcquire() -> tryAcquireAsync()-> trylockInnerAsync(),最终也是调用了一段lua脚本。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
if(redis.call('exists',KEYS[1]==0)) then redis.call('hset',KEYS[1],ARGV[2],1); redis.call('prxpire',KEYS[1],ARGV[1]); return nil; end;
if(redis.call('hexists',KEYS[1],ARGV[2])==1) then redis.call('hincrby',KEYS[1],ARGV[2],1); redis.call('pexpire',KEYS[1],ARGV[1]); return nil; end;
return redis.call('pttl',KEYS[1]);
|
释放锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
|
if(redis.call('exists',KEYS[1])==0) then redis.call('publish',KEYS[2],ARGV[1]); return 1; end;
if(redis.call('hexists',KEYS[1],ARGV[3])==0) then return nil; end;
local counter=redis.call('hincrby',KEYS[1],ARGV[3],-1);
if(counter>0) then redis.call('pexpire',KEYS[1],ARGV[2]); return 0; else redis.call('del',KEYS[1]); redis.call('publish',KEYS[2],ARGV[1]); return 1; end;
return nil;
|
这个是Redisson里面分布式锁的实现,在调用的时候非常简单。
1.业务没执行完,锁到期了怎么办? watchdog(Redisson github wiki)
2.集群模式下,如果对多个master加锁,导致重复加锁怎么办? Redisson会自动选择同一个master
3.业务没执行完,redis master挂了怎么办? redis slave还有这个数据
Redisson跟jedis定位不同,它不是一个单纯的redis客户端,而是基于redis实现的分布式的服务,如果有需要用到一些分布式的数据结构,比如还可以基于Redisson的分布式队列实现分布式事务,就可以引入redisson的依赖实现。