首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

摊牌了,我要手写一个RPC

  • 23-11-17 19:42
  • 3747
  • 8892
blog.csdn.net

文章目录

  • 前言
  • 需要解决的问题
  • 手写RPC实战
    • 1、定义通信协议
    • 2、自定义注解
    • 3、定义接口
    • 4、实现接口
    • 5、暴露服务并监听处理请求
    • 6、生成RPC动态代理对象
    • 7、消费者注入RPC动态代理对象
  • 功能测试
  • 尾巴

前言

RPC是远程过程调用(Remote Procedure Call)的缩写形式。SAP系统RPC调用的原理其实很简单,有一些类似于三层构架的C/S系统,第三方的客户程序通过接口调用SAP内部的标准或自定义函数,获得函数返回的数据进行处理后显示或打印。

随着微服务、分布式的大热,开发者慢慢趋向于将一个大的服务拆分成多个独立的小的服务。
服务经过拆分后,服务与服务之间的通信就变得至关重要。

RPC说白了就是节点A去调用节点B的服务,站在Java的角度看,就是像调用本地函数一样调用远程函数。


需要解决的问题

在这里插入图片描述

手写RPC实战

首先看下目录结构:
在这里插入图片描述

1、定义通信协议

消费者发起一个调用请求,服务者必须知道你要调哪个服务,参数是什么,这些需要封装好。

@Data
public class RpcMessage implements Serializable {
	private static final long serialVersionUID = 1L;

	private String interfaceName;//调用的Service接口名
	private String methodName;//调用的方法名
	private Class<?>[] argsType;//参数类型列表
	private Object[] args;//参数
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

2、自定义注解

分别是服务的提供者和消费者。

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Service//引入Spring Service,自动注入IOC容器
// 服务提供者
public @interface MyRpcService {

}


@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
// 服务消费者
public @interface MyRpcReference {

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

3、定义接口

public interface UserService {

	// 根据UserId查找用户
	R<UserResp> findById(Long userId);
}
  • 1
  • 2
  • 3
  • 4
  • 5

4、实现接口

加上自定义注解@MyRpcService,后续需要扫描这些实现类,并暴露服务。

@MyRpcService
public class UserServiceImpl implements UserService{

	@Override
	public R<UserResp> findById(Long userId) {
		UserResp userResp = new UserResp();
		userResp.setId(userId);
		userResp.setName("张三");
		userResp.setPwd("root@abc");
		return R.ok(userResp);
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

5、暴露服务并监听处理请求

应用程序启动后,从Spring的IOC容器中,找到加了@MyRpcService注解的服务,并暴露出去。

/**
 * @author: pch
 * @description: 程序启动,暴露Service服务
 * @date: 2020/10/13
 **/
@Component
public class ProviderListener implements ApplicationListener<ApplicationStartedEvent> {

	@Override
	public void onApplicationEvent(ApplicationStartedEvent applicationStartedEvent) {
		ConfigurableApplicationContext context = applicationStartedEvent.getApplicationContext();
		for (Object bean : context.getBeansWithAnnotation(MyRpcService.class).values()) {
			ProviderHolder.addService(bean);
		}
		try {
			ProviderHolder.start();
		} catch (Exception e) {
			e.printStackTrace();
		}
		System.err.println("provider...启动");
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

暴露服务,处理消费者请求的核心代码

/**
 * @author: pch
 * @description: 服务持有者
 * @date: 2020/10/13
 **/
public class ProviderHolder {
	// 缓存所有的服务提供者
	private static final Map<String, Provider> SERVICES = new ConcurrentHashMap<>();
	// 起一个线程池,处理消费者的请求
	private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();

	// 添加服务
	public static void addService(Object bean) {
		Class<?> beanClass = bean.getClass();
		String interfaceName = beanClass.getInterfaces()[0].getName();
		SERVICES.put(interfaceName, new Provider(bean));
	}

	/**
	 * 启动服务
	 * @throws Exception
	 */
	public static void start() throws Exception {
		if (SERVICES.isEmpty()) {
			return;
		}
		// 开启ServerSocket,端口3333,监听消费者发起的请求。
		ServerSocket serverSocket = new ServerSocket(3333);
		while (true) {
			// 当有请求到达,提交一个任务到线程池
			Socket socket = serverSocket.accept();
			EXECUTOR_SERVICE.submit(() -> {
				try {
					// 从网络IO中读取消费者发送的参数
					Object o = new ObjectInputStream(socket.getInputStream()).readObject();
					if (o instanceof RpcMessage) {
						RpcMessage message = (RpcMessage) o;
						// 找到消费者要调用的服务
						Provider provider = SERVICES.get(message.getInterfaceName());
						if (provider == null) {
							return;
						}
						// 利用反射调用服务
						Object result = provider.invoke(message.getMethodName(), message.getArgsType(), message.getArgs());
						OutputStream outputStream = socket.getOutputStream();
						// 将返回结果序列化为字节数组并通过Socket写回
						outputStream.write(ObjectUtil.serialize(result));
						outputStream.flush();
					}
				} catch (Exception e) {
					e.printStackTrace();
				}
			});
		}
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

6、生成RPC动态代理对象

/**
 * @author: pch
 * @description: 基于JDK动态代理生成代理对象,发起RPC调用
 * @date: 2020/10/13
 **/
public class RpcProxy implements InvocationHandler {
	private Object origin = new Object();

	@Override
	public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
		if (Object.class.equals(method.getDeclaringClass())) {
			return method.invoke(origin, args);
		}
		// 开启一个Socket
		Socket socket = new Socket("127.0.0.1", 3333);
		// 封装请求协议
		RpcMessage message = new RpcMessage();
		message.setInterfaceName(method.getDeclaringClass().getName());
		message.setMethodName(method.getName());
		message.setArgsType(method.getParameterTypes());
		message.setArgs(args);
		// 将请求参数序列化成字节数组通过网络IO写回
		OutputStream outputStream = socket.getOutputStream();
		outputStream.write(ObjectUtil.serialize(message));
		outputStream.flush();
		// 阻塞,等待服务端处理完毕返回结果
		Object o = new ObjectInputStream(socket.getInputStream()).readObject();
		// 返回给调用者
		return o;
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

7、消费者注入RPC动态代理对象

/**
 * @author: pch
 * @description: 注入加了@MyRpcReference注解的属性
 * @date: 2020/10/13
 **/
@Component
public class RpcBeanPostProcessor implements BeanPostProcessor {

	@Override
	public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
		Class<?> beanClass = bean.getClass();
		Field[] fields = ClassUtil.getDeclaredFields(beanClass);
		for (Field field : fields) {
			if (field.getAnnotation(MyRpcReference.class) == null) {
				continue;
			}
			Object proxy = Proxy.newProxyInstance(beanClass.getClassLoader(), new Class[]{field.getType()}, new RpcProxy());
			field.setAccessible(true);
			try {
				field.set(bean, proxy);
			} catch (IllegalAccessException e) {
				e.printStackTrace();
			}
		}
		return bean;
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

功能测试

核心代码写好了,那就可以开始测试功能是否符合预期了。

1、启动服务提供者
在这里插入图片描述
2、启动消费者,并发起一个请求
在这里插入图片描述

尾巴

基于篇幅原因,本文只是实现了RPC最基本最简单的功能,主要是理解RPC的思想。
当然,还有很多可以优化的点:

  1. Service暴露的所有方法缓存起来,每次调用再反射查找开销还是很大的。
  2. 使用Netty提升网络IO的通信性能。
  3. 连接池的引入。
  4. 注册中心的加入。
  5. 写回的数据没有包装协议。
  6. 数据格式的扩展,请求头的加入。

你可能感兴趣的文章:

  • AQS源码导读
  • Java锁的膨胀过程以及一致性哈希对锁膨胀的影响
  • ThreadLocal源码解析
  • CMS与三色标记算法
  • 大白话理解可达性分析算法
程序员小潘
微信公众号
专注于Java后端技术分享~
注:本文转载自blog.csdn.net的程序员小潘的文章"https://javap.blog.csdn.net/article/details/109141994"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top