百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

Netty实战篇-手写DubboRpc框架(netty4核心原理与手写)

ccwgpt 2025-04-01 16:20 31 浏览 0 评论

1. RPC 基本介绍

rpc是远程调用的一种行为,在数据传输过程中涉及到传输协议,http就是一种传输协议。

RPC(Remote Procedure Call)— 远程过程调用,是一个计算机通信协议。

  • 该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。
  • 两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样

常见的 RPC 框架有:

  • 阿里的Dubbo
  • google的gRPC
  • Go语言的rpc
  • Apache的thrift
  • Spring旗下的 Spring Cloud。

2. RPC 调用流程

说明:

  1. 服务消费方(client)以本地调用方式调用服务
  2. client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
  3. client stub 将消息进行编码并发送到服务端
  4. server stub 收到消息后进行解码
  5. server stub 根据解码结果调用本地的服务
  6. 本地服务执行并将结果返回给 server stub
  7. server stub 将返回导入结果进行编码并发送至消费方
  8. client stub 接收到消息并进行解码】
  9. 服务消费方(client)得到结果

RPC 的目标就是将 2-8 这些步骤都封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用

3. 实现 dubbo RPC(基于 Netty)

3.1 需求说明

  • dubbo 底层使用了 Netty 作为网络通讯框架,要求用 Netty 实现一个简单的 RPC 框架
  • 模仿 dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用 Netty 4.1.20

    io.netty
    netty-all
    4.1.20.Final

3.2 设计说明

  • 创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。
  • 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
  • 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用 Netty 请求提供者返回数据

3.3 代码实现

项目结构:

客户端启动器ClientBootstrap

public class ClientBootstrap {
    //定义协议头
    public static final String providerName = "HelloService#hello";
    
    public static void main(String[] args) throws InterruptedException {
        NettyClient client = new NettyClient();
        HelloService serviceProxy = (HelloService) client.getBean(HelloService.class, providerName);//拿到代理对象
        //        for (; ; ) {
        //调用客户端的方法
        //            Thread.sleep(2000);
        String result = serviceProxy.hello("阿昌来也");
        System.out.println("客户端调用服务端,结果为:" + result);
        //        }
    }
}

服务端启动器ServerBootstrap

public class ServerBootstrap {
    public static void main(String[] args) throws InterruptedException {
        NettyServer.startServer("127.0.0.1",7000);
    }
}

客户端初始化类NettyClient

public class NettyClient {
    //创建线程池
    private static ExecutorService executors = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    private static NettyClientHandler nettyClientHandler;

    /**
     * 编写方式使用代理模式,获取一个代理对象
     * @param serviceClass service类
     * @param providerName 协议头
     * @return 代理对象
     */
    public Object getBean(final Class serviceClass,final String providerName){
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                                      new Class[]{serviceClass},
                                      ((proxy, method, args) -> {
                                          //客户端每调用一次就会进入该代码块
                                          //第一次调用
                                          if (nettyClientHandler==null){
                                              startClient0("127.0.0.1",7000);
                                          }

                                          //设置要发送给服务器的信息
                                          //providerName协议头,args传入的参数
                                          nettyClientHandler.setParam(providerName+args[0]);
                                          return executors.submit(nettyClientHandler).get();
                                      }
                                      ));
    }

    //初始化客户端
    private static void startClient0(String ipaddr,Integer port){
        nettyClientHandler = new NettyClientHandler();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            Bootstrap clientBootstrap = bootstrap.group(workerGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY,true)
                .handler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(nettyClientHandler);
                    }
                });
            clientBootstrap.connect(ipaddr,port).sync();
        }catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

客户端处理器NettyClientHandler

public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
    private ChannelHandlerContext channelHandlerContext;//上下文
    private String result;//调用的返回结果
    private String param;//客户端调用方法时的参数

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    //收到服务器的数据后就会被调用
    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("channelRead");
        result = msg.toString();
        notify();//唤醒等待的线程
    }

    //与服务器连接成功后就会被调用
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelActive");
        channelHandlerContext = ctx;
    }

    //被代理对象调用,异步发送数据给服务器,然后阻塞,会等待被唤醒
    @Override
    public synchronized Object call() throws Exception {
        System.out.println("call1");
        channelHandlerContext.writeAndFlush(param);
        //进行wait阻塞
        wait();
        System.out.println("call2");

        return result;
    }


    //设置发送的数据
    void setParam(String msg){
        System.out.println("setParam");
        this.param = msg;
    }
    
}
复制代码

服务端初始化类NettyServer

public class NettyServer {

    public static  void startServer(String hostname,Integer port) throws InterruptedException {
        startServer0(hostname,port);
    }


    private static void startServer0(String hostname,Integer port) throws InterruptedException {
        NioEventLoopGroup boosGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            ServerBootstrap serverBootstrap = bootstrap.group(boosGroup, workerGroup)
                //                    .handler(new LoggingHandler())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new NettyServerHandler());
                    }
                });
            System.out.println("服务端启动成功....端口:"+port);
            ChannelFuture cf = serverBootstrap.bind(hostname, port).sync();
            cf.channel().closeFuture().sync();
        }finally {
            boosGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    
}
复制代码

服务端处理器NettyServerHandler

