手写RPC远程调用

过去的,未来的
2020-01-16 / 0 评论 / 0 点赞 / 748 阅读 / 0 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2020-01-16,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

RPC 的全称是 Remote Procedure Call 是一种进程间通信方式。它允许程序调用另一个地址空间(通常是共享网络的另一台机器上)的过程或函数,而不用程序员显式编码这个远程调用的细节。

远程调用原理

比如 A (client) 调用 B (server) 提供的方法:

  • 首先A与B之间建立一个TCP连接;

  • 然后A把需要调用的方法名以及方法参数序列化成字节流发送出去;

  • B接受A发送过来的字节流,然后反序列化得到目标方法名,方法参数,接着执行相应的方法调用并把结果返回;

  • A接受远程调用结果,输出。

  • RPC框架就是把我刚才说的这几点些细节给封装起来,给用户暴露简单友好的API使用。

这里我们采用jdk的动态代理实现

话不多说,直接看代码。

本案例分为三个模块rpc-api、rpc-cusmer、rpc-product。

一、首先暴露出来一个接口(rpc-api)

1、提供一个接口
package com.rpc.api;

public interface Api {
	public String testApi(String name);	
}
2、封装一个request的data
package com.rpc.data;

import java.io.Serializable;

public class RequestData implements Serializable{
	
	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	private String interfaceName;
	private String methodName;
	private Class<?>[]parameterTypes;
	private Object[] parameter;
	public String getInterfaceName() {
		return interfaceName;
	}
	public void setInterfaceName(String interfaceName) {
		this.interfaceName = interfaceName;
	}
	public String getMethodName() {
		return methodName;
	}
	public void setMethodName(String methodName) {
		this.methodName = methodName;
	}
	public Class<?>[] getParameterTypes() {
		return parameterTypes;
	}
	public void setParameterTypes(Class<?>[] parameterTypes) {
		this.parameterTypes = parameterTypes;
	}
	public Object[] getParameter() {
		return parameter;
	}
	public void setParameter(Object[] parameter) {
		this.parameter = parameter;
	}
	
}

二、服务的提供者(rpc-product)

1、pom依赖rpc-api
2、实现接口
package com.rpc.service;

import com.rpc.api.Api;

public class ApiImp implements Api {

	public String testApi(String name) {
		// TODO Auto-generated method stub
		return "Hello "+name;
	}

}
3、提供一个服务
package com.rpc.service;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.rpc.data.RequestData;

public class RpcServer {
	//zookeper
	private Map<String,Object> serviceMap=new ConcurrentHashMap<String, Object>(32);
	
	private ThreadPoolExecutor excExecutor=new ThreadPoolExecutor(8,20, 200,TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
	
	public void publishService(Class<?>interfaceClass,Object instance){
		this.serviceMap.put(interfaceClass.getName(), instance);
	}
	public void start(int port){
		try {
			ServerSocket serverSocket=new ServerSocket();
			serverSocket.bind(new InetSocketAddress(port));
			System.out.println("Ѿ.....");
			while(true){
				excExecutor.execute(new Tack(serverSocket.accept()));
			}
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	private class Tack implements Runnable{
		private Socket client;
		public Tack(Socket socket){
			this.client=socket;
		}
		public void run() {
			try{
				ObjectOutputStream serializer=new ObjectOutputStream(client.getOutputStream());
				ObjectInputStream deserializer=new ObjectInputStream(client.getInputStream());	
				RequestData requestData=(RequestData) deserializer.readObject();
				Object instance=serviceMap.get(requestData.getInterfaceName());
				Method method= instance.getClass().getDeclaredMethod(requestData.getMethodName(),requestData.getParameterTypes());
				Object result=method.invoke(instance, requestData.getParameter());
				serializer.writeObject(result);
			}catch(Exception e){
				
			}
		
		}
		
	}
	
	
}
4、对外发布接口
package com.rpc.service;

import com.rpc.api.Api;

public class ServerMain {

	public static void main(String[] args) {
		RpcServer rpcServer=new RpcServer();
		rpcServer.publishService(Api.class, new ApiImp());
		rpcServer.start(12345);		
	}

}

三、服务消费者(rpc-cusmer)

1、pom依赖rpc-api
2、客户端远程代理
package com.rpc.client;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;

import com.rpc.api.Api;
import com.rpc.data.RequestData;

public class RpcClient {
	/***
	 * 客户端远程代理  
	 * @param interfaceClass
	 * @param address
	 * @return
	 */
	@SuppressWarnings("unchecked")
	public static <T> Object getRemoteProxy(final Class<?> interfaceClass,final InetSocketAddress address){
		return  (T)Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new InvocationHandler() {
			public Object invoke(Object proxy, Method method, Object[] args)
					throws Throwable {
								try{
									Socket client=new Socket();
									client.connect(address);
									try{
										ObjectOutputStream serializer=new ObjectOutputStream(client.getOutputStream());
										ObjectInputStream deserializer=new ObjectInputStream(client.getInputStream());
										RequestData requestData=new RequestData();
										requestData.setInterfaceName(interfaceClass.getName());
										requestData.setMethodName(method.getName());
										requestData.setParameterTypes(method.getParameterTypes());
										requestData.setParameter(args);
										serializer.writeObject(requestData);
										return deserializer.readObject();
									}catch(Exception e){
										
									}
								}catch(Exception e){
									
								}
				return null;
			}
		});
	}
	
	
	public static void main(String[] args) {
		Api api=(Api) RpcClient.getRemoteProxy(Api.class, new InetSocketAddress("localhost", 12345));
		String result=api.testApi("fwpfjgwwj");
		System.out.println(result);
		/***
		 * 可以将spring框架引入,即可用spring管理对象
		 */
			
	}
	
}

四、说明

1、本案例采用原生Socket,方式,可以优化为nio
2、本案例采用jdk动态代理
3、可以将RpcServer更换为zookeeper,为服务注册中心
4、代码地址为
0

评论区