Dubbo源码分析 - 网络传输层

Scroll Down

前言

在 远程通信模块总览 中对 Remoting 层进行了总体说明,下面我们开始详细介绍 Remoting 层的 Transport 网络传输层。本文会从 Transporter 层的 Server、Client、Channel、ChannelHandler、Dispatcher 以及 Codec2 等核心接口出发,分别介绍这些核心接口的实现。

概述

有很多网络库可以实现网络传输的功能,如 Netty、Mina、Grizzly等。但这些 NIO 库对外接口和使用方式不一样,如果使用方直接使用 Netty 或其它通信组件,那么就依赖了具体的NIO库实现,而不是依赖一个有传输能力的抽象,后续要切换其它NIO库实现的话就需要修改依赖和接入的相关代码,这既容易出错也不符合设计模式中的开放-封闭原则。因此,Dubbo Transporter 层就被抽象出来了,它屏蔽了不同的通信框架的异同,封装了统一的对外接口。有了 Transporter 层之后,我们可以通过 Dubbo SPI 动态切换具体的 Transporter 扩展实现,从而切换到不同的 Client 和 Server 实现,达到底层 NIO 库切换的目的。需要注意的是,Dubbo Transporter 层不等于 Transport 扩展接口及其实现,它是对网络传输层的抽象即在NIO库之上的抽象,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec, ChannelHandler, Dispatcher 等。

Transport 抽象层代码结构如下:
image
注意, Dubbo 接入具体 NIO 库的代码散落在 dubbo-remoting-* 实现模块中,会在后面的文章中介绍。

Transporter 扩展接口

@SPI("netty")
public interface Transporter {

    /**
     * 创建一个服务器,监听来自客户端的请求。根据 'server','transporter' 确定 Server 扩展实现
     *
     * @param url     服务器地址
     * @param handler 通道处理器
     * @return server 返回服务器
     * @throws RemotingException
     * @see com.alibaba.dubbo.remoting.Transporters#bind(URL, ChannelHandler...)
     */
    @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
    Server bind(URL url, ChannelHandler handler) throws RemotingException;

    /**
     * 连接服务器,即创建一个客户端。根据 'client','transporter' 确定 Client 扩展实现
     *
     * @param url     服务器地址
     * @param handler 通道处理器
     * @return client 客户端
     * @throws RemotingException
     * @see com.alibaba.dubbo.remoting.Transporters#connect(URL, ChannelHandler...)
     */
    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    Client connect(URL url, ChannelHandler handler) throws RemotingException;

}

Transporter 是在 Client 和 Server 之上封装的统一的对外接口,针对每个支持的NIO库,都有一个 Transporter 接口实现,它们是 Dubbo 接入具体NIO库的实现入口,在各个 dubbo-remoting-* 实现模块中。如,Dubbo 接入 Mina 网络通信库,就会有对应的 dubbo-remoting-mina 模块对抽象api模块的实现,该模块提供了 Transporter、Server、Client、Channel、ChannelHandler 等核心接口的实现。
image-1680244562334
这些 Transporter 接口实现返回的 Client 和 Server 具体实现如下图所示,它们是Dubbo 接入的NIO库对应的 Server和Client实现。

具体NIO库Server的实现
image-1680244576297
具体NIO库Client的实现
image-1680244588159
在 远程通信模块总览 中已经介绍过 Transporter 接口以及该接口的门面类 Transporters ,这里不再重复介绍。关于通信具体实现模块会在后面的文章中介绍,它们也是 Transporter 层的一部分,本篇文章着重分析 Transport 层公用组件及抽象概念。

AbstractPeer 抽象类

public abstract class AbstractPeer implements Endpoint, ChannelHandler {

    /**
     * 通道处理器,AbstractPeer 对 ChannelHandler 接口的所有实现,都是委托给了这个 ChannelHandler 对象来处理
     */
    private final ChannelHandler handler;
    /**
     * 端点自身的 URL 类型的字段
     */
    private volatile URL url;

    /**
     * 正在关闭
     */
    private volatile boolean closing;
    /**
     * 关闭完成
     */
    private volatile boolean closed;

    /**
     * handler 属性,通道处理器,通过构造方法传入。使用 '装饰者模式'
     *
     * @param url
     * @param handler
     */
    public AbstractPeer(URL url, ChannelHandler handler) {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        this.url = url;
        this.handler = handler;
    }

  // ${省略其它代码}
}

AbstractPeer 这个抽象类,它同时实现了 Endpoint 接口和 ChannelHandler 接口,AbstractPeer 对 ChannelHandler 接口的所有实现都是委托给维护的 ChannelHandler 属性来处理。对 Endpoint 接口的实现,包括和Channel有关的,如关闭Channel、开始关闭Channel(做标记关闭)、检查Channel是否关闭,这些都是对其维护的 closing 和 closed 属性进行操作;发送消息 send 方法的实现交给其子类去完成;获取端点自身的 URL;获取 ChannelHandler。需要特别说明的是,上层的 ChannelHandler 在链路的最底层保存的位置就是在 AbstractPeer 这个抽象类中。

AbstractPeer 也是 AbstractChannel、AbstractEndpoint 抽象类的父类,继承关系如下图:
image-1680244626315
红框中的实现类是 Dubbo 接入的具体NIO库实现相关的 Server、Client 和 Channel 实现类,通过继承关系以及前面的描述,我们可以知道 AbstractChannel、AbstractServer、AbstractClient 都会关联一个 ChannelHandler 对象,这个对象很重要,后面会慢慢揭开它的面纱。

AbstractEndpoint 抽象类

上文也提到了,AbstractEndpoint 继承了 AbstractPeer 这个抽象类,因为继承关系因此也会关联一个 ChannelHandler。

public abstract class AbstractEndpoint extends AbstractPeer implements Resetable {

    private static final Logger logger = LoggerFactory.getLogger(AbstractEndpoint.class);

    /**
     * 编解码器
     */
    private Codec2 codec;
    /**
     * 超时时间
     */
    private int timeout;
    /**
     * 连接超时时间 (用于具体子类客户端连接超时时间)
     */
    private int connectTimeout;

    public AbstractEndpoint(URL url, ChannelHandler handler) {
        // 调用父类 AbstractPeer 的构造方法
        super(url, handler);
        // 根据URL中的 codec 参数值 获取Codec2的实现类
        this.codec = getChannelCodec(url);
        // 根据 URL 中的 timeout 参数确定 timeout 字段的值,默认 1000
        this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // 根据URL中的connect.timeout 参数确定connectTimeout 字段值,默认 3000
        this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
    }

    /**
     * 基于Dubbo SPI机制,加载对应的Codec实现对象,如:在DubboProtocol中会获得DubboCodec对象
     *
     * @param url
     * @return
     */
    protected static Codec2 getChannelCodec(URL url) {
        String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
        if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
            return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
        } else {
            // 注意: Codec接口已经废弃了
            return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class).getExtension(codecName));
        }
    }

 // ${省略其它代码}
}

通过上面的代码可以看到,AbstractEndpoint 中维护了一个编解码对象 Codec2 ,该对象是在 AbstractEndpoint 构造方法中根据传入的URL完成初始化,这个非常重要。除了维护 Codec2 编解码对象外,还维护了超时时间(timeout)和连接超时时间(connectTimeout),它们也是在构造方法中根据传入的URL进行初始化的。

此外,AbstractEndpoint 还实现了 Resetable 接口用来支持重置 AbstractEndpoint 中维护的三个属性,代码实现如下:

--- AbstractEndpoint
 /**
     * 重置属性, 即使用新的 url 重置 codec、timeout、connectTimeout 属性
     *
     * @param url
     */
    @Override
    public void reset(URL url) {
        if (isClosed()) {
            throw new IllegalStateException("Failed to reset parameters "
                    + url + ", cause: Channel closed. channel: " + getLocalAddress());
        }
        try {
            if (url.hasParameter(Constants.TIMEOUT_KEY)) {
                int t = url.getParameter(Constants.TIMEOUT_KEY, 0);
                if (t > 0) {
                    this.timeout = t;
                }
            }
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        }
        try {
            if (url.hasParameter(Constants.CONNECT_TIMEOUT_KEY)) {
                int t = url.getParameter(Constants.CONNECT_TIMEOUT_KEY, 0);
                if (t > 0) {
                    this.connectTimeout = t;
                }
            }
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        }
        try {
            if (url.hasParameter(Constants.CODEC_KEY)) {
                this.codec = getChannelCodec(url);
            }
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        }
    }

抽象的服务端和客户端

由上面的继承关系图可知,AbstractServer 和 AbstractClient 都继承自 AbstractEndpoint 抽象类,下面我们先来分析 AbstractServer 这个抽象服务的实现。

AbstractServer
属性

public abstract class AbstractServer extends AbstractEndpoint implements Server {

    protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler";
    private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class);
    /**
     * 当前Server关联的线程池,是从 DataStore 中取的
     */
    ExecutorService executor;
    /**
     * 当前Server本地地址
     */
    private InetSocketAddress localAddress;
    /**
     * 绑定地址  (默认值与 localAddress 一致)
     */
    private InetSocketAddress bindAddress;
    /**
     * 服务器最大可接受连接数
     */
    private int accepts;
    /**
     * 空闲超时时间
     */
    private int idleTimeout = 600; //600 seconds
   
}

AbstractServer 在继承 AbstractEndpoint 的同时,还实现了 Server 接口,是服务抽象类,重点实现了服务的公用逻辑,Server 接口在 在 远程通信模块总览 中已经介绍,其中的属性已经在代码中详细标注。下面我们接着看它的构造方法,上述的属性字段都是在构造方法中进行初始化的。

构造方法

--- AbstractServer
 public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        // 调用父类构造方法
        super(url, handler);
        // 服务地址: 本机地址  如:-> /192.168.0.100:20880
        localAddress = getUrl().toInetSocketAddress();
        // 获取ip和端口
        String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
        int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
        if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
            // 设置ip 为 0.0.0.0
            bindIp = NetUtils.ANYHOST;
        }
        // 绑定地址  如: /0.0.0.0:20880
        bindAddress = new InetSocketAddress(bindIp, bindPort);
        // 获取最大可接受连接数
        this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
        // 空闲超时时间
        this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);

        try {
            // 调用模版方法 doOpen 启动服务
            doOpen();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }

        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }

        /** 从DataStore中获得线程池 ,来源 {@link com.alibaba.dubbo.remoting.transport.dispatcher.WrappedChannelHandler.WrappedChannelHandler}*/
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
    }

构造方法会根据传入的URL初始化 AbstractServer 中的属性,这也是为了其子类的初始化做准备,其中在构造方法中调用了一个模版方法 doOpen,这个方法就是初始化其子类的关键入口,即启动具体的NIO服务,下篇文章分析具体NIO库是如何接入的就会清晰了。当前Server关联的线程池 executor 是从 DataStore 中取的,下文会对 DataStore 进行介绍并说明线程池的来源。

模版方法

用于子类实现,完成服务的开启和关闭工作。

protected abstract void doOpen() throws Throwable;

protected abstract void doClose() throws Throwable;

发送消息

发送消息方法是对 Endpoint 接口的实现

--- AbstractServer
  /**
     * 发送消息
     *
     * @param message
     * @param sent    true: 会等待消息发出,消息发送失败会抛出异常;  false: 不等待消息发出,将消息放入IO队列,即可返回
     * @throws RemotingException
     */
    @Override
    public void send(Object message, boolean sent) throws RemotingException {
        // 获取连接上服务器的通道列表 【客户端列表】
        Collection<Channel> channels = getChannels();
        // 群发消息
        for (Channel channel : channels) {
            // 如果是已经连接的就发送
            if (channel.isConnected()) {
                channel.send(message, sent);
            }
        }
    }

客户端请求连接

用于客户端连接当前服务,是对父类 AbstractPeer 方法的重写,对 ChannelHandler 的实现,AbstractPeer 中的实现很简单,只是判断服务是否关闭,关闭就不会处理客户端连接请求,没有关闭则会把连接请求交给维护的 ChannelHandler 处理。

--- AbstractServer
 @Override
    public void connected(Channel ch) throws RemotingException {
        // If the server has entered the shutdown process, reject any new connection
        // 调用父类AbstractPeer 中的方法,判读当前这个 Server 端是否正在关闭或关闭了。如果不是启动状态则直接关闭新建的 Client 连接。
        if (this.isClosing() || this.isClosed()) {
            logger.warn("Close new channel " + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.");
            ch.close();
            return;
        }

        //1 超过上限,关闭新的连接

        //1.1 获取连接上服务器的通道列表 【客户端列表】
        Collection<Channel> channels = getChannels();
        //1.2 判断服务器上连接数是否超过上限
        if (accepts > 0 && channels.size() > accepts) {
            logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);
            // 服务器上的连接数超过上上限的话,就关闭新的连接
            ch.close();
            return;
        }

        // 处理连接事件,AbstractPeer 中的方法,本质还是委托内部装饰的 ChannelHandler 来处理
        super.connected(ch);
    }

客户端断开连接

用于客户端断开连接当前服务,是对父类 AbstractPeer 方法的重写,对 ChannelHandler 的实现,AbstractPeer 中的实现很简单,直接把断开连接请求交给装饰的 ChannelHandler 处理。

--- AbstractServer
  @Override
    public void disconnected(Channel ch) throws RemotingException {
        Collection<Channel> channels = getChannels();
        if (channels.isEmpty()) {
            logger.warn("All clients has discontected from " + ch.getLocalAddress() + ". You can graceful shutdown now.");
        }
        // 处理断开连接请求
        super.disconnected(ch);
    }

服务关闭

@Override
 public void close() {
     if (logger.isInfoEnabled()) {
         logger.info("Close " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
     }
     // 关闭关联的线程池
     ExecutorUtil.shutdownNow(executor, 100);
     try {
         // 标记关闭
         super.close();
     } catch (Throwable e) {
         logger.warn(e.getMessage(), e);
     }
     try {
         // 子类关闭动作
         doClose();
     } catch (Throwable e) {
         logger.warn(e.getMessage(), e);
     }
 }

还有一些不是很重要的其它方法就不分析了,下面继续分析抽象客户端实现。

AbstractClient

AbstractClient 同样继承了 AbstractEndpoint 抽象类,并且实现了 Client 接口,是客户端的抽象类,实现了公用的逻辑。Client 接口在 在 远程通信模块总览 中已经介绍过,就不再重复说明。

属性

public abstract class AbstractClient extends AbstractEndpoint implements Client {

    private static final Logger logger = LoggerFactory.getLogger(AbstractClient.class);

    /**
     * 连接线程池名
     */
    protected static final String CLIENT_THREAD_POOL_NAME = "DubboClientHandler";

    /**
     * 连接线程池id
     */
    private static final AtomicInteger CLIENT_THREAD_POOL_ID = new AtomicInteger();
    /**
     * 重连定时任务执行器,在客户端连接服务端时,会创建后台任务,定时检查连接,若断开会进行重新连
     */
    private static final ScheduledThreadPoolExecutor reconnectExecutorService = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("DubboClientReconnectTimer", true));
    /**
     * 连接锁,用于实现发起连接和断开连接互斥,避免并发。
     */
    private final Lock connectLock = new ReentrantLock();
    /**
     * 发送消息时,若断开,是否重连
     */
    private final boolean send_reconnect;
    /**
     * 重连次数
     */
    private final AtomicInteger reconnect_count = new AtomicInteger(0);

    /**
     * 重连时,是否已经打印过错误日志。默认没有打印过
     */
    private final AtomicBoolean reconnect_error_log_flag = new AtomicBoolean(false);
    /**
     * 重连warning的间隔,warning多少次之后warning一次
     */
    private final int reconnect_warning_period;
    /**
     * 关闭超时时间
     */
    private final long shutdown_timeout;
    /**
     * 当前客户端对应的线程池
     * 在调用 {@link #wrapChannelHandler(URL, ChannelHandler)} 时,会调用 {@link com.alibaba.dubbo.remoting.transport.dispatcher.WrappedChannelHandler} 创建
     */
    protected volatile ExecutorService executor;
    /**
     * 重连执行任务 Future
     */
    private volatile ScheduledFuture<?> reconnectExecutorFuture = null;
    /**
     * 最后成功连接时间
     */
    private long lastConnectedTime = System.currentTimeMillis();

}

AbstractClient 中的相关属性已经详细标注,因为是客户端,会涉及到重连服务的情况,属性相对比服务端要多些,但是这些属性都是很有用的 。

构造方法

--- AbstractClient
  public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);

        // 从URL中,获得重连相关配置,即 send.reconnect 配置属性
        send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);

        // 从URL中获得关闭超时时间 即 shutdown.timeout 配置属性
        shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);

        // The default reconnection interval is 2s, 1800 means warning interval is 1 hour.
        reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800);

        // 初始化客户端
        try {
            doOpen();
        } catch (Throwable t) {
            // 初始化失败,则关闭,并抛出异常
            close();
            throw new RemotingException(url.toInetSocketAddress(), null,
                    "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                            + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
        }

        // 连接服务器
        try {
            connect();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
            }
        } catch (RemotingException t) {
            // 如果连接失败,并且配置了启动检查,则进行对应的逻辑
            if (url.getParameter(Constants.CHECK_KEY, true)) {
                // 关闭连接
                close();
                throw t;
            } else {
                logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                        + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
            }
        } catch (Throwable t) {
            close();
            throw new RemotingException(url.toInetSocketAddress(), null,
                    "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                            + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
        }


        // 从DataStore中获得线程池,这里的线程池就是线程模型中的涉及的线程池
        /**
         * {@link WrappedChannelHandler#WrappedChannelHandler(com.alibaba.dubbo.remoting.ChannelHandler, com.alibaba.dubbo.common.URL)}
         */
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
        ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
    }

构造方法中不仅初始化了属性,还调用了模版方法,用于完成子类的初始化工作,即完成客户端的初始化并连接上服务。具体的客户端实现同样在后面的文章中说明。

模版方法

--- AbstractClient
 /**
     * Open client.
     *
     * @throws Throwable
     */
    protected abstract void doOpen() throws Throwable;

    /**
     * Close client.
     *
     * @throws Throwable
     */
    protected abstract void doClose() throws Throwable;

    /**
     * Connect to server.
     *
     * @throws Throwable
     */
    protected abstract void doConnect() throws Throwable;

    /**
     * disConnect to server.
     *
     * @throws Throwable
     */
    protected abstract void doDisConnect() throws Throwable;
   /**
     * Get the connected channel.
     *
     * @return channel
     */
    protected abstract Channel getChannel();

与 AbstractServer 类似,AbstractClient 定义了 doOpen()、doClose()、doConnect()、 doDisConnect() 和 getChannel() 抽象方法给子类实现以完成特定的功能。其中 doClose() 方法在 Netty 实现中是个空方法。

连接服务的通用逻辑

--- AbstractClient
/**
     * 连接服务器
     *
     * @throws RemotingException
     */
    protected void connect() throws RemotingException {
        // 获得锁 
        connectLock.lock();
        try {
            // 判断连接状态,若已经连接就不重复连接。
            if (isConnected()) {
                return;
            }

            // 初始化重连线程 【断线重连机制】
            initConnectStatusCheckCommand();

            // 执行连接
            doConnect();

            // 是否已经连接,如过连接失败则抛出异常
            if (!isConnected()) {
                throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                        + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                        + ", cause: Connect wait timeout: " + getConnectTimeout() + "ms.");
                // 连接成功,打印日志
            } else {
                if (logger.isInfoEnabled()) {
                    logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                            + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                            + ", channel is " + this.getChannel());
                }
            }

            // 设置重连次数归零
            reconnect_count.set(0);

            // 设置未打印过重连错误日志
            reconnect_error_log_flag.set(false);
        } catch (RemotingException e) {
            throw e;
        } catch (Throwable e) {
            throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                    + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                    + ", cause: " + e.getMessage(), e);
        } finally {
            // 释放锁
            connectLock.unlock();
        }
    }

连接服务通用逻辑主要做了以下工作:

获得锁,用于实现在连接和断开连接同时操作时,通过加锁以防止并发问题。
判断是否连接,如果连接了就无需再连接,是否连接逻辑是对Channel接口方法的实现。

--- AbstractClient
 /**
     * Dubbo的Channel 接口中的方法。方法内部调用的是Channel对象
     *
     * @return
     */
    @Override
    public boolean isConnected() {
        Channel channel = getChannel();
        if (channel == null) {
            return false;
        }
        return channel.isConnected();
    }

开启断线重连机制,即初始化重连线程,定时检查连接状态。
调用具体客户端实现的连接服务的方法去连接对应的服务。
连接失败抛出异常,连接成功则打印日志并归零重连次数。

断线重连机制

--- AbstractClient
 /**
     * 初始化重连线程 【以一定频率尝试重连任务】
     */
    private synchronized void initConnectStatusCheckCommand() {
        // 获得重连频率  【注意:默认是开启的,2000毫秒】
        int reconnect = getReconnectParam(getUrl());

        // 若开启重连功能,创建重连线程 
        if (reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled())) {
            // 创建重连任务体
            Runnable connectStatusCheckCommand = new Runnable() {
                @Override
                public void run() {
                    try {
                        // 判断是否连接,未连接就重连
                        if (!isConnected()) {
                            connect();
                            // 已连接则记录最后连接时间(确保是连接状态的时间)
                        } else {
                            lastConnectedTime = System.currentTimeMillis();
                        }
                    } catch (Throwable t) {

                        // 符合条件时,打印错误或告警日志。 如果不加节制打印日志,很容易打出满屏日志,严重的可能造成JVM崩溃

                        // 超过一定时间未连接上,才打印异常日志。并且,仅打印一次。默认15分钟
                        String errorMsg = "client reconnect to " + getUrl().getAddress() + " find error . url: " + getUrl();
                        // wait registry sync provider list
                        if (System.currentTimeMillis() - lastConnectedTime > shutdown_timeout) {
                            if (!reconnect_error_log_flag.get()) {
                                reconnect_error_log_flag.set(true);
                                logger.error(errorMsg, t);
                                return;
                            }
                        }
                        // 按照一定的重连次数,打印告警日志
                        if (reconnect_count.getAndIncrement() % reconnect_warning_period == 0) {
                            logger.warn(errorMsg, t);
                        }
                    }
                }
            };
            // 发起重连定时任务,定时检查是否需要重连 [默认两秒检查一次]
            reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS);
        }
    }

断线重连机制就是在客户端连接服务端时,会创建后台任务,定时检查连接,若断开会进行重连。

发送消息

--- AbstractClient
 /**
     * 发送消息
     *
     * @param message
     * @param sent    true: 会等待消息发出,消息发送失败会抛出异常;  false: 不等待消息发出,将消息放入IO队列,即可返回
     * @throws RemotingException
     */
    @Override
    public void send(Object message, boolean sent) throws RemotingException {
        // 未连接时,并且开启了发送消息断开重连功能,则先发起连接
        if (send_reconnect && !isConnected()) {
            connect();
        }
        // 获取通道,如 NettyChannel 实例,该实例内部channel实例就是 NioClientSocketChannel。
        Channel channel = getChannel();
        //TODO Can the value returned by getChannel() be null? need improvement.
        if (channel == null || !channel.isConnected()) {
            throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
        }
        // 发送消息
        channel.send(message, sent);
    }

客户端连接服务时只会有对应的一个 Channel 通道,客户端发送消息时使用的是 Dubbo 接入具体NIO库的 Channel 实例,如 NettyChannel 实例,它内部封装的 Channel 实例是 Netty 的通道实例 NioClientSocketChannel 。这个在后面的文章中详细说明。

断开连接

该方法目前用在 reconnect() 重连方法和 close() 关闭方法中。

public void disconnect() {
     // 加锁
     connectLock.lock();
     try {
         // 1 关闭断线重连任务
         destroyConnectStatusCheckCommand();
         try {
             // 2 关闭连接服务的通道
             Channel channel = getChannel();
             if (channel != null) {
                 channel.close();
             }
         } catch (Throwable e) {
             logger.warn(e.getMessage(), e);
         }
         try {
             // 3 清除通道缓存(Dubbo 层面的 Channel,该Channel 内部封装了NIO库的Channel,它们是一对一关系)
             doDisConnect();
         } catch (Throwable e) {
             logger.warn(e.getMessage(), e);
         }
     } finally {
         connectLock.unlock();
     }
 }

重连

先断开连接,在进行连接。

@Override
public void reconnect() throws RemotingException {
    // 1 先断开连接
    disconnect();
    // 2 连接
    connect();
}

关闭

@Override
  public void close() {
      try {
          // 1 关闭线程池
          if (executor != null) {
              ExecutorUtil.shutdownNow(executor, 100);
          }
      } catch (Throwable e) {
          logger.warn(e.getMessage(), e);
      }
      try {
          // 2 标记通道关闭完成
          super.close();
      } catch (Throwable e) {
          logger.warn(e.getMessage(), e);
      }
      
      try {
          // 3 断开连接
          disconnect();
      } catch (Throwable e) {
          logger.warn(e.getMessage(), e);
      }
      
      try {
          //4 执行关闭
          doClose();
      } catch (Throwable e) {
          logger.warn(e.getMessage(), e);
      }
  }

超时关闭

对线程池超时关闭

@Override
   public void close(int timeout) {
       ExecutorUtil.gracefulShutdown(executor, timeout);
       close();
  }

抽象通道 AbstractChannel

AbstractChannel 同样继承了 AbstractPeer 这个抽象类,同时还实现了 Channel 接口。AbstractChannel 实现非常简单,只是在 send() 方法中检测了底层连接的状态,没有实现具体的发送消息的逻辑。注意,一般情况下 Dubbo 层面的 Channel 和 具体NIO库的通道是一对一的关系,前者会对后者进行装饰,前者的功能本质上是后者的职能。

public abstract class AbstractChannel extends AbstractPeer implements Channel {

    // 关联了 ChannelHandler
    public AbstractChannel(URL url, ChannelHandler handler) {
        super(url, handler);
    }

    /**
     * 发送消息,在这里只做底层连接状态检查,没有实现具体的发送消息的逻辑,具体的发送逻辑由子类实现
     * @param message
     * @param sent    true: 会等待消息发出,消息发送失败会抛出异常;  false: 不等待消息发出,将消息放入IO队列,即可返回
     * @throws RemotingException
     */
    @Override
    public void send(Object message, boolean sent) throws RemotingException {
        if (isClosed()) {
            throw new RemotingException(this, "Failed to send message " + (message == null ? "" : message.getClass().getName()) + ":" + message + ", cause: Channel closed. channel: " + getLocalAddress() + " -> " + getRemoteAddress());
        }
    }
}

继承关系图如下:
image-1680245070240
各子类实现会对 send 方法进行重写。

ChannelHandler

前文介绍的 AbstractEndpoint、AbstractChannel 都是通过对 AbstractPeer 继承间接实现了 ChannelHandler 接口并关联了 ChannelHandler 对象,仅仅是对 ChannelHandler 的装饰,方法都是委托给底层关联的这个 ChannelHandler 对象。下面我们对 Transporter 层相关的 ChannelHandler 进行详细分析。继承关系如下图所示:
image-1680245090177

ChannelHandlerAdapter

ChannelHandlerAdapter 是 ChannelHandler 的一个空实现,TelnetHandlerAdapter 继承了它并实现了 TelnetHandler 接口,用于支持 Dubbo 命令行的服务治理。关于 Telnet 的实现,会在后面单独进行介绍,这里就不展开说明了。

/**
 * ChannelHandlerAdapter.  实现ChannelHandler接口,通道处理器适配器,每个方法都是空实现。子类可根据具体场景选择性实现所需方法。
 */
public class ChannelHandlerAdapter implements ChannelHandler {

    @Override
    public void connected(Channel channel) throws RemotingException {
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
    }

    @Override
    public void sent(Channel channel, Object message) throws RemotingException {
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
    }

    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
    }

}

ChannelHandlerDispatcher

在前面的文章中有提到过 ChannelHandlerDispatcher,它维护了一个 CopyOnWriteArraySet 集合,负责将多个 ChannelHandler 对象聚合成一个 ChannelHandler 对象。

public class ChannelHandlerDispatcher implements ChannelHandler {

    private static final Logger logger = LoggerFactory.getLogger(ChannelHandlerDispatcher.class);

    /**
     * 通道处理器集合
     */
    private final Collection<ChannelHandler> channelHandlers = new CopyOnWriteArraySet<ChannelHandler>();

    public ChannelHandlerDispatcher() {
    }

    public ChannelHandlerDispatcher(ChannelHandler... handlers) {
        this(handlers == null ? null : Arrays.asList(handlers));
    }

    public ChannelHandlerDispatcher(Collection<ChannelHandler> handlers) {
        if (handlers != null && !handlers.isEmpty()) {
            this.channelHandlers.addAll(handlers);
        }
    }

  // 省略对 ChannelHandler 接口方法的实现
}

ChannelHandlerDispatcher 实现了 ChannelHandler 接口中的所有方法,每个方法都是循环通道处理器集合调用相应的方法。

ChannelHandlerDelegate

实现 ChannelHandler 接口,通道处理器装饰者接口,即是对其它 ChannelHandler 进行装饰的接口,这个接口非常重要。

public interface ChannelHandlerDelegate extends ChannelHandler {
    /**
     * 获取装饰的ChannelHandler
     *
     * @return
     */
    ChannelHandler getHandler();
}

ChannelHandlerDelegate 有三个直接的实现类,分别是 AbstractChannelHandlerDelegate、WrappedChannelHandler 和 HeaderExchangeHandler ,它们就是对其它 ChannelHandler 的装饰。其中 HeaderExchangeHandler 是 Exchange 层涉及的对象,我们先不讨论。我们先来分析 AbstractChannelHandlerDelegate 继承体系。

AbstractChannelHandlerDelegate

public abstract class AbstractChannelHandlerDelegate implements ChannelHandlerDelegate {
    // 装饰的 ChannelHandler
    protected ChannelHandler handler;

    protected AbstractChannelHandlerDelegate(ChannelHandler handler) {
        Assert.notNull(handler, "handler == null");
        this.handler = handler;
    }

    @Override
    public ChannelHandler getHandler() {
        if (handler instanceof ChannelHandlerDelegate) {
            return ((ChannelHandlerDelegate) handler).getHandler();
        }
        return handler;
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        handler.connected(channel);
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        handler.disconnected(channel);
    }

    @Override
    public void sent(Channel channel, Object message) throws RemotingException {
        handler.sent(channel, message);
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        handler.received(channel, message);
    }

    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        handler.caught(channel, exception);
    }
}

实现 ChannelHandlerDelegate 接口,在每个实现的方法里都是直接调用被装饰的 ChannelHandler 对象对应的方法,没有其它逻辑。它的三个子类都是在被装饰的 ChannelHandler 的基础上添加了一些增强的功能,使用的是装饰者模式。因为 HeartbeatHandler 属于 Exchange 层的 ChannelHandler ,在分析 Exchange 层时再进行分析,这里不再展开说明。

DecodeHandler

public class DecodeHandler extends AbstractChannelHandlerDelegate {

    private static final Logger log = LoggerFactory.getLogger(DecodeHandler.class);

    public DecodeHandler(ChannelHandler handler) {
        super(handler);
    }

    /**
     * 覆写了 received(channel,message)方法
     *
     * @param channel
     * @param message RpcInvocation 或 RpcResult
     * @throws RemotingException
     * @see com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec#decodeBody(com.alibaba.dubbo.remoting.Channel, java.io.InputStream, byte[])
     */
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        // 当消息是 Decodeable 类型时 进行解码
        if (message instanceof Decodeable) {
            decode(message);
        }

        // 当消息是Request类型时,对 data 字段进行解码
        if (message instanceof Request) {
            decode(((Request) message).getData());
        }

        // 当消息是Response类型时,对 result 字段进行解码
        if (message instanceof Response) {
            decode(((Response) message).getResult());
        }

        // 解码后,调用ChannelHandler#received(channel,message)方法,将消息交给委托的handler继续处理
        handler.received(channel, message);
    }

    /**
     * 解析消息
     *
     * @param message
     */
    private void decode(Object message) {
        /**
         * Decodeable 接口目前有两个实现类:
         * 1 DecodeableRpcInvocation
         * 2 DecodeableRpcResult
         */
        if (message != null && message instanceof Decodeable) {
            try {
                // 解析消息
                ((Decodeable) message).decode();
                if (log.isDebugEnabled()) {
                    log.debug("Decode decodeable message " + message.getClass().getName());
                }
            } catch (Throwable e) {
                if (log.isWarnEnabled()) {
                    log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);
                }
            } // ~ end of catch
        } // ~ end of if
    } // ~ end of method decode
}

DecodeHandler 是一个解码处理器,专门用于处理 Decodeable 类型消息的 ChannelHandler实现类,因此该实现类只重写了 received() 接收消息的方法,它的作用和含义如下:

请求解码可在IO线程上执行,也可在线程池中执行,取决于配置。DecodeHandler 存在的意义就是保证请求体或响应体可在线程池中被解码。
在Codec2解码器实现中,如果请求体和响应结果需要在线程池中进行解码,那么就不进行直接解码,而是把解码任务最终交给线程池来处理,最后由 DecodeHandler来处理,因为 DecodeHandler 也参与了对上层 ChannelHandler 的包装。

实现了 Decodeable 接口的类都会提供了一个 decode() 方法实现对自身的解码,DecodeHandler.received() 方法就是通过该方法得到解码后的消息,然后传递给底层的 ChannelHandler 对象继续处理。

MultiMessageHandler

public class MultiMessageHandler extends AbstractChannelHandlerDelegate {

    public MultiMessageHandler(ChannelHandler handler) {
        super(handler);
    }

    /**
     * 覆写了 received方法
     *
     * @param channel
     * @param message
     * @throws RemotingException
     */
    @SuppressWarnings("unchecked")
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        // 消息类型是MultiMessage,即多消息
        if (message instanceof MultiMessage) {
            MultiMessage list = (MultiMessage) message;
            // 循环提交给handler处理
            for (Object obj : list) {
                handler.received(channel, obj);
            }
            // 如果是单消息时,直接提交给handler处理
        } else {
            handler.received(channel, message);
        }
    }
}

MultiMessageHandler 是专门处理 MultiMessage 类型消息的 ChannelHandler 实现类。MultiMessage 是 Exchange 层的一种消息类型,它其中封装了多个消息。在 MultiMessageHandler 收到 MultiMessage 消息的时候,received() 方法会遍历其中的所有消息,并交给底层的 ChannelHandler 对象进行处理。

至此,Transport 层的 AbstractChannelHandlerDelegate 继承体系分析完毕。下面我们继续看 ChannelHandlerDelegate 的另一条继承体系分支。

WrappedChannelHandler

WrappedChannelHandler 也实现了 ChannelHandlerDelegate 接口,也是对其它 ChannelHandler 装饰的类。WrappedChannelHandler 在 ChannelHandler 接口方法实现上和 AbstractChannelHandlerDelegate 基本一致,那为什么又要搞一个新的继承体系而不是直接继承 AbstractChannelHandlerDelegate 呢?因为 WrappedChannelHandler 继承体系不仅是对其它 ChannelHandler 的装饰而且还决定了 Dubbo 的线程模型,有关 Dubbo 中的线程池会单独分析,这里先不展开说明。WrappedChannelHandler 关联体系如下图所示:
image-1680245240830
从上图可知,每个 WrappedChannelHandler 的子类都有一个对应的 Dispatcher 实现类,这些实现类就是用来创建 WrappedChannelHandler 的子类们。 Dispatcher 接口已经在 远程通信模块总览 中已经介绍过,它主要支持了 Dubbo 的线程模型,通过它的实现类可以创建不同的 ChannelHandler 来决定消息是交给线程池处理还是IO线程处理。

WrappedChannelHandler 实现了 ChannelHandlerDelegate 接口,其子类实现了消息派发功能,即决定了 Dubbo 以哪种线程模型处理收到的事件和消息。每个子类都由对应的Dispatcher 实现类创建。

属性

public class WrappedChannelHandler implements ChannelHandlerDelegate {

    protected static final Logger logger = LoggerFactory.getLogger(WrappedChannelHandler.class);

    /**
     * 共享线程池
     */
    protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
    /**
     * 当前端点关联的线程池
     */
    protected final ExecutorService executor;
    /**
     * 被装饰的通道处理器
     */
    protected final ChannelHandler handler;
    /**
     * URL
     */
    protected final URL url;

}

WrappedChannelHandler 中有四个核心的属性,因为是对 ChannelHandler 的装饰,因此 ChannelHandler 是必须的。需要说明的是共享线程池和当前端点关联的线程池,共享线程池对每个子类公用,当前端点关联的线程池属于每个子类对象独有,它是在构造方法中初始化的。

构造方法

--- WrappedChannelHandler

    public WrappedChannelHandler(ChannelHandler handler, URL url) {
        // 赋值
        this.handler = handler;
        this.url = url;

        // 基于SPI机制创建线程池
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);

        // 默认是 ExecutorService 的名称
        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;

        // 如果是消费端,则 componentKey 为 'consumer'
        if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
            componentKey = Constants.CONSUMER_SIDE;
        }

        // 基于SPI机制创建线程池存储对象
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();

        // 添加线程池到 DataStore中
        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
    }

通过 WrappedChannelHandler 的构造方法可知,每个子类对象都会创建一个线程池并添加到 DataStore 缓存起来,我们上面介绍的 AbstractClient 和 AbstractServer 是从 DataStore 获得线程池的,而数据来源正是这里。关于线程池的介绍,会在后面的文章中详细分析,这里先不展开说明。

DataSource 核心就是一个 Map 结构缓存,代码如下:

// SimpleDataStore 是 DataStore 唯一扩展实现
public class SimpleDataStore implements DataStore {
    /**
     * key1: ExecutorService 的名称 或 'consumer'
     * key2: port
     * value: ExecutorService
     */
    private ConcurrentMap<String, ConcurrentMap<String, Object>> data = new ConcurrentHashMap<String, ConcurrentMap<String, Object>>();

    @Override
    public Map<String, Object> get(String componentName) {
        ConcurrentMap<String, Object> value = data.get(componentName);
        if (value == null) return new HashMap<String, Object>();
        return new HashMap<String, Object>(value);
    }

    @Override
    public Object get(String componentName, String key) {
        if (!data.containsKey(componentName)) {
            return null;
        }
        return data.get(componentName).get(key);
    }

    @Override
    public void put(String componentName, String key, Object value) {
        Map<String, Object> componentData = data.get(componentName);
        if (null == componentData) {
            data.putIfAbsent(componentName, new ConcurrentHashMap<String, Object>());
            componentData = data.get(componentName);
        }
        componentData.put(key, value);
    }

    @Override
    public void remove(String componentName, String key) {
        if (!data.containsKey(componentName)) {
            return;
        }
        data.get(componentName).remove(key);
    }
}

获取线程池

获取线程池,供子类使用调用。

--- WrappedChannelHandler
/**
     * 获取当前端点关联的公共线程池,部分子类会使用
     *
     * @return
     */
    public ExecutorService getExecutorService() {
        ExecutorService cexecutor = executor;
        // 当前端点关联的线程池为空或关闭就使用共享的
        if (cexecutor == null || cexecutor.isShutdown()) {
            cexecutor = SHARED_EXECUTOR;
        }
        return cexecutor;
    }

WrappedChannelHandler 实现 ChannelHandler 接口的方法都是直接调用装饰的 ChannelHandler 对应的方法,就不再进行分析。

线程模型

如果事件处理的逻辑能迅速完成,并且不会发起新的 IO 请求,比如只是在内存中记个标识,则直接在 IO 线程上处理更快,因为减少了线程池调度。但如果事件处理逻辑较慢,或者需要发起新的 IO 请求,比如需要查询数据库,则必须派发到线程池,否则 IO 线程阻塞,将导致不能接收其它请求。因此,需要通过不同的派发策略和不同的线程池配置的组合来应对不同的场景:

<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />

image
Dubbo 的线程模型需要具有线程派发能力的 ChannelHandler 和 定制化的线程池来支撑。Dispatcher 的职责就是用来创建具有线程派发能力的 ChannelHandler,其本身并不具备线程派发能力。关于 Dispatcher 在 远程通信模块总览 中已经介绍,这里不再重复说明。

Dispatcher 派发策略:

all: 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。
direct: 所有消息都不派发到线程池,全部在 IO 线程上直接执行。
message: 只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
execution: 只有请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
connection: 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其实是交给专门处理连接事件的线程池处理的。其它消息派发到线程池。
关于线程池部分在后面的文章中详细说明,先不在这里展开介绍。

Dispatcher 实现类用来创建 WrappedChannelHandler 的子类对象,每个子类对象代表不同的派发策略,同时子类对象在创建的时候会初始化一个线程池。下面我们来分析 Dispatcher 扩展实现和对应的 WrappedChannelHandler 的子类。

AllDispatcher & AllChannelHandler

AllDispatcher 用来创建 AllChannelHandler 对象,代码如下:

public class AllDispatcher implements Dispatcher {
    public static final String NAME = "all";
    /**
     * 创建 AllChannelHandler 对象
     * @param handler
     * @param url
     * @return
     */
    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }
}

AllChannelHandler 实现 WrappedChannelHandler 抽象类,所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等,代码如下:

public class AllChannelHandler extends WrappedChannelHandler {

    // 构造方法调用父类方法,创建独享的线程池
    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    /**
     * 处理连接事件
     *
     * @param channel
     * @throws RemotingException
     */
    @Override
    public void connected(Channel channel) throws RemotingException {
        // 获取线程池
        ExecutorService cexecutor = getExecutorService();
        try {
            // 将CONNECTED 事件的处理封装成ChannelEventRunnable提交到线程池中执行
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }

    /**
     * 处理断开连接事件
     *
     * @param channel
     * @throws RemotingException
     */
    @Override
    public void disconnected(Channel channel) throws RemotingException {
        // 获取线程池
        ExecutorService cexecutor = getExecutorService();
        try {
            // 创建ChannelEventRunnable对象,用于将断开连接事件任务派发到线程池执行
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
        }
    }

    /**
     * 接收到的所有消息都派发到线程池。注意这里的message 可能是 Request也可能是 Response。
     * 流程大概是:消息先由IO线程(Netty 中的EventLoopGroup )从二进制流中解码出来,然后执行到该方法会把请求提交给线程池处理,处理完后调用send 方法用于向对端写回结果。
     *
     * @param channel
     * @param message
     * @throws RemotingException
     */
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            // 将请求/响应消息派发到线程池中处理,ChannelEventRunnable对象作为任务体
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            // 如果是请求消息,并且出现了线程池满了的异常
            if (message instanceof Request && t instanceof RejectedExecutionException) {
                Request request = (Request) message;
                // 如果通信方式为双向通信,将错误信息封装到Response 中,并返回给服务消费方。防止消费端等待超时
                if (request.isTwoWay()) {
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    // 返回包含错误信息的 Response 对象
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    /**
     * 处理异常信息
     *
     * @param channel
     * @param exception
     * @throws RemotingException
     */
    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }
}

AllChannelHandler 重写了 WrappedChannelHandler 中除了发送消息的 sent() 方法之外的其它方法,执行底层的 ChannelHandler 的逻辑都交给线程池处理,请求执行完毕后发送消息 AllChannelHandler 会直接在 IO 线程中进行处理。

ExecutionDispatcher & ExecutionChannelHandler

ExecutionDispatcher 用来创建 ExecutionChannelHandler 对象,代码如下:

public class ExecutionDispatcher implements Dispatcher {

    public static final String NAME = "execution";
    /**
     * 创建 ExecutionChannelHandler 对象
     *
     * @param handler 通道处理
     * @param url     url
     * @return
     */
    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new ExecutionChannelHandler(handler, url);
    }
}

ExecutionChannelHandler 实现 WrappedChannelHandler 抽象类,只会将请求消息派发到线程池进行处理。对于响应消息以及其他网络事件(例如,连接建立事件、连接断开事件、心跳消息等),ExecutionChannelHandler 会直接在 IO 线程中进行处理,代码如下:

public class ExecutionChannelHandler extends WrappedChannelHandler {
    // 构造方法调用父类方法,创建独享的线程池
    public ExecutionChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        // 请求消息
        if (message instanceof Request) {
            try {
                cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
                if (t instanceof RejectedExecutionException) {
                    Request request = (Request) message;
                    if (request.isTwoWay()) {
                        String msg = "Server side(" + url.getIp() + "," + url.getPort()
                                + ") thread pool is exhausted, detail msg:" + t.getMessage();
                        Response response = new Response(request.getId(), request.getVersion());
                        response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                        response.setErrorMessage(msg);
                        channel.send(response);
                        return;
                    }
                }
                throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);
            }
        } else {
            // 直接交给装饰的 ChannelHandler 处理
            handler.received(channel, message);
        }
    }
}

由上面代码可知,ExecutionChannelHandler 只重写了 received() 方法并且只处理请求消息,其它方法的调用直接调用父类的,是直接在 IO 线程中进行处理。

DirectDispatcher & DirectChannelHandler

direct 类型,所有消息都不派发到线程池,全部在 IO 线程上直接执行,相关代码如下:

public class DirectDispatcher implements Dispatcher {
    public static final String NAME = "direct";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return handler;
    }
}

MessageOnlyDispatcher & MessageOnlyChannelHandler

MessageOnlyDispatcher 用来创建 MessageOnlyChannelHandler 对象,代码如下:

public class MessageOnlyDispatcher implements Dispatcher {

    public static final String NAME = "message";

    /**
     * 创建 MessageOnlyChannelHandler
     * @param handler 通道处理
     * @param url     url
     * @return
     */
    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new MessageOnlyChannelHandler(handler, url);
    }
}

MessageOnlyChannelHandler 实现 WrappedChannelHandler 抽象类,会将所有收到的消息(请求/响应)提交到线程池处理,其他网络事件(连接断开事件,心跳等消息)则是由 IO 线程直接处理,代码如下:

public class MessageOnlyChannelHandler extends WrappedChannelHandler {

    // 构造方法调用父类方法,创建独享的线程池
    public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    /**
     * 处理读取到的数据
     *
     * @param channel
     * @param message request/response
     * @throws RemotingException
     */
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }
}

由上面代码可知,ExecutionChannelHandler 只重写了 received() 方法,其它方法的调用是直接调用父类的方法,直接在 IO 线程中进行处理。

ConnectionOrderedDispatcher & ConnectionOrderedChannelHandler

ConnectionOrderedDispatcher 用来创建 ConnectionOrderedChannelHandler 对象,代码如下:

public class ConnectionOrderedDispatcher implements Dispatcher {

    public static final String NAME = "connection";

    /**
     * 创建 ConnectionOrderedChannelHandler 对象
     *
     * @param handler 通道处理
     * @param url     url
     * @return
     */
    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new ConnectionOrderedChannelHandler(handler, url);
    }
}

ConnectionOrderedChannelHandler 实现 WrappedChannelHandler 抽象类,会将收到的消息交给线程池进行处理,对于连接建立以及断开事件是通过 IO 线程将连接、断开事件交给 connectionExecutor 线程池排队处理的,代码如下:

public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {

    /**
     * 处理连接建立和断开事件的线程池,线程池线程数只有一个,因此任务多的情况会先堆积到阻塞队列进行排队,有序执行
     */
    protected final ThreadPoolExecutor connectionExecutor;
    /**
     * 线程池阻塞队列告警阈值
     */
    private final int queuewarninglimit;

    public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
        // 调用父类构造方法,创建独享的线程池
        super(handler, url);
        // 从 'threadname' 配置项获取线程池名,默认为 Dubbo
        String threadName = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        // 该线程池只有一个线程,并且阻塞队列的长度也是固定的,由配置参数决定
        connectionExecutor = new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
                new NamedThreadFactory(threadName, true),
                new AbortPolicyWithReport(threadName, url)
        ); 

        // 从 'connect.queue.warning.size' 配置项获取线程池阻塞队列告警阈值,默认大小为 1000
        queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        try {
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        try {
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t);
        }
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            //fix, reject exception can not be sent to consumer because thread pool is full, resulting in consumers waiting till timeout.
            if (message instanceof Request && t instanceof RejectedExecutionException) {
                Request request = (Request) message;
                if (request.isTwoWay()) {
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }

    // 检查阈值
    private void checkQueueLength() {
        // 排队任务超过阈值打印告警日志
        if (connectionExecutor.getQueue().size() > queuewarninglimit) {
            logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queuewarninglimit));
        }
    }
}

和 AllChannelHandler 一样,发送消息由 ConnectionOrderedChannelHandler 直接在 IO 线程中进行处理,区别在于后者的连接建立、断开事件不是通过父类中创建的线程池处理,而是创建了一个排队线程池。之所以叫它排队线程池,是该线程池只有一个线程,并且使用的阻塞队列是有序的。

ChannelEventRunnable 线程派发任务体

实现Runnable接口,该任务体被不同的线程派发机制使用。

属性

public class ChannelEventRunnable implements Runnable {

    private static final Logger logger = LoggerFactory.getLogger(ChannelEventRunnable.class);

    /**
     * 装饰的通道处理器,会在线程池中处理消息
     */
    private final ChannelHandler handler;
    /**
     * 通道
     */
    private final Channel channel;
    /**
     * 通道状态
     */
    private final ChannelState state;
    /**
     * 消息(可能为空,如连接断开事件)
     */
    private final Object message;
    /**
     * 处理异常时,捕获的异常
     */
    private final Throwable exception;

   public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state) {
        this(channel, handler, state, null);
    }

    public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message) {
        this(channel, handler, state, message, null);
    }

    public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Throwable t) {
        this(channel, handler, state, null, t);
    }

    public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message, Throwable exception) {
        this.channel = channel;
        this.handler = handler;
        this.state = state;
        this.message = message;
        this.exception = exception;
    }
}

ChannelEventRunnable 中的属性都是由线程派发相关的 ChannelHandler 传入的,不同的派发策略传入的属性不同,通过不同的构造方法也可以看出。

任务体

public class ChannelEventRunnable implements Runnable {

    @Override
    public void run() {

        // 检测通道状态,如果是请求或响应消息, 那么state = RECEIVED
        if (state == ChannelState.RECEIVED) {
            try {

                // 将 channel 和 message 传递给 ChannelHandler 对象用于后续的调用。
                handler.received(channel, message);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                        + ", message is " + message, e);
            }

            // 其它通道状态
        } else {
            switch (state) {
                // 连接事件
                case CONNECTED:
                    try {
                        handler.connected(channel);
                    } catch (Exception e) {
                        logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                    }
                    break;
                // 断开连接事件
                case DISCONNECTED:
                    try {
                        handler.disconnected(channel);
                    } catch (Exception e) {
                        logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                    }
                    break;
                // 发送消息
                case SENT:
                    try {
                        handler.sent(channel, message);
                    } catch (Exception e) {
                        logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                                + ", message is " + message, e);
                    }
                    // 异常处理
                case CAUGHT:
                    try {
                        handler.caught(channel, exception);
                    } catch (Exception e) {
                        logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                                + ", message is: " + message + ", exception is " + exception, e);
                    }
                    break;
                default:
                    logger.warn("unknown state: " + state + ", message is " + message);
            }
        }

    }

    /**
     * 通道状态
     */
    public enum ChannelState {

        /**
         * CONNECTED  - 连接
         */
        CONNECTED,

        /**
         * DISCONNECTED - 断开连接
         */
        DISCONNECTED,

        /**
         * SENT - 发送消息
         */
        SENT,

        /**
         * RECEIVED - 接收请求/响应消息
         */
        RECEIVED,

        /**
         * CAUGHT - 异常
         */
        CAUGHT
    }

}

该任务体功能和作用如下:

1 请求和响应消息出现频率比其他类型消息高,因此这里对消息类型进行了针对性判断,便于提前处理。
2 ChannelEventRunnable 仅是一个中转站,它的 run 方法中并不包含具体的调用逻辑,只是判断对应的通道状态,然后将参数传给装饰的 ChannelHandler 对象进行针对性处理。

至此,ChannelHandlerDelegate 的另一条继承体系分析完毕,Transport 层的主要 ChannelHandler 分析到此结束。

ChannelHandlers

ChannelHandler 的工具类,主要是对传入的 ChannelHandler 进行层层包装,具体怎么包装的我们看下面的代码。

public class ChannelHandlers {
    /**
     * 单例
     */
    private static ChannelHandlers INSTANCE = new ChannelHandlers();
    protected ChannelHandlers() {
    }

    /**
     * 包装
     *
     * @param handler
     * @param url
     * @return
     */
    public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }

    protected static ChannelHandlers getInstance() {
        return INSTANCE;
    }

    /**
     * 无论是Client还是Server,在构造方法中都会将传入的ChannelHandler进行包装,为该 ChannelHandler 增加了 Dubbo 消息派发、心跳处理以及多消息处理的功能。
     * @param handler
     * @param url
     * @return
     */
    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler( // 多消息处理
                new HeartbeatHandler( // 心跳处理
                        ExtensionLoader.getExtensionLoader(Dispatcher.class)
                                .getAdaptiveExtension() 
                                .dispatch(handler, url) // 返回的是一个 ChannelHandlerDelegate 类型的对象,默认是 AllChannelHandler,确定了具体的线程模型
                )
        );
    }
}

很容易发现,包装器其实就是前文介绍的 ChannelHandlerDelegate 类型的 ChannelHandler。该包装逻辑无论在 Client 端还是 Server 端都会使用,也就意味着上层传入的 ChannelHandler 会增加很多的逻辑,即支持多消息处理、心跳处理以及支持 Dubbo 线程模型机制。我们在下一篇文章中还会再次介绍,这里先以 netty4 实现的网络通信简单说明。

NettyServer

public class NettyServer extends AbstractServer implements Server {
    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        // ChannelHandlers.wrap方法,用来包装 ChannelHandler,实现Dubbo 线程模型等功能
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }
}

NettyClient

public class NettyClient extends AbstractClient {
    public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
        // wrapChannelHandler方法,用来包装 ChannelHandler,实现Dubbo 线程模型等功能
        super(url, wrapChannelHandler(url, handler));
    }
}

编解码

关于 Codec2 扩展接口已经在 远程通信模块总览 中进行了介绍,下面介绍在 Transport 层相关的实现和扩展。

编解码工具类 CodecSupport

public class CodecSupport {

    private static final Logger logger = LoggerFactory.getLogger(CodecSupport.class);

    /**
     * 序列化对象集合
     * key: 序列化类型编号  {@link Serialization#getContentTypeId()}
     * value: 序列化对象,如: Hessian2Serialization
     */
    private static Map<Byte, Serialization> ID_SERIALIZATION_MAP = new HashMap<Byte, Serialization>();
    /**
     * 序列化名集合
     * key: 序列化类型编号 {@link Serialization#getContentTypeId()}
     * value: 序列化拓展名,如:hessian2
     */
    private static Map<Byte, String> ID_SERIALIZATIONNAME_MAP = new HashMap<Byte, String>();

    static {
        // 基于 Dubbo SPI,获取 Serialization 的扩展名列表
        Set<String> supportedExtensions = ExtensionLoader.getExtensionLoader(Serialization.class).getSupportedExtensions();
        for (String name : supportedExtensions) {
            // 根据扩展名获取对应的扩展实现
            Serialization serialization = ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(name);
            // 内容类型编号
            byte idByte = serialization.getContentTypeId();

            if (ID_SERIALIZATION_MAP.containsKey(idByte)) {
                logger.error("Serialization extension " + serialization.getClass().getName()
                        + " has duplicate id to Serialization extension "
                        + ID_SERIALIZATION_MAP.get(idByte).getClass().getName()
                        + ", ignore this Serialization extension");
                continue;
            }

            // 以内容编号作为 key,分别缓存序列化扩展实现和扩展实现名
            ID_SERIALIZATION_MAP.put(idByte, serialization);
            ID_SERIALIZATIONNAME_MAP.put(idByte, name);
        }
    }

    private CodecSupport() {
    }

    /**
     * 从缓存中,根据序列化号查找Serialization对象
     *
     * @param id
     * @return
     */
    public static Serialization getSerializationById(Byte id) {
        return ID_SERIALIZATION_MAP.get(id);
    }