public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //获取客户端发送来的消息,并调用服务
        System.out.println("msg="+msg);

        //客户端想要调用服务器的api时,想要满足一定协议的要求才能调用
        //比如,我们这里要求,每次发送消息时,都必须要求以"HelloService#hello开头"
        if (msg.toString().startsWith("HelloService#hello")){
            String result = new HelloServiceImpl().hello(msg.toString().split("HelloService#hello")[1]);
            ctx.writeAndFlush(result);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

客户端接口的真正实现Impl:HelloServiceImpl

public class HelloServiceImpl implements HelloService {
    private static int count = 0;
    @Override
    public String hello(String message) {
        System.out.println("客户端发来的消息为:【"+message+"】");
        if (message!=null){
            return "你好客户端,服务端已经收到了消息"+"调用次数为:【"+(++count)+"】";
        }else {
            return "消息不能为空";
        }
    }
}

服务提供方 和 服务消费放 公共部分,约定的接口规范 HelloService

public interface HelloService {
    String hello(String message);
}




作者:hsfxuebao
链接:
https://juejin.cn/post/7102020346861060109


相关推荐

一个基于.Net Core遵循Clean Architecture原则开源架构

今天给大家推荐一个遵循CleanArchitecture原则开源架构。项目简介这是基于Asp.netCore6开发的,遵循CleanArchitecture原则,可以高效、快速地构建基于Ra...

AI写代码翻车无数次,我发现只要提前做好这3步,bug立减80%

写十万行全是bug之后终于找到方法了开发"提示词管理助手"新版本那会儿,我差点被bug整崩溃。刚开始两周,全靠AI改代码架构,结果十万行程序漏洞百出。本来以为AI说没问题就稳了,结果...

OneCode低代码平台的事件驱动设计:架构解析与实践

引言:低代码平台的事件驱动范式在现代软件开发中,事件驱动架构(EDA)已成为构建灵活、松耦合系统的核心范式。OneCode低代码平台通过创新性的注解驱动设计,将事件驱动理念深度融入平台架构,实现了业务...

国内大厂AI插件评测:根据UI图生成Vue前端代码

在IDEA中安装大厂的AI插件,打开ruoyi增强项目:yudao-ui-admin-vue31.CodeBuddy插件登录腾讯的CodeBuddy后,大模型选择deepseek-v3,输入提示语:...

AI+低代码技术揭秘(二):核心架构

本文档介绍了为VTJ低代码平台提供支持的基本架构组件,包括Engine编排层、Provider服务系统、数据模型和代码生成管道。有关UI组件库和widget系统的信息,请参阅UI...

GitDiagram用AI把代码库变成可视化架构图

这是一个名为gitdiagram的开源工具,可将GitHub仓库实时转换为交互式架构图,帮助开发者快速理解代码结构。核心功能一键可视化:替换GitHubURL中的"hub...

30天自制操作系统:第六天:代码架构整理与中断处理

1.拆开bootpack.c文件。根据设计模式将对应的功能封装成独立的文件。2.初始化pic:pic(可编程中断控制器):在设计上,cpu单独只能处理一个中断。而pic是将8个中断信号集合成一个中断...

AI写代码越帮越忙?2025年研究揭露惊人真相

近年来,AI工具如雨后春笋般涌现,许多人开始幻想程序员的未来就是“对着AI说几句话”,就能轻松写出完美的代码。然而,2025年的一项最新研究却颠覆了这一期待,揭示了一个令人意外的结果。研究邀请了16位...

一键理解开源项目:两个自动生成GitHub代码架构图与说明书工具

一、GitDiagram可以一键生成github代码仓库的架构图如果想要可视化github开源项目:https://github.com/luler/reflex_ai_fast,也可以直接把域名替换...

5分钟掌握 c# 网络通讯架构及代码示例

以下是C#网络通讯架构的核心要点及代码示例,按协议类型分类整理:一、TCP协议(可靠连接)1.同步通信//服务器端usingSystem.Net.Sockets;usingTcpListene...

从复杂到优雅:用建造者和责任链重塑代码架构

引用设计模式是软件开发中的重要工具,它为解决常见问题提供了标准化的解决方案,提高了代码的可维护性和可扩展性,提升了开发效率,促进了团队协作,提高了软件质量,并帮助开发者更好地适应需求变化。通过学习和应...

低代码开发当道,我还需要学习LangChain这些框架吗?| IT杂谈

专注LLM深度应用,关注我不迷路前两天有位兄弟问了个问题:当然我很能理解这位朋友的担忧:期望效率最大化,时间用在刀刃上,“不要重新发明轮子”嘛。铺天盖地的AI信息轰炸与概念炒作,很容易让人浮躁与迷茫。...

框架设计并不是简单粗暴地写代码,而是要先弄清逻辑

3.框架设计3.框架设计本节我们要开发一个UI框架,底层以白鹭引擎为例。框架设计的第一步并不是直接撸代码,而是先想清楚设计思想,抽象。一个一个的UI窗口是独立的吗?不是的,...

大佬用 Avalonia 框架开发的 C# 代码 IDE

AvalonStudioAvalonStudio是一个开源的跨平台的开发编辑器(IDE),AvalonStudio的目标是成为一个功能齐全,并且可以让开发者快速使用的IDE,提高开发的生产力。A...

轻量级框架Lagent 仅需20行代码即可构建自己的智能代理

站长之家(ChinaZ.com)8月30日消息:Lagent是一个专注于基于LLM模型的代理开发的轻量级框架。它的设计旨在简化和提高这种模型下代理的开发效率。LLM模型是一种强大的工具,可以...

取消回复欢迎 发表评论: