博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
dubbo集群之Router模块
阅读量:6706 次
发布时间:2019-06-25

本文共 8945 字,大约阅读时间需要 29 分钟。

dubbo如何集成路由

1.默认路由是怎么设置的

消费者服务启动时,会监听注册中心的变更。所以我调用下面的方法

//RegistryDirectorypublic synchronized void notify(List
urls) { // 根据 URL 的分类或协议,分组成三个集合 。 //.......省略代码 // 2.routers if (routerUrls != null && !routerUrls.isEmpty()) { List
routers = toRouters(routerUrls); if (routers != null) { // setRouters(routers); } } //.... }protected void setRouters(List
routers) { //... // append mock invoker selector routers.add(new MockInvokersSelector());//这里设置MockInvokersSelector Collections.sort(routers); this.routers = routers; }//经过上面的代码,我们已经拿到的默认路由复制代码

2.下面看下我们是怎么使用默认路由的

当消费者引用服务时会走到这里

//AbstractDirectorypublic List
> list(Invocation invocation) throws RpcException { //抽象方法,子类实现 List
> invokers = doList(invocation);//5 RegistryDirectory.doList-->6 7 // 根据路由规则,找出 Invoker 集合 List
localRouters = this.routers; //拿到路由MockInvokersSelector if (localRouters != null && !localRouters.isEmpty()) { for (Router router : localRouters) { if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) { invokers = router.route(invokers, getConsumerUrl(), invocation);//6 开始使用路由 } } } return invokers; }复制代码

3.MockInvokersSelector

整个流程简单,看下代码注释就ok了

public class MockInvokersSelector implements Router {    @Override    public 
List
> route(final List
> invokers, URL url, final Invocation invocation) throws RpcException { if (invocation.getAttachments() == null) { // 获得普通 Invoker 集合 return getNormalInvokers(invokers); } else { // 获得 "invocation.need.mock" 配置项 String value = invocation.getAttachments().get("invocation.need.mock"); if (value == null) return getNormalInvokers(invokers);//7 else if (Boolean.TRUE.toString().equalsIgnoreCase(value)) { return getMockedInvokers(invokers); } }// 其它,不匹配,直接返回 `invokers` 集合 return invokers; } private
List
> getMockedInvokers(final List
> invokers) { if (!hasMockProviders(invokers)) { return null; }// 过滤掉普通 kInvoker ,创建 MockInvoker 集合 List
> sInvokers = new ArrayList
>(1); for (Invoker
invoker : invokers) { if (invoker.getUrl().getProtocol().equals("mock")) { sInvokers.add(invoker); } } return sInvokers; } private
List
> getNormalInvokers(final List
> invokers) { if (!hasMockProviders(invokers)) { return invokers; } else {// 若包含 MockInvoker,过滤掉 MockInvoker ,创建普通 Invoker 集合 List
> sInvokers = new ArrayList
>(invokers.size()); for (Invoker
invoker : invokers) { if (!invoker.getUrl().getProtocol().equals("mock")) { sInvokers.add(invoker); } } return sInvokers; } } private
boolean hasMockProviders(final List
> invokers) { boolean hasMockProvider = false; for (Invoker
invoker : invokers) { if (invoker.getUrl().getProtocol().equals("mock")) { hasMockProvider = true; break; } } return hasMockProvider; }}复制代码

新加路由是怎么设置进去的

注册中心服务变更的时候-->RegistryDirectory.notify()设置新的路由

怎么筛选出Invoker

//RegistryDirectory private void refreshInvoker(List
invokerUrls) { //... Map
> newUrlInvokerMap = toInvokers(invokerUrls);//1 Map
>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); //2 //.. this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap; this.urlInvokerMap = newUrlInvokerMap; }复制代码
  • 1处:url转成调用者Invoker的列表,这里会有服务禁用、启用功能的实现
private Map
> toInvokers(List
urls) { Map
> newUrlInvokerMap = new HashMap
>(); if (urls == null || urls.isEmpty()) { return newUrlInvokerMap; } Set
keys = new HashSet
(); String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY); for (URL providerUrl : urls) { //如果reference端配置了protocol,则只选择匹配的protocol if (queryProtocols != null && queryProtocols.length() > 0) { boolean accept = false; String[] acceptProtocols = queryProtocols.split(","); for (String acceptProtocol : acceptProtocols) { if (providerUrl.getProtocol().equals(acceptProtocol)) { accept = true; break; } } if (!accept) { continue; } } if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) { continue; } if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) { logger); continue; } URL url = mergeUrl(providerUrl); String key = url.toFullString(); // URL参数是排序的 if (keys.contains(key)) { // 重复URL continue; } keys.add(key); // 缓存key为没有合并消费端参数的URL,不管消费端如何合并参数,如果服务端URL发生变化,则重新refer Map
> localUrlInvokerMap = this.urlInvokerMap; // local reference Invoker
invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key); if (invoker == null) { // 缓存中没有,重新refer try { boolean enabled = true; if (url.hasParameter("disabled")) { enabled = !url.getParameter("disabled", false); } else { enabled = url.getParameter("enabled", true); } if (enabled) { invoker = new InvokerDelegate
(protocol.refer(serviceType, url), url, providerUrl); } } catch (Throwable t) { logger; } if (invoker != null) { // 将新的引用放入缓存 newUrlInvokerMap.put(key, invoker); } } else { newUrlInvokerMap.put(key, invoker); } } keys.clear(); return newUrlInvokerMap; }复制代码
  • 2处:
// 按提供者URL所声明的methods分类,兼容注册中心执行路由过滤掉的methods    //即消费者调用了某个方法,会根据方法映射到提供者    private Map
>> toMethodInvokers(Map
> invokersMap) { Map
>> newMethodInvokerMap = new HashMap
>>(); List
> invokersList = new ArrayList
>(); if (invokersMap != null && invokersMap.size() > 0) { for (Invoker
invoker : invokersMap.values()) { String parameter = invoker.getUrl().getParameter("methods"); if (parameter != null && parameter.length() > 0) { String[] methods = Constants.COMMA_SPLIT_PATTERN.split(parameter); if (methods != null && methods.length > 0) { for (String method : methods) { if (method != null && method.length() > 0 && !Constants.ANY_VALUE.equals(method)) { List
> methodInvokers = newMethodInvokerMap.get(method); if (methodInvokers == null) { methodInvokers = new ArrayList
>(); newMethodInvokerMap.put(method, methodInvokers); } methodInvokers.add(invoker); } } } } invokersList.add(invoker); } } List
> newInvokersList = route(invokersList, null); newMethodInvokerMap.put(Constants.ANY_VALUE, newInvokersList); if (serviceMethods != null && serviceMethods.length > 0) { for (String method : serviceMethods) { List
> methodInvokers = newMethodInvokerMap.get(method); if (methodInvokers == null || methodInvokers.isEmpty()) { methodInvokers = newInvokersList; } newMethodInvokerMap.put(method, route(methodInvokers, method));//路由功能实现 } } // sort and unmodifiable for (String method : new HashSet
(newMethodInvokerMap.keySet())) { List
> methodInvokers = newMethodInvokerMap.get(method); Collections.sort(methodInvokers, InvokerComparator.getComparator()); newMethodInvokerMap.put(method, Collections.unmodifiableList(methodInvokers)); } return Collections.unmodifiableMap(newMethodInvokerMap); }复制代码
我们进入路由功能实现的源码     private List
> route(List
> invokers, String method) { Invocation invocation = new RpcInvocation(method, new Class
[0], new Object[0]); List
routers = getRouters(); if (routers != null) {// MockInvokersSelector和ConditionRouter for (Router router : routers) { if (router.getUrl() != null && !router.getUrl().getParameter("runtime", false)) { //从invocation参数可看出:是用方法一个一个到路由表里找的 invokers = router.route(invokers, getConsumerUrl(), invocation);//--进ConditionRouter } } } return invokers; }从上面的注释,进入到进ConditionRouter // 1处,判断消费者是否匹配路由规则的左边 // 比如,路由规则method = find*,list*,get*,is* => host = 11.22.3.91,12.221.3.95,12.232.3.16, // 规则左边,即判断消费者调用的方法是读方法 // 2处,判断提供者是否匹配路由规则的右边 // 判断提供者是否这几个host的服务,否则不会被映射到方法的提供列表 @Override public
List
> route(List
> invokers, URL url, Invocation invocation) try { if (!matchWhen(url, invocation)) {//1 return invokers; } List
> result = new ArrayList
>(); for (Invoker
invoker : invokers) { if (matchThen(invoker.getUrl(), url)) {//2 result.add(invoker); } } return invokers; }好了。路由结束了。貌似也没那么高大上。呵呵下一篇会进入负载均衡。复制代码

转载于:https://juejin.im/post/5bf4c3f6e51d4505ae62e6df

你可能感兴趣的文章
C# COM Object for Use In JavaScript / HTML, Including Event Handling
查看>>
svn权限设置
查看>>
MVC验证11-对复杂类型使用jQuery异步验证
查看>>
C++static关键字用法
查看>>
excel在msdn上的说明文档
查看>>
指尖下的js ——多触式web前端开发之一:对于Touch的处理(转)
查看>>
visual studio 2013使用技巧
查看>>
Sublime Text 相关
查看>>
深入理解css优先级
查看>>
Android MediaPlayer状态机
查看>>
Material Design Animation
查看>>
ASP.NET MVC搭建项目后台UI框架—3、面板折叠和展开
查看>>
(C语言)memcpy函数原型的实现
查看>>
Theano2.1.1-基础知识之准备工作
查看>>
FreeBSd ports 安装软件
查看>>
DevExpress.Build
查看>>
ACCESS-如何多数据库查询(跨库查询)
查看>>
iOS:转载sqlite3
查看>>
java并发编程学习:用 Semaphore (信号量)控制并发资源
查看>>
HDU 2070 Fibbonacci Number
查看>>