Dubbo的通信机制

Scroll Down

Dubbo的remote模块是远程通信模块,是Dubbo项目处理底层网络通信层。

  • buffer 主要是针对NIO的Buffer做了一些封装。
  • exchange 信息交换层,这也是整个通信过程的核心层。
  • Telnet 主要是针对Telnet提供编解码转换。
  • transport 网络传输层,抽象Mina和Netty为同一接口,以Message为中心,扩展接口为Channel、Transporter、Client、Server和Codec等。在Dubbo中具体的传输功能实现都继承自Transporter接口,此接口只包含bind和connect两个方法接口。通过SPI的adaptive注解方式进行注解,默认为Netty。

Transport网络传输层

Transport网络传输层主要包括两大部分,一个是基于Codec2的数据编码和解码,还有一个是基于Transport的数据传输封装。

  • AbstractCodec、ThriftCodec、CodecAdapter、DubboCountCodec和ThriftNativeCodec都实现了Codec2接口,而TransportCodec、TelnetCodec、ExchangeCodec和DubboCodec都继承了AbstractCodec。
  • CodeAdapter是Codec2的适配器模式,通过内部的SPI机制加载指定的Code2实现类。而后将CodecAdapter实例返回给AbstractClient构造方法,AbstractClient的实现类包括NettyClient、MinaClient和GrizzlyClient。
  • DubboCountCodec:Dubbo的默认编码和解码实现类。
  • TransportCodec:比较通用并且没有具体的协议编码类。

Dubbo 发送和接受机制

多个线程请求共用一个 socket连接,当数据返回时,怎么区分数据属于哪个线程;以及 dubbo 虽然是 NIO 模型,但默认实现的是同步调用等

发送机制
  DubboInvoker.doInvoke 方法是发送请求的核心方法

 @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(PATH_KEY, getUrl().getPath());
        inv.setAttachment(VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            // index.getAndIncrement()是通过cas原子操作自增
            // 每次请求相当于轮询方式拿取连接池连接,如果只有一个,每次获取的都是一样的
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = calculateTimeout(invocation, methodName);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
                // 获取执行器,里面会 new 一个 ThreadlessExecutor对象
                ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                CompletableFuture<AppResponse> appResponseFuture =
                        currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
                // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
                FutureContext.getContext().setCompatibleFuture(appResponseFuture);
                AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);				
                // 将执行器添加到 AsyncRpcResult
                result.setExecutor(executor);
                return result;
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

进入到 HeaderExchangeChannel.request 方法,该方法会封装 Request 请求参数,并调用 NettyClient(底层是 NIO 模型的 Socket连接)发送请求。

public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // 封装 Request 对象,并生成唯一 ID。
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
    
    	// 这个代码很重要
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
        try {
            // 发送请求
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

唯一ID
  上文我们说过每个 PRC 请求参数会携带一个唯一 ID,用来区分请求和响应是哪个线程,这里唯一ID 跟下面代码有关:

Request req = new Request();

// 构造函数
public Request() {
    mId = newId();
}

private static long newId() {
    // getAndIncrement()方法通过 cas原子操作自增1   
    return INVOKE_ID.getAndIncrement();
}
// INVOKE_ID 是Request 的静态 final 变量。
private static final AtomicLong INVOKE_ID = new AtomicLong(0);

看到这里我们明白了唯一ID是怎么生成了,使用 AtomicLong 从 0 开始自增。

DefaultFuture
  每个线程在进行 RPC 调用时,都拥有自己唯一的 DefaultFuture 对象。我们来看另外一个重要的代码,HeaderExchangeChannel.request 中的 DefaultFuture.newFuture 方法。

......    
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor)
......    
    
》》》》》》》》》》》    

// 创建 DefaultFuture 对象的方法
// 参数:nettyClient, requet请求参数,超时实际timeout,ThreadlessExecutor执行器
public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
    	// 调用构造方法
        final DefaultFuture future = new DefaultFuture(channel, request, timeout);
        // 将 ThreadlessExecutor执行器添加进 DefaultFuture
        future.setExecutor(executor);
        // ThreadlessExecutor needs to hold the waiting future in case of circuit return.
        if (executor instanceof ThreadlessExecutor) {
            ((ThreadlessExecutor) executor).setWaitingFuture(future);
        }
        // timeout check
        timeoutCheck(future);
        return future;
    }    

》》》》》》》》》》》

// DefaultFuture 构造方法
private DefaultFuture(Channel channel, Request request, int timeout) {
        this.channel = channel;
        this.request = request;
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
        // 重点,将唯一ID 和 DefaultFuture 自身添加进 FUTURES(一个Map)
        FUTURES.put(id, this);
        CHANNELS.put(id, channel);
}

每一个 RPC 调用都有唯一的 DefaultFuture 对象 和 唯一ID,它们被添加到全局 FUTURES中

DefaultFuture 类简介如下:

public class DefaultFuture extends CompletableFuture<Object> {
    ......省略
    // FUTURES 是静态 final 变量
    private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();
    // ThreadlessExecutor 执行器
    private ExecutorService executor;
    
    // 通过 唯一ID 获取 DefaultFuture 对象
    public static DefaultFuture getFuture(long id) {
        return FUTURES.get(id);
    }
    ......省略
}    

DefaultFuture 还有另外一个重要的变量, ThreadlessExecutor。它封装了阻塞队列,是实现请求线程阻塞,响应唤醒的关键所在。

在调用 channel.send(req) 方法发送请求后,线程继续往下执行,回到 DubboInvoker.doInvoke 方法

