百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

基于 grpc 手撸一个 RPC 框架(rpc框架选型)

ccwgpt 2024-10-13 01:31 42 浏览 0 评论

1 手撸RPC框架的原因

常见的 RPC 框架有: 比较知名的如阿里的Dubbo、google的gRPC、Go语言的rpcx。 但只有gRPC支持streaming模式。

gRPC 是在任何环境中运行的现代开源高性能 RPC 框架,基于 HTTP/2 标准设计,带来诸如双向流、流控、头部压缩、单 TCP 连接上的多复用请求等特。这些特性使得其在移动设备上表现更好,更省电和节省空间占用。

为了理解如何基于gRPC框架,开发server/client程序,所以基于grpc手撸一个RPC框架

源代码存放在gitee上,欢迎大家围观: https://gitee.com/cloudcoder/grpc-examples

1.1 gRPC有什么好处

gRPC和restful API都提供了一套通信机制,用于server/client模型通信,而且它们都使用http作为底层的传输协议(严格地说, gRPC使用的http2.0,而restful api则不一定)。

gRPC还是有些特有的优势,如下:

  1. 通过protobuf定义协议接口,有更加严格的接口约束条件。
  2. protobuf定义的协议在传输时数据被序列化为二进制编码,大幅减少需要传输的数据量,节省带宽并性能。
  3. gRPC支持流式通信(理论上通过http2.0就可以使用streaming模式, 但是通常web服务的restful api很少这么用【WEB服务基于HTTP1.1】)
  4. 支持多种语言

2. RPC框架

2.1 RPC介绍

RPC就是要像调用本地的函数一样去调远程函数,RPC过程如下:

2.1.1 RPC远程调用过程

  1. 客户机调用一个称为客户机存根【client stub】的本地过程。对于客户机进程来说,这似乎是实际的过程,因为它是一个常规的本地过程。client stub将参数打包到远程过程(这可能涉及将它们转换为标准格式),并构建一个或多个网络消息。将参数打包到网络消息中称为封送处理【marshaling 】,并要求将所有数据序列化【serializing】为一种扁平的字节数组格式。
  2. 网络消息由client stub发送到远程系统(通过使用sockets interfaces对本地内核【local kernel】的系统调用)。
  3. 网络消息由内核通过某种协议(无连接或面向连接)传输到远程系统
  4. 服务器存根【server stub】(有时称为骨架 skeleton)接收服务器上的消息。它从网络消息中unmarshals出参数,并在必要时将它们从标准网络格式转换为特定于机器的形式。
  5. server stub调用服务器函数(对客户机来说,服务器函数是远程过程),并将从客户机接收到的参数传递给它
  6. 当服务器函数完成时,它将带着返回值返回到server stub
  7. 如果有必要,server stub将返回值转换为一个或多个网络消息发送给client stub
  8. 消息通过网络发送回client stub
  9. client stub从本地内核读取消息
  10. 然后,client stub将结果返回给客户机函数,如果需要,将它们从网络消息转换为本地的对象

2.1.2 RPC的好处

RPC的主要好处有两方面。

  • 程序员现在可以使用过程调用语义来调用远程函数并获取响应。
  • 编写分布式应用程序简化了,因为RPC将所有网络代码隐藏在存根函数中。应用程序不需要担心诸如套接字、端口号以及数据转换和解析等细节。

在OSI参考模型上,RPC跨越会话层和表示层(第5层和第6层)。

2.2 手撸RPC框架实现的功能

  • 基于NettyServerBuilder、NettyChannelBuilder,针对RPC的服务端、客户端进行了封装
  • 服务端支持Authentication Interceptor
  • 服务端目前支持心跳服务、简单的hello handler、client管理handler(提供client实例属性上报、keepalive接口)、消息上报handler, 通过ServiceLoader发现这些handler
  • 客户端使用ServiceManager管理所有BootService, 并在启动时,ServiceLoader发现这些Service,依次调用所有BootService实例的prepare、startup、onComplete方法,在BootService实例(除GRPCChannelManager)外,将自己作为GRPCChannelListener注册到GRPCChannelManager中
  • 客户端使用GRPCChannelManager管理GRPCChannel【针对grpc的ManagedChannel进行封装】,并启动一个线程,根据channel状态检查间隔时间定期检查是否需要重连
  • 客户端支持配置多个服务端,如果配置多个,则client随机绑定一个
  • 客户端连接成功后,会通知所有注册的服务,标识channel连接成功
  • 服务使用channel,调用rpc服务

2.2.1 服务端测试示例

public class ServerMain {
    public static void main(String[] args) throws Exception {
        ServiceServer server = new ServiceServer("localhost",7077);
        server.init();
        //通过SPI添加grpc handler
        HandlerManager.INSTANCE.registerHandlers(server);
        server.startServer();
        server.blockUntilShutdown();
    }
}

2.2.2 客户端测试示例


public class ClientMain {
    private static final Logger LOGGER = LoggerFactory.getLogger(ClientMain.class);

    public static void main(String[] args) throws Exception {
        try {
            //通过SPI绑定服务
            ServiceManager.INSTANCE.boot();
        } catch (Exception e) {
            LOGGER.error("boot failure.", e);
        }

        Runtime.getRuntime()
                .addShutdownHook(
                        new Thread(ServiceManager.INSTANCE::shutdown, "service shutdown thread"));

        HelloServiceClient helloServiceClient = ServiceManager.INSTANCE.findService(HelloServiceClient.class);
        for (int i = 0; i < 10; i++) {
            helloServiceClient.greet("张三" + i);
        }

        MsgReportServiceClient msgReportServiceClient = ServiceManager.INSTANCE.findService(MsgReportServiceClient.class);
        MsgCollection.Builder msgCollection = MsgCollection.newBuilder();
        for(int i=0;i<5;i++){
            MsgObject.Builder builder = MsgObject.newBuilder();
            builder.setId(i+"");
            builder.setMsg("report msg "+i);
            builder.setType(i+"");
            msgCollection.addMsgs(builder);
        }
        msgReportServiceClient.reportInSync(msgCollection.build());

        List<MsgObject> msgs = Lists.newArrayList();
        for(int i=10;i<15;i++){
            MsgObject.Builder builder = MsgObject.newBuilder();
            builder.setId(i+"");
            builder.setMsg("report msg "+i);
            builder.setType(i+"");
            msgs.add(builder.build());
        }
        msgReportServiceClient.reportBySteam(msgs);

        TimeUnit.SECONDS.sleep(1000);
    }
}

2.2.3 客户端启动时序图

客户端启动时序图如下:

2.2.4 服务端启动时序图

3. 定义rpc协议

3.1 定义rcp协议

在project的src/main/proto 和 src/test/proto,根据需要创建proto文件,如健康检查proto

syntax = "proto3";

package grpc.health.v1;

message HealthCheckRequest {
    string service = 1;
}

message HealthCheckResponse {
    enum ServingStatus {
        UNKNOWN = 0;
        SERVING = 1;
        NOT_SERVING = 2;
    }
    ServingStatus status = 1;
}

service Health {
    rpc Check (HealthCheckRequest) returns (HealthCheckResponse);
}

3.2 根据协议生成代码

使用的protoc.version版本是: 3.12.0

参考:https://www.grpc.io/docs/languages/java/generated-code/

在pom.xml文件中增加如下内容,则在compile 或 package时,protobuf-maven-plugin会根据定义的proto文件,生成代码


<build>
    <extensions>
      <extension>
        <groupId>kr.motd.maven</groupId>
        <artifactId>os-maven-plugin</artifactId>
        <version>1.6.2</version>
      </extension>
    </extensions>
    <plugins>
      <plugin>
        <groupId>org.xolstice.maven.plugins</groupId>
        <artifactId>protobuf-maven-plugin</artifactId>
        <version>0.6.1</version>
        <configuration>
          <protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
          <pluginId>grpc-java</pluginId>
          <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
        </configuration>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>compile-custom</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>

默认情况下,生成的代码在:

  • target/protobuf/grpc-java 存放生成的rpc服务,类以Grpc结尾
  • target/protobuf/grpc 存放生成的消息对象, 类名是消息的名称、消息的名称+OrBuilder

相关推荐

十分钟让你学会LNMP架构负载均衡(impala负载均衡)

业务架构、应用架构、数据架构和技术架构一、几个基本概念1、pv值pv值(pageviews):页面的浏览量概念:一个网站的所有页面,在一天内,被浏览的总次数。(大型网站通常是上千万的级别)2、u...

AGV仓储机器人调度系统架构(agv物流机器人)

系统架构层次划分采用分层模块化设计,分为以下五层:1.1用户接口层功能:提供人机交互界面(Web/桌面端),支持任务下发、实时监控、数据可视化和报警管理。模块:任务管理面板:接收订单(如拣货、...

远程热部署在美团的落地实践(远程热点是什么意思)

Sonic是美团内部研发设计的一款用于热部署的IDEA插件,本文其实现原理及落地的一些技术细节。在阅读本文之前,建议大家先熟悉一下Spring源码、SpringMVC源码、SpringBoot...

springboot搭建xxl-job(分布式任务调度系统)

一、部署xxl-job服务端下载xxl-job源码:https://gitee.com/xuxueli0323/xxl-job二、导入项目、创建xxl_job数据库、修改配置文件为自己的数据库三、启动...

大模型:使用vLLM和Ray分布式部署推理应用

一、vLLM:面向大模型的高效推理框架1.核心特点专为推理优化:专注于大模型(如GPT-3、LLaMA)的高吞吐量、低延迟推理。关键技术:PagedAttention:类似操作系统内存分页管理,将K...

国产开源之光【分布式工作流调度系统】:DolphinScheduler

DolphinScheduler是一个开源的分布式工作流调度系统,旨在帮助用户以可靠、高效和可扩展的方式管理和调度大规模的数据处理工作流。它支持以图形化方式定义和管理工作流,提供了丰富的调度功能和监控...

简单可靠高效的分布式任务队列系统

#记录我的2024#大家好,又见面了,我是GitHub精选君!背景介绍在系统访问量逐渐增大,高并发、分布式系统成为了企业技术架构升级的必由之路。在这样的背景下,异步任务队列扮演着至关重要的角色,...

虚拟服务器之间如何分布式运行?(虚拟服务器部署)

  在云计算和虚拟化技术快速发展的今天,传统“单机单任务”的服务器架构早已难以满足现代业务对高并发、高可用、弹性伸缩和容错容灾的严苛要求。分布式系统应运而生,并成为支撑各类互联网平台、企业信息系统和A...

一文掌握 XXL-Job 的 6 大核心组件

XXL-Job是一个分布式任务调度平台,其核心组件主要包括以下部分,各组件相互协作实现高效的任务调度与管理:1.调度注册中心(RegistryCenter)作用:负责管理调度器(Schedule...

京东大佬问我,SpringBoot中如何做延迟队列?单机与分布式如何做?

京东大佬问我,SpringBoot中如何做延迟队列?单机如何做?分布式如何做呢?并给出案例与代码分析。嗯,用户问的是在SpringBoot中如何实现延迟队列,单机和分布式环境下分别怎么做。这个问题其实...

企业级项目组件选型(一)分布式任务调度平台

官网地址:https://www.xuxueli.com/xxl-job/能力介绍架构图安全性为提升系统安全性,调度中心和执行器进行安全性校验,双方AccessToken匹配才允许通讯;调度中心和执...

python多进程的分布式任务调度应用场景及示例

多进程的分布式任务调度可以应用于以下场景:分布式爬虫:importmultiprocessingimportrequestsdefcrawl(url):response=re...

SpringBoot整合ElasticJob实现分布式任务调度

介绍ElasticJob是面向互联网生态和海量任务的分布式调度解决方案,由两个相互独立的子项目ElasticJob-Lite和ElasticJob-Cloud组成。它通过弹性调度、资源管控、...

分布式可视化 DAG 任务调度系统 Taier 的整体流程分析

Taier作为袋鼠云的开源项目之一,是一个分布式可视化的DAG任务调度系统。旨在降低ETL开发成本,提高大数据平台稳定性,让大数据开发人员可以在Taier直接进行业务逻辑的开发,而不用关...

SpringBoot任务调度:@Scheduled与TaskExecutor全面解析

一、任务调度基础概念1.1什么是任务调度任务调度是指按照预定的时间计划或特定条件自动执行任务的过程。在现代应用开发中,任务调度扮演着至关重要的角色,它使得开发者能够自动化处理周期性任务、定时任务和异...

取消回复欢迎 发表评论: