dubbo-remoting 模块提供了多种客户端和服务端通信的功能,该模块内部可以再划分为 Transport 传输层和 Exchange 信息交换层,Transport 层只负责单向消息传输,是对 Mina, Netty 等网络传输组件的抽象,它也可以扩展 UDP 传输,而 Exchange 层是在传输层之上封装了 Request-Response 语义。
Dubbo 框架并没有自己实现一套完整的网络库,而是使用第三方开源网络库。dubbo-remoting-api 子模块中定义了远程通信的抽象概念,具体通信功能需要 dubbo-remoting-* 模块来实现,它们依赖第三方 NIO 库实现 dubbo-remoting-api 模块。如 dubbo-remoting-netty 模块依赖 Netty 3 实现 Dubbo 的远程通信。需要说明的是,dubbo-remoting-zookeeper 模块是实现注册中心功能的模块。
下面对各个包进行简单说明:
buffer 包
定义了缓冲区相关的接口、抽象类以及实现。缓存区对于通信框架是一个不可或缺的功能,几乎每个通信框架都有自己的缓存区实现。Dubbo 中的该包是对各个通信框架的缓存区进行了统一的抽象,同时实现了一些基础能力。
exchange 包
建立Request-Response模型,封装请求响应模式,以 Request, Response 为中心。
telnet 包
Dubbo 支持通过 telnet 命令进行服务治理。
transport 包
将网络传输抽象为统一接口,屏蔽了不同网络库的差异,只负责抽象单向消息的传输,以 Message 为中心。即请求消息由 Client 端发出,Server 端接收;响应消息由 Server 端发出,Client端接收。
其它接口
顶层接口放到了 remoting 包下,这些接口是 Dubbo Remoting 的核心接口。
远程通信抽象相关UML图如下:
端点 Endpoint
public interface Endpoint {
/**
* 关联的 URL 信息
*
* @return url
*/
URL getUrl();
/**
* 底层 Channel 关联的 ChannelHandler
*
* @return channel handler
*/
ChannelHandler getChannelHandler();
/**
* 获取本地地址
*
* @return local address.
*/
InetSocketAddress getLocalAddress();
/**
* 发送消息
*
* @param message
* @throws RemotingException
*/
void send(Object message) throws RemotingException;
/**
* 发送消息
*
* @param message
* @param sent true: 会等待消息发出,消息发送失败会抛出异常; false: 不等待消息发出,将消息放入IO队列,即可返回
*/
void send(Object message, boolean sent) throws RemotingException;
/**
* 关闭底层Channel
*/
void close();
/**
* 优雅关闭底层Channel
*/
void close(int timeout);
/**
* 开始关闭
*/
void startClose();
/**
* 检测底层Channel是否已经关闭
*
* @return closed
*/
boolean isClosed();
}
Dubbo 中抽象了端点(Endpoint)的概念,通过 ip + port 能够唯一确定一个端点,两个端点之间可以建立 TCP 连接,用于双向传输数据。Dubbo 将 Endpoint 之间的 TCP 连接抽象为通道(Channel),将发起请求的 Endpoint 抽象为客户端(Client),将接收请求的 Endpoint 抽象为服务端(Server)。本质上 Client 和 Server 都是一个端点。
通道 Channel
public interface Channel extends Endpoint {
/**
* 获取远程地址 (注意,父类中是获取本地地址)
*
* @return remote address.
*/
InetSocketAddress getRemoteAddress();
/**
* 是否已经链接
*
* @return connected
*/
boolean isConnected();
//------------- Channel 中属性相关接口,可以对Channel 中属性进行操作 -----------------/
/**
* has attribute.
*
* @param key key.
* @return has or has not.
*/
boolean hasAttribute(String key);
/**
* get attribute.
*
* @param key key.
* @return value.
*/
Object getAttribute(String key);
/**
* set attribute.
*
* @param key key.
* @param value value.
*/
void setAttribute(String key, Object value);
/**
* remove attribute.
*
* @param key key.
*/
void removeAttribute(String key);
}
Channel 是对两个 Endpoint 连接的抽象,消息发送端会往 Channel 写入消息,而接收端会从 Channel 读取消息。
Channel 接口继承了 Endpoint 接口,也具备开关 Channel 以及发送数据的能力。此外,Channel 支持附加键值对属性。Dubbo 的 Channel 和 Netty 中的 Channel 一致,是通信的载体,Dubbo 的 Channel 的工作最终是要委托给底层 NIO 连接完成,如 Netty 的 Channel 来完成的 。
通道处理器 ChannelHandler
@SPI
public interface ChannelHandler {
/**
* 处理 Channel 的连接建立事件 - Channel 已经被创建
*
* @param channel channel.
*/
void connected(Channel channel) throws RemotingException;
/**
* 处理 Channel 的连接断开事件 - Channel 已经被断开
*
* @param channel channel.
*/
void disconnected(Channel channel) throws RemotingException;
/**
* 处理发送的数据 - 消息被发送
*
* @param channel channel.
* @param message message.
*/
void sent(Channel channel, Object message) throws RemotingException;
/**
* 处理读取到的数据 - 消息被接收
*
* @param channel channel.
* @param message message.
*/
void received(Channel channel, Object message) throws RemotingException;
/**
* 处理捕获到的异常
*
* @param channel channel.
* @param exception exception.
*/
void caught(Channel channel, Throwable exception) throws RemotingException;
}
ChannelHandler 是注册在 Channel 上的消息处理器,和 Netty 的 ChannelHandler 一致,负责 Channel 中的逻辑处理,如连接、断开、发送消息、收到消息和出现异常等。需要说明的是,ChannelHandler 被 @SPI注解标注,表示是一个 Dubbo 扩展点。ChannelHandler 中定义了 5 个方法,对应着它的 5 种状态:
connected - Channel 已经被创建
disconnected - Channel 已经被断开
sent - 消息被发送
received - 消息被接收
caught - 捕获到异常
Dubbo 中提供了大量的 ChannelHandler 去承载特性和扩展,这些 Handler 最终会和底层通信框架进行关联,如 Netty、Mina 等。一次完整的 RPC 调用贯穿了一系列的 ChannelHandler,如果直接挂载到 Netty 这样的底层通信框架,因为整个调用链路比较长,需要触发大量链式查找和事件,不仅效率低而且消耗资源。因此,Dubbo 框架内部使用了大量的 ChannelHandler 组成链式结构(类似过滤器Filter链),根据 ChannelHandler 的特性依次处理具体的逻辑,Dubbo 这种将多个 ChannelHandler 聚合成一个 ChannelHandler 使用的是装饰者模式,在后面的具体实现中可以看到大量装饰者模式的使用。
语义端点
Dubbo 中抽象了端点 Endpoint 的概念,将发起请求的 Endpoint 抽象为客户端(Client),将接收请求的 Endpoint 抽象为服务端(Server),Client 和 Server 本身都是 Endpoint,只不过在语义上区分了请求和响应的职责,两者都具备发送消息的能力,所以都继承了 Endpoint 接口。UML 图如下:
客户端 Client
public interface Client extends Endpoint, Channel, Resetable {
/**
* 重连
*
* @throws RemotingException
*/
void reconnect() throws RemotingException;
/**
* 重置
*
* @param parameters
*/
@Deprecated
void reset(com.alibaba.dubbo.common.Parameters parameters);
}
服务端 Server
public interface Server extends Endpoint, Resetable {
/**
* 是否绑定本地端口,即是否启动成功,可连接、接收消息
*
* @return bound
*/
boolean isBound();
/**
* 获取连接上服务的通道列表。 多个Client 可以连接同一个Server
*
* @return channels
*/
Collection<Channel> getChannels();
/**
* 根据地址获取连接上服务的通道
*
* @param remoteAddress
* @return channel
*/
Channel getChannel(InetSocketAddress remoteAddress);
/**
* 重置,已废弃
*
* @param parameters
*/
@Deprecated
void reset(com.alibaba.dubbo.common.Parameters parameters);
}
Client 和 Server 的主要区别是 Client 只能关联一个 Channel,而 Server 可以接收多个 Client 发起的 Channel 连接。
网络传输 Transporter
由远程通信抽象相关UML图可知,网络传输 Transporter 是在 Client 和 Server 之上封装的接口。
@SPI("netty")
public interface Transporter {
/**
* 创建一个服务器。根据 'server','transporter' 确定 Server 扩展实现
*
* @param url 服务器地址
* @param handler 通道处理器
* @return server 返回服务器
* @throws RemotingException
* @see com.alibaba.dubbo.remoting.Transporters#bind(URL, ChannelHandler...)
*/
@Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
Server bind(URL url, ChannelHandler handler) throws RemotingException;
/**
* 连接服务器,即创建一个客户端。根据 'client','transporter' 确定 Client 扩展实现
*
* @param url 服务器地址
* @param handler 通道处理器
* @return client 客户端
* @throws RemotingException
* @see com.alibaba.dubbo.remoting.Transporters#connect(URL, ChannelHandler...)
*/
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
Client connect(URL url, ChannelHandler handler) throws RemotingException;
}
Transporter 接口上标注 @SPI 注解,表示它是一个扩展点,默认扩展名为 netty ,bind 方法 和 connect 方法 都使用 @Adaptive 注解标注,表示会生成自适应扩展实现。
网络传输门面 Transporters
public class Transporters {
static {
// check duplicate jar package
Version.checkDuplicate(Transporters.class);
Version.checkDuplicate(RemotingException.class);
}
private Transporters() {
}
/**
* 静态方法,创建一个服务器
*
* @param url
* @param handler
* @return
* @throws RemotingException
*/
public static Server bind(String url, ChannelHandler... handler) throws RemotingException {
return bind(URL.valueOf(url), handler);
}
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
// 创建handler
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
// 如果handlers 元素数量大于1,则创建分发器 ChannelHandlerDispatcher【分发器会循环调用handlers】
handler = new ChannelHandlerDispatcher(handlers);
}
// 获取自适应 Transporter 实例,由具体的Transporter 来创建Server 。默认是NettyTransporter创建NettyServer
return getTransporter().bind(url, handler);
}
/**
* 静态方法,连接服务器,即创建一个客户端
*
* @param url
* @param handler
* @return
* @throws RemotingException
*/
public static Client connect(String url, ChannelHandler... handler) throws RemotingException {
return connect(URL.valueOf(url), handler);
}
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
// 没有传入通道处理器 ChannelHandler,则会创建ChannelHandlerAdapter 作为通道处理器
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
// 传入多个 ChanenlHandler,则会创建分发器 ChannelHandlerDispatcher【分发器会循环调用handlers】
handler = new ChannelHandlerDispatcher(handlers);
}
//获取自适应 Transporter 实例,由具体的Transporter 来创建 Client 。默认是NettyTransporter创建 NettyClient
return getTransporter().connect(url, handler);
}
/**
* 获取自适应 Transporter 实例
*
* @return
*/
public static Transporter getTransporter() {
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
}
Transporters 是一个门面类,其中封装了通过 Dubbo SPI 获取 Transporter 对象、ChannelHandler 的处理、服务器 Sever 的创建以及客户端 Client 的创建。属于外观模式。
编解码器
Codec2相比较Codec的变化是,将OutputStream和InputStream,替换成了ChannelBuffer,更好的以 ChannelBuffer 为核心,与其他框架整合。
Codec2
@SPI
public interface Codec2 {
/**
* 编码
*
* @param channel 通道
* @param buffer Buffer
* @param message 消息
* @throws IOException
*/
@Adaptive({Constants.CODEC_KEY})
void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException;
/**
* 解码
*
* @param channel 通道
* @param buffer Buffer
* @return 消息
* @throws IOException
*/
@Adaptive({Constants.CODEC_KEY})
Object decode(Channel channel, ChannelBuffer buffer) throws IOException;
/**
* 解码过程中,需要解决TCP拆包,粘包的场景。解码结果如下:
*/
enum DecodeResult {
/**
* 需要更多输入
*/
NEED_MORE_INPUT,
/**
* 忽略一些输入
*/
SKIP_SOME_INPUT
}
}
Codec
@Deprecated
@SPI
public interface Codec {
/**
* Need more input poison.
*
* @see #decode(Channel, InputStream)
*/
Object NEED_MORE_INPUT = new Object();
/**
* Encode message.
*
* @param channel channel.
* @param output output stream.
* @param message message.
*/
@Adaptive({Constants.CODEC_KEY})
void encode(Channel channel, OutputStream output, Object message) throws IOException;
/**
* Decode message.
*
* @param channel channel.
* @param input input stream.
* @return message or <code>NEED_MORE_INPUT</code> poison.
* @see #NEED_MORE_INPUT
*/
@Adaptive({Constants.CODEC_KEY})
Object decode(Channel channel, InputStream input) throws IOException;
}
Codec 是老的编解码器接口,目前已经被Codec2取代,可以通过CodecAdapter将Codec适配成Codec2。
可解码接口 Decodeable
public interface Decodeable {
/**
* 解码接口
*
* @throws Exception
*/
public void decode() throws Exception;
}
该接口在消息解码的过程中扮演重要角色,是对 Dubbo 协议下的请求和响应消息体解码的支持。在后面的文章中会详细说明其作用。
派发器 Dispatcher
/**
* ChannelHandlerWrapper (SPI, Singleton, ThreadSafe)
* <span>说明:</span>
* 1 调度器接口,被 @SPI(AllDispatcher.NAME)注解标注,是Dubbo 的拓展点,默认扩展名为 'all'
* 2 如果事件处理的逻辑能迅速完成,并且不会发起新的 IO 请求,比如只是在内存中记个标识,则直接在 IO 线程上处理更快,因为减少了线程池调度。
* 如果事件处理逻辑较慢,或者需要发起新的 IO 请求,比如需要查询数据库,则必须派发到线程池,否则 IO 线程阻塞,将导致不能接收其它请求。
* 3 通过不同的派发策略和不同的线程池配置的组合来应对不同的场景。注意,派发策略和线程池的联系
*
* <span>在dubbo 中,有多种Dispatcher的实现</span>
* <ul>
* <li>all: 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等</li>
* <li>direct: 所有消息都不派发到线程池,全部在IO线程上直接执行</li>
* <li>message: 只有请求/响应消息派发到线程池,其他的消息直接在IO线程上执行</li>
* <li>execution: 只有请求消息派发到线程池,其他的消息直接在IO线程上执行</li>
* <li>connection: 在IO线程上,将连接/断开事件放入队列,有序逐个执行。其他消息派发到线程池</li>
* </ul>
* 注意:每个Dispatcher实现类,都对应一个ChannelHandler实现类。默认情况下,使用AllDispatcher调度
*/
@SPI(AllDispatcher.NAME)
public interface Dispatcher {
/**
* 派发消息到线程池处理还是IO线程直接处理
*
* @param handler 通道处理
* @param url url
* @return channel handler
*/
@Adaptive({Constants.DISPATCHER_KEY, "dispather", "channel.handler"})
ChannelHandler dispatch(ChannelHandler handler, URL url);
}
Dispatcher 主要支持了 Dubbo 的线程模型,可以创建不同的 ChannelHandler 来决定消息是交给线程池处理还是IO线程处理,因此我们可以在不同的场景中选择不同的派发策略实现消息或者事件的处理。
本篇文章主要介绍了 dubbo-remoting-api 中核心接口以及类,重点介绍了 端点 Endpoint、通道 Channel、通道处理器 ChannelHandler 、以及编解码器 。在语义上将端点 Endpoint 区分为 Client 和 Server。接着又介绍了 Server 和 Client 之上的 网络传输层Transporter 以及其门面类 Transporters 。
总结起来,上层使用方通过 Transporters 门面获取具体的 Transporter 实现,然后通过该 Transporter 创建相应的 Server 和 Client 实现,接着 Client 和 Server 之间建立连接即通道 Channel,并使用 ChannelHandler 处理 Channel相关事件和消息,这个过程还会涉及到编解码的处理,Codec2 正是用来解决编解码问题的。需要注意的是,这里上层指的其实就是信息交互层 Exchange ,我们会在之后的文章中介绍。