概述
本文主要讲述Dubbo中的核心组件Protocol以及Invoker
Protocol属于Dubbo核心操作类,主要定义了Dubbo框架的两个核心操作,即导出(export)和调用(refer)。
Protocol定义
/**
* Protocol类定义了Dubbo中的核心操作,即服务导出和服务调用。
* Protocol. (API/SPI, Singleton, ThreadSafe)
*/
@SPI("dubbo") // 缺省情况下Protocol扩展点的默认实现是dubbo。
public interface Protocol {
/**
* Get default port when user doesn't config the port.
*
* @return default port
*/
int getDefaultPort();
/**
* Export service for remote invocation: <br>
* 1. Protocol should record request source address after receive a request:
* RpcContext.getContext().setRemoteAddress();<br>
* 2. export() must be idempotent, that is, there's no difference between invoking once and invoking twice when
* export the same URL<br>
* 3. Invoker instance is passed in by the framework, protocol needs not to care <br>
* 该方法定义了Dubbo导出方法。
* 该方法在实现中主要有两种:一种是InJvm内部导出,这种导出不需要启动服务和注册URL到注册中心,
* Invoker是AbstractProxyInvoker,这个类主要是通过Javassist生成一个对于目标类的代理类目的是减少反射调用。
* 另一种是各种协议的导出,主要的实现方式先使用不同协议框架生成对目标类的代理实现对象,并且针对目标代理对象生成AbstractProxyInvoker
* AbstractProxyInvoker所做的逻辑仅仅是识别方法名,并进行代理调用。并执行doExport调用,将Export注册到exporterMap中。
* @param <T> Service type
* @param invoker Service invoker
* @return exporter reference for exported service, useful for unexport the service later
* @throws RpcException thrown when error occurs during export the service, for example: port is occupied
*/
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
/**
* Refer a remote service: <br>
* 1. When user calls `invoke()` method of `Invoker` object which's returned from `refer()` call, the protocol
* needs to correspondingly execute `invoke()` method of `Invoker` object <br>
* 2. It's protocol's responsibility to implement `Invoker` which's returned from `refer()`. Generally speaking,
* protocol sends remote request in the `Invoker` implementation. <br>
* 3. When there's check=false set in URL, the implementation must not throw exception but try to recover when
* connection fails.
* 一种是Dubbo协议调用,通过DubboInvoker实现。
* 另一种是各种协议的导出。
* 主要的实现方式先使用不同协议框架生成对目标类的代理实现对象,并且针对目标代理对象生成AbstractProxyInvoker
* AbstractProxyInvoker所做的逻辑仅仅是识别方法名,并进行代理调用。
* 然后再使用AbstractInvoker生成远程调用对象,该远程调用对象封装了远程调用逻辑。
* @param <T> Service type
* @param type Service class
* @param url URL address for the remote service
* @return invoker service's local proxy
* @throws RpcException when there's any error while connecting to the service provider
*/
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
/**
* Destroy protocol: <br>
* 1. Cancel all services this protocol exports and refers <br>
* 2. Release all occupied resources, for example: connection, port, etc. <br>
* 3. Protocol can continue to export and refer new service even after it's destroyed.
*/
void destroy();
}
Protocol实现细节
Protocol接口主要有两类实现。
1、Dubbo内部的Protocol,DubboProtocol。这个接口调用了Dubbo内部自定义的调用和导出服务,该类主要继承了AbstractProtocol。
2、三方Protocol,比如HessianProtocol、等。这些类主要继承了AbstractProxyProtocol。
ProxyFactory
1、getProxy: InvokerInvocationHandler通过一个Invoker入参生成一个可以提供用户客户端调用的代理对象,实现远程调用。
2、getInvoker: 根据提供的实例对象生成一个可执行的本地方法的Invoker(AbstractProxyInvoker),供Exporter暴露服务,为每个服务提供的接口实现类对象生成一个Wrapper类型的对象,内部invokeMethod通过判断方法名和方法参数个数来确定具体调用哪个方法,直接通过对象调用,减少反射调用所产生的开销。
InvokerInvocationHandler:顾名思义是生成一个Invoker的代理对象,内部通过回调Invoker来实现远程调用。主要的作用是为客户端的接口提供动态代理调用的逻辑,主要有两步:
1、通过方法名,返回值,入参构建Invocation
2、通过构建的Invocation执行Invoker的invoke函数
Invoker实现细节
public interface Invoker<T> extends Node {
/**
* get service interface.
*
* @return service interface.
*/
Class<T> getInterface();
/**
* invoke. 执行调用:1、远程代理调用 2、本地代理调用
*
* @param invocation
* @return result
* @throws RpcException
*/
Result invoke(Invocation invocation) throws RpcException;
}
Invoker实现主要有两种:
1、AbstractInvoker:发起远程接口调用
2、AbstractProxyInvoker:发起本地方法调用
AbstractInvoker
invoker方法代码如下:
@Override
public Result invoke(Invocation inv) throws RpcException {
// if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
if (destroyed.get()) {
logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
+ ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
}
RpcInvocation invocation = (RpcInvocation) inv;
// 将发起调用对象放到Invovation中
invocation.setInvoker(this);
if (attachment != null && attachment.size() > 0) {
invocation.addAttachmentsIfAbsent(attachment);
}
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
/**
* invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
* because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
* by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is
* a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).
*/
invocation.addAttachments(contextAttachments);
}
if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
try {
// 执行远程调用,由子类实现
return doInvoke(invocation);
} catch (InvocationTargetException e) { // biz exception
Throwable te = e.getTargetException();
if (te == null) {
return new RpcResult(e);
} else {
if (te instanceof RpcException) {
((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
}
return new RpcResult(te);
}
} catch (RpcException e) {
if (e.isBiz()) {
return new RpcResult(e);
} else {
throw e;
}
} catch (Throwable e) {
return new RpcResult(e);
}
}
protected abstract Result doInvoke(Invocation invocation) throws Throwable;
AbstractProxyInvoker
AbstractProxyInvoker中的Invoke方法如下:
@Override
public Result invoke(Invocation invocation) throws RpcException {
RpcContext rpcContext = RpcContext.getContext();
try {
// 由子类实现具体逻辑,这里主要是由JavassistProxyFactory和JdkProxyFactory实现
Object obj = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
if (RpcUtils.isReturnTypeFuture(invocation)) {
return new AsyncRpcResult((CompletableFuture<Object>) obj);
} else if (rpcContext.isAsyncStarted()) { // ignore obj in case of RpcContext.startAsync()? always rely on user to write back.
return new AsyncRpcResult(((AsyncContextImpl)(rpcContext.getAsyncContext())).getInternalFuture());
} else {
return new RpcResult(obj);
}
} catch (InvocationTargetException e) {
// TODO async throw exception before async thread write back, should stop asyncContext
if (rpcContext.isAsyncStarted() && !rpcContext.stopAsync()) {
logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
}
return new RpcResult(e.getTargetException());
} catch (Throwable e) {
throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
JavassistProxyFactory
public class JavassistProxyFactory extends AbstractProxyFactory {
// 该方法主要是由refer,服务消费方进行调用。
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
// 由Dubbo本身实现的Proxy类,主要功能是通过Javassist来动态生成对应的代理对象。
// 同时这里定义了InvokerInvocationHandler的回调逻辑,使得代理对象可以在方法中回调Invoker逻辑。
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
// 该方法主要是由export,服务提供方进行调用。主要的功能是为每个需要对外提供服务的接口生成对应的代理对象,并通过Invoker对外暴露。
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
其实从整体上来看,Dubbo的整体设计还是按照服务消费和服务生产两方面来设计的。
Proxy 和 Wrapper 这两个类,Proxy 类用来创建服务接口代理类的实例,Wrapper 类是对服务类进行拆解、包装,对服务方法进行映射处理,避免反射调用。
服务消费者使用Proxy创建的服务代理对象屏蔽了网络通信等细节,服务提供者使用Wrapper将个性化的服务接口实现统一转成Invoker, Proxy 和 Wrapper 实现了 Dubbo 内部和业务接口之间的无缝转换。在之后的服务暴露、服务引用以及服务调用环节中可以看到它们的必要性。