初识RPC

Scroll Down

基本概念

RPC(Remote Procedure Call)远程过程调用,简单来说就是一个节点请求另一个节点提供的服务,像本地方法调用一样调用远程的服务。详细说明:请求方没有服务实现的细节,执行目标行为还是服务提供的节点。请求服务的节点和服务提供的节点以某种方式进行通信,请求方把行为及行为参数传递给服务提供方,服务提供方会根据请求方传递的数据找到对应的服务实现然后执行目标行为,最后再把执行结果返回给请求方。

本地过程调用

发起请求和响应结果都在同一个服务节点上,在Java中就是同一个JVM中的方法调用过程。

远程过程调用

请求的发起者和请求的处理者不在同一个节点上,它们之间需要进行网络通信才能完成请求和响应。

简单RPC实现

说明: 例子是使用梁飞大佬的 技术博客 中的案例

服务接口及实现

package com.alibaba.study.rpc.service;

/**
 * HelloService
 */
public interface HelloService {

    /**
     * 服务方法
     *
     * @param name
     * @return
     */
    String hello(String name);

}
---
package com.alibaba.study.rpc.service.impl;

import com.alibaba.study.rpc.service.HelloService;

/**
 * HelloServiceImpl
 */
public class HelloServiceImpl implements HelloService {

    @Override
    public String hello(String name) {
        return "Hello " + name;
    }
}

RPC框架

package com.alibaba.study.rpc.framework;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * RpcFramework
 */
public class RpcFramework {
    /**
     * 暴露服务
     *
     * @param service 服务实现
     * @param port    服务端口
     * @throws Exception
     */
    public static void export(final Object service, int port) throws Exception {
        if (service == null) {
            throw new IllegalArgumentException("service instance == null");
        }
        if (port <= 0 || port > 65535) {
            throw new IllegalArgumentException("Invalid port " + port);
        }
        System.out.println("Export service " + service.getClass().getName() + " on port " + port);

        // 以指定端口创建ServerSocket
        ServerSocket server = new ServerSocket(port);

        for (; ; ) {
            try {

                // 等待接收请求
                final Socket socket = server.accept();

                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            try {

                                // 获取请求的数据流
                                ObjectInputStream input = new ObjectInputStream(socket.getInputStream());

                                try {


                                    // 获取客户端请求的方法名
                                    String methodName = input.readUTF();
                                    // 获取客户端请求的参数类型列表
                                    Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
                                    // 获取客户端请求的参数列表
                                    Object[] arguments = (Object[]) input.readObject();

                                    // 创建对象输出流对象,用于响应结果给客户端
                                    ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());

                                    try {

                                        // 通过反射,获取服务接口指定的方法
                                        Method method = service.getClass().getMethod(methodName, parameterTypes);
                                        // 反射调用
                                        Object result = method.invoke(service, arguments);
                                        // 将结果响应给客户端
                                        output.writeObject(result);

                                    } catch (Throwable t) {
                                        output.writeObject(t);
                                    } finally {
                                        output.close();
                                    }
                                } finally {
                                    input.close();
                                }
                            } finally {
                                socket.close();
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }).start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 引用服务
     *
     * @param <T>            接口泛型
     * @param interfaceClass 接口类型
     * @param host           服务器主机名
     * @param port           服务器端口
     * @return 远程服务
     * @throws Exception
     */
    @SuppressWarnings("unchecked")
    public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception {
        if (interfaceClass == null) {
            throw new IllegalArgumentException("Interface class == null");
        }
        if (!interfaceClass.isInterface()) {
            throw new IllegalArgumentException("The " + interfaceClass.getName() + " must be interface class!");
        }
        if (host == null || host.length() == 0) {
            throw new IllegalArgumentException("Host == null!");
        }
        if (port <= 0 || port > 65535) {
            throw new IllegalArgumentException("Invalid port " + port);
        }

        System.out.println("Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port);

        /**
         * 使用JDK的动态代理创建接口的代理对象
         * 说明:
         * 在 InvocationHandler#invoke方法内部实现Socket与ServerSocket的通信。当使用代理对象调用方法时,内部使用Socket进行通信,然后把通信的结果返回。
         */
        return (T) Proxy.newProxyInstance(
                interfaceClass.getClassLoader(),
                new Class<?>[]{interfaceClass},
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {

                        // 创建Socket,用于连接ServerSocket
                        Socket socket = new Socket(host, port);

                        try {
                            // 创建用于发送数据到ServerSocket的输出流
                            ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());

                            try {

                                //--------------------- 数据契约 ----------------------------/

                                // 方法名
                                output.writeUTF(method.getName());
                                // 参数类型
                                output.writeObject(method.getParameterTypes());
                                // 参数值
                                output.writeObject(arguments);

                                //------------------------ 数据契约 --------------------------/

                                // 创建用于接收ServerSocket的输入流
                                ObjectInputStream input = new ObjectInputStream(socket.getInputStream());

                                try {
                                    // 读取ServerSocket响应的数据
                                    Object result = input.readObject();

                                    if (result instanceof Throwable) {
                                        throw (Throwable) result;
                                    }

                                    // 返回结果
                                    return result;
                                } finally {
                                    input.close();
                                }
                            } finally {
                                output.close();
                            }
                        } finally {
                            socket.close();
                        }
                    }
                });
    }
}

服务暴露

package com.alibaba.study.rpc.provider;

import com.alibaba.study.rpc.framework.RpcFramework;
import com.alibaba.study.rpc.service.HelloService;
import com.alibaba.study.rpc.service.impl.HelloServiceImpl;

/**
 * RpcProvider
 */
public class RpcProvider {
    public static void main(String[] args) throws Exception {
        // 服务实现
        HelloService service = new HelloServiceImpl();
        // 暴露服务
        RpcFramework.export(service, 1234);
    }
}

引用服务

package com.alibaba.study.rpc.consumer;

import com.alibaba.study.rpc.framework.RpcFramework;
import com.alibaba.study.rpc.service.HelloService;

/**
 * RpcConsumer
 */
public class RpcConsumer {

    public static void main(String[] args) throws Exception {
        // 引用服务【代理对象】
        HelloService service = RpcFramework.refer(HelloService.class, "127.0.0.1", 1234);
        while (true) {
            String hello = service.hello("World");
            System.out.println(hello);
            Thread.sleep(1000);
        }
    }
}

小结

这个例子中,通信是使用同步阻塞的Socket来实现的,采用端对端的方式。远程调用使用的是JDK的动态代理,在invoke方法中实现网络通信。参数序列化使用的是JDK的ObjectStream。一个完善的RPC框架其实就是在这例子的基础上进行多方位扩展和改进。比如,网络通信可以使用性能更好的NIO框架Netty,动态代理可以使用javaassist字节码生成方式[注意:不是javaassist提供的动态代理接口,该接口比JDK自带的还慢],序列化方式可以采用fastjson、hession2以及kryo等技术。如果服务数量达到一定规模,可以引进注册中心进行服务的治理。节点间的通信方式可以有多种,因此可以扩展多协议。除此之外,性能和健壮性也是一个优秀的RPC框架所必须的,如集群容错、负载均衡、重试机制、服务降级…这些都会在后面分析的Dubbo框架中得到很好的体现。