RPC调用本质上就是一种网络编程,客户端向服务器发送消息,服务器拿到消息之后做后续动作。只是RPC这种消息比较特殊,它封装了方法调用,包括方法名,方法参数。服务端拿到这个消息之后,解码消息,然后要通过方法调用模型来完成实际服务器端业务方法的调用。
这篇讲讲Thrfit的方法调用模型。Thrift的方法调用模型很简单,就是通过方法名和实际方法实现类的注册完成,没有使用反射机制,类加载机制。
和方法调用相关的几个核心类:
1. 自动生成的Iface接口,是远程方法的顶层接口
2. 自动生成的Processor类及相关父类,包括TProcessor接口,TBaseProcess抽象类
3. ProcessFunction抽象类,抽象了一个具体的方法调用,包含了方法名信息,调用方法的抽象过程等
4. TNonblcokingServer,是NIO服务器的默认实现,通过Args参数来配置Processor等信息
5. FrameBuffer类,服务器NIO的缓冲区对象,这个对象在服务器端收到全包并解码后,会调用Processor去完成实际的方法调用
6. 服务器端的方法的具体实现类,实现Iface接口
![](https://box.kancloud.cn/2016-02-19_56c6c62b7baa8.jpg)
下面逐个来分析相关的类。
Iface接口是自动生成的,描述了方法的接口。 服务器端服务提供方DemoService要实现Iface接口
~~~
public class DemoService {
public interface Iface {
public int demoMethod(String param1, Parameter param2, Map<String,String> param3) throws org.apache.thrift.TException;
}
}
public class DemoServiceImpl implements DemoService.Iface{
@Override
public int demoMethod(String param1, Parameter param2,
Map<String, String> param3) throws TException {
return 0;
}
}
~~~
来看TProcess相关类和接口
1. TProcessor就定义了一个顶层的调用方法process,参数是输入流和输出流
2. 抽象类TBaseProcessor提供了TProcessor的process的默认实现,先读消息头,拿到要调用的方法名,然后从维护的一个Map中取ProcessFunction对象。ProcessFunction对象是实际方法的抽象,调用它的process方法,实际是调用了实际的方法。
3. Processor类是自动生成了,它依赖Iface接口,负责把实际的方法实现和方法的key关联起来,放到Map中维护
~~~
public interface TProcessor {
public boolean process(TProtocol in, TProtocol out)
throws TException;
}
public abstract class TBaseProcessor<I> implements TProcessor {
private final I iface;
private final Map<String,ProcessFunction<I, ? extends TBase>> processMap;
protected TBaseProcessor(I iface, Map<String, ProcessFunction<I, ? extends TBase>> processFunctionMap) {
this.iface = iface;
this.processMap = processFunctionMap;
}
@Override
public boolean process(TProtocol in, TProtocol out) throws TException {
TMessage msg = in.readMessageBegin();
ProcessFunction fn = processMap.get(msg.name);
if (fn == null) {
TProtocolUtil.skip(in, TType.STRUCT);
in.readMessageEnd();
TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
x.write(out);
out.writeMessageEnd();
out.getTransport().flush();
return true;
}
fn.process(msg.seqid, in, out, iface);
return true;
}
}
~~~
~~~
public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
public Processor(I iface) {
super(iface, getProcessMap(new HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
}
protected Processor(I iface, Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
super(iface, getProcessMap(processMap));
}
private static <I extends Iface> Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
processMap.put("demoMethod", new demoMethod());
return processMap;
}
private static class demoMethod<I extends Iface> extends org.apache.thrift.ProcessFunction<I, demoMethod_args> {
public demoMethod() {
super("demoMethod");
}
protected demoMethod_args getEmptyArgsInstance() {
return new demoMethod_args();
}
protected demoMethod_result getResult(I iface, demoMethod_args args) throws org.apache.thrift.TException {
demoMethod_result result = new demoMethod_result();
result.success = iface.demoMethod(args.param1, args.param2, args.param3);
result.setSuccessIsSet(true);
return result;
}
}
}
~~~
自动生成的demoMethod类继承了ProcessFunction类,它负载把方法参数,iface, 方法返回值这些抽象的概念组合在一起,通过抽象模型来完成实际方法的调用。实际方法的实现者实现了Iface接口。
TNonblockingServer是NIO服务器的实现,它通过Selector来检查IO就绪状态,进而调用相关的Channel。就方法调用而言,它处理的是读事件,用handelRead()来进一步处理
~~~
private void select() {
try {
// wait for io events.
selector.select();
// process the io events we received
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (!stopped_ && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
// skip if not valid
if (!key.isValid()) {
cleanupSelectionKey(key);
continue;
}
// if the key is marked Accept, then it has to be the server
// transport.
if (key.isAcceptable()) {
handleAccept();
} else if (key.isReadable()) {
// deal with reads
handleRead(key);
} else if (key.isWritable()) {
// deal with writes
handleWrite(key);
} else {
LOGGER.warn("Unexpected state in select! " + key.interestOps());
}
}
} catch (IOException e) {
LOGGER.warn("Got an IOException while selecting!", e);
}
}
protected void handleRead(SelectionKey key) {
FrameBuffer buffer = (FrameBuffer) key.attachment();
if (!buffer.read()) {
cleanupSelectionKey(key);
return;
}
// if the buffer's frame read is complete, invoke the method.
<strong>if (buffer.isFrameFullyRead()) {
if (!requestInvoke(buffer)) {
cleanupSelectionKey(key);
}
}</strong>
}
protected boolean requestInvoke(FrameBuffer frameBuffer) {
frameBuffer.invoke();
return true;
}
~~~
非阻塞同步IO的NIO服务器都会使用缓冲区来存放读写的中间状态。FrameBuffer就是这样的一个缓冲区,它由于涉及到方法调用,所以提供了invoke()方法来实现对Processor的调用。
~~~
public void invoke() {
TTransport inTrans = getInputTransport();
TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans);
TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport());
try {
processorFactory_.getProcessor(inTrans).process(inProt, outProt);
responseReady();
return;
} catch (TException te) {
LOGGER.warn("Exception while invoking!", te);
} catch (Throwable t) {
LOGGER.error("Unexpected throwable while invoking!", t);
}
// This will only be reached when there is a throwable.
state_ = FrameBufferState.AWAITING_CLOSE;
requestSelectInterestChange();
}
~~~
FrameBuffer使用了processorFactory来获得Processor。ProcessorFactory是在创建服务器的时候传递过来的,只是对Processor的简单封装。
~~~
protected TServer(AbstractServerArgs args) {
processorFactory_ = args.processorFactory;
serverTransport_ = args.serverTransport;
inputTransportFactory_ = args.inputTransportFactory;
outputTransportFactory_ = args.outputTransportFactory;
inputProtocolFactory_ = args.inputProtocolFactory;
outputProtocolFactory_ = args.outputProtocolFactory;
}
public class TProcessorFactory {
private final TProcessor processor_;
public TProcessorFactory(TProcessor processor) {
processor_ = processor;
}
public TProcessor getProcessor(TTransport trans) {
return processor_;
}
}
public T processor(TProcessor processor) {
this.processorFactory = new TProcessorFactory(processor);
return (T) this;
}
~~~
下面是一个实际的TNonblockingServer的配置实例
除了配置服务器运行的基本参数,最重要的就是把实际的服务提供者通过服务器参数的方式作为Processor传递给TNonblockingServer,供FrameBuffer调用。
~~~
public class DemoServiceImpl implements DemoService.Iface{
@Override
public int demoMethod(String param1, Parameter param2,
Map<String, String> param3) throws TException {
return 0;
}
public static void main(String[] args){
TNonblockingServerSocket socket;
try {
socket = new TNonblockingServerSocket(9090);
TNonblockingServer.Args options = new TNonblockingServer.Args(socket);
TProcessor processor = new DemoService.Processor(new DemoServiceImpl());
options.processor(processor);
options.protocolFactory(new TCompactProtocol.Factory());
TServer server = new TNonblockingServer(options);
server.serve();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
~~~