首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

Redis——10集群——10.5请求路由(集群客户端)

  • 25-03-02 13:43
  • 2641
  • 12217
blog.csdn.net

目前我们已经搭建好Redis集群并且理解了通信和伸缩细节, 但还没有使用客户端去操作集群。 Redis集群对客户端通信协议做了比较大的修改,为了追求性能最大化,并没有采用代理的方式而是采用客户端直连节点的方式。因此对于希望从单机切换到集群环境的应用需要修改客户端代码。本节我们关注集群请求路由的细节,以及客户端如何高效地操作集群。

10.5.1 请求重定向
在集群模式下,Redis接收任何键相关命令时首先计算键对应的槽,再根据槽找出所对应的节点,如果节点是自身,则处理键命令;否则回复MOVED重定向错误,通知客户端请求正确的节点。这个过程称为MOVED重定向,如图10-29所示。
例如, 在之前搭建的集群上执行如下命令:
127.0.0.1:6379> set key:test:1 value-1
OK

执行set命令成功,因为键key:test:1对应槽5191正好位于6379节点负责的槽范围内,可以借助cluster keyslot{key}命令返回key所对应的槽,如下所示:
127.0.0.1:6379> cluster keyslot key:test:1
(integer) 5191
127.0.0.1:6379> cluster nodes
cfb28ef1deee4e0fa78da86abe5d24566744411e 127.0.0.1:6379 myself,master - 0 0 10 connected
1366-4095 4097-5461 12288-13652
...
再执行以下命令,由于键对应槽是9252,不属于6379节点,则回复MOVED{slot}{ip}{port}格式重定向信息:
127.0.0.1:6379> set key:test:2 value-2
(error) MOVED 9252 127.0.0.1:6380
127.0.0.1:6379> cluster keyslot key:test:2
(integer) 9252
重定向信息包含了键所对应的槽以及负责该槽的节点地址,根据这些信息客户端就可以向正确的节点发起请求。在6380节点上成功执行之前的命令:
127.0.0.1:6380> set key:test:2 value-2
OK
使用redis-cli命令时,可以加入-c参数支持自动重定向,简化手动发起重定向操作,如下所示:
#redis-cli -p 6379 -c
127.0.0.1:6379> set key:test:2 value-2
-> Redirected to slot [9252] located at 127.0.0.1:6380
OK
redis-cli自动帮我们连接到正确的节点执行命令, 这个过程是在redis-cli内部维护,实质上是client端接到MOVED信息之后再次发起请求,并不在Redis节点中完成请求转发,如图10-30所示。
节点对于不属于它的键命令只回复重定向响应,并不负责转发。熟悉Cassandra的用户希望在这里做好区分,不要混淆。正因为集群模式下把解析发起重定向的过程放到客户端完成,所以集群客户端协议相对于单机有了很大的变化。
键命令执行步骤主要分两步:计算槽,查找槽所对应的节点。下面分别介绍。
1.计算槽
Redis首先需要计算键所对应的槽。根据键的有效部分使用CRC16函数计算出散列值,再取对16383的余数,使每个键都可以映射到0~16383槽范围内。伪代码如下:
def key_hash_slot(key):
    int keylen = key.length();
    for (s = 0; s < keylen; s++):
        if (key[s] == '{'):
            break;
        if (s == keylen) return crc16(key,keylen) & 16383;
        for (e = s+1; e < keylen; e++):
            if (key[e] == '}') break;
            if (e == keylen || e == s+1) return crc16(key,keylen) & 16383;
    /* 使用{和}之间的有效部分计算槽 */
    return crc16(key+s+1,e-s-1) & 16383;

根据伪代码, 如果键内容包含{和}大括号字符, 则计算槽的有效部分是括号内的内容;否则采用键的全内容计算槽。
cluster keyslot命令就是采用key_hash_slot函数实现的,例如:
127.0.0.1:6379> cluster keyslot key:test:111
(integer) 10050
127.0.0.1:6379> cluster keyslot key:{hash_tag}:111
(integer) 2515
127.0.0.1:6379> cluster keyslot key:{hash_tag}:222
(integer) 2515
其中键内部使用大括号包含的内容又叫做hash_tag,它提供不同的键可以具备相同slot的功能,常用于Redis IO优化。
例如在集群模式下使用mget等命令优化批量调用时,键列表必须具有相同的slot,否则会报错。这时可以利用hash_tag让不同的键具有相同的slot达到优化的目的。命令如下:
127.0.0.1:6385> mget user:10086:frends user:10086:videos
(error) CROSSSLOT Keys in request don't hash to the same slot
127.0.0.1:6385> mget user:{10086}:friends user:{10086}:videos
1) "friends"

2) "videos"

开发提示

Pipeline同样可以受益于hash_tag,由于Pipeline只能向一个节点批量发送执行命令,而相同slot必然会对应到唯一的节点,降低了集群使用Pipeline的门槛。

2.槽节点查找
Redis计算得到键对应的槽后,需要查找槽所对应的节点。集群内通过消息交换每个节点都会知道所有节点的槽信息,内部保存在clusterState结构中,结构所示:
typedef struct clusterState {
    clusterNode *myself; /* 自身节点,clusterNode代表节点结构体 */
    clusterNode *slots[CLUSTER_SLOTS]; /* 16384个槽和节点映射数组, 数组下标代表对应的槽 */
    ...
} clusterState;
slots数组表示槽和节点对应关系,实现请求重定向伪代码如下:
def execute_or_redirect(key):
    int slot = key_hash_slot(key);
    ClusterNode node = slots[slot];
    if(node == clusterState.myself):
        return executeCommand(key);
    else:
        return '(error) MOVED {slot} {node.ip}:{node.port}';

根据伪代码看出节点对于判定键命令是执行还是MOVED重定向, 都是借助slots[CLUSTER_SLOTS]数组实现。根据MOVED重定向机制,客户端可以随机连接集群内任一Redis获取键所在节点,这种客户端又叫Dummy(傀儡)客户端,它优点是代码实现简单,对客户端协议影响较小,只需要根据重定向信息再次发送请求即可。但是它的弊端很明显,每次执行键命令前都要到Redis上进行重定向才能找到要执行命令的节点,额外增加了IO开销,这不是Redis集群高效的使用方式。正因为如此通常集群客户端都采用另一种实现:Smart(智能)客户端。

10.5.2 Smart客户端

1.smart客户端原理
大多数开发语言的Redis客户端都采用Smart客户端支持集群协议,客户端如何选择见:http://redis.io/clients,从中找出符合自己要求的客户端类库。Smart客户端通过在内部维护slot→node的映射关系,本地就可实现键到节点的查找,从而保证IO效率的最大化,而MOVED重定向负责协助Smart客户端更新slot→node映射。我们以Java的Jedis为例,说明Smart客户端操作集群的流程。

1)首先在JedisCluster初始化时会选择一个运行节点,初始化槽和节点映射关系,使用cluster slots命令完成 如下所示:


2)JedisCluster解析cluster slots结果缓存在本地,并为每个节点创建唯一的JedisPool连接池。映射关系在JedisClusterInfoCache类中, 如下所示:
public class JedisClusterInfoCache {
    private Map nodes = new HashMap();
    private Map slots = new HashMap();
    ...
}
3)JedisCluster执行键命令的过程有些复杂,但是理解这个过程对于开发人员分析定位问题非常有帮助,部分代码如下:
public abstract class JedisClusterCommand {
        // 集群节点连接处理器
        private JedisClusterConnectionHandler connectionHandler;
        // 重试次数, 默认5次
        private int redirections;
        // 模板回调方法
        public abstract T execute(Jedis connection);
        public T run(String key) {
                if (key == null) {
                        throw new JedisClusterException("No way to dispatch this command to Redis Cluster.");
                }
                return runWithRetries(SafeEncoder.encode(key), this.redirections, false, false);
        }
        // 利用重试机制运行键命令
        private T runWithRetries(byte[] key, int redirections, boolean tryRandomNode, boolean asking) {
                if (redirections <= 0) {
                        throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections");
                }
                Jedis connection = null;
                try {
                        if (tryRandomNode) {
                                // 随机获取活跃节点连接
                                connection = connectionHandler.getConnection();
                } else {
                                // 使用slot缓存获取目标节点连接
                                connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
                }
                return execute(connection);
                } catch (JedisConnectionException jce) {
                        // 出现连接错误使用随机连接重试
                        return runWithRetries(key, redirections - 1, true/*开启随机连接*/, asking);
                } catch (JedisRedirectionException jre) {
                        if (jre instanceof JedisMovedDataException) {
                                // 如果出现MOVED重定向错误,在连接上执行cluster slots命令重新初始化slot缓存
                                this.connectionHandler.renewSlotCache(connection);
                            }
                        // slot初始化后重试执行命令
                        return runWithRetries(key, redirections - 1, false, asking);
                } finally {
                        releaseConnection(connection);
                }
        }
}
键命令执行流程:
1)计算slot并根据slots缓存获取目标节点连接,发送命令。
2)如果出现连接错误, 使用随机连接重新执行键命令, 每次命令重试对redi-rections参数减1。
3)捕获到MOVED重定向错误, 使用cluster slots命令更新slots缓存(renewSlotCache方法)。
4)重复执行1)~3)步,直到命令执行成功,或者当redirections<=0时抛出Jedis ClusterMaxRedirectionsException异常。
整个流程如图10-31所示。

