ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
> anoyi中的GrpcClient 1. 查看属性 ~~~ private static final Map<String, ServerContext> serverMap = new HashMap<>(); private final GrpcProperties grpcProperties; private final SerializeService serializeService; private ClientInterceptor clientInterceptor; ~~~ 这里四个属性,serverMap 的key值是远程服务端的名字列表,ServerContext是http2.0的请求context,grpcProperties是客户端的配置属性,有端口号,启动时候绑定的,serializeService是序列化的类,clientInterceptor是拦截器,因为grpc本质是http,所以有必要在发送的时候加token或者啥的令牌 2. 构造方法(自己去看) ~~~ public GrpcClient(GrpcProperties grpcProperties, SerializeService serializeService) { this.grpcProperties = grpcProperties; this.serializeService = serializeService; } public GrpcClient(GrpcProperties grpcProperties, SerializeService serializeService, ClientInterceptor clientInterceptor) { this.grpcProperties = grpcProperties; this.serializeService = serializeService; this.clientInterceptor = clientInterceptor; } ~~~ 3. 初始化方法 ~~~ /** * 初始化 */ public void init(){ List<RemoteServer> remoteServers = grpcProperties.getRemoteServers(); if (!CollectionUtils.isEmpty(remoteServers)) { for (RemoteServer server : remoteServers) { ManagedChannel channel = ManagedChannelBuilder.forAddress(server.getHost(), server.getPort()) .defaultLoadBalancingPolicy("round_robin") .nameResolverFactory(new DnsNameResolverProvider()) .idleTimeout(30, TimeUnit.SECONDS) .usePlaintext().build(); if (clientInterceptor != null){ Channel newChannel = ClientInterceptors.intercept(channel, clientInterceptor); serverMap.put(server.getServer(), new ServerContext(newChannel, serializeService)); } else { Class clazz = grpcProperties.getClientInterceptor(); if (clazz == null) { serverMap.put(server.getServer(), new ServerContext(channel, serializeService)); }else { try { ClientInterceptor interceptor = (ClientInterceptor) clazz.newInstance(); Channel newChannel = ClientInterceptors.intercept(channel, interceptor); serverMap.put(server.getServer(), new ServerContext(newChannel, serializeService)); } catch (InstantiationException | IllegalAccessException e) { log.warn("ClientInterceptor cannot use, ignoring..."); serverMap.put(server.getServer(), new ServerContext(channel, serializeService)); } } } } } } ~~~ 4. 解析初始化方法 我们看到该方法里面有这么一段代码 ~~~ ManagedChannel channel = ManagedChannelBuilder.forAddress(server.getHost(), server.getPort()) .defaultLoadBalancingPolicy("round_robin") .nameResolverFactory(new DnsNameResolverProvider()) .idleTimeout(30, TimeUnit.SECONDS) .usePlaintext().build(); ~~~ 这个方法只是实例化客户端连接器,而且这个方法是在循环中,说明我们客户端可以配置多个服务端连接。所以客户端是一对多服务端。才会有上面的serverMap ,其实这个方法很就是生成ServerContext存放到内存中,要访问哪个服务端的时候就拿出来,接下来看看下面的方法 ~~~ /** * 连接远程服务 */ public static ServerContext connect(String serverName) { return serverMap.get(serverName); } ~~~ 看到了吧,就是一对多服务端,要的时候取出相应名字的context,接下来看看ServerContext这个类,这个类是我们封装的 5. 查看ServerContext * [ ] 属性 ~~~ private Channel channel; private final SerializeService defaultSerializeService; private CommonServiceGrpc.CommonServiceBlockingStub blockingStub; ~~~ 这三个属性,有用的是Channel ,这个是通道,SerializeService 是序列化的方法,自己写,记得要跟服务端约定好, blockingStub是grpc的关键,是google生成的方法,实现通信。 * [ ] 构造方法 ~~~ ServerContext(Channel channel, SerializeService serializeService) { this.channel = channel; this.defaultSerializeService = serializeService; blockingStub = CommonServiceGrpc.newBlockingStub(channel); } ~~~ 很简单的构造方法,传参,实例化 * [ ] 请求方法 ~~~ /** * 处理 gRPC 请求 */ public GrpcResponse handle(SerializeType serializeType, GrpcRequest grpcRequest) { SerializeService serializeService = SerializeUtils.getSerializeService(serializeType, this.defaultSerializeService); ByteString bytes = serializeService.serialize(grpcRequest); int value = (serializeType == null ? -1 : serializeType.getValue()); GrpcService.Request request = GrpcService.Request.newBuilder().setSerialize(value).setRequest(bytes).build(); GrpcService.Response response = null; try{ response = blockingStub.handle(request); }catch (Exception exception){ log.warn("rpc exception: {}", exception.getMessage()); if ("UNAVAILABLE: io exception".equals(exception.getMessage().trim())){ response = blockingStub.handle(request); } } return serializeService.deserialize(response); } ~~~ 这个方法很简单就是序列化并发送,由于java的http2.0采用ByteString通讯,所以必须序列化成ByteString,这里我们提供三种方法,一个是FastJSONSerializeService,这个是阿里的fastjson序列化,一个是ProtoStuffSerializeService, 这个是protobuff的包,是谷歌提供的,一个是SofaHessianSerializeService,这个是probuf-java,也是谷歌的,接下来看看往下执行的方法 ~~~ GrpcService.Request request = GrpcService.Request.newBuilder().setSerialize(value).setRequest(bytes).build(); ~~~ 这个很关键了,GrpcService是用proto生成的,也就是protobuf对http2.0的支持,接下来看看我们写的service.proto ~~~ syntax = "proto3"; option java_package = "com.anoyi.rpc"; option java_outer_classname = "GrpcService"; option java_multiple_files = false; // 定义通用的 Grpc 服务 service CommonService { // 处理请求 rpc handle ( Request ) returns ( Response ) {} } // 定义通用的 Grpc 请求体 message Request { int32 serialize = 1; bytes request = 2; } // 定义通用的 Grpc 响应体 message Response { bytes response = 1; } ~~~ service.proto采用proto3协议,java_package 的意思是我们生成的类放的包,我们放在com.anoyi.rpc,java_outer_classname 是我们生成的外部类名字,我们叫GrpcService,java_multiple_files 是我们是否支持分成多个java类,我们集成到一个类,叫做GrpcService,所以false了,然后CommonService是个service,我们com.anoyi.rpc包会生成CommonServiceGrpc这个类,自己去继承重写吧,就一个handle方法,参数是Request ,返回值是Response ,都是我们底下定义的结构体,这是个关键的服务端处理方法,Request 是个message ,也就是结构体,有两个字段, int32 serialize = 1;bytes request = 2;一个是serialize ,是序列化方法,request 是我们序列化后的二进制,也及时byteString,Response 也就是一个byteString而已,因为就是返回。当然service.proto,我们会找个方法生成java文件,下一章节会介绍的,然后客户端调用init就会初始化客户端连接grpc的通道,这一章节就结束了