ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
## 流程 injvm协议的过程是dubbo中最简单的协议,但除了没有注册中心外,还是可以对其基本流程进行全面的了解。 ![dubbo流程](http://www.uxiaowo.com/dubbo/dubbo.png) ## 1.发布到dubbo协议 在发布流程中,我们已经知道了服务要发布为dubbo协议时,不同点在发布Invoker的不同。非injvm协议都使用了RegistryProtocol的export()来发布服务,RegistryProtocol的内部变量bounds中保存了`<服务,协议>`对应的Exporter,每次发布后会保存到这个map中。 发布的过程如下: ``` // originInvoker是发布服务的公共流程中生成的Invoker对象 public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { // 使用dubbo协议发布 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); // 获取注册中心 final Registry registry = getRegistry(originInvoker); // 通过invoker的url 获取 providerUrl的地址 final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); // 通知注册中心发布服务 registry.register(registedProviderUrl); // 订阅override数据 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //保证每次export都返回一个新的exporter实例 return new Exporter<T>() { public Invoker<T> getInvoker() { } public void unexport() { } }; } ``` ### 1.1 使用dubbo协议发布 dubbo协议发布服务时,会根据发布时生成的Invoker,构建InvokerFilterChain,并添加监听事件,最后,打开协议指定的服务器,等待客户端连接后处理调用。 doLocalExport(originInvoker);中首先根据服务名在bounds之后查找对应的Exporter,如果找到,说明已经发不过了;如果没有找到则使用DubboProtocol协议发布Invoker。在发布之前,会将发布之前生成的Invoker包装为InvokerDelegete对象,这是因为originInvoker的url是注册中心协议的url`registry://xxxx/xxx?xx`;而dubboProtocol发布时需要改为`dubbo://xxx/xx?xxx` ``` private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) { String key = getCacheKey(originInvoker); ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); } } } return exporter; } ``` 接下来,继续分析dubboProtocol的export,protocol变量与之前相同,会根据协议名称获取协议链: ``` |- ProtocolFilterWrapper |- ProtocolListenerWrapper |- DubboProtocol ``` ProtocolFilterWrapper:主要用来生成调用链,内部的buildInvokerChain方法会查找Filter的实现类,查找group为provider的,并根据order排序,将这些Filter连接成一个调用链 InvokerFilterChain,最终调用上一步生成的InvokerDelegete ``` EchoFilter -> ClassloaderFilter -> GenericFilter -> ContextFilter -> TraceFilter -> TimeoutFilter -> MonitorFilter -> ExceptionFilter -> InvokerDelegete ``` ProtocolListenerWrapper:主要用来添加监听事件。 DubboProtocol:首先调用DubboProtocol的export,内部将InvokerFilterChain的头节点保存到DubboExporter中,最后打开服务器,最终返回DubboExporter。打开服务器的过程见3.1节。 ### 1.2 获取注册中心 在1.1中虽然创建了Dubbo协议的Invoker,但还需要发布到注册中心,发布之前需要获取注册中心,以Zookeeper注册中心为例,获取注册中心时会根据url中的`registry=zookeeper`参数获取RegistryFactory,再由工厂获取注册中心 ``` private Registry getRegistry(final Invoker<?> originInvoker) { URL registryUrl = originInvoker.getUrl(); // 目的是根据修改registry参数值修改url if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) { String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY); registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY); } return registryFactory.getRegistry(registryUrl);// 此次协议为zookeeper://xxx/xxx?xxx } ``` RegistryFactory也是通过ExtensionLoader机制获取的,由于时zookeeper协议,会返回ZookeeperRegistryFactory对象 ``` public class ZookeeperRegistryFactory extends AbstractRegistryFactory { private ZookeeperTransporter zookeeperTransporter; public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) { this.zookeeperTransporter = zookeeperTransporter; } public Registry createRegistry(URL url) { return new ZookeeperRegistry(url, zookeeperTransporter); } } ``` zookeeperTransporter是对zookeeper节点操作的抽象,可以使用`CuratorZookeeperTransporter`或`ZkclientZookeeperTransporter`,创建注册中心对象最终会获得ZookeeperRegistry对象的实例。 ### 1.3 注册服务 注册服务是向注册中心注册,首先使用getRegistedProviderUrl获取服务提供者的url:`dubbo://IP:PORT/接口名?param`,并使用注册中心的register方法注册服务,Zookeeper注册中心会创建`/dubbo/接口名/providers/dubbo://xxxx/xxx?xxx`节点,providers下面都是提供了该服务的协议。 ### 1.4 订阅服务 使用getSubscribedOverrideUrl方法获取订阅服务的url`provider://xxx/xxx?xxx`,这个过程中,会向url中添加参数category;category用来指定该url关心的变化,如configurations、routers、providers和consumers等等; Zookeeper注册中心根据url中category的值configurators创建Zookeeper的节点`/dubbo/接口名/configurators`,并监听该节点,其子节点发生变化后调用ZookeeperRegistry的notify;最后,触发一遍notify,执行OverrideListener事件。 OverrideListener是一个监听事件,在zookeeper的节点发生变化后会调用notify(),目的是服务url发生更新后能够协议的Invoker。 ## 2.引用服务 ### 2.1 创建Invoker 根据第四章引用服务流程中创建Invoker的描述并未看到DubboProtocol创建Invoker的过程,这是因为`RegistryProtocol`的refer中创建了RegistryDirectory对象,并使用`cluster.join(directory)`方法返回了一个失败重试的Invoker就返回了。 其实DubboProtocol refer创建Invoker的过程正是在RegistryDirectory的监听函数中,RegistryDirectory的subscribe结束后会收到触发一遍,由于providers节点下有提供者,refreshInvoker中会使用DubboProtocol引用服务 ``` invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl); ``` protocol与前面章节的类似,会有协议链,ProtocolFilterWrapper用来构建InvokerFilterChain,与发布服务不同的是此时使用的是`@Activate(group = Constants.CONSUMER)`的Filter实现类;ProtocolListenerWrapper用来添加监听函数。 ``` |- ProtocolFilterWrapper |- ProtocolListenerWrapper |- DubboProtocol ``` DubboProtocol的refer时,首先要获取客户端,如果不存在的话就需要打开客户端了,`<dubbo:reference/>`标签中可以设置客户端连接数量,打开客户端时也会根据`connections`中设置的值初始化多个客户端,返回ExchangeClient数组。 之后创建DubboInvoker对象并返回。 客户端的应用中执行服务的方法时,最终会调用DubboInvoker的doInvoke,内部会使用currentClient.request(inv, timeout).get();想服务器发生请求。 ## 3.底层通信 ### 3.1 打开服务器流程 在DubboProtocol协议发布服务时,会打开服务器,DubboProtocol中有一个变量serverMap保存了`<ip:dubboPort>`对应的ExchangeServer,如果map中不存在,需要创建服务器createServer()。 底层通信分为两个层:信息交换层和传输层,传输层封装了netty、mina等服务器的实现。 ``` |- Exchanger 信息交换层 header |- Transporters 传输层 netty |- NettyServer |- HeaderExchangeServer ``` 1. 创建服务器时,`dubbo:protocol`标签中可以设置协议的服务器端实现类型,比如:dubbo协议的mina,netty等,http协议的jetty,servlet等,dubbo协议默认的服务器是netty。 2. Exchangers.bind会根据url启动服务器,首先根据url获取Exchanger,默认的exchanger是header,对应的是HeaderExchanger,exchanger是信息交换层 ``` public class HeaderExchanger implements Exchanger { public static final String NAME = "header"; public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); } public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } } ``` 4.Exchanger在执行bind方法时会使用Transporters的bind ``` getTransporter().bind(url, handler); public static Transporter getTransporter() { return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension(); } ``` getTransporter会获取url获取Transporter的实现类,默认的是NettyTransporter,使用的是netty3版本,也可以配置成netty4 ``` <dubbo:protocol name="dubbo" port="20880" transporter="netty4"/> ``` 下面以netty4为例说明, ``` public Server bind(URL url, ChannelHandler listener) throws RemotingException { return new NettyServer(url, listener); } ``` 此时的listener是 ``` |- DecodeHandler |- HeaderExchangeHandler |- DubboProtocol中的requestHandler ``` 在NettyServer中会对这个ChannelHandler链进行再包装 ``` public NettyServer(URL url, ChannelHandler handler) throws RemotingException { super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); } public static ChannelHandler wrap(ChannelHandler handler, URL url) { return ChannelHandlers.getInstance().wrapInternal(handler, url); } protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url))); } ``` Dispatcher是线程池配置,默认的是AllDispatcher,内部会返回AllChannelHandler,此时,ChannelHandler-Chain通道处理链如下: ``` |-MultiMessageHandler |-HeartbeatHandler |-AllChannelHandler |- DecodeHandler |- HeaderExchangeHandler |- DubboProtocol中的requestHandler ``` netty的open中会设置netty的子Channel的ChannnelHandler并启动服务器,NettyChannel链为 ``` decoder - NettyCodecAdapter.getDecoder() encoder - NettyCodecAdapter.getEncoder() handler - NettyServerHandler ``` 最后,将NettyServer包装为HeaderExchangeServer并返回。 ### 3.2 打开客户端流程 创建DubboInvoker时会打开客户端,会使用Exchangers的connect方法 ``` client = Exchangers.connect(url, requestHandler); public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); return getExchanger(url).connect(url, handler); } ``` 默认的Exchanger是HeaderExchanger,内部的connect使用Transporters的connect方法 ``` public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); } ``` 使用时,构建了dubboChannel链 ``` |- DecodeHandler |- HeaderExchangeHandler |- DubboProtocol中的requestHandler ``` 在创建NettyClient中,跟打开服务器一样会构建DubboChannel链 ``` |-MultiMessageHandler |-HeartbeatHandler |-AllChannelHandler |- DecodeHandler |- HeaderExchangeHandler |- DubboProtocol中的requestHandler ``` Netty Channel的链为: ``` decoder - NettyCodecAdapter.getDecoder() encoder - NettyCodecAdapter.getEncoder() handler - NettyClientHandler ``` ### 3.2 codec 在打开netty服务器和客户端的过程中,都使用了NettyCodecAdapter获取编解码的ChannelHandler,在NettyCodecAdapter中有一个变量Codec2,会根据url中codec的值获取,默认为dubbo,返回的是DubboCountCodec, ``` protected static Codec2 getChannelCodec(URL url) { String codecName = url.getParameter(Constants.CODEC_KEY, "telnet"); if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) { return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName); } else { return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class) .getExtension(codecName)); } } ``` DubboCountCodec内部有一个编解码的ChannelHandler,内部调用DubboCodec的编解码方法,这个类实现了dubbo协议传输时的byte数组的数据结构,主要会根据使用序列化参数`serialization`的值,默认是`hessian2`获取到序列化对象,根据序列化对象来序列化Request。 与发送类似,服务器端收到请求后,会首先进入decode方法,根据序列化方式讲Request对象序列化出来,然 ### 3.3 数据传输 客户端调用SpringContext返回的代码类的方法时,最终会进入DubboInvoker的doInvoker方法,在内部会使用ExchangeClient发送请求,内部将Invocation包装为Request对象,发送出去,最终会通过codec编码然后传递给 ``` public ResponseFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } // create request. Request req = new Request(); req.setVersion("2.0.0"); req.setTwoWay(true); req.setData(request); DefaultFuture future = new DefaultFuture(channel, req, timeout); try { channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; } ``` 服务器端收到Request对象后,会根据选择的序列化方法decode解析出Request对象,然后,在AllChannelHandler中提交给线程池处理,最终调用到DubboProtocol中的requestHandler的reply方法来处理请求,DubboProtocol中的exporterMap保存了服务名与Invoker的映射关系,最后使用Invoker的doInvoker调用Wrapper的invokeMethod方法从而完成调用。