在之前的内容中,我们讲解了消费者端服务发现与提供者端服务暴露的相关内容,同时也知道消费者端通过内置的负载均衡算法获取合适的调用invoker进行远程调用。那么,本章节重点关注的就是远程调用过程即网络通信。

dubbo异步线程模型(dubbo源码解析-网络通信)(1)

网络通信位于Remoting模块:

网络通信的问题:

通信协议

dubbo内置,dubbo协议 ,rmi协议,Hessian协议,http协议,WebService协议,Thrift协议,rest协议,grpc协议,memcached协议,Redis协议等10种通讯协议。各个协议特点如下

dubbo协议

Dubbo 缺省协议采用单一长连接和 NIO 异步通讯,适合于小数据量大并发的服务调用,以及服务消费者机器数远大于服务提供者机器数的情况。

缺省协议,使用基于 mina 1.1.7 和 hessian 3.2.1 的 tbremoting 交互。

rmi协议

RMI 协议采用 JDK 标准的 Java.rmi.* 实现,采用阻塞式短连接和 JDK 标准序列化方式。

hessian协议

Hessian 协议用于集成 Hessian 的服务,Hessian 底层采用 Http 通讯,采用 Servlet 暴露服务,Dubbo 缺省内嵌 Jetty 作为服务器实现。

Dubbo 的 Hessian 协议可以和原生 Hessian 服务互操作,即:

http协议

基于 HTTP 表单的远程调用协议,采用 Spring 的 HttpInvoker 实现

webservice协议