从命令执行流程中发现,客户端需要结合异常和重试机制时刻保证跟Redis集群的slots同步,因此Smart客户端相比单机客户端有了很大的变化和实现难度。了解命令执行流程后,下面我们对Smart客户端成本和可能存在的问题进行分析:
1)客户端内部维护slots缓存表,并且针对每个节点维护连接池,当集群规模非常大时,客户端会维护非常多的连接并消耗更多的内存。
2)使用Jedis操作集群时最常见的错误是:throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections");这经常会引起开发人员的疑惑,它隐藏了内部错误细节,原因是节点宕机或请求超时都会抛出JedisConnectionException,导致触发了随机重试,当重试次数耗尽抛出这个错误。
3)当出现JedisConnectionException时,Jedis认为可能是集群节点故障需要随机重试来更新slots缓存,因此了解哪些异常将抛出JedisConnectionException变得非常重要,有如下几种情况会抛出JedisConnectionException:
·Jedis连接节点发生socket错误时抛出。
·所有命令/Lua脚本读写超时抛出。
·JedisPool连接池获取可用Jedis对象超时抛出。
前两点都可能是节点故障需要通过JedisConnectionException来更新slots缓存,但是第三点没有必要,因此Jedis2.8.1版本之后对于连接池的超时抛出Jedis Exception,从而避免触发随机重试机制。
4)Redis集群支持自动故障转移,但是从故障发现到完成转移需要一定的时间,节点宕机期间所有指向这个节点的命令都会触发随机重试,每次收到MOVED重定向后会调用JedisClusterInfoCache类的renewSlotCache方法。部分代码如下:
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Lock w = rwl.writeLock();
public void renewSlotCache(Jedis jedis) {
        try {
                cache.discoverClusterSlots(jedis);
        } catch (JedisConnectionException e) {
                renewSlotCache();
        }
} 
public void discoverClusterSlots(Jedis jedis) {
        // 获取写锁
        w.lock();
        try {
                this.slots.clear();
                // 执行cluster slots
                List slots = jedis.clusterSlots();
                for (Object slotInfoObj : slots) {
                        // 初始化slots缓存代码,忽略细节...
                }
        } finally {
                w.unlock();
        }
} 
public JedisPool getSlotPool(int slot) {
        // 获取读锁
        r.lock();
        try {
                // 返回slot对应的jedisPool
                return slots.get(slot);
        } finally {
                r.unlock();
        }
}
从代码中看到,获得写锁后再执行cluster slots命令初始化缓存,由于集群所有的键命令都会执行getSlotPool方法计算槽对应节点,它内部要求读锁。Reentrant ReadWriteLock是读锁共享且读写锁互斥,从而导致所有的请求都会造成阻塞。对于并发量高的场景将极大地影响集群吞吐。这个现象称为cluster slots风暴,有如下现象:
·重试机制导致IO通信放大问题。比如默认重试5次的情况,当抛出JedisClusterMaxRedirectionsException异常时,内部最少需要9次IO通信:5次发送命令+2次ping命令保证随机节点正常+2次cluster slots命令初始化slots缓存。导致异常判定时间变长。
·个别节点操作异常导致频繁的更新slots缓存,多次调用cluster slots命令,高并发时将过度消耗Redis节点资源,如果集群  slot<->node映射庞大则cluster slots返回信息越多,问题越严重。
·频繁触发更新本地slots缓存操作,内部使用了写锁,阻塞对集群所有的键命令调用。
针对以上问题在Jedis2.8.2版本做了改进:
·当接收到JedisConnectionException时不再轻易初始化slots缓存,大幅降低内部IO次数,伪代码如下:
def runWithRetries(byte[] key, int attempts) :
        if (attempts <= 0) :
                throw new JedisClusterMaxRedirectionsException("Too many Cluster red irections")
        Jedis connection = null;
        try :
                // 获取连接
                connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(k
                return execute(connection);
        except JedisConnectionException,jce :
                if (attempts <= 1) :
                // 当重试到1次时, 更新本地slots缓存
                this.connectionHandler.renewSlotCache();
                // 抛出异常
                throw jce;
        // 递归执行重试
        return runWithRetries(key, attempts - 1);
        except JedisRedirectionException,jre:
        // 如果是MOVED异常, 更新slots缓存
        if (jre instanceof JedisMovedDataException) :
                this.connectionHandler.renewSlotCache(connection);
                // 递归, 执行重试
                return runWithRetries(key, attempts - 1);
        finally:
                releaseConnection(connection);
根据代码看出,只有当重试次数到最后1次或者出现MovedDataException时才更新slots操作,降低了cluster slots命令调用次数。
·当更新slots缓存时,不再使用ping命令检测节点活跃度,并且使用redis covering变量保证同一时刻只有一个线程更新slots缓存,其他线程忽略,优化了写锁阻塞和cluster slots调用次数。伪代码如下:
        def renewSlotCache(Jedis jedis) :
        //使用rediscovering变量保证当有一个线程正在初始化slots时, 其他线程直接忽略。
        if (!rediscovering):
            try :
                w.lock();
                rediscovering = true;
                if (jedis != null) :
                    try :
                        // 更新本地缓存
                        discoverClusterSlots(jedis);
                        return;
                    except JedisException,e:
                    // 忽略异常, 使用随机查找更新slots
                    // 使用随机节点更新slots
                    for (JedisPool jp : getShuffledNodesPool()) :
                        try :
                            // 不再使用ping命令检测节点
                            jedis = jp.getResource();
                            discoverClusterSlots(jedis);
                            return;
                        except JedisConnectionException,e:
                            // try next nodes
                        finally :
                            if (jedis != null) :
                                jedis.close();
                    finally :
                        // 释放锁和rediscovering变量
                        rediscovering = false;
                        w.unlock();

综上所述,Jedis2.8.2之后的版本,当出现JedisConnectionException时,命令发送次数变为5次:4次重试命令+1次cluster slots命令,同时避免了cluster slots不必要的并发调用。

开发提示
建议升级到Jedis2.8.2以上版本防止cluster slots风暴和写锁阻塞问题,但是笔者认为还可以进一步优化,如下所示:
·执行cluster slots的过程不需要加入任何读写锁,因为cluster slots命令执行不需要做并发控制,只有修改本地slots时才需要控制并发,这样降低了写锁持有时间。
·当获取新的slots映射后使用读锁跟老slots比对,只有新老slots不一致时再加入写锁进行更新。防止集群slots映射没有变化时进行不必要的加写锁行为。

这里我们用大量篇幅介绍了Smart客户端Jedis与集群交互的细节,主要原因是针对于高并发的场景,这里是绝对的热点代码。集群协议通过Smart客户端全面高效的支持需要一个过程,因此用户在选择Smart客户端时要重点审核集群交互代码,防止线上踩坑。必要时可以自行优化修改客户端源码。

2.Smart客户端——JedisCluster
(1)JedisCluster的定义

Jedis为Redis Cluster提供了Smart客户端,对应的类是JedisCluster,它的初始化方法如下:
public JedisCluster(Set jedisClusterNode, int connectionTimeout, 
                                        int soTimeout, int maxAttempts, final GenericObjectPoolConfig poolConfig) {
                                        ...
}
其中包含了5个参数:
·SetjedisClusterNode:所有Redis Cluster节点信息(也可以是一部分,因为客户端可以通过cluster slots自动发现)。
·int connectionTimeout:连接超时。
·int soTimeout:读写超时。
·int maxAttempts:重试次数。
·GenericObjectPoolConfig poolConfig:连接池参数,JedisCluster会为Redis Cluster的每个节点创建连接池,有关连接池的详细说明参见第4章。
例如下面代码展示了一次JedisCluster的初始化过程。
// 初始化所有节点(例如6个节点)
Set jedisClusterNode = new HashSet();
jedisClusterNode.add(new HostAndPort("10.10.xx.1", 6379));
jedisClusterNode.add(new HostAndPort("10.10.xx.2", 6379));
jedisClusterNode.add(new HostAndPort("10.10.xx.3", 6379));
jedisClusterNode.add(new HostAndPort("10.10.xx.4", 6379));
jedisClusterNode.add(new HostAndPort("10.10.xx.5", 6379));
jedisClusterNode.add(new HostAndPort("10.10.xx.6", 6379));
// 初始化commnon-pool连接池, 并设置相关参数
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
// 初始化JedisCluster

JedisCluster jedisCluster = new JedisCluster(jedisClusterNode, 1000, 1000, 5, poolConfig

JedisCluster可以实现命令的调用, 如下所示。
jedisCluster.set("hello", "world");
jedisCluster.get("key");
对于JedisCluster的使用需要注意以下几点:
·JedisCluster包含了所有节点的连接池(JedisPool),所以建议JedisCluster使用单例。
·JedisCluster每次操作完成后,不需要管理连接池的借还,它在内部已经完成。

·JedisCluster一般不要执行close( )操作,它会将所有JedisPool执行destroy操作。

(2)多节点命令和操作
Redis Cluster虽然提供了分布式的特性,但是有些命令或者操作, 诸如keys、 flushall、 删除指定模式的键,需要遍历所有节点才可以完成。下面代码实现了从Redis Cluster删除指定模式键的功能:
// 从RedisCluster批量删除指定pattern的数据
public void delRedisClusterByPattern(JedisCluster jedisCluster, String pattern, int scanCounter) {
        // 获取所有节点的JedisPool
        Map jedisPoolMap = jedisCluster.getClusterNodes();
        for (Entry entry : jedisPoolMap.entrySet()) {
                // 获取每个节点的Jedis连接
                Jedis jedis = entry.getValue().getResource();
                // 只删除主节点数据
                if (!isMaster(jedis)) {
                        continue;
                }
                // 使用Pipeline每次删除指定前缀的数据
                Pipeline pipeline = jedis.pipelined();
                // 使用scan扫描指定前缀的数据
                String cursor = "0";
                // 指定扫描参数: 每次扫描个数和pattern
                ScanParams params = new ScanParams().count(scanCounter).match(pattern);
                while (true) {
                        // 执行扫描
                        ScanResult scanResult = jedis.scan(cursor, params);
                        // 删除的key列表
                        List keyList = scanResult.getResult();
                        if (keyList != null && keyList.size() > 0) {
                                for (String key : keyList) {
                                        pipeline.del(key);
                                }
                                // 批量删除
                                pipeline.syncAndReturnAll();
                        }
                        cursor = scanResult.getStringCursor();
                        // 如果游标变为0, 说明扫描完毕
                        if ("0".equals(cursor)) {
                                break;
                        }
                }
        }
}
// 判断当前Redis是否为master节点
private boolean isMaster(Jedis jedis) {
        String[] data = jedis.info("Replication").split("\r\n");
        for (String line : data) {
                if ("role:master".equals(line.trim())) {
                        return true;
                }
        }
        return false;
}
具体分为如下几个步骤:
1)通过jedisCluster.getClusterNodes()获取所有节点的连接池。
2)使用info replication筛选1)中的主节点。
3)遍历主节点,使用scan命令找到指定模式的key,使用Pipeline机制删除。
例如下面操作每次遍历1000个key,将Redis Cluster中以user开头的key全部删除。
String pattern = "user*";
int scanCounter = 1000;
delRedisClusterByPattern(jedisCluster, pattern, scanCounter);
所以对于keys、flushall等需要遍历所有节点的命令,同样可以参照上面的方法进行相应功能的实现。
(3)批量操作的方法
Redis Cluster中,由于key分布到各个节点上,会造成无法实现mget、mset等功能。但是可以利用CRC16算法计算出key对应的slot,以及Smart客户端保存了slot和节点对应关系的特性,将属于同一个Redis节点的key进行归档,然后分别对每个节点对应的子key列表执行mget或者pipeline操作,具体使用方法可以参考11.5节“无底洞优化”。
(4)使用Lua、事务等特性的方法
Lua和事务需要所操作的key,必须在一个节点上,不过Redis Cluster提供了hashtag,如果开发人员确实要使用Lua或者事务,可以将所要操作的key使用一个hashtag,如下所示:
// hashtag
String hastag = "{user}";
// 用户A的关注表
String userAFollowKey = hastag + ":a:follow";
// 用户B的粉丝表
String userBFanKey = hastag + ":b:fans";
// 计算hashtag对应的slot
int slot = JedisClusterCRC16.getSlot(hastag);
// 获取指定slot的JedisPool
JedisPool jedisPool = jedisCluster.getConnectionHandler().getJedisPoolFromSlot(slot);
// 在当个节点上执行事务
Jedis jedis = null;
try {
        jedis = jedisPool.getResource();
        // 用户A的关注表加入用户B, 用户B的粉丝列表加入用户A
        Transaction transaction = jedis.multi();
        transaction.sadd(userAFollowKey, "user:b");
        transaction.sadd(userBFanKey, "user:a");
        transaction.exec();
} catch (Exception e) {
        logger.error(e.getMessage(), e);
} finally {
        if (jedis!= null)
            jedis.close();
}
具体步骤如下:
1) 将事务中所有的key添加hashtag。
2) 使用CRC16计算hashtag对应的slot。
3) 获取指定slot对应的节点连接池JedisPool。