@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
    ......省略
        // 这个类代表一个未完成的RPC调用,它将为这个调用保留一些上下文信息,例如RpcContext和Invocation,
        AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
    	// AsyncRpcResult 也保存了 ThreadlessExecutor执行器
        result.setExecutor(executor);
    	// 返回 AsyncRpcResult
        return result;
        
    ......省略
}       

代码继续往下执行到,AsyncToSyncInvoker.invoke 方法

@Override
public Result invoke(Invocation invocation) throws RpcException {
    ......省略
    // 跟进去    
	asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);    
    
    ......省略
}

AsyncRpcResult.get 方法,该方法会调用 threadlessExecutor执行器

@Override
    public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        if (executor != null && executor instanceof ThreadlessExecutor) {
            ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;		
            // 执行响应任务
            threadlessExecutor.waitAndDrain();
        }
        return responseFuture.get(timeout, unit);
    }

ThreadlessExecutor

public class ThreadlessExecutor extends AbstractExecutorService {
    // 每个 ThreadlessExecutor 对象都有一个阻塞队列
    private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();

    /**
    	一次RPC调用只会执行这个方法一次,它会通过向 阻塞queue 拿取任务,如果没有,线程则会阻塞。当响应任务到达,线程又会被唤醒。继续执行任务。
    	这里要注意,一次RPC调用是一个线程,这个线程的阻塞和唤醒都是又阻塞队列BlockingQueue完成的。
    */
    public void waitAndDrain() throws InterruptedException {
        
        if (finished) {
            return;
        }
		// 从阻塞队列读取响应任务,如果没有响应任务,等待添加响应任务唤醒线程
        Runnable runnable = queue.take();

        synchronized (lock) {
            waiting = false;
            runnable.run();
        }
		
        // 如果队列后续还有任务 则直到执行完才结束
        runnable = queue.poll();
        while (runnable != null) {
            try {
                runnable.run();
            } catch (Throwable t) {
                logger.info(t);

            }
            runnable = queue.poll();
        }
        // 标记 ThreadlessExecutor执行器 已经完成
        // mark the status of ThreadlessExecutor as finished.
        finished = true;
    }
    
    /**
    如果调用线程仍在等待回调任务,则将任务添加到阻塞队列中以等待调度。
否则,直接提交给共享回调执行器。
    */
    @Override
    public void execute(Runnable runnable) {
        synchronized (lock) {
            
            if (!waiting) {
                // 如果 rpc调用线程不是等待阻塞状态,则用共享线程池来处理响应任务
                sharedExecutor.execute(runnable);
            } else {
                // 如果 rpc调用线程是等待阻塞状态,则通过 阻塞队列add方法添加唤醒
                queue.add(runnable);
            }
        }
    }
}

那么dubbo 消费者在接收到响应数据时,怎么找到对应的 ThreadlessExecutor执行器,调用 execute 方法呢?还记得我们之前将 唯一ID : DefaultFuture 存入 FUTURES(Map)中,而 DefaultFuture 对象中有 ThreadlessExecutor执行器变量。所以只需要通过 唯一ID,能找到对应的 ThreadlessExecutor执行器执行execute,就能唤醒对应的线程。我们来看 dubbo 是不是这样做的。

接收机制
  Dubbo 是基于 Netty 实现的 NIO 网络模型,我们直接找 NettyClientHandler.channelRead 方法。

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        // 跟进去
        handler.received(channel, msg);
}

一直 debug 到 AllChannelHandler.received 方法

@Override
    public void received(Channel channel, Object message) throws RemotingException {
        // 获取 ThreadlessExecutor执行器
        ExecutorService executor = getPreferredExecutorService(message);
        try {
            // 调用 ThreadlessExecutor.execute 方法。这个方法我们上面介绍过。添加任务、唤醒阻塞线程
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
        	if(message instanceof Request && t instanceof RejectedExecutionException){
                sendFeedback(channel, (Request) message, t);
                return;
        	}
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

WrappedChannelHandler.getPreferredExecutorService 方法获取 ThreadlessExecutor执行器

public ExecutorService getPreferredExecutorService(Object msg) {
        if (msg instanceof Response) {
            Response response = (Response) msg;
            // 这个方法上文介绍过,通过 唯一ID 获取 DefaultFuture对象
            DefaultFuture responseFuture = DefaultFuture.getFuture(response.getId());
            // a typical scenario is the response returned after timeout, the timeout response may has completed the future
            if (responseFuture == null) {
                return getSharedExecutorService();
            } else {
                // 通过 DefaultFuture对象 获取成员变量 ThreadlessExecutor执行器
                ExecutorService executor = responseFuture.getExecutor();
                if (executor == null || executor.isShutdown()) {
                    executor = getSharedExecutorService();
                }
                // 返回 ThreadlessExecutor执行器
                return executor;
            }
        } else {
            return getSharedExecutorService();
        }
    }

跟到这里是不是跟我们上文猜想的一样。

总结,每个线程在进行 RPC 调用时,都拥有自己唯一的 DefaultFuture 对象,每个 DefaultFuture 对象对应一个 唯一ID 添加到全局 FUTURES 中。发送数据、响应数据都会携带 唯一ID ,当响应数据的时候,也会以 唯一ID 获取对应的 DefaultFuture 对象。而 DefaultFuture 实现了,线程获取任务阻塞,添加任务唤醒的线程功能,本质是靠成员 ThreadlessExecutor 执行器完成,而 ThreadlessExecutor 对象则是封装了阻塞队列来完成这功能。