T O P

[资源分享]     Tars | 第6篇 基于TarsGo Subset路由规则的Java JDK实现方式(下)

  • By - 楼主

  • 2021-09-12 22:00:01

  • 前言

    利开园导师用Go语言实现了Subset路由规则,并在中期汇报分享会里介绍出来;这篇文章将基于利导师的实现方式,对Subset路由规则的细节做些理解与补充。

    此篇文章为下半部分,将对上半部分提到的TarsGo对Subset路由规则的实现做一一分析,重点放在“如果开发语言是Java,对应功能将如何实现”问题上。

    上下部分文章在目录上一一对应,上半注重TarsGo分析,下半部分注重TarsJava实现方式。如上篇文章第一点修改.tars协议文件记录利导师在TarsGo的代码修改,下片文章第一点也是修改.tars协议文件,侧重点在如何用Java语言实现。上下文章相辅相成,建议对照学习。

    一些资源链接如下:

    上半部分文章链接
    https://www.cnblogs.com/dlhjw/p/15245113.html

    TarsJava 实现Subset路由规则JDK链接地址
    https://github.com/TarsCloud/TarsJava/commit/cc2fe884ecbe8455a8e1f141e21341f4f3dd98a3

    TarsGo 实现Subset路由规则JDK链接地址

    https://github.com/defool/TarsGo/commit/136878e9551d68c4b54c402df564729f51f3dd9c#


    1. 修改.tars协议文件

    需要修改两处.tars协议文件;

    1.1 Java源码位置及逻辑分析

    该部分的含义是:增加Subset配置增加获取Subset信息

    通过上半文章的分析,增加的配置是在EndpointF.tarsQueryF.tars协议文件里面添加,而tars协议文件在所有语言中是统一的,一样的;在Java中,EndpointF协议文件在src/main/resources/EndpointF.tars;QueryF协议文件在src/main/resources/QueryF.tars;

    因此,我们可以得到以下信息:

    • 定位对应源码位置如下:
    Go语言 Java
    tars/protocol/res/EndpointF.tars TarsJava-1.7.x\core\src\main\resources\EndpointF.tars
    tars/protocol/res/QueryF.tars TarsJava-1.7.x\core\src\main\resources\QueryF.tars
    • 直接添加subset配置即可;

    1.2 Java语言实现方式

    module tars
    {
        /**
         * Port information
         */
        struct EndpointF
        {
            0  require  string host;
            1  require  int    port;
            2  require  int    timeout;
            3  require  int    istcp;
            4  require  int    grid;
            5  optional int    groupworkid;
            6  optional int    grouprealid;
            7  optional string setId;
            8  optional int    qos;
            9  optional int    bakFlag;
            11 optional int    weight;
            12 optional int    weightType;
            13 optional string subset;
        };
        key[EndpointF, host, port, timeout, istcp, grid, qos, weight, weightType];
    
    
    };
    
    

    QueryF.tars

    1.3 通过协议文件自动生成代码

    Tars有个强大的功能,它能根据.tars里的配置文件自动生成相应Bean代码;

    在Java语言里,具体操作如下:

    1. 在项目的pom.xml里配置对应插件

    <build>
        <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
            <plugins>
                <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
                <plugin>
                    <groupId>com.tencent.tars</groupId>
                    <artifactId>tars-maven-plugin</artifactId>
                    <version>1.7.2</version>
                    <configuration>
                        <tars2JavaConfig>
                            <!-- tars文件位置 -->
                            <tarsFiles>
                                <tarsFile>${basedir}/src/main/resources/EndpointF.tars</tarsFile>
                            </tarsFiles>
                            <!-- 源文件编码 -->
                            <tarsFileCharset>UTF-8</tarsFileCharset>
                            <!-- 生成服务端代码 -->
                            <servant>false</servant>
                            <!-- 生成源代码编码 -->
                            <charset>UTF-8</charset>
                            <!-- 生成的源代码目录 -->
                            <srcPath>${basedir}/src/main/java</srcPath>
                            <!-- 生成源代码包前缀 -->
                            <packagePrefixName>com.qq.tars.common.support.</packagePrefixName>
                        </tars2JavaConfig>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>
    

    我们仅需要修改的地方在 tars文件位置生成源代码包前缀

    2. 在项目根路径下执行mvn tars:tars2java命令

    项目根路径输入cmd
    接着输入mvn tars:tars2java命令后出现下面日志则说明生成成功;

    BUILD SUCCESS

    3. 检查生成代码

    我们回到项目代码,经检查,EndpointF类发生了修改,新增SubsetConf类。(因为笔者在第一步生成源代码包前缀没有配置好,所有将生成后的代码直接复制黏贴到源代码路径里了,影响不大。)

    检查代码
    4. 用同样的方法可以自动生成QueryF代码

    1.4 变更代码的路径

    通过上述操作,以下路径的代码发生改变,需要在github上提交:

    • TarsJava-1.7.x\core\src\main\resources\EndpointF.tars
    • TarsJava-1.7.x\core\src\main\resources\QueryF.tars
    • TarsJava-1.7.x\core\src\main\java\com\qq\tars\support\query\prx\EndpointF.java
    • TarsJava-1.7.x\core\src\main\java\com\qq\tars\support\query\prx\QueryFPrx.java
    • TarsJava-1.7.x\core\src\main\java\com\qq\tars\support\query\prx\QueryFPrxCallback.java

    2. 【核心】增添Subset核心功能

    这部分是核心功能,不需要在源码里更改,属于新增的内容。

    2.1 Java源码位置及逻辑分析

    该部分的含义是:增添Subset核心功能

    由于Subset路由业务与客户端相关,在Tars中的地位是:Tars支持(support)的功能之一,因此,笔者打算在参照原来的项目结构,在TarsJava-1.7.x\core\src\main\java\com\qq\tars\client路径下新建包subset,包内实现以下功能:

    新增类型 新增内容
    结构体 新增Subset配置项的结构体 subsetConf
    结构体 新增路由规则配置项的结构体ratioConfig
    结构体 新增染色路径的结构体keyRoute
    结构体 新增染色配置项的结构体keyConfig
    结构体 新增subset管理者的结构体subsetManager
    方法 新增获取subset配置项的方法getSubsetConfig
    方法 新增获取比例 / 染色路由配置项的方法getSubset
    方法 新增根据subset规则过滤节点的方法subsetEndpointFilter
    方法 新增根据一致hash的subset规则过滤节点的方法subsetHashEpFilter
    方法 新增按比例路由路由路径的方法findSubet
    方法 新增按默认路由路径findSubet

    因此,我们可以得到以下信息:

    • 定位对应源码位置如下:
    Go语言 Java
    tars/subset.go TarsJava-1.7.x\core\src\main\java\com\qq\tars\client\subset\

    2.2 Java语言实现方式

    笔者的理解是五个结构体各自新建成一个类,此外新建Subset类;根据TarsGo实现逻辑:

    • SubsetConf类里定义一些属性,并生成对应getter与setter方法;
    • RatioConfig类里实现findSubet()方法;
      • *在KeyRoute类里实现getRouteKey()setRouteKey()setRouteKeyToRequest()方法;
      • 这里提到的方法请见《3. 添加常量与获取染色key的方法》与《5. 实现透传染色Key功能》分析;
    • KeyConfig类里实现findSubet()方法;
    • SubsetManager类里实现getSubsetConfig()getSubset()方法;
    • Subset类里实现subsetEndpointFilter()subsetHashEpFilter()方法

    具体的实现代码如下:

    SubsetConf

    public class SubsetConf {
    
        private boolean enanle;
        private String ruleType;
        private RatioConfig ratioConf;
        private KeyConfig keyConf;
    
        private Instant lastUpdate;
    
        public SubsetConf() {
            lastUpdate =  Instant.now();
        }
    
    
        public SubsetConf(boolean enanle, String ruleType, RatioConfig ratioConf, KeyConfig keyConf) {
            this.enanle = enanle;
            this.ruleType = ruleType;
            this.ratioConf = ratioConf;
            this.keyConf = keyConf;
            lastUpdate =  Instant.now();
        }
    
        public boolean isEnanle() {
            return enanle;
        }
    
        public void setEnanle(boolean enanle) {
            this.enanle = enanle;
        }
    
        public String getRuleType() {
            return ruleType;
        }
    
        public void setRuleType(String ruleType) {
            this.ruleType = ruleType;
        }
    
        public RatioConfig getRatioConf() {
            return ratioConf;
        }
    
        public void setRatioConf(RatioConfig ratioConf) {
            this.ratioConf = ratioConf;
        }
    
        public KeyConfig getKeyConf() {
            return keyConf;
        }
    
        public void setKeyConf(KeyConfig keyConf) {
            this.keyConf = keyConf;
        }
    
        public Instant getLastUpdate() {
            return lastUpdate;
        }
    
        public void setLastUpdate(Instant lastUpdate) {
            this.lastUpdate = lastUpdate;
        }
    }
    
    

    RatioConfig

    public class RatioConfig {
    
        private Map<String, Integer> rules;
    
    
        //进行路由规则的具体实现,返回subset字段
        public String findSubet(String routeKey){
            //routeKey为空时随机
            if( "".equals(routeKey) ){
                //赋值routeKey为获取的随机值
                Random random = new Random();
                int r = random.nextInt( rules.size() );
                routeKey = String.valueOf(r);
                int i = 0;
                for (String key : rules.keySet()) {
                    if(i == r){
                        return key;
                    }
                    i++;
                }
            }
    
            //routeKey不为空时实现按比例算法
            int totalWeight = 0;
            int supWeight = 0;
            String subset = null;
            //获得总权重
            for (Integer value : rules.values()) {
                totalWeight+=value;
            }
            //获取随机数
            Random random = new Random();
            int r = random.nextInt(totalWeight);
            //根据随机数找到subset
            for (Map.Entry<String, Integer> entry : rules.entrySet()){
                supWeight+=entry.getValue();
                if( r < supWeight){
                    subset = entry.getKey();
                    return subset;
                }
            }
            return null;
        }
    
        public Map<String, Integer> getRules() {
            return rules;
        }
    
        public void setRules(Map<String, Integer> rules) {
            this.rules = rules;
        }
    }
    

    KeyRoute

    • 这里提到的方法请见《3. 添加常量与获取染色key的方法》分析;
    public class KeyRoute {
    
        private String action = null;
        private String value = null;
        private String route = null;
    
        public static final String TARS_ROUTE_KEY = "TARS_ROUTE_KEY";
    
        private static final Logger logger = LoggerFactory.getClientLogger();
    
    
        //根据分布式上下文信息获取KeyRoute
        public static String getRouteKey(DistributedContext distributedContext){
            if( distributedContext == null ){
                logger.info("无分布式上下文信息distributedContext");
            }
            String routeValue = "";
            if(distributedContext != null){
                TarsServantRequest tarsServantRequest = distributedContext.get(DyeingSwitch.REQ);
                if( tarsServantRequest != null){
                    routeValue = tarsServantRequest.getStatus().get(TARS_ROUTE_KEY);
                }
            }
            return routeValue;
        }
    
        //根据分布式上下文信息设置KeyRoute
        public static void setRouteKey(DistributedContext distributedContext, String routeKey){
    
            if(distributedContext != null && routeKey != null ){
                TarsServantRequest tarsServantRequest = distributedContext.get(DyeingSwitch.REQ);
                tarsServantRequest.getStatus().put(TARS_ROUTE_KEY, routeKey);
            }
        }
    
        public static void setRouteKeyToRequest(DistributedContext distributedContext, TarsServantRequest request){
            if( distributedContext == null ){
                logger.info("无分布式上下文信息distributedContext");
            }
            String routeValue = KeyRoute.getRouteKey(distributedContext);
            if( routeValue != null && !"".equals(routeValue)){
                if(request.getStatus() != null){
                    request.getStatus().put(KeyRoute.TARS_ROUTE_KEY ,routeValue);
                } else {
                    HashMap<String, String> status = new HashMap<>();
                    status.put(KeyRoute.TARS_ROUTE_KEY ,routeValue);
                    request.setStatus(status);
                }
            }
        }
    
        //将分布式上下文信息的routeValue 设置到KeyRoute.value
        public void setValue(DistributedContext distributedContext){
            String routeKey = getRouteKey(distributedContext);
            if( !"".equals(routeKey) && routeKey != null){
                this.value = routeKey;
            }
        }
    
        public KeyRoute() {
        }
    
        public KeyRoute(String action, String value, String route) {
            this.action = action;
            this.value = value;
            this.route = route;
        }
    
        public String getValue() {
            return value;
        }
    
        public String getAction() {
            return action;
        }
    
        public void setAction(String action) {
            this.action = action;
        }
    
        public String getRoute() {
            return route;
        }
    
        public void setRoute(String route) {
            this.route = route;
        }
    }
    

    KeyConfig

    • 因为这里涉及正则匹配,所有在StringUtils工具类里有正则算法的实现,详情见《8. 正则算法的实现》;
    public class KeyConfig {
    
        private String defaultRoute;
    
        private List<KeyRoute> rules;
    
        private DistributedContext distributedContext = DistributedContextManager.getDistributedContext();
    
        private static final Logger logger = LoggerFactory.getClientLogger();
    
        public String findSubet(String routeKey){
            //非空校验
            if( routeKey == null || "".equals(routeKey) || rules == null){
                return null;
            }
            for ( KeyRoute rule: rules) {
                //根据根据分布式上下文信息获取 “请求的染色的key”
                String routeKeyReq;
                if( distributedContext != null){
                    routeKeyReq = KeyRoute.getRouteKey(distributedContext);
                } else {
                    logger.info("无分布式上下文信息distributedContext");
                    return null;
                }
                //精确匹配
                if( "match".equals(rule.getAction())  ){
                    if( routeKeyReq.equals(rule.getValue()) ){
                        return rule.getRoute();
                    } else {
                        logger.info("染色key匹配不上,请求的染色key为:" + routeKeyReq + "; 规则的染色key为:" + rule.getValue());
                    }
                }
                //正则匹配
                if( "equal".equals(rule.getAction()) ){
                    if( StringUtils.matches(routeKeyReq, rule.getValue()) ){
                        return rule.getRoute();
                    } else {
                        logger.info("正则匹配失败,请求的染色key为:" + routeKeyReq + "; 规则的染色key为:" + rule.getValue());
                    }
    
                }
                //默认匹配
                if( "default".equals(rule.getAction()) ){
                    //默认路由无需考虑染色key
                    return rule.getRoute();
                }
            }
            return null;
        }
    
        public KeyConfig() {
        }
    
        public KeyConfig(String defaultRoute, List<KeyRoute> rules) {
            this.defaultRoute = defaultRoute;
            this.rules = rules;
        }
    
        public String getDefaultRoute() {
            return defaultRoute;
        }
    
        public void setDefaultRoute(String defaultRoute) {
            this.defaultRoute = defaultRoute;
        }
    
        public List<KeyRoute> getRules() {
            return rules;
        }
    
        public void setRules(List<KeyRoute> rules) {
            this.rules = rules;
        }
    }
    

    SubsetManager

    public class SubsetManager {
    
        private Map<String, SubsetConf> cache = new HashMap<>();
    
        private QueryFPrx queryProxy;
    
        //获取Subset路由规则,并存到subsetConf配置项
        public SubsetConf getSubsetConfig(String servantName){
            SubsetConf subsetConf = new SubsetConf();
            if( cache.containsKey(servantName) ){
                subsetConf = cache.get(servantName);
    
                //小于10秒从缓存中取
                if( Duration.between(subsetConf.getLastUpdate() , Instant.now()).toMillis() < 1000 ){
                    return subsetConf;
                }
            }
            // get config from registry
            Holder<SubsetConf> subsetConfHolder = new Holder<SubsetConf>(subsetConf);
            int ret = queryProxy.findSubsetConfigById(servantName, subsetConfHolder);
            SubsetConf newSubsetConf = subsetConfHolder.getValue();
            if( ret == TarsHelper.SERVERSUCCESS ){
                return newSubsetConf;
            }
            //从registry中获取失败时,更新subsetConf添加进缓存
            subsetConf.setRuleType( newSubsetConf.getRuleType() );
            subsetConf.setLastUpdate( Instant.now() );
            cache.put(servantName, subsetConf);
            //解析subsetConf
            if( !newSubsetConf.isEnanle() ){
                subsetConf.setEnanle(false);
                return subsetConf;
            }
            if( "ratio".equals(newSubsetConf.getRuleType())){
                subsetConf.setRatioConf( newSubsetConf.getRatioConf() );
            } else {
                //按参数匹配
                KeyConfig newKeyConf = newSubsetConf.getKeyConf();
                List<KeyRoute> keyRoutes = newKeyConf.getRules();
                for ( KeyRoute kr: keyRoutes) {
                    KeyConfig keyConf = new KeyConfig();
                    //默认
                    if("default".equals(kr.getAction())){
                        keyConf.setDefaultRoute(newKeyConf.getDefaultRoute());
                        subsetConf.setKeyConf(keyConf);
                    }
                    //精确匹配
                    if("match".equals(kr.getAction())){
                        List<KeyRoute> rule = new ArrayList<>();
                        rule.add(new KeyRoute("match", kr.getValue() , kr.getRoute()));
                        keyConf.setRules( rule );
                    }
                    //正则匹配
                    if("equal".equals(kr.getAction())){
                        List<KeyRoute> rule = new ArrayList<>();
                        rule.add(new KeyRoute("equal", kr.getValue() , kr.getRoute()));
                        keyConf.setRules( rule );
                    }
                }
                subsetConf.setKeyConf(newKeyConf);
            }
            return subsetConf;
        }
    
        // 根据路由规则先获取到比例 / 染色路由的配置,再通过配置获取String的subset字段
        public String getSubset(String servantName, String routeKey){
            //check subset config exists
            SubsetConf subsetConf = getSubsetConfig(servantName);
            if( subsetConf == null ){
                return null;
            }
            // route key to subset
            if("ratio".equals(subsetConf.getRuleType())){
                RatioConfig ratioConf = subsetConf.getRatioConf();
                if(ratioConf != null){
                    return ratioConf.findSubet(routeKey);
                }
            }
            KeyConfig keyConf = subsetConf.getKeyConf();
            if ( keyConf != null ){
                return keyConf.findSubet(routeKey);
            }
            return null;
        }
    
        public SubsetManager() {
        }
    
        public SubsetManager(Map<String, SubsetConf> cache) {
            if(cache == null){
                this.cache = new HashMap<>();
            }
        }
    
        public Map<String, SubsetConf> getCache() {
            return cache;
        }
    
        public void setCache(Map<String, SubsetConf> cache) {
            this.cache = cache;
        }
    
    }
    
    

    Subset

    public class Subset {
    
        private String hashString;
    
        private SubsetConf subsetConf;
    
        private KeyConfig keyConfig;
        private KeyRoute keyRoute;
        private RatioConfig ratioConfig;
    
        private SubsetManager subsetManager;
    
    
        //获取到规则后的subset,与节点的subset比较,过滤不匹配节点
        public Holder<List<EndpointF>> subsetEndpointFilter(String servantName, String routeKey, Holder<List<EndpointF>> eps){
    
            if( subsetConf==null || !subsetConf.isEnanle() ){
                return eps;
            }
    
            if(eps.value == null || eps.value.isEmpty()){
                return eps;
            }
    
            //调用subsetManager,根据比例/匹配等规则获取到路由规则的subset
            String subset = subsetManager.getSubset(servantName, routeKey);
            if( "".equals(subset) || subset == null){
                return eps;
            }
            //和每一个eps的subset比较,淘汰不符合要求的
    
            Holder<List<EndpointF>> epsFilter = new Holder<>(new ArrayList<EndpointF>());
            for (EndpointF ep : eps.value) {
                if( subset.equals(ep.getSubset())){
                    epsFilter.getValue().add(ep);
                }
            }
            return epsFilter;
        }
    
        public Subset() {
        }
    
        public Subset(String hashString, SubsetConf subsetConf, KeyConfig keyConfig, KeyRoute keyRoute, RatioConfig ratioConfig) {
            this.hashString = hashString;
            this.subsetConf = subsetConf;
            this.keyConfig = keyConfig;
            this.keyRoute = keyRoute;
            this.ratioConfig = ratioConfig;
        }
    
        public String getHashString() {
            return hashString;
        }
    
        public void setHashString(String hashString) {
            this.hashString = hashString;
        }
    
        public SubsetConf getSubsetConf() {
            return subsetConf;
        }
    
        public void setSubsetConf(SubsetConf subsetConf) {
            this.subsetConf = subsetConf;
        }
    
        public KeyConfig getKeyConfig() {
            return keyConfig;
        }
    
        public void setKeyConfig(KeyConfig keyConfig) {
            this.keyConfig = keyConfig;
        }
    
        public KeyRoute getKeyRoute() {
            return keyRoute;
        }
    
        public void setKeyRoute(KeyRoute keyRoute) {
            this.keyRoute = keyRoute;
        }
    
        public RatioConfig getRatioConfig() {
            return ratioConfig;
        }
    
        public void setRatioConfig(RatioConfig ratioConfig) {
            this.ratioConfig = ratioConfig;
        }
    
        public SubsetManager getSubsetManager() {
            return subsetManager;
        }
    
        public void setSubsetManager(SubsetManager subsetManager) {
            this.subsetManager = subsetManager;
        }
    }
    
    

    2.3 变更代码的路径

    通过上述操作,新增了以下代码,需要在github上提交:

    • TarsJava-1.7.x\core\src\main\java\com\qq\tars\client\subset\SubsetConf.java
    • TarsJava-1.7.x\core\src\main\java\com\qq\tars\client\subset\KeyConfig.java
    • TarsJava-1.7.x\core\src\main\java\com\qq\tars\client\subset\KeyRoute.java
    • TarsJava-1.7.x\core\src\main\java\com\qq\tars\client\subset\RatioConfig.java
    • TarsJava-1.7.x\core\src\main\java\com\qq\tars\client\subset\Subset.java
    • TarsJava-1.7.x\core\src\main\java\com\qq\tars\client\subset\SubsetManager.java

    3. 添加常量与获取染色key的方法

    3.1 Java源码位置及逻辑分析

    该部分的含义是:添加常量添加获取染色key的方法

    在TarsJava中,染色相关的逻辑在DyeingKeyCacheDyeingSwitch类里;但我们新增的TARS_ROUTE_KEY染色key与原染色逻辑相关性不大,这里的TARS_ROUTE_KEY是随着Tars的请求体TarsServantRequest里的中获取status参数(map类型)传递而来的;

    • Tars的请求体路径:TarsJava-1.7.x\core\src\main\java\com\qq\tars\rpc\protocol\tars\TarsServantRequest.java

    因此设置 / 获取染色key的逻辑应该是:通过分布式上下文信息DistributedContext获取到TarsServantRequest请求体,再从请求体里的statusmap数据设置 / 获取染色key相关;

    因此,我们可以得到以下信息:

    • 定位对应源码位置如下:
    Go语言 Java
    tars/subset.go TarsJava-1.7.x\core\src\main\java\com\qq\tars\client\subset\KeyRoute.java

    3.2 Java语言实现方式

    跟《2.2 Java语言实现方式》中的KeyRoute一样

    3.3 变更代码的路径

    通过上述操作,改变了以下代码,需要在github上提交:

    • TarsJava-1.7.x\core\src\main\java\com\qq\tars\client\subset\KeyRoute.java

    4.【核心】修改获取服务IP规则

    4.1 Java源码位置及逻辑分析

    该部分的含义是:节点管理

    在Go语言中,我们点进去tars/endpointmanager.go查看源码发现该代码的作用是:创建一个结点管理器,通过管理器可以实现查看节点状态checkEpStatus()、更新节点信息updateEndpoints()等功能。

    修改的方法为SelectAdapterProxy()选择适配器代理,原逻辑为获取服务端节点列表,新增逻辑为subsetEndpointFilter()为根据subset规则过滤节点;

    而在Java语言中,类似功能在ObjectProxyFactory类里,该类的功能主要是:创建代理对象,通过代理对象实现更新节点updateServantEndpoints()、创建服务代理配置项createServantProxyConfig()等功能。

    其中在updateServantEndpoints()方法里涉及到更新服务节点列表,但在Java中使用了一个QueryHelper查询工具,里面有个getServerNodes()方法获取服务端节点列表,我们要修改的地方就在这里。

    因此,我们可以得到以下信息:

    • 定位对应源码位置如下:
    Go语言 Java语言
    tars/endpointmanager.go TarsJava-1.7.x\core\src\main\java\com\qq\tars\support\query\QueryHelper.java
    • 增加的方法逻辑

    由于在java里节点的储存是使用Holder<List<EndpointF>>对象而并不是LIst,因此对应参数类型改成Holder;

    项目 说明
    方法名 subsetEndpointFilter
    实现逻辑 根据subset规则过滤节点
    传入参数 服务名String、染色状态String、存活的节点Holder
    返回参数 过滤后的节点Holder

    这里的染色逻辑

    新添加的获取染色key的方法与原来染色逻辑类似,可以参照相应实现逻辑;

    在TarsGo里,通过msg.Req.状态[current.STATUS_ROUTE_KEY]获取routeKey字段;通过msg.Req.SServantName获取服务名;

    而在TarsJava里,通过ServantProxyConfig.getSimpleObjectName()获取服务名,获取routeKey字段则比较复杂;我们需要的最终染色字段在Tars请求体TarsServantRequest里的status参数(map类型);

    获取的逻辑是:通过分布式上下文信息DistributedContext获取到TarsServantRequest请求体,再从请求体里的statusmap获取染色key;

    4.2 Java语言实现方式

    public String getServerNodes(ServantProxyConfig config) {
        QueryFPrx queryProxy = getPrx();
    
        //【新增】通过KeyRoute类与分布式上下文信息获取routeKey
        String routeKey = getRouteKeyByContext();
        String name = config.getSimpleObjectName();
    
        //存活的节点
        Holder<List<EndpointF>> activeEp = new Holder<List<EndpointF>>(new ArrayList<EndpointF>());
        //挂掉的节点
        Holder<List<EndpointF>> inactiveEp = new Holder<List<EndpointF>>(new ArrayList<EndpointF>());
        int ret = TarsHelper.SERVERSUCCESS;
        //判断是否为启用集
        if (config.isEnableSet()) {
            ret = queryProxy.findObjectByIdInSameSet(name, config.getSetDivision(), activeEp, inactiveEp);
        } else {
            ret = queryProxy.findObjectByIdInSameGroup(name, activeEp, inactiveEp);
        }
    
        if (ret != TarsHelper.SERVERSUCCESS) {
            return null;
        }
        Collections.sort(activeEp.getValue());
        
        //【新增】根据Subset规则过滤节点
        Holder<List<EndpointF>> activeEpFilter = subset.subsetEndpointFilter(name, routeKey, activeEp);
        
        //将获取到的节点列表格式化为一个字符串格式
        StringBuilder value = new StringBuilder();
        if (activeEpFilter.value != null && !activeEpFilter.value.isEmpty()) {
            for (EndpointF endpointF : activeEpFilter.value) {
                if (value.length() > 0) {
                    value.append(":");
                }
                value.append(ParseTools.toFormatString(endpointF, true));
            }
        }
        //个格式化后的字符串加上Tars的服务名
        if (value.length() < 1) {
            return null;
        }
        value.insert(0, Constants.TARS_AT);
        value.insert(0, name);
        return value.toString();
    }
    
    //【新增】根据分布式上下文信息获取RouteKey
    public String getRouteKeyByContext(){
        KeyRoute routeKey = new KeyRoute();
        return KeyRoute.getRouteKey(DistributedContextManager.getDistributedContext())
    }
    

    4.3 变更代码的路径

    通过上述操作,改变了以下代码,需要在github上提交:

    • TarsJava-1.7.x\core\src\main\java\com\qq\tars\support\query\QueryHelper.java

    5. 实现透传染色Key功能(客户端)

    5.1 Java源码位置及逻辑分析

    该部分的含义是:透传染色Key

    是指染色key和value放到tars请求结构体的status参数,需要透传给下游。这里讨论客户端。

    在TarsGo里,这部分代码位置在tars/servant.go,通过阅读源码上下文,我们可以得知这个类主要围绕ServantProxy服务代理器而工作的;透传染色Key是在ServantProxyTars_invoke方法里实现的,invoke方法一般是最终要执行的方法;

    在TarsJava里,对Tars_invoke类似的方法进行了层层封装;通过之前分析的客户端负载均衡源码分析可知,最终的执行方法在TarsInvoker类的doInvokeServant方法里,而该方法又对异步调用、同步调用、协程调用三种形式,这三个调用才是最终执行方法。

    因此,我们可以得到以下信息:

    • 定位对应源码位置如下:
    Go语言 Java
    tars/servant.go TarsJava-1.7.x\core\src\main\java\com\qq\tars\client\rpc\tars\TarsInvoker.java

    5.2 Java语言实现方式

    在KeyRoute类里添加一个静态方法setRouteKeyToRequest(),逻辑是通过分布式上下文信息,判断Tars请求体的status(map类型)是否存在TARS_ROUTE_KEY键值对,存在则设置到Tars的响应体透传给下游,不存在则不处理;

    之所以添加到KeyRoute类,是因为该方法需要在多处地方重用,如《6.2 Java语言实现方式》;

    public static void KeyRoute.setRouteKeyToRequest(DistributedContext distributedContext, TarsServantRequest request){
        String routeKey = KeyRoute.getRouteKey(distributedContext);
        if( routeKey != null && !"".equals(routeKey)){
            if(request.getStatus() != null){
                request.getStatus().put(KeyRoute.TARS_ROUTE_KEY ,routeKey);
            } else {
                HashMap<String, String> status = new HashMap<>();
                status.put(KeyRoute.TARS_ROUTE_KEY ,routeKey);
                request.setStatus(status);
            }
        }
    }
    

    然后在同步调用方法invokeWithSync()、异步调用方法invokeWithAsync()和协程调用方法invokeWithPromiseFuture()里,调用上述方法即可。

    调用setRouteKey方法(客户端)

    5.3 变更代码的路径

    通过上述操作,改变了以下代码,需要在github上提交:

    • TarsJava-1.7.x\core\src\main\java\com\qq\tars\client\rpc\tars\TarsInvoker.java
    • TarsJava-1.7.x\core\src\main\java\com\qq\tars\client\subset\KeyRoute.java

    6. 实现透传染色Key功能(服务端)

    6.1 Java源码位置及逻辑分析

    该部分的含义是:透传染色Key

    是指染色key和value放到tars请求结构体的status参数,需要透传给下游。这里讨论服务端。

    在TarsGo里,这部分代码位置在tars/tarsprotocol.go,通过阅读源码上下文,我们可以得知这个类主要围绕TarsProtocolTars服务端协议而工作的;透传染色Key是在TarsProtocolInvoke方法里实现的,其主要功能是将request请求作为字节数组,调用dispather,然后以字节数组返回response响应;

    在TarsJava中,Tars服务处理器为TarsServantProcessor,其中的process()方法逻辑是处理request请求到response响应转换;

    因此,我们可以得到以下信息:

    • 定位对应源码位置如下:
    Go语言 Java
    tars/tarsprotocol.go TarsJava-1.7.x\core\src\main\java\com\qq\tars\server\core\TarsServantProcessor.java

    6.2 Java语言实现方式

    直接调用setRouteKeyToRequest()方法即可;

    调用setRouteKey方法(服务端)

    6.3 变更代码的路径

    通过上述操作,改变了以下代码,需要在github上提交:

    • TarsJava-1.7.x\core\src\main\java\com\qq\tars\server\core\TarsServantProcessor.java

    7. 给节点信息增添Subset字段

    7.1 Java源码位置及逻辑分析

    该部分的含义是:增添Subset字段

    在TarsGo中,这部分代码位置在endpoint.go,比较简单,增加了一个String类型的Subset字段属性;

    在TarsJava中,endpoint的源码位置很容易找到,直接修改即可;主要修改两处,增加一个subset字段以及修改解析方法;

    因此,我们可以得到以下信息:

    • 定位对应源码位置如下:
    Go语言 Java
    tars/util/endpoint/endpoint.go和tars/util/endpoint/convert.go TarsJava-1.7.x\core\src\main\java\com\qq\tars\common\support\Endpoint.java

    7.2 Java语言实现方式

    public class Endpoint {
    
        private final String type;
        private final String host; 
        private final int port; 
    
        private final int timeout;
        private final int grid; 
        private final int qos; 
        private final String setDivision;
        //新增
        private String subset;
        ……
    }
    

    7.3 变更代码的路径

    通过上述操作,改变了以下代码,需要在github上提交:

    • TarsJava-1.7.x\core\src\main\java\com\qq\tars\common\support\Endpoint.java

    * 8. 正则算法的实现

    8.1 Java源码位置及逻辑分析

    该部分的含义是:正则算法匹配

    因为在参数匹配里要求正则匹配,因此在String工具类里新增一个算法实现正则匹配;

    8.2 Java语言实现方式

    public static Boolean matches(String regex, String input){
        //非空校验
        if(regex==null || "".equals(regex) || input == null){
            return false;
        }
        char[] chars = regex.toCharArray();
        boolean flage = true;
        if( chars[0] == '*'){
            //如果regex是*开头,如:*d123等。从d往后匹配;
            if( regex.length() < 2){
                return true;
            }
            int i;
            flage = false;
            for (i = 0; i < input.length(); i++) {
                if( input.charAt(i) == regex.charAt(1)){
                    flage = true;
                    for (int j = 1; j < regex.length(); j++) {
    
                        if( i > input.length() -1 && regex.charAt(j) != '*' ){
                            return false;
                        }
    
                        if( regex.charAt(j) == '*' || input.charAt(i) == regex.charAt(j)  ){
                            i++;
                        } else {
                            flage = false;
                        }
    
    
                    }
                }
            }
        }else {
            if( chars[chars.length-1] == '*'){
                //如果regex是*结尾,如uid12*。从第一个字符开始匹配
                for (int i = 0; i < Math.min(regex.length(), input.length()); i++) {
                    if(regex.charAt(i) == input.charAt(i) || regex.charAt(i) == '*'){
                        if( i == Math.min(regex.length(), input.length()) -1 && regex.length() > input.length()+1 ){
                            flage = false;
                        }
    
                    } else {
                        flage = false;
                    }
                }
            } else {
                //如果没有*,如uid123。
                flage = regex.equals(input);
            }
        }
    
        return flage;
    }
    

    8.3 变更代码的路径

    通过上述操作,改变了以下代码,需要在github上提交:

    • TarsJava-1.7.x\core\src\main\java\com\qq\tars\common\util\StringUtils.java

    * 9. 添加测试代码

    9.1 Java源码位置及逻辑分析

    该部分的含义是:主要流量路由规则测试

    测试中包含按比例路由单次测试、按比例路由多次测试、按参数精确路由测试、按参数路由正则测试,以及registry测试;

    由于其他同学部分的相关registry接口功能还未完成,故registry测试会失败。

    9.2 Java语言实现方式

    public class TestSubset {
    
        //创建Subset过滤器
        Subset subsetFilter = new Subset();
    
        //模拟objectName
        String objectName = "objectName";
    
        //模拟routeKey
        String routeKey = "routeKey";
    
        //存活节点list列表
        List<EndpointF> endpointFList = new ArrayList<EndpointF>();
        Holder<List<EndpointF>> activeEp = new Holder<List<EndpointF>>(new ArrayList<EndpointF>());
    
        //定义一个Session域,用来构建Tars请求体
        Session session;
    
    
        /**
         * 按比例路由规则 - 单次测试
         * 没有测试registry获取subsetConf功能
         */
        @Test
        public void testRatioOnce() {
    
            //1. 给过滤器设置过滤规则
            //1.1 创建SubsetManager管理器
            SubsetManager subsetManager = new SubsetManager();
    
    
            //1.1 设置比例路由规则
            RatioConfig ratioConf = new RatioConfig();
            Map<String , Integer> map = new HashMap<>();
            map.put("v1",20);
            map.put("v2",80);
            //map.put("v3",20);
            ratioConf.setRules(map);
    
            //1.2 设置subsetConf,并加入缓存
            SubsetConf subsetConf = new SubsetConf();
            subsetConf.setEnanle(true);
            subsetConf.setRuleType("ratio");
            subsetConf.setRatioConf(ratioConf);
            subsetConf.setLastUpdate( Instant.now() );
    
            Map<String, SubsetConf> cache = new HashMap<>();
            cache.put(objectName,subsetConf);
            subsetManager.setCache(cache);
    
            //1.3 给过滤器设置过滤规则和管理者
            subsetFilter.setSubsetConf(subsetConf);
            subsetFilter.setSubsetManager(subsetManager);
    
    
            //2. 模拟存活节点
            endpointFList.add(new EndpointF("host1",1,2,3,4,5,6,"setId1",7,8,9,10,"v1"));
            endpointFList.add(new EndpointF("host2",1,2,3,4,5,6,"setId2",7,8,9,10,"v1"));
            endpointFList.add(new EndpointF("host3",1,2,3,4,5,6,"setId3",7,8,9,10,"v2"));
            endpointFList.add(new EndpointF("host4",1,2,3,4,5,6,"setId4",7,8,9,10,"v2"));
            endpointFList.add(new EndpointF("host5",1,2,3,4,5,6,"setId5",7,8,9,10,"v2"));
            endpointFList.add(new EndpointF("host5",1,2,3,4,5,6,"setId5",7,8,9,10,"v3"));
            activeEp.setValue(endpointFList);
    
    
            //3. 输出过滤前信息
            System.out.println("过滤前节点信息如下:");
            for( EndpointF endpoint : endpointFList){
                System.out.println(endpoint.toString());
            }
    
            //4. 对存活节点按subset规则过滤
            Holder<List<EndpointF>> filterActiveEp = subsetFilter.subsetEndpointFilter(objectName, routeKey, activeEp);
    
            //5. 输出过滤结果
    
            System.out.println("过滤后节点信息如下:");
            for( EndpointF endpoint : filterActiveEp.getValue() ){
                System.out.println(endpoint.toString());
            }
        }
    
    
        /**
         * 按比例路由规则 - 多次测试
         * 没有测试registry获取subsetConf功能
         */
        @Test
        public void testRatioTimes() {
    
            //1. 给过滤器设置过滤规则
            //1.1 创建SubsetManager管理器
            SubsetManager subsetManager = new SubsetManager();
    
    
            //1.1 设置比例路由规则
            RatioConfig ratioConf = new RatioConfig();
            Map<String , Integer> map = new HashMap<>();
            map.put("v1",20);
            map.put("v2",80);
            map.put("v3",20);
            ratioConf.setRules(map);
    
            //1.2 设置subsetConf,并加入缓存
            SubsetConf subsetConf = new SubsetConf();
            subsetConf.setEnanle(true);
            subsetConf.setRuleType("ratio");
            subsetConf.setRatioConf(ratioConf);
            subsetConf.setLastUpdate( Instant.now() );
    
            Map<String, SubsetConf> cache = new HashMap<>();
            cache.put(objectName,subsetConf);
            subsetManager.setCache(cache);
    
            //1.3 给过滤器设置过滤规则和管理者
            subsetFilter.setSubsetConf(subsetConf);
            subsetFilter.setSubsetManager(subsetManager);
    
    
            //2. 模拟存活节点
            endpointFList.add(new EndpointF("host1",1,2,3,4,5,6,"setId1",7,8,9,10,"v1"));
            endpointFList.add(new EndpointF("host2",1,2,3,4,5,6,"setId2",7,8,9,10,"v1"));
            endpointFList.add(new EndpointF("host3",1,2,3,4,5,6,"setId3",7,8,9,10,"v2"));
            endpointFList.add(new EndpointF("host4",1,2,3,4,5,6,"setId4",7,8,9,10,"v2"));
            endpointFList.add(new EndpointF("host5",1,2,3,4,5,6,"setId5",7,8,9,10,"v2"));
            endpointFList.add(new EndpointF("host5",1,2,3,4,5,6,"setId5",7,8,9,10,"v3"));
            activeEp.setValue(endpointFList);
    
    
            //3. 循环times次
            int times = 1000000;
            int v1Times = 0;
            int v2Times = 0;
            int v3Times = 0;
            int errTimes = 0;
            for (int i = 0; i < times; i++) {
                //对存活节点按subset规则过滤
                Holder<List<EndpointF>> filterActiveEp = subsetFilter.subsetEndpointFilter(objectName, routeKey, activeEp);
                String subsetValue = filterActiveEp.getValue().get(0).getSubset();
                if("v1".equals(subsetValue)){
                    v1Times++;
                } else if("v2".equals(subsetValue)){
                    v2Times++;
                } else if("v3".equals(subsetValue)){
                    v3Times++;
                } else {
                    errTimes++;
                }
    
            }
            //输出结果
            System.out.println("一共循环次数:" + times);
            System.out.println("路由到v1次数:" + v1Times);
            System.out.println("路由到v2次数:" + v2Times);
            System.out.println("路由到v3次数:" + v3Times);
            System.out.println("路由异常次数:" + errTimes);
        }
    
    
        /**
         * 测试参数匹配 - 精确匹配
         * 没有测试registry获取subsetConf功能
         * 注意要成功必须routeKey和match匹配上
         */
        @Test
        public void testMatch() {
    
            //1. 给过滤器设置过滤规则
            //1.1 创建SubsetManager管理器
            SubsetManager subsetManager = new SubsetManager();
    
    
            //1.1 设置参数路由规则,这里的KeyRoute的value为 “规则的染色key”
            KeyConfig keyConf = new KeyConfig();
            List<KeyRoute> krs = new LinkedList<>();
            krs.add(new KeyRoute("match","routeKey","v1"));
            keyConf.setRules(krs);
    
            //1.2 设置subsetConf,并加入缓存
            SubsetConf subsetConf = new SubsetConf();
            subsetConf.setEnanle(true);
            subsetConf.setRuleType("key");
            subsetConf.setKeyConf(keyConf);
            subsetConf.setLastUpdate( Instant.now() );
    
            Map<String, SubsetConf> cache = new HashMap<>();
            cache.put(objectName,subsetConf);
            subsetManager.setCache(cache);
    
            //1.3 给过滤器设置过滤规则和管理者
            subsetFilter.setSubsetConf(subsetConf);
            subsetFilter.setSubsetManager(subsetManager);
    
            //1.4 模拟Tars “请求的染色key” TARS_ROUTE_KEY,但请求染色key和规则染色key匹配时,才能精确路由
            //1.4.1 创建Tars的请求体TarsServantRequest
            TarsServantRequest request = new TarsServantRequest( session );
            //1.4.2 往请求体的status添加{TARS_ROUTE_KEY, "routeKey"}键值对
            Map<String, String> status = new HashMap<>();
            status.put("TARS_ROUTE_KEY", "routeKey");
            request.setStatus(status);
            //1.4.3 构建分布式上下文信息,将请求放入分布式上下文信息中,因为getSubset()的逻辑是从分布式上下文信息中取
            DistributedContext distributedContext = new DistributedContextImpl();
            distributedContext.put(DyeingSwitch.REQ,request);
    
            //2. 模拟存活节点
            endpointFList.add(new EndpointF("host1",1,2,3,4,5,6,"setId1",7,8,9,10,"v1"));
            endpointFList.add(new EndpointF("host2",1,2,3,4,5,6,"setId2",7,8,9,10,"v1"));
            endpointFList.add(new EndpointF("host3",1,2,3,4,5,6,"setId3",7,8,9,10,"v2"));
            endpointFList.add(new EndpointF("host4",1,2,3,4,5,6,"setId4",7,8,9,10,"v2"));
            endpointFList.add(new EndpointF("host5",1,2,3,4,5,6,"setId5",7,8,9,10,"v2"));
            endpointFList.add(new EndpointF("host5",1,2,3,4,5,6,"setId5",7,8,9,10,"v3"));
            activeEp.setValue(endpointFList);
    
    
            //3. 输出过滤前信息
            System.out.println("过滤前节点信息如下:");
            for( EndpointF endpoint : endpointFList){
                System.out.println(endpoint.toString());
            }
    
            //4. 对存活节点按subset规则过滤
            Holder<List<EndpointF>> filterActiveEp = subsetFilter.subsetEndpointFilter(objectName, routeKey, activeEp);
    
            //5. 输出过滤结果
    
            System.out.println("过滤后节点信息如下:");
            for( EndpointF endpoint : filterActiveEp.getValue() ){
                System.out.println(endpoint.toString());
            }
        }
    
    
        /**
         * 测试参数匹配 - 正则匹配
         * 没有测试registry获取subsetConf功能
         * 注意要成功必须routeKey和match匹配上
         */
        @Test
        public void testEqual() {
    
            //1. 给过滤器设置过滤规则
            //1.1 创建SubsetManager管理器
    
    

    9.3 变更代码的路径

    通过上述操作,改变了以下代码,需要在github上提交:

    • TarsJava-1.7.x\core\src\test\java\com\qq\tars\client\subset\TestSubset.java

    最后

    新人制作,如有错误,欢迎指出,感激不尽!
    欢迎关注公众号,会分享一些更日常的东西!
    如需转载,请标注出处!

    本帖子中包含资源

    您需要 登录 才可以下载,没有帐号?立即注册