基于 WebService 的远程调用协议,基于 Apache CXF 实现](http://dubbo.apache.org/zh-cn/docs/user/references/protocol/webservice.html#fn2)。

可以和原生 WebService 服务互操作,即:

thrift协议

当前 dubbo 支持 [1]的 thrift 协议是对 thrift 原生协议 [2] 的扩展,在原生协议的基础上添加了一些额外的头信息,比如 service name,magic number 等。

rest协议

基于标准的Java REST API——JAX-RS 2.0(Java API for RESTful Web Services的简写)实现的REST调用支持

gRpc协议

Dubbo 自 2.7.5 版本开始支持 gRPC 协议,对于计划使用 HTTP/2 通信,或者想利用 gRPC 带来的 Stream、反压、Reactive 编程等能力的开发者来说, 都可以考虑启用 gRPC 协议。

memcached协议

基于 memcached实现的 RPC 协议

redis协议

基于 Redis 实现的 RPC 协议

序列化

序列化就是将对象转成字节流,用于网络传输,以及将字节流转为对象,用于在收到字节流数据后还原成对象。序列化的优势有很多,例如安全性更好、可跨平台等。我们知道dubbo基于netty进行网络通讯,在NettyClient.doOpen()方法中可以看到Netty的相关类

bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", adapter.getdecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } });

然后去看NettyCodecAdapter 类最后进入ExchangeCodec类的encodeRequest方法,如下:

protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException { serialization serialization = getSerialization(channel); // header. byte[] header = new byte[HEADER_LENGTH];

是的,就是Serialization接口,默认是Hessian2Serialization序列化接口。

dubbo异步线程模型(dubbo源码解析-网络通信)(2)

Dubbo序列化支持java、compactedjava、nativejava、fastjson、dubbo、fst、hessian2、kryo,protostuff其中默认hessian2。其中java、compactedjava、nativejava属于原生java的序列化。

最近几年,各种新的高效序列化方式层出不穷,不断刷新序列化性能的上限,最典型的包括:

这些序列化方式的性能多数都显著优于 hessian2 (甚至包括尚未成熟的dubbo序列化)。所以我们可以为 dubbo 引入 Kryo 和 FST 这两种高效 Java 来优化 dubbo 的序列化。

使用Kryo和FST非常简单,只需要在dubbo RPC的XML配置中添加一个属性即可:

<dubbo:protocol name="dubbo" serialization="kryo"/>

网络通信dubbo中数据格式

解决socket中数据粘包拆包问题,一般有三种方式

Dubbo 框架定义了私有的RPC协议,其中请求和响应协议的具体内容我们使用表格来展示。

dubbo异步线程模型(dubbo源码解析-网络通信)(3)

Dubbo 数据包分为消息头和消息体,消息头用于存储一些元信息,比如魔数(Magic),数据包类型(Request/Response),消息体长度(Data Length)等。消息体中用于存储具体的调用消息,比如方法名称,参数列表等。下面简单列举一下消息头的内容。

偏移量(Bit)

字段

取值

0 ~ 7

魔数高位

0xda00

8 ~ 15

魔数低位

0xbb

16

数据包类型

0 - Response, 1 - Request

17

调用方式

仅在第16位被设为1的情况下有效,0 - 单向调用,1 - 双向调用

18

事件标识

0 - 当前数据包是请求或响应包,1 - 当前数据包是心跳包

19 ~ 23

序列化器编号

2 - Hessian2Serialization 3 - JavaSerialization 4 - CompactedJavaSerialization 6 - FastJsonSerialization 7 - NativeJavaSerialization 8 - KryoSerialization 9 - FstSerialization

24 ~ 31

状态

20 - OK 30 - CLIENT_TIMEOUT 31 - SERVER_TIMEOUT 40 - BAD_REQUEST 50 - BAD_RESPONSE ......

32 ~ 95

请求编号

共8字节,运行时生成

96 ~ 127

消息体长度

运行时计算

消费方发送请求

(1)发送请求

为了便于大家阅读代码,这里以 DemoService 为例,将 sayHello 方法的整个调用路径贴出来。

proxy0#sayHello(String) —> InvokerInvocationHandler#invoke(Object, Method, Object[]) —> mockClusterInvoker#invoke(Invocation) —> abstractClusterInvoker#invoke(Invocation) —> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance) —> Filter#invoke(Invoker, Invocation) // 包含多个 Filter 调用 —> ListenerInvokerWrapper#invoke(Invocation) —> AbstractInvoker#invoke(Invocation) —> DubboInvoker#doInvoke(Invocation) —> ReferenceCountExchangeClient#request(Object, int) —> HeaderExchangeClient#request(Object, int) —> HeaderExchangeChannel#request(Object, int) —> AbstractPeer#send(Object) —> AbstractClient#send(Object, Boolean) —> NettyChannel#send(Object, boolean) —> NioClientSocketChannel#write(Object)

dubbo消费方,自动生成代码对象如下

public class proxy0 implements ClassGenerator.DC, EchoService, DemoService { private InvocationHandler handler; public String sayHello(String string) { // 将参数存储到 Object 数组中 Object[] arrobject = new Object[]{string}; // 调用 InvocationHandler 实现类的 invoke 方法得到调用结果 Object object = this.handler.invoke(this, methods[0], arrobject); // 返回调用结果 return (String)object; } }

InvokerInvocationHandler 中的 invoker 成员变量类型为 MockClusterInvoker,MockClusterInvoker 内部封装了服务降级逻辑。下面简单看一下:

public Result invoke(Invocation invocation) throws RpcException { Result result = null; // 获取 mock 配置值 String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim(); if (value.length() == 0 || value.equalsIgnoreCase("false")) { // 无 mock 逻辑,直接调用其他 Invoker 对象的 invoke 方法, // 比如 FailoverClusterInvoker result = this.invoker.invoke(invocation); } else if (value.startsWith("force")) { // force:xxx 直接执行 mock 逻辑,不发起远程调用 result = doMockInvoke(invocation, null); } else { // fail:xxx 表示消费方对调用服务失败后,再执行 mock 逻辑,不抛出异常 try { result = this.invoker.invoke(invocation); } catch (RpcException e) { // 调用失败,执行 mock 逻辑 result = doMockInvoke(invocation, e); } } return result; }

考虑到前文已经详细分析过 FailoverClusterInvoker,因此本节略过 FailoverClusterInvoker,直接分析 DubboInvoker。

public abstract class AbstractInvoker<T> implements Invoker<T> { public Result invoke(Invocation inv) throws RpcException { if (destroyed.get()) { throw new RpcException("Rpc invoker for service ..."); } RpcInvocation invocation = (RpcInvocation) inv; // 设置 Invoker invocation.setInvoker(this); if (attachment != null && attachment.size() > 0) { // 设置 attachment invocation.addAttachmentsIfAbsent(attachment); } Map<String, String> contextAttachments = RpcContext.getContext().getAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { // 添加 contextAttachments 到 RpcInvocation#attachment 变量中 invocation.addAttachments(contextAttachments); } if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) { // 设置异步信息到 RpcInvocation#attachment 中 invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString()); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); try { // 抽象方法,由子类实现 return doInvoke(invocation); } catch (InvocationTargetException e) { // ... } catch (RpcException e) { // ... } catch (Throwable e) { return new RpcResult(e); } } protected abstract Result doInvoke(Invocation invocation) throws Throwable; // 省略其他方法 }

上面的代码来自 AbstractInvoker 类,其中大部分代码用于添加信息到 RpcInvocation#attachment 变量中,添加完毕后,调用 doInvoke 执行后续的调用。doInvoke 是一个抽象方法,需要由子类实现,下面到 DubboInvoker 中看一下。

@Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); //将目标方法以及版本好作为参数放入到Invocation中 inv.setAttachment(PATH_KEY, getUrl().getPath()); inv.setAttachment(VERSION_KEY, version); //获得客户端连接 ExchangeClient currentClient; //初始化invoker的时候,构建的一个远程通信连接 if (clients.length == 1) { //默认 currentClient = clients[0]; } else { //通过取模获得其中一个连接 currentClient = clients[index.getAndIncrement() % clients.length]; } try { //表示当前的方法是否存在返回值 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT); //isOneway 为 true,表示“单向”通信 if (isOneway) {//异步无返回值 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return AsyncRpcResult.newDefaultAsyncResult(invocation); } else { //存在返回值 //是否采用异步 AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv); CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout); responseFuture.whenComplete((obj, t) -> { if (t != null) { asyncRpcResult.completeExceptionally(t); } else { asyncRpcResult.complete((AppResponse) obj); } }); RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult)); return asyncRpcResult; } } //省略无关代码 }

最终进入到HeaderExchangeChannel#request方法,拼装Request并将请求发送出去

public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed tosend request " request ", cause: The channel " this " is closed!"); } // 创建请求对象 Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setData(request); DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout); try { //NettyClient channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; }

(2)请求编码

在netty启动时,我们设置了编解码器,其中通过ExchangeCodec完成编解码工作如下:

public class ExchangeCodec extends TelnetCodec { // 消息头长度 protected static final int HEADER_LENGTH = 16; // 魔数内容 protected static final short MAGIC = (short) 0xdabb; protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0]; protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1]; protected static final byte FLAG_REQUEST = (byte) 0x80; protected static final byte FLAG_TWOWAY = (byte) 0x40; protected static final byte FLAG_EVENT = (byte) 0x20; protected static final int SERIALIZATION_MASK = 0x1f; private static final Logger logger = LoggerFactory.getLogger(ExchangeCodec.class); public Short getMagicCode() { return MAGIC; } @Override public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException { if (msg instanceof Request) { // 对 Request 对象进行编码 encodeRequest(channel, buffer, (Request) msg); } else if (msg instanceof Response) { // 对 Response 对象进行编码,后面分析 encodeResponse(channel, buffer, (Response) msg); } else { super.encode(channel, buffer, msg); } } protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException { Serialization serialization = getSerialization(channel); // 创建消息头字节数组,长度为 16 byte[] header = new byte[HEADER_LENGTH]; // 设置魔数 Bytes.short2bytes(MAGIC, header); // 设置数据包类型(Request/Response)和序列化器编号 header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId()); // 设置通信方式(单向/双向) if (req.isTwoWay()) { header[2] |= FLAG_TWOWAY; } // 设置事件标识 if (req.isEvent()) { header[2] |= FLAG_EVENT; } // 设置请求编号,8个字节,从第4个字节开始设置 Bytes.long2bytes(req.getId(), header, 4); // 获取 buffer 当前的写位置 int savedWriteIndex = buffer.writerIndex(); // 更新 writerIndex,为消息头预留 16 个字节的空间 buffer.writerIndex(savedWriteIndex HEADER_LENGTH); ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); // 创建序列化器,比如 Hessian2ObjectOutput ObjectOutput out = serialization.serialize(channel.getUrl(), bos); if (req.isEvent()) { // 对事件数据进行序列化操作 encodeEventData(channel, out, req.getData()); } else { // 对请求数据进行序列化操作 encodeRequestData(channel, out, req.getData(), req.getVersion()); } out.flushBuffer(); if (out instanceof Cleanable) { ((Cleanable) out).cleanup(); } bos.flush(); bos.close(); // 获取写入的字节数,也就是消息体长度 int len = bos.writtenBytes(); checkPayload(channel, len); // 将消息体长度写入到消息头中 Bytes.int2bytes(len, header, 12); // 将 buffer 指针移动到 savedWriteIndex,为写消息头做准备 buffer.writerIndex(savedWriteIndex); // 从 savedWriteIndex 下标处写入消息头 buffer.writeBytes(header); // 设置新的 writerIndex,writerIndex = 原写下标 消息头长度 消息体长度 buffer.writerIndex(savedWriteIndex HEADER_LENGTH len); } // 省略其他方法 }

以上就是请求对象的编码过程,该过程首先会通过位运算将消息头写入到 header 数组中。然后对 Request 对象的 data 字段执行序列化操作,序列化后的数据最终会存储到 ChannelBuffer 中。序列化操作执行完后,可得到数据序列化后的长度 len,紧接着将 len 写入到 header 指定位置处。最后再将消息头字节数组 header 写入到 ChannelBuffer 中,整个编码过程就结束了。本节的最后,我们再来看一下 Request 对象的 data 字段序列化过程,也就是 encodeRequestData 方法的逻辑,如下:

public class DubboCodec extends ExchangeCodec implements Codec2 { protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException { RpcInvocation inv = (RpcInvocation) data; // 依次序列化 dubbo version、path、version out.writeUTF(version); out.writeUTF(inv.getAttachment(Constants.PATH_KEY)); out.writeUTF(inv.getAttachment(Constants.VERSION_KEY)); // 序列化调用方法名 out.writeUTF(inv.getMethodName()); // 将参数类型转换为字符串,并进行序列化 out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes())); Object[] args = inv.getArguments(); if (args != null) for (int i = 0; i < args.length; i ) { // 对运行时参数进行序列化 out.writeObject(encodeInvocationArgument(channel, inv, i)); } // 序列化 attachments out.writeObject(inv.getAttachments()); } }

至此,关于服务消费方发送请求的过程就分析完了,接下来我们来看一下服务提供方是如何接收请求的。

,