    /**
     * 通过URL根据SPI机制查找Serialization对象,默认使用 hessian2
     *
     * @param url
     * @return
     */
    public static Serialization getSerialization(URL url) {
        return ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(url.getParameter(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION));
    }

    /**
     * 查找Serialization对象
     *
     * @param url
     * @param id
     * @return
     * @throws IOException
     */
    public static Serialization getSerialization(URL url, Byte id) throws IOException {
        Serialization serialization = getSerializationById(id);
        // 序列化扩展名
        String serializationName = url.getParameter(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION);

        // 出于安全的目的,针对 JDK 类型的序列化方式,检查连接到服务器的 URL 和实际传输的数据协议是否一致。
        if (serialization == null
                || ((id == 3 || id == 7 || id == 4) && !(serializationName.equals(ID_SERIALIZATIONNAME_MAP.get(id))))) {
            throw new IOException("Unexpected serialization id:" + id + " received from network, please check if the peer send the right id.");
        }
        return serialization;
    }

    /**
     * 获取反序列化对应的 ObjectInput
     *
     * @param url
     * @param is
     * @param proto
     * @return
     * @throws IOException
     */
    public static ObjectInput deserialize(URL url, InputStream is, byte proto) throws IOException {
        Serialization s = getSerialization(url, proto);
        return s.deserialize(url, is);
    }
}

上面代码已经详细注释,整个逻辑分为两点,Dubbo 应用启动时缓存序列化并提供获取序列化的方法。 关于序列化在之前的文章中已经详细介绍过,这里就不再重复说明。

编解码适配器

public class CodecAdapter implements Codec2 {
    /**
     * 被适配的对象
     */
    private Codec codec;

    /**
     * 通过构造方法设置被适配的对象
     *
     * @param codec
     */
    public CodecAdapter(Codec codec) {
        Assert.notNull(codec, "codec == null");
        this.codec = codec;
    }

    @Override
    public void encode(Channel channel, ChannelBuffer buffer, Object message)
            throws IOException {
        UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(1024);
        codec.encode(channel, os, message);
        buffer.writeBytes(os.toByteArray());
    }

    @Override
    public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
        byte[] bytes = new byte[buffer.readableBytes()];
        int savedReaderIndex = buffer.readerIndex();
        buffer.readBytes(bytes);
        UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream(bytes);
        Object result = codec.decode(channel, is);
        buffer.readerIndex(savedReaderIndex + is.position());
        return result == Codec.NEED_MORE_INPUT ? DecodeResult.NEED_MORE_INPUT : result;
    }

    public Codec getCodec() {
        return codec;
    }
}

CodecAdapter 使用对象适配模式完成对 Codec 类型的适配工作,即将 Codec 适配成 Codec2 。关于适配器模式可以参考 适配器模式 。

编解码继承关系

编解码 Codec2 的继承关系如下图所示:
image-1680245723347
继承关系中包含了各层的编解码实现,本篇文章只介绍 Transport 层相关的实现,其它层相关的实现会在对应的层进行介绍。需要注意的是,Exchange 层的编解码实现依赖了 Transport 层的编解码实现,Protocol 层又依赖了 Exchange 层的编解码实现。可以发现,编解码器的实现通过继承的方式以获得更多的功能,每个编码器实现类编解码消息的逻辑都不一样。

AbstractCodec

public abstract class AbstractCodec implements Codec2 {

    private static final Logger logger = LoggerFactory.getLogger(AbstractCodec.class);

    /**
     * 静态方法,校验消息长度
     *
     * @param channel
     * @param size
     * @throws IOException
     */
    protected static void checkPayload(Channel channel, long size) throws IOException {
        //  8M
        int payload = Constants.DEFAULT_PAYLOAD;
        if (channel != null && channel.getUrl() != null) {
            // 获取配置允许最大的消息大小,默认 为 8 * 1024 * 1024;  8M
            payload = channel.getUrl().getParameter(Constants.PAYLOAD_KEY, Constants.DEFAULT_PAYLOAD);
        }
        // 超过允许最大的消息大小,则抛出异常
        if (payload > 0 && size > payload) {
            ExceedPayloadLimitException e = new ExceedPayloadLimitException("Data length too large: " + size + ", max payload: " + payload + ", channel: " + channel);
            logger.error(e);
            throw e;
        }
    }

    /**
     * 获得Serialization对象
     *
     * @param channel
     * @return
     */
    protected Serialization getSerialization(Channel channel) {
        return CodecSupport.getSerialization(channel.getUrl());
    }

    /**
     * 是否为客户端的通道
     *
     * @param channel
     * @return
     */
    protected boolean isClientSide(Channel channel) {
        String side = (String) channel.getAttribute(Constants.SIDE_KEY);
        if ("client".equals(side)) {
            return true;
        } else if ("server".equals(side)) {
            return false;
        } else {
            InetSocketAddress address = channel.getRemoteAddress();
            URL url = channel.getUrl();
            boolean client = url.getPort() == address.getPort()
                    && NetUtils.filterLocalHost(url.getIp()).equals(
                    NetUtils.filterLocalHost(address.getAddress()
                            .getHostAddress()));
            channel.setAttribute(Constants.SIDE_KEY, client ? "client"
                    : "server");
            return client;
        }
    }

    /**
     * 是否为服务端的通道
     *
     * @param channel
     * @return
     */
    protected boolean isServerSide(Channel channel) {
        return !isClientSide(channel);
    }
}

是 Codec2 的抽象实现,提供了公用的一些方法,如校验消息长度是否超过阈值,根据URL获取 Serialization 扩展实现,判断当前通道属于客户端侧还是服务端侧。

TransportCodec

TransportCodec 的逻辑简单、粗暴,使用 Serialize 对所有消息直接序列化或者反序列化。

public class TransportCodec extends AbstractCodec {

    @Override
    public void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException {
        // 对 ChannelBuffer 进行装饰获得 Dubbo 输出流
        OutputStream output = new ChannelBufferOutputStream(buffer);
        // 获得用于序列化的ObjectOutput对象
        ObjectOutput objectOutput = getSerialization(channel).serialize(channel.getUrl(), output);
        // 将消息写入 ObjectOutput
        encodeData(channel, objectOutput, message);
        objectOutput.flushBuffer();
        // 释放,kryo 的 KryoObjectInput 和 KryoObjectOutput 实现了 Cleanable 接口,需要释放资源。
        if (objectOutput instanceof Cleanable) {
            ((Cleanable) objectOutput).cleanup();
        }
    }

    @Override
    public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
        //  对 ChannelBuffer 进行装饰获得 Dubbo 输入流
        InputStream input = new ChannelBufferInputStream(buffer);
        // 获得用于反序列的 ObjectInput 对象
        ObjectInput objectInput = getSerialization(channel).deserialize(channel.getUrl(), input);
        // 从 ObjectInput 读取消息并反序列化为 对象
        Object object = decodeData(channel, objectInput);
        if (objectInput instanceof Cleanable) {
            ((Cleanable) objectInput).cleanup();
        }
        return object;
    }

    protected void encodeData(Channel channel, ObjectOutput output, Object message) throws IOException {
        encodeData(output, message);
    }

    protected Object decodeData(Channel channel, ObjectInput input) throws IOException {
        return decodeData(input);
    }

    protected void encodeData(ObjectOutput output, Object message) throws IOException {
        output.writeObject(message);
    }

    protected Object decodeData(ObjectInput input) throws IOException {
        try {
            return input.readObject();
        } catch (ClassNotFoundException e) {
            throw new IOException("ClassNotFoundException: " + StringUtils.toString(e));
        }
    }
}

小结

本篇文章简单介绍了 Transport 层及其必要性,然后从端点抽象类 AbstractPeer、AbstractEndpoint,语义端点抽象类 AbstractServer、AbstractClient,抽象通道 AbstractChannel 以及 通道关联的 ChannelHandler 多方面介绍了 Transport 层的实现,最后介绍了编解码的继承体系。不难发现,作为底层的 Transport,支持了消息/事件发送、处理、响应以及编解码,涉及的接口和类在功能层面上已经是一个闭环了。后面两篇文章会对本篇文章的抽象进行具体化。