百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

Netty实战篇-手写DubboRpc框架(netty4核心原理与手写)

ccwgpt 2025-04-01 16:20 22 浏览 0 评论

1. RPC 基本介绍

rpc是远程调用的一种行为,在数据传输过程中涉及到传输协议,http就是一种传输协议。

RPC(Remote Procedure Call)— 远程过程调用,是一个计算机通信协议。

  • 该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。
  • 两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样

常见的 RPC 框架有:

  • 阿里的Dubbo
  • google的gRPC
  • Go语言的rpc
  • Apache的thrift
  • Spring旗下的 Spring Cloud。

2. RPC 调用流程

说明:

  1. 服务消费方(client)以本地调用方式调用服务
  2. client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
  3. client stub 将消息进行编码并发送到服务端
  4. server stub 收到消息后进行解码
  5. server stub 根据解码结果调用本地的服务
  6. 本地服务执行并将结果返回给 server stub
  7. server stub 将返回导入结果进行编码并发送至消费方
  8. client stub 接收到消息并进行解码】
  9. 服务消费方(client)得到结果

RPC 的目标就是将 2-8 这些步骤都封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用

3. 实现 dubbo RPC(基于 Netty)

3.1 需求说明

  • dubbo 底层使用了 Netty 作为网络通讯框架,要求用 Netty 实现一个简单的 RPC 框架
  • 模仿 dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用 Netty 4.1.20

    io.netty
    netty-all
    4.1.20.Final

3.2 设计说明

  • 创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。
  • 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
  • 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用 Netty 请求提供者返回数据

3.3 代码实现

项目结构:

客户端启动器ClientBootstrap

public class ClientBootstrap {
    //定义协议头
    public static final String providerName = "HelloService#hello";
    
    public static void main(String[] args) throws InterruptedException {
        NettyClient client = new NettyClient();
        HelloService serviceProxy = (HelloService) client.getBean(HelloService.class, providerName);//拿到代理对象
        //        for (; ; ) {
        //调用客户端的方法
        //            Thread.sleep(2000);
        String result = serviceProxy.hello("阿昌来也");
        System.out.println("客户端调用服务端,结果为:" + result);
        //        }
    }
}

服务端启动器ServerBootstrap

public class ServerBootstrap {
    public static void main(String[] args) throws InterruptedException {
        NettyServer.startServer("127.0.0.1",7000);
    }
}

客户端初始化类NettyClient

public class NettyClient {
    //创建线程池
    private static ExecutorService executors = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    private static NettyClientHandler nettyClientHandler;

    /**
     * 编写方式使用代理模式,获取一个代理对象
     * @param serviceClass service类
     * @param providerName 协议头
     * @return 代理对象
     */
    public Object getBean(final Class serviceClass,final String providerName){
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                                      new Class[]{serviceClass},
                                      ((proxy, method, args) -> {
                                          //客户端每调用一次就会进入该代码块
                                          //第一次调用
                                          if (nettyClientHandler==null){
                                              startClient0("127.0.0.1",7000);
                                          }

                                          //设置要发送给服务器的信息
                                          //providerName协议头,args传入的参数
                                          nettyClientHandler.setParam(providerName+args[0]);
                                          return executors.submit(nettyClientHandler).get();
                                      }
                                      ));
    }

    //初始化客户端
    private static void startClient0(String ipaddr,Integer port){
        nettyClientHandler = new NettyClientHandler();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            Bootstrap clientBootstrap = bootstrap.group(workerGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY,true)
                .handler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(nettyClientHandler);
                    }
                });
            clientBootstrap.connect(ipaddr,port).sync();
        }catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

客户端处理器NettyClientHandler

public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
    private ChannelHandlerContext channelHandlerContext;//上下文
    private String result;//调用的返回结果
    private String param;//客户端调用方法时的参数

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    //收到服务器的数据后就会被调用
    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("channelRead");
        result = msg.toString();
        notify();//唤醒等待的线程
    }

    //与服务器连接成功后就会被调用
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelActive");
        channelHandlerContext = ctx;
    }

    //被代理对象调用,异步发送数据给服务器,然后阻塞,会等待被唤醒
    @Override
    public synchronized Object call() throws Exception {
        System.out.println("call1");
        channelHandlerContext.writeAndFlush(param);
        //进行wait阻塞
        wait();
        System.out.println("call2");

        return result;
    }


    //设置发送的数据
    void setParam(String msg){
        System.out.println("setParam");
        this.param = msg;
    }
    
}
复制代码

服务端初始化类NettyServer

public class NettyServer {

    public static  void startServer(String hostname,Integer port) throws InterruptedException {
        startServer0(hostname,port);
    }


    private static void startServer0(String hostname,Integer port) throws InterruptedException {
        NioEventLoopGroup boosGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            ServerBootstrap serverBootstrap = bootstrap.group(boosGroup, workerGroup)
                //                    .handler(new LoggingHandler())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new NettyServerHandler());
                    }
                });
            System.out.println("服务端启动成功....端口:"+port);
            ChannelFuture cf = serverBootstrap.bind(hostname, port).sync();
            cf.channel().closeFuture().sync();
        }finally {
            boosGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    
}
复制代码

服务端处理器NettyServerHandler

public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //获取客户端发送来的消息,并调用服务
        System.out.println("msg="+msg);

        //客户端想要调用服务器的api时,想要满足一定协议的要求才能调用
        //比如,我们这里要求,每次发送消息时,都必须要求以"HelloService#hello开头"
        if (msg.toString().startsWith("HelloService#hello")){
            String result = new HelloServiceImpl().hello(msg.toString().split("HelloService#hello")[1]);
            ctx.writeAndFlush(result);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

客户端接口的真正实现Impl:HelloServiceImpl

public class HelloServiceImpl implements HelloService {
    private static int count = 0;
    @Override
    public String hello(String message) {
        System.out.println("客户端发来的消息为:【"+message+"】");
        if (message!=null){
            return "你好客户端,服务端已经收到了消息"+"调用次数为:【"+(++count)+"】";
        }else {
            return "消息不能为空";
        }
    }
}

服务提供方 和 服务消费放 公共部分,约定的接口规范 HelloService

public interface HelloService {
    String hello(String message);
}




作者:hsfxuebao
链接:
https://juejin.cn/post/7102020346861060109


相关推荐

迈向群体智能 | 智源发布首个跨本体具身大小脑协作框架

允中发自凹非寺量子位|公众号QbitAI3月29日,智源研究院在2025中关村论坛“未来人工智能先锋论坛”上发布首个跨本体具身大小脑协作框架RoboOS与开源具身大脑RoboBrain,可实...

大模型对接微信个人号,极空间部署AstrBot机器人,万事不求百度

「亲爱的粉丝朋友们好啊!今天熊猫又来介绍好玩有趣的Docker项目了,喜欢的记得点个关注哦!」引言前两天熊猫发过一篇关于如何在极空间部署AstrBot并对接QQ消息平台的文章,不过其实QQ现在已经很少...

Seata,让分布式事务不再是难题!实战分享带你领略Seata的魅力!

终身学习、乐于分享、共同成长!前言Seata是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata将为用户提供了AT、TCC、SAGA和XA事务模式,为用户打造一站式的...

常见分布式事务解决方案(分布式事务解决的问题)

1.两阶段提交(2PC)原理:分为准备阶段(协调者询问参与者是否可提交)和提交阶段(协调者根据参与者反馈决定提交或回滚)。优点:强一致性,适用于数据库层(如XA协议)。缺点:同步阻塞:所有参与者阻塞...

分布式事务:从崩溃到高可用,程序员必须掌握的实战方案!

“支付成功,但订单状态未更新!”、“库存扣减后,交易却回滚了!”——如果你在分布式系统中踩过这些“天坑”,这篇文章就是你的救命稻草!本文将手把手拆解分布式事务的核心痛点和6大主流解决方案,用代码实战+...

谈谈对分布式事务的一点理解和解决方案

分布式事务首先,做系统拆分的时候几乎都会遇到分布式事务的问题,一个仿真的案例如下:项目初期,由于用户体量不大,订单模块和钱包模块共库共应用(大war包时代),模块调用可以简化为本地事务操作,这样做只要...

一篇教你通过Seata解决分布式事务问题

1 Seata介绍Seata是由阿里中间件团队发起的开源分布式事务框架项目,依赖支持本地ACID事务的关系型数据库,可以高效并且对业务0侵入的方式解决微服务场景下面临的分布式事务问题,目前提供AT...

Seata分布式事务详解(原理流程及4种模式)

Seata分布式事务是SpringCloudAlibaba的核心组件,也是构建分布式的基石,下面我就全面来详解Seata@mikechen本篇已收于mikechen原创超30万字《阿里架构师进阶专题合...

分布式事务最终一致性解决方案有哪些?MQ、TCC、saga如何实现?

JTA方案适用于单体架构多数据源时实现分布式事务,但对于微服务间的分布式事务就无能为力了,我们需要使用其他的方案实现分布式事务。1、本地消息表本地消息表的核心思想是将分布式事务拆分成本地事务进行处理...

彻底掌握分布式事务2PC、3PC模型(分布式事务视频教程)

原文:https://mp.weixin.qq.com/s/_zhntxv07GEz9ktAKuj70Q作者:马龙台工作中使用最多的是本地事务,但是在对单一项目拆分为SOA、微服务之后,就会牵扯出分...

Seata分布式事务框架关于Annotation的SAGA模式分析

SAGAAnnotation是ApacheSeata版本2.3.0中引入的功能,它提供了一种使用Java注解而不是传统的JSON配置或编程API来实现SAGA事务模式的声明...

分布式事务,原理简单,写起来全是坑

今天我们就一起来看下另一种模式,XA模式!其实我觉得seata中的四种不同的分布式事务模式,学完AT、TCC以及XA就够了,Saga不好玩,而且长事务本身就有很多问题,也不推荐使用。S...

内存空间节约利器redis的bitmap(位图)应用场景有哪些你知道吗

在前面我们分享过一次Redis常用数据结构和使用场景,文章对Redis基本使用做了一个简单的API说明,但是对于其中String类型中的bitmap(位图)我们需要重点说明一下,因为他的作用真的不容忽...

分布式事务原理详解(图文全面总结)

分布式事务是非常核心的分布式系统,也是大厂经常考察对象,下面我就重点详解分布式事务及原理实现@mikechen本文作者:陈睿|mikechen文章来源:mikechen.cc分布式事务分布式事务指的是...

大家平时天天说的分布式系统到底是什么东西?

目录从单块系统说起团队越来越大,业务越来越复杂分布式出现:庞大系统分而治之分布式系统所带来的技术问题一句话总结:什么是分布式系统设计和开发经验补充说明:中间件系统及大数据系统前言现在有很多Java技术...

取消回复欢迎 发表评论: