dubbo如何集成路由
1.默认路由是怎么设置的
消费者服务启动时,会监听注册中心的变更。所以我调用下面的方法
//RegistryDirectorypublic synchronized void notify(Listurls) { // 根据 URL 的分类或协议,分组成三个集合 。 //.......省略代码 // 2.routers if (routerUrls != null && !routerUrls.isEmpty()) { List routers = toRouters(routerUrls); if (routers != null) { // setRouters(routers); } } //.... }protected void setRouters(List routers) { //... // append mock invoker selector routers.add(new MockInvokersSelector());//这里设置MockInvokersSelector Collections.sort(routers); this.routers = routers; }//经过上面的代码,我们已经拿到的默认路由复制代码
2.下面看下我们是怎么使用默认路由的
当消费者引用服务时会走到这里
//AbstractDirectorypublic List> list(Invocation invocation) throws RpcException { //抽象方法,子类实现 List > invokers = doList(invocation);//5 RegistryDirectory.doList-->6 7 // 根据路由规则,找出 Invoker 集合 List localRouters = this.routers; //拿到路由MockInvokersSelector if (localRouters != null && !localRouters.isEmpty()) { for (Router router : localRouters) { if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) { invokers = router.route(invokers, getConsumerUrl(), invocation);//6 开始使用路由 } } } return invokers; }复制代码
3.MockInvokersSelector
整个流程简单,看下代码注释就ok了
public class MockInvokersSelector implements Router { @Override publicList > route(final List > invokers, URL url, final Invocation invocation) throws RpcException { if (invocation.getAttachments() == null) { // 获得普通 Invoker 集合 return getNormalInvokers(invokers); } else { // 获得 "invocation.need.mock" 配置项 String value = invocation.getAttachments().get("invocation.need.mock"); if (value == null) return getNormalInvokers(invokers);//7 else if (Boolean.TRUE.toString().equalsIgnoreCase(value)) { return getMockedInvokers(invokers); } }// 其它,不匹配,直接返回 `invokers` 集合 return invokers; } private List > getMockedInvokers(final List > invokers) { if (!hasMockProviders(invokers)) { return null; }// 过滤掉普通 kInvoker ,创建 MockInvoker 集合 List > sInvokers = new ArrayList >(1); for (Invoker invoker : invokers) { if (invoker.getUrl().getProtocol().equals("mock")) { sInvokers.add(invoker); } } return sInvokers; } private List > getNormalInvokers(final List > invokers) { if (!hasMockProviders(invokers)) { return invokers; } else {// 若包含 MockInvoker,过滤掉 MockInvoker ,创建普通 Invoker 集合 List > sInvokers = new ArrayList >(invokers.size()); for (Invoker invoker : invokers) { if (!invoker.getUrl().getProtocol().equals("mock")) { sInvokers.add(invoker); } } return sInvokers; } } private boolean hasMockProviders(final List > invokers) { boolean hasMockProvider = false; for (Invoker invoker : invokers) { if (invoker.getUrl().getProtocol().equals("mock")) { hasMockProvider = true; break; } } return hasMockProvider; }}复制代码
新加路由是怎么设置进去的
注册中心服务变更的时候-->RegistryDirectory.notify()设置新的路由
怎么筛选出Invoker
//RegistryDirectory private void refreshInvoker(ListinvokerUrls) { //... Map > newUrlInvokerMap = toInvokers(invokerUrls);//1 Map >> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); //2 //.. this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap; this.urlInvokerMap = newUrlInvokerMap; }复制代码
- 1处:url转成调用者Invoker的列表,这里会有服务禁用、启用功能的实现
private Map> toInvokers(List urls) { Map > newUrlInvokerMap = new HashMap >(); if (urls == null || urls.isEmpty()) { return newUrlInvokerMap; } Set keys = new HashSet (); String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY); for (URL providerUrl : urls) { //如果reference端配置了protocol,则只选择匹配的protocol if (queryProtocols != null && queryProtocols.length() > 0) { boolean accept = false; String[] acceptProtocols = queryProtocols.split(","); for (String acceptProtocol : acceptProtocols) { if (providerUrl.getProtocol().equals(acceptProtocol)) { accept = true; break; } } if (!accept) { continue; } } if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) { continue; } if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) { logger); continue; } URL url = mergeUrl(providerUrl); String key = url.toFullString(); // URL参数是排序的 if (keys.contains(key)) { // 重复URL continue; } keys.add(key); // 缓存key为没有合并消费端参数的URL,不管消费端如何合并参数,如果服务端URL发生变化,则重新refer Map > localUrlInvokerMap = this.urlInvokerMap; // local reference Invoker invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key); if (invoker == null) { // 缓存中没有,重新refer try { boolean enabled = true; if (url.hasParameter("disabled")) { enabled = !url.getParameter("disabled", false); } else { enabled = url.getParameter("enabled", true); } if (enabled) { invoker = new InvokerDelegate (protocol.refer(serviceType, url), url, providerUrl); } } catch (Throwable t) { logger; } if (invoker != null) { // 将新的引用放入缓存 newUrlInvokerMap.put(key, invoker); } } else { newUrlInvokerMap.put(key, invoker); } } keys.clear(); return newUrlInvokerMap; }复制代码
- 2处:
// 按提供者URL所声明的methods分类,兼容注册中心执行路由过滤掉的methods //即消费者调用了某个方法,会根据方法映射到提供者 private Map>> toMethodInvokers(Map > invokersMap) { Map >> newMethodInvokerMap = new HashMap >>(); List > invokersList = new ArrayList >(); if (invokersMap != null && invokersMap.size() > 0) { for (Invoker invoker : invokersMap.values()) { String parameter = invoker.getUrl().getParameter("methods"); if (parameter != null && parameter.length() > 0) { String[] methods = Constants.COMMA_SPLIT_PATTERN.split(parameter); if (methods != null && methods.length > 0) { for (String method : methods) { if (method != null && method.length() > 0 && !Constants.ANY_VALUE.equals(method)) { List > methodInvokers = newMethodInvokerMap.get(method); if (methodInvokers == null) { methodInvokers = new ArrayList >(); newMethodInvokerMap.put(method, methodInvokers); } methodInvokers.add(invoker); } } } } invokersList.add(invoker); } } List > newInvokersList = route(invokersList, null); newMethodInvokerMap.put(Constants.ANY_VALUE, newInvokersList); if (serviceMethods != null && serviceMethods.length > 0) { for (String method : serviceMethods) { List > methodInvokers = newMethodInvokerMap.get(method); if (methodInvokers == null || methodInvokers.isEmpty()) { methodInvokers = newInvokersList; } newMethodInvokerMap.put(method, route(methodInvokers, method));//路由功能实现 } } // sort and unmodifiable for (String method : new HashSet (newMethodInvokerMap.keySet())) { List > methodInvokers = newMethodInvokerMap.get(method); Collections.sort(methodInvokers, InvokerComparator.getComparator()); newMethodInvokerMap.put(method, Collections.unmodifiableList(methodInvokers)); } return Collections.unmodifiableMap(newMethodInvokerMap); }复制代码
我们进入路由功能实现的源码 private List> route(List > invokers, String method) { Invocation invocation = new RpcInvocation(method, new Class [0], new Object[0]); List routers = getRouters(); if (routers != null) {// MockInvokersSelector和ConditionRouter for (Router router : routers) { if (router.getUrl() != null && !router.getUrl().getParameter("runtime", false)) { //从invocation参数可看出:是用方法一个一个到路由表里找的 invokers = router.route(invokers, getConsumerUrl(), invocation);//--进ConditionRouter } } } return invokers; }从上面的注释,进入到进ConditionRouter // 1处,判断消费者是否匹配路由规则的左边 // 比如,路由规则method = find*,list*,get*,is* => host = 11.22.3.91,12.221.3.95,12.232.3.16, // 规则左边,即判断消费者调用的方法是读方法 // 2处,判断提供者是否匹配路由规则的右边 // 判断提供者是否这几个host的服务,否则不会被映射到方法的提供列表 @Override public List > route(List > invokers, URL url, Invocation invocation) try { if (!matchWhen(url, invocation)) {//1 return invokers; } List > result = new ArrayList >(); for (Invoker invoker : invokers) { if (matchThen(invoker.getUrl(), url)) {//2 result.add(invoker); } } return invokers; }好了。路由结束了。貌似也没那么高大上。呵呵下一篇会进入负载均衡。复制代码