基于netty的网络传输实现远程本地调用,无感知远程调用。
实现过程:
当前版本的实现思路:
该版本涉及:
该版本实现到的功能:服务注册、服务发现、服务断开注册中心自动剔除服务、多个提供者可对应一个消费者。
注册中心针对服务提供者的数据保存目前保存到本地文件中。
提供者引入咱们自己写的RPC框架,项目启动,加上 @EnableRpc 启动咱们的RPC框架应用,启动的时候,在上下文初始化的时候提供者向注册中心注册服务。 接着扫描咱们自己的 @RpcService 将其注册到Spring中。在Bean初始化之后将其接口暴露出去。
消费者引入接口,使用@RpcRefence 注解,消费者端不需要实现接口。项目启动也加上@EnableRpc。开始处理@RpcRefence 依赖注入。
消费者端调用的时候,在代理类invoke的方法去注册中心拉取可用服务,使用随机算法获得其中一个服务地址。通过netty连接传输我们的协议对象。
提供服务方收到连接请求的时候,读取协议参数,解析出接口名、参数类型、参数值通过反射执行该对应的方法,获取结果值。将结果再次回传给消费者。
基于SpringBoot进行开发,尽可能实现接近Dubbo
项目结构图:
YJRpc 父项目
service-frame:
注解:
@EnableYJRpc
/**
* RPC启动注解,加载配置类
* @author 永健
* @since 2022-01-18 14:58
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({YJRpcServiceScanRegistrar.class, RegisterConfig.class, BeanConfig.class})
public @interface EnableYJRpc { }
@RpcReference
/**
* 调用方消费者使用
* @author 永健
* @since 2022-01-18 17:09
*/
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RpcReference {
String name() default "";
}
@RpcService
/**
* 暴露服务
* 将其该注解的类接口发布出去
* @author 永健
* @since 2022-01-18 15:28
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RpcService { }
服务注册:在我们的提供者启动的时候,当上下文初始化好了,我们进行事件监听(ApplicationContextInitializedEvent)。在这个时候我们开始注册服务,并且启动netty服务
/**
* 在上下文初始化的时候注册服务
* @author 永健
* @since 2022-01-19 10:23
*/
public class ApplicationStartingListener implements ApplicationListener<ApplicationContextInitializedEvent> {
@Override
public void onApplicationEvent(ApplicationContextInitializedEvent applicationContextInitializedEvent) {
ConfigurableEnvironment env = applicationContextInitializedEvent.getApplicationContext().getEnvironment();
// 配置文件里读取
String host = env.getProperty("spring.rpc.provider.host");
String applicationName = env.getProperty("spring.application.name");
if ("".equals(host) || host == null) {
return;
}
String[] split = host.split(":");
InetSocketAddress inetSocketAddress = new InetSocketAddress(split[0], Integer.parseInt(split[1]));
// 启动netty,提供者服务启动,注册服务
RegisterConfig.register(applicationName, host);
new NettyServer().start(inetSocketAddress);
}
}
接口暴露:接口暴露在@RpcService的Bean初始化好后,将其暴露出去
/**
* @author 永健
* @since 2022-01-18 16:03
*/
public class RpcAnnotationBean implements BeanPostProcessor {
/**
* bean初始化好后,暴露出去,将其注册到注册中心
*
* @param bean
* @param beanName
*
* @throws BeansException
*/
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
RpcService service = bean.getClass().getAnnotation(RpcService.class);
if (service != null) {
// 暴露接口
RegisterConfig.export(bean, beanName);
}
return bean;
}
}
接口剔除: 当我们的提供者服务不可用时,我们将其从注册中心剔除
/**
* 在上下文关闭的时候(服务停止)
*
* @author 永健
* @since 2022-01-19 10:23
*/
public class ApplicationCloseListener implements ApplicationListener<ContextClosedEvent> {
@Override
public void onApplicationEvent(ContextClosedEvent contextClosedEvent) {
// 移除注册中心的服务
RegisterConfig.logout();
// 关闭netty
NettyServer.closeServer();
}
}
远程调用实现(核心部分):远程调用的实现:调用方使用的是代理对象(接口),通过invoke方法,在此方法中,去注册中心拉取服务,获得服务地址后,通过netty传输该接口对应的协议参数,提供者拿到协议参数后,通过反射调用该接口的实现方法获得返回结果,将结果又返回给消费者。这就是全部过程,这个对于使用方来说是无感知的,即RPC远程本地调用。
消费者使用费例子:
api接口
消费者: 没有实现该api接口
提供者:提供者分别实现api接口
消费者的invoke方法
/**
* Rpc调用方代理对象
* @author 永健
* @since 2022-01-18 17:11
*/
public class RpcReferenceProxy implements InvocationHandler {
private Class<?> aClass;
private String beanName;
public RpcReferenceProxy(Class<?> aClass, String beanName) {
this.beanName = beanName;
this.aClass = aClass;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
ResponseHandler responseHandler = new ResponseHandler();
EventLoopGroup group = new NioEventLoopGroup(1);
Bootstrap bootstrap =
new Bootstrap().group(group).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 心跳监测机制
socketChannel.pipeline().addLast(new IdleStateHandler(3, 3, 3, TimeUnit.SECONDS));
// 编码处理器
socketChannel.pipeline().addLast("encoder", new ObjectEncoder());
// 解码处理器
socketChannel.pipeline().addLast("decoder", new ObjectDecoder(1024,
ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
// 消息处理器
socketChannel.pipeline().addLast(responseHandler);
}
});
// 向注册中心拉取服务列表
String ip = RegisterConfig.loadBalanceIp(this.beanName);
checkIsNoExits(ip);
// 连接提供者
ChannelFuture connect = bootstrap.connect(ip.split(":")[0], Integer.parseInt(ip.split(":")[1])).sync();
Channel channel = connect.channel();
// 包装协议对象
RpcProtocol rpcProtocol = new RpcProtocol();
rpcProtocol.setMethodName(method.getName());
rpcProtocol.setMethodParams(args);
rpcProtocol.setMethodParamsType(method.getParameterTypes());
rpcProtocol.setBeanName(this.beanName);
// 传输
channel.writeAndFlush(rpcProtocol);
channel.closeFuture().sync();
return responseHandler.getResult();
}
}
打包多一个提供者启动:
启动两个提供者:一个端口是8082 一个是8081
消费者在成功启动的时候就会直接调用接口。
/**
* @author 永健
* @since 2022-01-18 17:27
*/
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
@EnableYJRpc
@RestController
@RequestMapping("/consumer")
public class App {
@Resource
private TestService testService;
@GetMapping
public Object ok(){
return testService.test();
}
public static void main(String[] args) {
SpringApplication.run(App.class,args);
}
}
/**
* @author 永健
* @since 2022-01-18 17:27
*/
@Service
public class TestService implements ApplicationRunner {
private static final Logger log = LoggerFactory.getLogger(TestService.class);
@RpcReference
IHelloService helloService;
@RpcReference
IYJRpcService yjRpcService;
public YJRpc test() {
return yjRpcService.getYjRpc();
}
@Override
public void run(ApplicationArguments args) throws Exception {
log.info("{}",yjRpcService.getYjRpc());
log.info(helloService.getHello());
log.info(helloService.sayHello("你好"));
}
}
调用结果:
我们可以通过http不断调用 接口 /consumer 来查看 随机的负载均衡的情况,也可以突然关掉其中一个服务的情况去测试查看。
第一次调用:
查看我们的日志:也是在随机的变化中
从中你可以了解到Spring的强大之处,Spring的更多扩展接口,开闭原则运用的微妙微翘
从中你还可以了解到 jdk代理
从中你还可以了解到反射
从中你还可以了学习到netty的使用
从中你还可以 身临其境的感受到 Dubbo 的RPC原理过程,服务暴露,服务注册(与其有出入,本编只是学习、实践、感受其原理,真正感受到什么叫RPC远程调用如同本地一样调用的神秘面纱)
下次更新注册中心:zookeper尽请期待
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。