一 总体流程
客户端接收响应消息
NettyHandler.messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-->MultiMessageHandler.received(Channel channel, Object message)
-->HeartbeatHandler.received(Channel channel, Object message)
-->AllChannelHandler.received(Channel channel, Object message)
-->ExecutorService cexecutor = getExecutorService()
-->cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message))
-->ChannelEventRunnable.run()
-->DecodeHandler.received(Channel channel, Object message)
-->decode(Object message)
-->HeaderExchangeHandler.received(Channel channel, Object message)
-->handleResponse(Channel channel, Response response)
-->DefaultFuture.received(channel, response)
-->doReceived(Response res)//异步转同步
二 源码解析
在HeaderExchangeHandler.received(Channel channel, Object message)方法之前,与服务端接收请求消息一样,不再赘述。
HeaderExchangeHandler.received(Channel channel, Object message)
public void received(Channel channel, Object message) throws RemotingException {
...
try {
if (message instanceof Request) {
...
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
...
} else {
...
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
DefaultFuture.received(Channel channel, Response response)
private final long id;
private final Request request;
private final int timeout;
private volatile Response response;
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
private final Condition done = lock.newCondition();
public static void received(Channel channel, Response response) {
try {
DefaultFuture future = FUTURES.remove(response.getId());//删除元素并返回key=response.getId()的DefaultFuture
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
lock.lock();
try {
//设置response
response = res;
if (done != null) {
//唤醒阻塞的线程
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
这里比较难懂,笔者再给出客户端发出请求时的一段代码:HeaderExchangeChannel.request(Object request, int timeout)
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
netty是一个异步非阻塞的框架,所以当执行channel.send(req);的时候,当其内部执行到netty发送消息时,不会等待结果,直接返回。为了实现“异步转为同步”,使用了DefaultFuture这个辅助类,
在HeaderExchangeChannel.request(Object request, int timeout),在还没有等到客户端的响应回来的时候,就直接将future返回了。返回给谁?再来看HeaderExchangeChannel.request(Object request, int timeout)的调用者。
-->DubboInvoker.doInvoke(final Invocation invocation)
//获取ExchangeClient进行消息的发送
-->ReferenceCountExchangeClient.request(Object request, int timeout)
-->HeaderExchangeClient.request(Object request, int timeout)
-->HeaderExchangeChannel.request(Object request, int timeout)
DubboInvoker.doInvoke(final Invocation invocation)
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);//是否异步
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);//是否没有返回值
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(...);
} catch (RemotingException e) {
throw new RpcException(...);
}
}
其中currentClient.request(inv, timeout)返回值是ResponseFuture,DefaultFuture是ResponseFuture的实现类,实际上这里返回的就是DefaultFuture实例,而该实例就是HeaderExchangeChannel.request(Object request, int timeout)返回的那个future实例。之后调用DefaultFuture.get()。
public Object get() throws RemotingException {
return get(timeout);
}
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
//Causes the current thread to wait until it is signalled or interrupted, or the specified waiting time elapses.
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
public boolean isDone() {
return response != null;
}
此处我们看到当响应response没有回来时,condition会执行await进行阻塞当前线程,直到被唤醒或被中断或阻塞时间到时了。当客户端接收到服务端的响应的时候,DefaultFuture.doReceived:
会先为response赋上返回值,之后执行condition的signal唤醒被阻塞的线程,get()方法就会释放锁,执行returnFromResponse(),返回值。
private Object returnFromResponse() throws RemotingException {
Response res = response;
if (res == null) {
throw new IllegalStateException("response cannot be null");
}
if (res.getStatus() == Response.OK) {
return res.getResult();
}
if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
}
throw new RemotingException(channel, res.getErrorMessage());
}
到现在其实还有一个问题?就是netty时异步非阻塞的,那么假设现在我发了1w个Request,后来返回来1w个Response,那么怎么对应Request和Response呢?如果对应不上,最起码的唤醒就会有问题。为了解决这个问题提,Request和Response中都有一个属性id。
在HeaderExchangeChannel.request(Object request, int timeout)中:
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
看一下Request的构造器:
private static final AtomicLong INVOKE_ID = new AtomicLong(0);
private final long mId;
public Request() {
mId = newId();
}
private static long newId() {
// getAndIncrement()增长到MAX_VALUE时,再增长会变为MIN_VALUE,负数也可以做为ID
return INVOKE_ID.getAndIncrement();
}
看一下DefaultFuture的构造器:
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
private final long id;
private final Request request;
private volatile Response response;
public DefaultFuture(Channel channel, Request request, int timeout) {
...
this.request = request;
this.id = request.getId();
...
FUTURES.put(id, this);
...
}
再来看一下响应。
HeaderExchangeHandler.handleRequest(ExchangeChannel channel, Request req)
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
...
Object msg = req.getData();
try {
// handle data.
Object result = handler.reply(channel, msg);
res.setStatus(Response.OK);
res.setResult(result);
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
}
return res;
}
来看一下Response的构造器:
private long mId = 0;
public Response(long id, String version) {
mId = id;
mVersion = version;
}
这里response的id的值时request的id。最后来看一下服务端接收后的处理:
DefaultFuture.received(Channel channel, Response response)
public static void received(Channel channel, Response response) {
try {
DefaultFuture future = FUTURES.remove(response.getId());//删除元素并返回key=response.getId()的DefaultFuture
if (future != null) {
future.doReceived(response);
} else {
...
}
} finally {
CHANNELS.remove(response.getId());
}
}