客户端接收响应信息(异步转同步的实现)

Scroll Down

一 总体流程

客户端接收响应消息
NettyHandler.messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-->MultiMessageHandler.received(Channel channel, Object message)
  -->HeartbeatHandler.received(Channel channel, Object message)
    -->AllChannelHandler.received(Channel channel, Object message)
      -->ExecutorService cexecutor = getExecutorService()
      -->cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message))
        -->ChannelEventRunnable.run()
          -->DecodeHandler.received(Channel channel, Object message)
            -->decode(Object message)
            -->HeaderExchangeHandler.received(Channel channel, Object message)
              -->handleResponse(Channel channel, Response response)
                -->DefaultFuture.received(channel, response)
                  -->doReceived(Response res)//异步转同步

二 源码解析

在HeaderExchangeHandler.received(Channel channel, Object message)方法之前,与服务端接收请求消息一样,不再赘述。

HeaderExchangeHandler.received(Channel channel, Object message)

public void received(Channel channel, Object message) throws RemotingException {
        ...
        try {
            if (message instanceof Request) {
                ...
            } else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                ...
            } else {
                ...
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }

    static void handleResponse(Channel channel, Response response) throws RemotingException {
        if (response != null && !response.isHeartbeat()) {
            DefaultFuture.received(channel, response);
        }
    }

DefaultFuture.received(Channel channel, Response response)

private final long id;
    private final Request request;
    private final int timeout;
    private volatile Response response;
    private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
    private final Condition done = lock.newCondition();

    public static void received(Channel channel, Response response) {
        try {
            DefaultFuture future = FUTURES.remove(response.getId());//删除元素并返回key=response.getId()的DefaultFuture
            if (future != null) {
                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at "
                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                        + ", response " + response
                        + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                        + " -> " + channel.getRemoteAddress()));
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }

    private void doReceived(Response res) {
        lock.lock();
        try {
            //设置response
            response = res;
            if (done != null) {
                //唤醒阻塞的线程
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }

这里比较难懂,笔者再给出客户端发出请求时的一段代码:HeaderExchangeChannel.request(Object request, int timeout)

public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        Request req = new Request();
        req.setVersion("2.0.0");
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

netty是一个异步非阻塞的框架,所以当执行channel.send(req);的时候,当其内部执行到netty发送消息时,不会等待结果,直接返回。为了实现“异步转为同步”,使用了DefaultFuture这个辅助类,

在HeaderExchangeChannel.request(Object request, int timeout),在还没有等到客户端的响应回来的时候,就直接将future返回了。返回给谁?再来看HeaderExchangeChannel.request(Object request, int timeout)的调用者。

                  -->DubboInvoker.doInvoke(final Invocation invocation)
                     //获取ExchangeClient进行消息的发送
                    -->ReferenceCountExchangeClient.request(Object request, int timeout)
                      -->HeaderExchangeClient.request(Object request, int timeout)
                        -->HeaderExchangeChannel.request(Object request, int timeout)

DubboInvoker.doInvoke(final Invocation invocation)

protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);//是否异步
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);//是否没有返回值
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout);
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(...);
        } catch (RemotingException e) {
            throw new RpcException(...);
        }
    }

其中currentClient.request(inv, timeout)返回值是ResponseFuture,DefaultFuture是ResponseFuture的实现类,实际上这里返回的就是DefaultFuture实例,而该实例就是HeaderExchangeChannel.request(Object request, int timeout)返回的那个future实例。之后调用DefaultFuture.get()。

public Object get() throws RemotingException {
        return get(timeout);
    }

    public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {
                    //Causes the current thread to wait until it is signalled or interrupted, or the specified waiting time elapses.
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }

    public boolean isDone() {
        return response != null;
    }

此处我们看到当响应response没有回来时,condition会执行await进行阻塞当前线程,直到被唤醒或被中断或阻塞时间到时了。当客户端接收到服务端的响应的时候,DefaultFuture.doReceived:

会先为response赋上返回值,之后执行condition的signal唤醒被阻塞的线程,get()方法就会释放锁,执行returnFromResponse(),返回值。

private Object returnFromResponse() throws RemotingException {
        Response res = response;
        if (res == null) {
            throw new IllegalStateException("response cannot be null");
        }
        if (res.getStatus() == Response.OK) {
            return res.getResult();
        }
        if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
            throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
        }
        throw new RemotingException(channel, res.getErrorMessage());
    }

到现在其实还有一个问题?就是netty时异步非阻塞的,那么假设现在我发了1w个Request,后来返回来1w个Response,那么怎么对应Request和Response呢?如果对应不上,最起码的唤醒就会有问题。为了解决这个问题提,Request和Response中都有一个属性id。

在HeaderExchangeChannel.request(Object request, int timeout)中:

Request req = new Request();
        req.setVersion("2.0.0");
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;

看一下Request的构造器:

private static final AtomicLong INVOKE_ID = new AtomicLong(0);
    private final long mId;

    public Request() {
        mId = newId();
    }

    private static long newId() {
        // getAndIncrement()增长到MAX_VALUE时,再增长会变为MIN_VALUE,负数也可以做为ID
        return INVOKE_ID.getAndIncrement();
    }

看一下DefaultFuture的构造器:

private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
    private final long id;
    private final Request request;
    private volatile Response response;

    public DefaultFuture(Channel channel, Request request, int timeout) {
        ...
        this.request = request;
        this.id = request.getId();
        ...
        FUTURES.put(id, this);
        ...
    }

再来看一下响应。

HeaderExchangeHandler.handleRequest(ExchangeChannel channel, Request req)

Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        ...
        Object msg = req.getData();
        try {
            // handle data.
            Object result = handler.reply(channel, msg);
            res.setStatus(Response.OK);
            res.setResult(result);
        } catch (Throwable e) {
            res.setStatus(Response.SERVICE_ERROR);
            res.setErrorMessage(StringUtils.toString(e));
        }
        return res;
    }

来看一下Response的构造器:

private long mId = 0;

     public Response(long id, String version) {
         mId = id;
         mVersion = version;
     }

这里response的id的值时request的id。最后来看一下服务端接收后的处理:

DefaultFuture.received(Channel channel, Response response)

public static void received(Channel channel, Response response) {
        try {
            DefaultFuture future = FUTURES.remove(response.getId());//删除元素并返回key=response.getId()的DefaultFuture
            if (future != null) {
                future.doReceived(response);
            } else {
               ...
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }