Dubbo的remote模块是远程通信模块,是Dubbo项目处理底层网络通信层。
- buffer 主要是针对NIO的Buffer做了一些封装。
- exchange 信息交换层,这也是整个通信过程的核心层。
- Telnet 主要是针对Telnet提供编解码转换。
- transport 网络传输层,抽象Mina和Netty为同一接口,以Message为中心,扩展接口为Channel、Transporter、Client、Server和Codec等。在Dubbo中具体的传输功能实现都继承自Transporter接口,此接口只包含bind和connect两个方法接口。通过SPI的adaptive注解方式进行注解,默认为Netty。
Transport网络传输层
Transport网络传输层主要包括两大部分,一个是基于Codec2的数据编码和解码,还有一个是基于Transport的数据传输封装。
- AbstractCodec、ThriftCodec、CodecAdapter、DubboCountCodec和ThriftNativeCodec都实现了Codec2接口,而TransportCodec、TelnetCodec、ExchangeCodec和DubboCodec都继承了AbstractCodec。
- CodeAdapter是Codec2的适配器模式,通过内部的SPI机制加载指定的Code2实现类。而后将CodecAdapter实例返回给AbstractClient构造方法,AbstractClient的实现类包括NettyClient、MinaClient和GrizzlyClient。
- DubboCountCodec:Dubbo的默认编码和解码实现类。
- TransportCodec:比较通用并且没有具体的协议编码类。
Dubbo 发送和接受机制
多个线程请求共用一个 socket连接,当数据返回时,怎么区分数据属于哪个线程;以及 dubbo 虽然是 NIO 模型,但默认实现的是同步调用等
发送机制
DubboInvoker.doInvoke 方法是发送请求的核心方法
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
// index.getAndIncrement()是通过cas原子操作自增
// 每次请求相当于轮询方式拿取连接池连接,如果只有一个,每次获取的都是一样的
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = calculateTimeout(invocation, methodName);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
// 获取执行器,里面会 new 一个 ThreadlessExecutor对象
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
// 将执行器添加到 AsyncRpcResult
result.setExecutor(executor);
return result;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
进入到 HeaderExchangeChannel.request 方法,该方法会封装 Request 请求参数,并调用 NettyClient(底层是 NIO 模型的 Socket连接)发送请求。
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// 封装 Request 对象,并生成唯一 ID。
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
// 这个代码很重要
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
try {
// 发送请求
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
唯一ID
上文我们说过每个 PRC 请求参数会携带一个唯一 ID,用来区分请求和响应是哪个线程,这里唯一ID 跟下面代码有关:
Request req = new Request();
// 构造函数
public Request() {
mId = newId();
}
private static long newId() {
// getAndIncrement()方法通过 cas原子操作自增1
return INVOKE_ID.getAndIncrement();
}
// INVOKE_ID 是Request 的静态 final 变量。
private static final AtomicLong INVOKE_ID = new AtomicLong(0);
看到这里我们明白了唯一ID是怎么生成了,使用 AtomicLong 从 0 开始自增。
DefaultFuture
每个线程在进行 RPC 调用时,都拥有自己唯一的 DefaultFuture 对象。我们来看另外一个重要的代码,HeaderExchangeChannel.request 中的 DefaultFuture.newFuture 方法。
......
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor)
......
》》》》》》》》》》》
// 创建 DefaultFuture 对象的方法
// 参数:nettyClient, requet请求参数,超时实际timeout,ThreadlessExecutor执行器
public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
// 调用构造方法
final DefaultFuture future = new DefaultFuture(channel, request, timeout);
// 将 ThreadlessExecutor执行器添加进 DefaultFuture
future.setExecutor(executor);
// ThreadlessExecutor needs to hold the waiting future in case of circuit return.
if (executor instanceof ThreadlessExecutor) {
((ThreadlessExecutor) executor).setWaitingFuture(future);
}
// timeout check
timeoutCheck(future);
return future;
}
》》》》》》》》》》》
// DefaultFuture 构造方法
private DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
// 重点,将唯一ID 和 DefaultFuture 自身添加进 FUTURES(一个Map)
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}
每一个 RPC 调用都有唯一的 DefaultFuture 对象 和 唯一ID,它们被添加到全局 FUTURES中
DefaultFuture 类简介如下:
public class DefaultFuture extends CompletableFuture<Object> {
......省略
// FUTURES 是静态 final 变量
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();
// ThreadlessExecutor 执行器
private ExecutorService executor;
// 通过 唯一ID 获取 DefaultFuture 对象
public static DefaultFuture getFuture(long id) {
return FUTURES.get(id);
}
......省略
}
DefaultFuture 还有另外一个重要的变量, ThreadlessExecutor。它封装了阻塞队列,是实现请求线程阻塞,响应唤醒的关键所在。
在调用 channel.send(req) 方法发送请求后,线程继续往下执行,回到 DubboInvoker.doInvoke 方法
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
......省略
// 这个类代表一个未完成的RPC调用,它将为这个调用保留一些上下文信息,例如RpcContext和Invocation,
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
// AsyncRpcResult 也保存了 ThreadlessExecutor执行器
result.setExecutor(executor);
// 返回 AsyncRpcResult
return result;
......省略
}
代码继续往下执行到,AsyncToSyncInvoker.invoke 方法
@Override
public Result invoke(Invocation invocation) throws RpcException {
......省略
// 跟进去
asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
......省略
}
AsyncRpcResult.get 方法,该方法会调用 threadlessExecutor执行器
@Override
public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (executor != null && executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
// 执行响应任务
threadlessExecutor.waitAndDrain();
}
return responseFuture.get(timeout, unit);
}
ThreadlessExecutor
public class ThreadlessExecutor extends AbstractExecutorService {
// 每个 ThreadlessExecutor 对象都有一个阻塞队列
private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
/**
一次RPC调用只会执行这个方法一次,它会通过向 阻塞queue 拿取任务,如果没有,线程则会阻塞。当响应任务到达,线程又会被唤醒。继续执行任务。
这里要注意,一次RPC调用是一个线程,这个线程的阻塞和唤醒都是又阻塞队列BlockingQueue完成的。
*/
public void waitAndDrain() throws InterruptedException {
if (finished) {
return;
}
// 从阻塞队列读取响应任务,如果没有响应任务,等待添加响应任务唤醒线程
Runnable runnable = queue.take();
synchronized (lock) {
waiting = false;
runnable.run();
}
// 如果队列后续还有任务 则直到执行完才结束
runnable = queue.poll();
while (runnable != null) {
try {
runnable.run();
} catch (Throwable t) {
logger.info(t);
}
runnable = queue.poll();
}
// 标记 ThreadlessExecutor执行器 已经完成
// mark the status of ThreadlessExecutor as finished.
finished = true;
}
/**
如果调用线程仍在等待回调任务,则将任务添加到阻塞队列中以等待调度。
否则,直接提交给共享回调执行器。
*/
@Override
public void execute(Runnable runnable) {
synchronized (lock) {
if (!waiting) {
// 如果 rpc调用线程不是等待阻塞状态,则用共享线程池来处理响应任务
sharedExecutor.execute(runnable);
} else {
// 如果 rpc调用线程是等待阻塞状态,则通过 阻塞队列add方法添加唤醒
queue.add(runnable);
}
}
}
}
那么dubbo 消费者在接收到响应数据时,怎么找到对应的 ThreadlessExecutor执行器,调用 execute 方法呢?还记得我们之前将 唯一ID : DefaultFuture 存入 FUTURES(Map)中,而 DefaultFuture 对象中有 ThreadlessExecutor执行器变量。所以只需要通过 唯一ID,能找到对应的 ThreadlessExecutor执行器执行execute,就能唤醒对应的线程。我们来看 dubbo 是不是这样做的。
接收机制
Dubbo 是基于 Netty 实现的 NIO 网络模型,我们直接找 NettyClientHandler.channelRead 方法。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
// 跟进去
handler.received(channel, msg);
}
一直 debug 到 AllChannelHandler.received 方法
@Override
public void received(Channel channel, Object message) throws RemotingException {
// 获取 ThreadlessExecutor执行器
ExecutorService executor = getPreferredExecutorService(message);
try {
// 调用 ThreadlessExecutor.execute 方法。这个方法我们上面介绍过。添加任务、唤醒阻塞线程
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException){
sendFeedback(channel, (Request) message, t);
return;
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
WrappedChannelHandler.getPreferredExecutorService 方法获取 ThreadlessExecutor执行器
public ExecutorService getPreferredExecutorService(Object msg) {
if (msg instanceof Response) {
Response response = (Response) msg;
// 这个方法上文介绍过,通过 唯一ID 获取 DefaultFuture对象
DefaultFuture responseFuture = DefaultFuture.getFuture(response.getId());
// a typical scenario is the response returned after timeout, the timeout response may has completed the future
if (responseFuture == null) {
return getSharedExecutorService();
} else {
// 通过 DefaultFuture对象 获取成员变量 ThreadlessExecutor执行器
ExecutorService executor = responseFuture.getExecutor();
if (executor == null || executor.isShutdown()) {
executor = getSharedExecutorService();
}
// 返回 ThreadlessExecutor执行器
return executor;
}
} else {
return getSharedExecutorService();
}
}
跟到这里是不是跟我们上文猜想的一样。
总结,每个线程在进行 RPC 调用时,都拥有自己唯一的 DefaultFuture 对象,每个 DefaultFuture 对象对应一个 唯一ID 添加到全局 FUTURES 中。发送数据、响应数据都会携带 唯一ID ,当响应数据的时候,也会以 唯一ID 获取对应的 DefaultFuture 对象。而 DefaultFuture 实现了,线程获取任务阻塞,添加任务唤醒的线程功能,本质是靠成员 ThreadlessExecutor 执行器完成,而 ThreadlessExecutor 对象则是封装了阻塞队列来完成这功能。