## 流程
injvm协议的过程是dubbo中最简单的协议,但除了没有注册中心外,还是可以对其基本流程进行全面的了解。
![dubbo流程](http://www.uxiaowo.com/dubbo/dubbo.png)
## 1.发布到dubbo协议
在发布流程中,我们已经知道了服务要发布为dubbo协议时,不同点在发布Invoker的不同。非injvm协议都使用了RegistryProtocol的export()来发布服务,RegistryProtocol的内部变量bounds中保存了`<服务,协议>`对应的Exporter,每次发布后会保存到这个map中。
发布的过程如下:
```
// originInvoker是发布服务的公共流程中生成的Invoker对象
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 使用dubbo协议发布
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
// 获取注册中心
final Registry registry = getRegistry(originInvoker);
// 通过invoker的url 获取 providerUrl的地址
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
// 通知注册中心发布服务
registry.register(registedProviderUrl);
// 订阅override数据
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//保证每次export都返回一个新的exporter实例
return new Exporter<T>() {
public Invoker<T> getInvoker() {
}
public void unexport() {
}
};
}
```
### 1.1 使用dubbo协议发布
dubbo协议发布服务时,会根据发布时生成的Invoker,构建InvokerFilterChain,并添加监听事件,最后,打开协议指定的服务器,等待客户端连接后处理调用。
doLocalExport(originInvoker);中首先根据服务名在bounds之后查找对应的Exporter,如果找到,说明已经发不过了;如果没有找到则使用DubboProtocol协议发布Invoker。在发布之前,会将发布之前生成的Invoker包装为InvokerDelegete对象,这是因为originInvoker的url是注册中心协议的url`registry://xxxx/xxx?xx`;而dubboProtocol发布时需要改为`dubbo://xxx/xx?xxx`
```
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
```
接下来,继续分析dubboProtocol的export,protocol变量与之前相同,会根据协议名称获取协议链:
```
|- ProtocolFilterWrapper
|- ProtocolListenerWrapper
|- DubboProtocol
```
ProtocolFilterWrapper:主要用来生成调用链,内部的buildInvokerChain方法会查找Filter的实现类,查找group为provider的,并根据order排序,将这些Filter连接成一个调用链 InvokerFilterChain,最终调用上一步生成的InvokerDelegete
```
EchoFilter -> ClassloaderFilter -> GenericFilter ->
ContextFilter -> TraceFilter -> TimeoutFilter ->
MonitorFilter -> ExceptionFilter -> InvokerDelegete
```
ProtocolListenerWrapper:主要用来添加监听事件。
DubboProtocol:首先调用DubboProtocol的export,内部将InvokerFilterChain的头节点保存到DubboExporter中,最后打开服务器,最终返回DubboExporter。打开服务器的过程见3.1节。
### 1.2 获取注册中心
在1.1中虽然创建了Dubbo协议的Invoker,但还需要发布到注册中心,发布之前需要获取注册中心,以Zookeeper注册中心为例,获取注册中心时会根据url中的`registry=zookeeper`参数获取RegistryFactory,再由工厂获取注册中心
```
private Registry getRegistry(final Invoker<?> originInvoker) {
URL registryUrl = originInvoker.getUrl();
// 目的是根据修改registry参数值修改url
if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
}
return registryFactory.getRegistry(registryUrl);// 此次协议为zookeeper://xxx/xxx?xxx
}
```
RegistryFactory也是通过ExtensionLoader机制获取的,由于时zookeeper协议,会返回ZookeeperRegistryFactory对象
```
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
private ZookeeperTransporter zookeeperTransporter;
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
}
```
zookeeperTransporter是对zookeeper节点操作的抽象,可以使用`CuratorZookeeperTransporter`或`ZkclientZookeeperTransporter`,创建注册中心对象最终会获得ZookeeperRegistry对象的实例。
### 1.3 注册服务
注册服务是向注册中心注册,首先使用getRegistedProviderUrl获取服务提供者的url:`dubbo://IP:PORT/接口名?param`,并使用注册中心的register方法注册服务,Zookeeper注册中心会创建`/dubbo/接口名/providers/dubbo://xxxx/xxx?xxx`节点,providers下面都是提供了该服务的协议。
### 1.4 订阅服务
使用getSubscribedOverrideUrl方法获取订阅服务的url`provider://xxx/xxx?xxx`,这个过程中,会向url中添加参数category;category用来指定该url关心的变化,如configurations、routers、providers和consumers等等;
Zookeeper注册中心根据url中category的值configurators创建Zookeeper的节点`/dubbo/接口名/configurators`,并监听该节点,其子节点发生变化后调用ZookeeperRegistry的notify;最后,触发一遍notify,执行OverrideListener事件。
OverrideListener是一个监听事件,在zookeeper的节点发生变化后会调用notify(),目的是服务url发生更新后能够协议的Invoker。
## 2.引用服务
### 2.1 创建Invoker
根据第四章引用服务流程中创建Invoker的描述并未看到DubboProtocol创建Invoker的过程,这是因为`RegistryProtocol`的refer中创建了RegistryDirectory对象,并使用`cluster.join(directory)`方法返回了一个失败重试的Invoker就返回了。
其实DubboProtocol refer创建Invoker的过程正是在RegistryDirectory的监听函数中,RegistryDirectory的subscribe结束后会收到触发一遍,由于providers节点下有提供者,refreshInvoker中会使用DubboProtocol引用服务
```
invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
```
protocol与前面章节的类似,会有协议链,ProtocolFilterWrapper用来构建InvokerFilterChain,与发布服务不同的是此时使用的是`@Activate(group = Constants.CONSUMER)`的Filter实现类;ProtocolListenerWrapper用来添加监听函数。
```
|- ProtocolFilterWrapper
|- ProtocolListenerWrapper
|- DubboProtocol
```
DubboProtocol的refer时,首先要获取客户端,如果不存在的话就需要打开客户端了,`<dubbo:reference/>`标签中可以设置客户端连接数量,打开客户端时也会根据`connections`中设置的值初始化多个客户端,返回ExchangeClient数组。
之后创建DubboInvoker对象并返回。
客户端的应用中执行服务的方法时,最终会调用DubboInvoker的doInvoke,内部会使用currentClient.request(inv, timeout).get();想服务器发生请求。
## 3.底层通信
### 3.1 打开服务器流程
在DubboProtocol协议发布服务时,会打开服务器,DubboProtocol中有一个变量serverMap保存了`<ip:dubboPort>`对应的ExchangeServer,如果map中不存在,需要创建服务器createServer()。
底层通信分为两个层:信息交换层和传输层,传输层封装了netty、mina等服务器的实现。
```
|- Exchanger 信息交换层 header
|- Transporters 传输层 netty
|- NettyServer
|- HeaderExchangeServer
```
1. 创建服务器时,`dubbo:protocol`标签中可以设置协议的服务器端实现类型,比如:dubbo协议的mina,netty等,http协议的jetty,servlet等,dubbo协议默认的服务器是netty。
2. Exchangers.bind会根据url启动服务器,首先根据url获取Exchanger,默认的exchanger是header,对应的是HeaderExchanger,exchanger是信息交换层
```
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}
```
4.Exchanger在执行bind方法时会使用Transporters的bind
```
getTransporter().bind(url, handler);
public static Transporter getTransporter() {
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
```
getTransporter会获取url获取Transporter的实现类,默认的是NettyTransporter,使用的是netty3版本,也可以配置成netty4
```
<dubbo:protocol name="dubbo" port="20880" transporter="netty4"/>
```
下面以netty4为例说明,
```
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
```
此时的listener是
```
|- DecodeHandler
|- HeaderExchangeHandler
|- DubboProtocol中的requestHandler
```
在NettyServer中会对这个ChannelHandler链进行再包装
```
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
```
Dispatcher是线程池配置,默认的是AllDispatcher,内部会返回AllChannelHandler,此时,ChannelHandler-Chain通道处理链如下:
```
|-MultiMessageHandler
|-HeartbeatHandler
|-AllChannelHandler
|- DecodeHandler
|- HeaderExchangeHandler
|- DubboProtocol中的requestHandler
```
netty的open中会设置netty的子Channel的ChannnelHandler并启动服务器,NettyChannel链为
```
decoder - NettyCodecAdapter.getDecoder()
encoder - NettyCodecAdapter.getEncoder()
handler - NettyServerHandler
```
最后,将NettyServer包装为HeaderExchangeServer并返回。
### 3.2 打开客户端流程
创建DubboInvoker时会打开客户端,会使用Exchangers的connect方法
```
client = Exchangers.connect(url, requestHandler);
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).connect(url, handler);
}
```
默认的Exchanger是HeaderExchanger,内部的connect使用Transporters的connect方法
```
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
```
使用时,构建了dubboChannel链
```
|- DecodeHandler
|- HeaderExchangeHandler
|- DubboProtocol中的requestHandler
```
在创建NettyClient中,跟打开服务器一样会构建DubboChannel链
```
|-MultiMessageHandler
|-HeartbeatHandler
|-AllChannelHandler
|- DecodeHandler
|- HeaderExchangeHandler
|- DubboProtocol中的requestHandler
```
Netty Channel的链为:
```
decoder - NettyCodecAdapter.getDecoder()
encoder - NettyCodecAdapter.getEncoder()
handler - NettyClientHandler
```
### 3.2 codec
在打开netty服务器和客户端的过程中,都使用了NettyCodecAdapter获取编解码的ChannelHandler,在NettyCodecAdapter中有一个变量Codec2,会根据url中codec的值获取,默认为dubbo,返回的是DubboCountCodec,
```
protected static Codec2 getChannelCodec(URL url) {
String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
} else {
return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class)
.getExtension(codecName));
}
}
```
DubboCountCodec内部有一个编解码的ChannelHandler,内部调用DubboCodec的编解码方法,这个类实现了dubbo协议传输时的byte数组的数据结构,主要会根据使用序列化参数`serialization`的值,默认是`hessian2`获取到序列化对象,根据序列化对象来序列化Request。
与发送类似,服务器端收到请求后,会首先进入decode方法,根据序列化方式讲Request对象序列化出来,然
### 3.3 数据传输
客户端调用SpringContext返回的代码类的方法时,最终会进入DubboInvoker的doInvoker方法,在内部会使用ExchangeClient发送请求,内部将Invocation包装为Request对象,发送出去,最终会通过codec编码然后传递给
```
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
```
服务器端收到Request对象后,会根据选择的序列化方法decode解析出Request对象,然后,在AllChannelHandler中提交给线程池处理,最终调用到DubboProtocol中的requestHandler的reply方法来处理请求,DubboProtocol中的exporterMap保存了服务名与Invoker的映射关系,最后使用Invoker的doInvoker调用Wrapper的invokeMethod方法从而完成调用。