Dubbo中Protocol和Invoker

Scroll Down

概述

本文主要讲述Dubbo中的核心组件Protocol以及Invoker

Protocol属于Dubbo核心操作类,主要定义了Dubbo框架的两个核心操作,即导出(export)和调用(refer)。

Protocol定义

/**
 * Protocol类定义了Dubbo中的核心操作,即服务导出和服务调用。
 * Protocol. (API/SPI, Singleton, ThreadSafe)
 */
@SPI("dubbo") // 缺省情况下Protocol扩展点的默认实现是dubbo。
public interface Protocol {

    /**
     * Get default port when user doesn't config the port.
     *
     * @return default port
     */
    int getDefaultPort();

    /**
     * Export service for remote invocation: <br>
     * 1. Protocol should record request source address after receive a request:
     * RpcContext.getContext().setRemoteAddress();<br>
     * 2. export() must be idempotent, that is, there's no difference between invoking once and invoking twice when
     * export the same URL<br>
     * 3. Invoker instance is passed in by the framework, protocol needs not to care <br>
     * 该方法定义了Dubbo导出方法。
     * 该方法在实现中主要有两种:一种是InJvm内部导出,这种导出不需要启动服务和注册URL到注册中心,
     * Invoker是AbstractProxyInvoker,这个类主要是通过Javassist生成一个对于目标类的代理类目的是减少反射调用。
     * 另一种是各种协议的导出,主要的实现方式先使用不同协议框架生成对目标类的代理实现对象,并且针对目标代理对象生成AbstractProxyInvoker
     * AbstractProxyInvoker所做的逻辑仅仅是识别方法名,并进行代理调用。并执行doExport调用,将Export注册到exporterMap中。
     * @param <T>     Service type
     * @param invoker Service invoker
     * @return exporter reference for exported service, useful for unexport the service later
     * @throws RpcException thrown when error occurs during export the service, for example: port is occupied
     */
    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    /**
     * Refer a remote service: <br>
     * 1. When user calls `invoke()` method of `Invoker` object which's returned from `refer()` call, the protocol
     * needs to correspondingly execute `invoke()` method of `Invoker` object <br>
     * 2. It's protocol's responsibility to implement `Invoker` which's returned from `refer()`. Generally speaking,
     * protocol sends remote request in the `Invoker` implementation. <br>
     * 3. When there's check=false set in URL, the implementation must not throw exception but try to recover when
     * connection fails.
     * 一种是Dubbo协议调用,通过DubboInvoker实现。
     * 另一种是各种协议的导出。
     * 主要的实现方式先使用不同协议框架生成对目标类的代理实现对象,并且针对目标代理对象生成AbstractProxyInvoker
     * AbstractProxyInvoker所做的逻辑仅仅是识别方法名,并进行代理调用。
     * 然后再使用AbstractInvoker生成远程调用对象,该远程调用对象封装了远程调用逻辑。
     * @param <T>  Service type
     * @param type Service class
     * @param url  URL address for the remote service
     * @return invoker service's local proxy
     * @throws RpcException when there's any error while connecting to the service provider
     */
    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

    /**
     * Destroy protocol: <br>
     * 1. Cancel all services this protocol exports and refers <br>
     * 2. Release all occupied resources, for example: connection, port, etc. <br>
     * 3. Protocol can continue to export and refer new service even after it's destroyed.
     */
    void destroy();

}

image.png

Protocol实现细节

Protocol接口主要有两类实现。
1、Dubbo内部的Protocol,DubboProtocol。这个接口调用了Dubbo内部自定义的调用和导出服务,该类主要继承了AbstractProtocol。
2、三方Protocol,比如HessianProtocol、等。这些类主要继承了AbstractProxyProtocol。

ProxyFactory

1、getProxy: InvokerInvocationHandler通过一个Invoker入参生成一个可以提供用户客户端调用的代理对象,实现远程调用。
2、getInvoker: 根据提供的实例对象生成一个可执行的本地方法的Invoker(AbstractProxyInvoker),供Exporter暴露服务,为每个服务提供的接口实现类对象生成一个Wrapper类型的对象,内部invokeMethod通过判断方法名和方法参数个数来确定具体调用哪个方法,直接通过对象调用,减少反射调用所产生的开销。

InvokerInvocationHandler:顾名思义是生成一个Invoker的代理对象,内部通过回调Invoker来实现远程调用。主要的作用是为客户端的接口提供动态代理调用的逻辑,主要有两步:

1、通过方法名,返回值,入参构建Invocation
2、通过构建的Invocation执行Invoker的invoke函数

Invoker实现细节

public interface Invoker<T> extends Node {

    /**
     * get service interface.
     *
     * @return service interface.
     */
    Class<T> getInterface();

    /**
     * invoke. 执行调用:1、远程代理调用 2、本地代理调用
     *
     * @param invocation
     * @return result
     * @throws RpcException
     */
    Result invoke(Invocation invocation) throws RpcException;

}

Invoker实现主要有两种:
1、AbstractInvoker:发起远程接口调用
2、AbstractProxyInvoker:发起本地方法调用

AbstractInvoker

invoker方法代码如下:

@Override
public Result invoke(Invocation inv) throws RpcException {
    // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
    if (destroyed.get()) {
        logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
                + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
    }
    RpcInvocation invocation = (RpcInvocation) inv;
    // 将发起调用对象放到Invovation中
    invocation.setInvoker(this);
    if (attachment != null && attachment.size() > 0) {
        invocation.addAttachmentsIfAbsent(attachment);
    }
    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        /**
         * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
         * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
         * by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is
         * a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).
         */
        invocation.addAttachments(contextAttachments);
    }
    if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
        invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
    }
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);


    try {
        // 执行远程调用,由子类实现
        return doInvoke(invocation);
    } catch (InvocationTargetException e) { // biz exception
        Throwable te = e.getTargetException();
        if (te == null) {
            return new RpcResult(e);
        } else {
            if (te instanceof RpcException) {
                ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
            }
            return new RpcResult(te);
        }
    } catch (RpcException e) {
        if (e.isBiz()) {
            return new RpcResult(e);
        } else {
            throw e;
        }
    } catch (Throwable e) {
        return new RpcResult(e);
    }
}

protected abstract Result doInvoke(Invocation invocation) throws Throwable;

AbstractProxyInvoker

AbstractProxyInvoker中的Invoke方法如下:

@Override
public Result invoke(Invocation invocation) throws RpcException {
    RpcContext rpcContext = RpcContext.getContext();
    try {
        // 由子类实现具体逻辑,这里主要是由JavassistProxyFactory和JdkProxyFactory实现
        Object obj = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
        if (RpcUtils.isReturnTypeFuture(invocation)) {
            return new AsyncRpcResult((CompletableFuture<Object>) obj);
        } else if (rpcContext.isAsyncStarted()) { // ignore obj in case of RpcContext.startAsync()? always rely on user to write back.
            return new AsyncRpcResult(((AsyncContextImpl)(rpcContext.getAsyncContext())).getInternalFuture());
        } else {
            return new RpcResult(obj);
        }
    } catch (InvocationTargetException e) {
        // TODO async throw exception before async thread write back, should stop asyncContext
        if (rpcContext.isAsyncStarted() && !rpcContext.stopAsync()) {
            logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
        }
        return new RpcResult(e.getTargetException());
    } catch (Throwable e) {
        throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

JavassistProxyFactory

public class JavassistProxyFactory extends AbstractProxyFactory {

    // 该方法主要是由refer,服务消费方进行调用。
    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        // 由Dubbo本身实现的Proxy类,主要功能是通过Javassist来动态生成对应的代理对象。
        // 同时这里定义了InvokerInvocationHandler的回调逻辑,使得代理对象可以在方法中回调Invoker逻辑。
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

    // 该方法主要是由export,服务提供方进行调用。主要的功能是为每个需要对外提供服务的接口生成对应的代理对象,并通过Invoker对外暴露。
    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

}

其实从整体上来看,Dubbo的整体设计还是按照服务消费和服务生产两方面来设计的。

image

Proxy 和 Wrapper 这两个类,Proxy 类用来创建服务接口代理类的实例,Wrapper 类是对服务类进行拆解、包装,对服务方法进行映射处理,避免反射调用。
服务消费者使用Proxy创建的服务代理对象屏蔽了网络通信等细节,服务提供者使用Wrapper将个性化的服务接口实现统一转成Invoker, Proxy 和 Wrapper 实现了 Dubbo 内部和业务接口之间的无缝转换。在之后的服务暴露、服务引用以及服务调用环节中可以看到它们的必要性。