2 Star 10 Fork 3

Rotten Code/netty

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

一、手动实现RPC远程本地调用

1.0 版本

1.1 实现思路:

​ 基于netty的网络传输实现远程本地调用,无感知远程调用。

​ 实现过程:

当前版本的实现思路:

该版本涉及:

  • netty
  • JDK动态代理。
  • Spring 的扫描原理(@CompanScan,@MapperScan 同理)
  • Spring的事件监听与处理,
  • 反射,SpringListener 配置,负债均衡随机算法。
  • 通过Spring注解形式启用RPC,接口暴露扫描,Rpc接口注解引用。
  • 等等

​ 该版本实现到的功能:服务注册、服务发现、服务断开注册中心自动剔除服务、多个提供者可对应一个消费者。

​ 注册中心针对服务提供者的数据保存目前保存到本地文件中。

​ 提供者引入咱们自己写的RPC框架,项目启动,加上 @EnableRpc 启动咱们的RPC框架应用,启动的时候,在上下文初始化的时候提供者向注册中心注册服务。 接着扫描咱们自己的 @RpcService 将其注册到Spring中。在Bean初始化之后将其接口暴露出去。

​ 消费者引入接口,使用@RpcRefence 注解,消费者端不需要实现接口。项目启动也加上@EnableRpc。开始处理@RpcRefence 依赖注入。

​ 消费者端调用的时候,在代理类invoke的方法去注册中心拉取可用服务,使用随机算法获得其中一个服务地址。通过netty连接传输我们的协议对象。

​ 提供服务方收到连接请求的时候,读取协议参数,解析出接口名、参数类型、参数值通过反射执行该对应的方法,获取结果值。将结果再次回传给消费者。

1.2 开发实现

​ 基于SpringBoot进行开发,尽可能实现接近Dubbo

项目结构图:

  • YJRpc 父项目

    • server-consumer-one: 消费者一个
    • server-provider-one: 提供者一个
    • service-api: 我们的API接口(提供者实现,消费者不需要)
    • service-common: 公共模块
    • service-frame: RPC核心模块
    • service-register-center: 注册中心
  • service-frame:

    • 注解:

      • @EnableYJRpc

        /**
         * RPC启动注解,加载配置类
         * @author 永健
         * @since 2022-01-18 14:58
         */
        @Target({ElementType.TYPE})
        @Retention(RetentionPolicy.RUNTIME)
        @Documented
        @Import({YJRpcServiceScanRegistrar.class, RegisterConfig.class, BeanConfig.class})
        public @interface EnableYJRpc { }
        
      • @RpcReference

        /**
         * 调用方消费者使用
         * @author 永健
         * @since 2022-01-18 17:09
         */
        @Target({ElementType.FIELD})
        @Retention(RetentionPolicy.RUNTIME)
        @Documented
        public @interface RpcReference {
            String name() default "";
        }
      • @RpcService

        /**
         * 暴露服务
         * 将其该注解的类接口发布出去
         * @author 永健
         * @since 2022-01-18 15:28
         */
        @Target({ElementType.TYPE})
        @Retention(RetentionPolicy.RUNTIME)
        @Documented
        public @interface RpcService { }
    • 服务注册:在我们的提供者启动的时候,当上下文初始化好了,我们进行事件监听(ApplicationContextInitializedEvent)。在这个时候我们开始注册服务,并且启动netty服务

      /**
       * 在上下文初始化的时候注册服务
       * @author 永健
       * @since 2022-01-19 10:23
       */
      public class ApplicationStartingListener implements ApplicationListener<ApplicationContextInitializedEvent> {
      
          @Override
          public void onApplicationEvent(ApplicationContextInitializedEvent applicationContextInitializedEvent) {
              ConfigurableEnvironment env = applicationContextInitializedEvent.getApplicationContext().getEnvironment();
              // 配置文件里读取
              String host = env.getProperty("spring.rpc.provider.host");
              String applicationName = env.getProperty("spring.application.name");
              if ("".equals(host) || host == null) {
                  return;
              }
              String[] split = host.split(":");
              InetSocketAddress inetSocketAddress = new InetSocketAddress(split[0], Integer.parseInt(split[1]));
              // 启动netty,提供者服务启动,注册服务
              RegisterConfig.register(applicationName, host);
              new NettyServer().start(inetSocketAddress);
          }
      }

    • 接口暴露:接口暴露在@RpcService的Bean初始化好后,将其暴露出去

      /**
       * @author 永健
       * @since 2022-01-18 16:03
       */
      public class RpcAnnotationBean implements BeanPostProcessor {
      
          /**
           * bean初始化好后,暴露出去,将其注册到注册中心
           *
           * @param bean
           * @param beanName
           *
           * @throws BeansException
           */
          @Override
          public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
              RpcService service = bean.getClass().getAnnotation(RpcService.class);
              if (service != null) {
                  // 暴露接口
                  RegisterConfig.export(bean, beanName);
              }
              return bean;
          }
      }

    • 接口剔除: 当我们的提供者服务不可用时,我们将其从注册中心剔除

      /**
       * 在上下文关闭的时候(服务停止)
       *
       * @author 永健
       * @since 2022-01-19 10:23
       */
      public class ApplicationCloseListener implements ApplicationListener<ContextClosedEvent> {
      
          @Override
          public void onApplicationEvent(ContextClosedEvent contextClosedEvent) {
              // 移除注册中心的服务
              RegisterConfig.logout();
      
              // 关闭netty
              NettyServer.closeServer();
          }
      }
    • 远程调用实现(核心部分):远程调用的实现:调用方使用的是代理对象(接口),通过invoke方法,在此方法中,去注册中心拉取服务,获得服务地址后,通过netty传输该接口对应的协议参数,提供者拿到协议参数后,通过反射调用该接口的实现方法获得返回结果,将结果又返回给消费者。这就是全部过程,这个对于使用方来说是无感知的,即RPC远程本地调用。

      • 消费者使用费例子:

        • api接口

        • 消费者: 没有实现该api接口

        • 提供者:提供者分别实现api接口

      • 消费者的invoke方法

        /**
         * Rpc调用方代理对象
         * @author 永健
         * @since 2022-01-18 17:11
         */
        public class RpcReferenceProxy implements InvocationHandler {
        
            private Class<?> aClass;
        
            private String beanName;
        
            public RpcReferenceProxy(Class<?> aClass, String beanName) {
                this.beanName = beanName;
                this.aClass = aClass;
            }
        
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                ResponseHandler responseHandler = new ResponseHandler();
                EventLoopGroup group = new NioEventLoopGroup(1);
                Bootstrap bootstrap =
                        new Bootstrap().group(group).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // 心跳监测机制
                        socketChannel.pipeline().addLast(new IdleStateHandler(3, 3, 3, TimeUnit.SECONDS));
                        // 编码处理器
                        socketChannel.pipeline().addLast("encoder", new ObjectEncoder());
                        // 解码处理器
                        socketChannel.pipeline().addLast("decoder", new ObjectDecoder(1024,
                                ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
                        // 消息处理器
                        socketChannel.pipeline().addLast(responseHandler);
                    }
                });
        
                // 向注册中心拉取服务列表
                String ip = RegisterConfig.loadBalanceIp(this.beanName);
                checkIsNoExits(ip);
        
                // 连接提供者
                ChannelFuture connect = bootstrap.connect(ip.split(":")[0], Integer.parseInt(ip.split(":")[1])).sync();
                Channel channel = connect.channel();
        
        
                // 包装协议对象
                RpcProtocol rpcProtocol = new RpcProtocol();
                rpcProtocol.setMethodName(method.getName());
                rpcProtocol.setMethodParams(args);
                rpcProtocol.setMethodParamsType(method.getParameterTypes());
                rpcProtocol.setBeanName(this.beanName);
                // 传输
                channel.writeAndFlush(rpcProtocol);
                channel.closeFuture().sync();
                return responseHandler.getResult();
            }
        }