4) 在JedisPool上执行事务。

10.5.3 ASK重定向
1.客户端ASK重定向流程
Redis集群支持在线迁移槽(slot)和数据来完成水平伸缩,当slot对应的数据从源节点到目标节点迁移过程中,客户端需要做到智能识别,保证键命令可正常执行。例如当一个slot数据从源节点迁移到目标节点时,期间可能出现一部分数据在源节点,而另一部分在目标节点,如图10-32所示。
当出现上述情况时,客户端键命令执行流程将发生变化,如下所示:
1)客户端根据本地slots缓存发送命令到源节点,如果存在键对象则直接执行并返回结果给客户端。
2)如果键对象不存在,则可能存在于目标节点,这时源节点会回复ASK重定向异常。格式如下:(error)ASK{slot}{targetIP}: {targetPort}。
3)客户端从ASK重定向异常提取出目标节点信息,发送asking命令到目标节点打开客户端连接标识,再执行键命令。如果存在则执行,不存在则返回不存在信息。ASK重定向整体流程如图10-33所示。


ASK与MOVED虽然都是对客户端的重定向控制,但是有着本质区别。ASK重定向说明集群正在进行slot数据迁移,客户端无法知道什么时候迁移完成,因此只能是临时性的重定向,客户端不会更新slots缓存。但是MOVED重定向说明键对应的槽已经明确指定到新的节点,因此需要更新slots缓存。

2.节点内部处理
为了支持ASK重定向,源节点和目标节点在内部的clusterState结构中维护当前正在迁移的槽信息,用于识别槽迁移情况,结构如下:
typedef struct clusterState {
    clusterNode *myself; /* 自身节点 */
    clusterNode *slots[CLUSTER_SLOTS]; /* 槽和节点映射数组 */
    clusterNode *migrating_slots_to[CLUSTER_SLOTS];/* 正在迁出的槽节点数组 */
    clusterNode *importing_slots_from[CLUSTER_SLOTS];/* 正在迁入的槽节点数组*/
    ...

} clusterState;

节点每次接收到键命令时,都会根据clusterState内的迁移属性进行命令处理,如下所示:
·如果键所在的槽由当前节点负责,但键不存在则查找migrating_slots_to数组查看槽是否正在迁出,如果是返回ASK重定向。
·如果客户端发送asking命令打开了CLIENT_ASKING标识,则该客户端下次发送键命令时查找importing_slots_from数组获取clusterNode,如果指向自身则执行命令。
·需要注意的是,asking命令是一次性命令,每次执行完后客户端标识都会修改回原状态,因此每次客户端接收到ASK重定向后都需要发送asking命令。
·批量操作。ASK重定向对单键命令支持得很完善,但是,在开发中我们经常使用批量操作,如mget或pipeline。当槽处于迁移状态时,批量操作会受到影响。
例如,手动使用迁移命令让槽4096处于迁移状态,并且数据各自分散在目标节点和源节点,如下所示:
#6379节点准备导入槽4096数据
127.0.0.1:6379>cluster setslot 4096 importing 1a205dd8b2819a00dd1e8b6be40a8e2abe77b756
OK
#6385节点准备导出槽4096数据
127.0.0.1:6379>cluster setslot 4096 migrating cfb28ef1deee4e0fa78da86abe5d24566744411e
OK
# 查看槽4096下的数据
127.0.0.1:6385> cluster getkeysinslot 4096 100
1) "key:test:5028"
2) "key:test:68253"
3) "key:test:79212"
# 迁移键key:test:68253和key:test:79212到6379节点
127.0.0.1:6385>migrate 127.0.0.1 6379 "" 0 5000 keys key:test:68253 key:test:79212
OK
现在槽4096下3个键数据分别位于6379和6380两个节点,使用Jedis客户端执行批量操作。mget代码如下:
@Test
public void mgetOnAskTest() {
    JedisCluster jedisCluster = new JedisCluster(new HostAndPort("127.0.0.1", 6379));
    List results = jedisCluster.mget("key:test:68253", "key:test:79212");
    System.out.println(results);
    results = jedisCluster.mget("key:test:5028", "key:test:68253", "key:test:79212");
    System.out.println(results);
}
运行mget测试结果如下:
[value:68253, value:79212]
redis.clients.jedis.exceptions.JedisDataException: TRYAGAIN Multiple keys request during rehashing of slot at redis.clients.jedis.Protocol.processError(Protocol.java:127)
...
测试结果分析:
·第1个mget运行成功,这是因为键key:test:68253,key:test:79212已经迁移到目标节点,当mget键列表都处于源节点/目标节点时,运行成功。

·第2个mget抛出异常,当键列表中任何键不存在于源节点时,抛出异常。

综上所处,当在集群环境下使用mget、mset等批量操作时,slot迁移数据期间由于键列表无法保证在同一节点, 会导致大量错误。

Pipeline代码如下:
@Test
public void pipelineOnAskTest() {

        JedisSlotBasedConnectionHandler connectionHandler = new JedisCluster(new Host AndPort ("127.0.0.1", 6379)) {

                        public JedisSlotBasedConnectionHandler getConnectionHandler() {
                                return (JedisSlotBasedConnectionHandler) super.connectionHandler;
                        }
            }.getConnectionHandler();
     List keys = Arrays.asList("key:test:68253", "key:test:79212", "key:test:5028");
    Jedis jedis = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.get Slot(keys.get(2)));
    try {
                Pipeline pipelined = jedis.pipelined();
                for (String key : keys) {
                        pipelined.get(key);
                }
                List results = pipelined.syncAndReturnAll();
                for (Object result : results) {
                        System.out.println(result);
                }
    } finally {
                jedis.close();
    }
}
Pipeline的代码中,由于Jedis没有开放slot到Jedis的查询,使用了匿名内部类暴露JedisSlotBasedConnectionHandler。通过Jedis获取Pipeline对象组合3条get命令一次发送。运行结果如下:
redis.clients.jedis.exceptions.JedisAskDataException: ASK 4096 127.0.0.1:6379
redis.clients.jedis.exceptions.JedisAskDataException: ASK 4096 127.0.0.1:6379
value:5028
结果分析:返回结果并没有直接抛出异常,而是把ASK异常JedisAskDataException包含在结果集中。但是使用Pipeline的批量操作也无法支持由于slot迁移导致的键列表跨节点问题。
得益于Pipeline并没有直接抛出异常,可以借助于JedisAskDataException内返回的目标节点信息,手动重定向请求给目标节点, 修改后的程序如下:
@Test
public void pipelineOnAskTestV2() {
        JedisSlotBasedConnectionHandler connectionHandler = new JedisCluster(new HostAndPort("127.0.0.1", 6379)) {
                public JedisSlotBasedConnectionHandler getConnectionHandler() {
                        return (JedisSlotBasedConnectionHandler) super.connectionHandler;
                }
        }.getConnectionHandler();
        List keys = Arrays.asList("key:test:68253", "key:test:79212", "key:test:5028");
        Jedis jedis = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(keys.get(2)));
        try {
                Pipeline pipelined = jedis.pipelined();
                for (String key : keys) {
                        pipelined.get(key);
                }
                List results = pipelined.syncAndReturnAll();
                for (int i = 0; i < keys.size(); i++) {
                        // 键顺序和结果顺序一致
                        Object result = results.get(i);
                        if (result != null && result instanceof JedisAskDataException) {
                                JedisAskDataException askException = (JedisAskDataException) result;
                                HostAndPort targetNode = askException.getTargetNode();
                                Jedis targetJedis = connectionHandler.getConnectionFromNode(targetNode);
                                try {
                                        // 执行asking
                                        targetJedis.asking();
                                        // 获取key并执行
                                        String key = keys.get(i);
                                        String targetResult = targetJedis.get(key);
                                        System.out.println(targetResult);
                                } finally {
                                        targetJedis.close();
                                }
                        } else {
                                System.out.println(result);
                        }
                }
        } finally {
                jedis.close();
        }
}
修改后的Pipeline运行结果以下:
value:68253
value:79212
value:5028
根据结果,我们成功获取到了3个键的数据。以上测试能够成功的前提是:
1)Pipeline严格按照键发送的顺序返回结果,即使出现异常也是如此(更多细节见3.3节“Pipeline”)。
2)理解ASK重定向之后,可以手动发起ASK流程保证Pipeline的结果正确性。

综上所处,使用smart客户端批量操作集群时,需要评估mget/mset、Pipeline等方式在slot迁移场景下的容错性,防止集群迁移造成大量错误和数据丢失的情况。

开发提示
集群环境下对于使用批量操作的场景,建议优先使用Pipeline方式,在客户端实现对ASK重定向的正确处理,这样既可以受益于批量操作的IO优化,又可以兼容slot迁移场景。
注:本文转载自blog.csdn.net的HoldBelief的文章"https://blog.csdn.net/HoldBelief/article/details/79796558"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top