1.3 运行效果

1、启动提供者

打包多一个提供者启动:

启动两个提供者:一个端口是8082 一个是8081

2、启动消费者

消费者在成功启动的时候就会直接调用接口。

/**
 * @author 永健
 * @since 2022-01-18 17:27
 */
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
@EnableYJRpc
@RestController
@RequestMapping("/consumer")
public class App {

    @Resource
    private TestService testService;

    @GetMapping
    public Object ok(){
        return testService.test();
    }

    public static void main(String[] args) {
        SpringApplication.run(App.class,args);
    }
}
/**
 * @author 永健
 * @since 2022-01-18 17:27
 */
@Service
public class TestService implements ApplicationRunner {

    private static final Logger log = LoggerFactory.getLogger(TestService.class);

    @RpcReference
    IHelloService helloService;

    @RpcReference
    IYJRpcService yjRpcService;


    public YJRpc test() {
        return yjRpcService.getYjRpc();
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("{}",yjRpcService.getYjRpc());
       log.info(helloService.getHello());
       log.info(helloService.sayHello("你好"));
    }
}

调用结果:

我们可以通过http不断调用 接口 /consumer 来查看 随机的负载均衡的情况,也可以突然关掉其中一个服务的情况去测试查看。

第一次调用:

查看我们的日志:也是在随机的变化中

  • 从中你可以了解到Spring的强大之处,Spring的更多扩展接口,开闭原则运用的微妙微翘

  • 从中你还可以了解到 jdk代理

  • 从中你还可以了解到反射

  • 从中你还可以了学习到netty的使用

  • 从中你还可以 身临其境的感受到 Dubbo 的RPC原理过程,服务暴露,服务注册(与其有出入,本编只是学习、实践、感受其原理,真正感受到什么叫RPC远程调用如同本地一样调用的神秘面纱

下次更新注册中心:zookeper尽请期待

空文件

简介

仿照dubbo手动实现RPC远程本地无感知调用。基于netty + SpringBoot 展开 收起
Java
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Java
1
https://gitee.com/lyj08/netty.git
git@gitee.com:lyj08/netty.git
lyj08
netty
netty
master

搜索帮助

Cb406eda 1850385 E526c682 1850385