百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

Golang实现gRPC的Proxy的原理(golang grpc原理)

ccwgpt 2024-10-13 01:32 75 浏览 0 评论

背景

gRPC是Google开始的一个RPC服务框架, 是英文全名为Google Remote Procedure Call的简称。

广泛的应用在有RPC场景的业务系统中,一些架构中将gRPC请求都经过一个gRPC服务代理节点或网关,进行服务的权限限制,限流,服务调用监控,增加请求统计等等诸多功能。

如下以Golang和gRPC为例,简要分析gRPC的转发原理。

gRPC Proxy原理

基本原理如下

  • 基于TCP启动一个gRPC代理服务端
  • 拦截gRPC框架的服务,能将gRPC请求的服务拦截到转发代理的一个函数中执行。
  • 接收客户端的请求,处理业务指标后转发给服务端。
  • 接收服务端的响应,处理业务指标后转发给客户端。

基于如上原理描述,如下图所示,gRPC的客户端将所有的请求都发给gRPC Server Proxy,这个代理网关实现请求转发。

将gRPC Client的请求流转发到gRPC 服务实现的节点上。并将服务处理结果响应返回给客户端。

在这个图中的转发需要回答如下几个问题

  • Proxy怎么知道哪些请求转发到哪些服务节点上,转发的依据是什么?
  • Proxy是否需要解析gRPC协议?
  • Proxy上没有服务的实现,该如何转发?

简化的gRPC服务处理流程

在回答如下问题之前,我们先简单的分析一下gRPC服务器的实现原理和流程。

  • 编写自己的服务实现,例子中以HelloWorld为例。
  • 把自己的服务实现HelloWorldServer注册到gRPC框架中
  • 创建一个TCP的服务端监听
  • 基于TCP监听启动一个gRPC服务
  • gRPC服务接收gRPC客户端的TCP请求
  • 解析gRPC的头部信息,找出服务名
  • 根据服务名找到第一步注册的服务和方法实现处理器handler
  • 处理函数执行
  • 返回处理结果

简化的注册服务处理器函数,启动gRPC服务,调用请求和执行数据流图如下所示:

详细的gRPC服务运行原理

第一步,定义和编写HelloWorld的IDL文件

syntax = "proto3";

package demoapi;


// HelloWorld Service
service HelloWorldService {
   rpc HelloWorld(HelloWorldRequest) returns (HelloWorldResponse){};
}

// Request message
message HelloWorldRequest {
   string  request = 1;
}

// Response message
message HelloWorldResponse {
   string respose = 1;
}

在这个简单的IDL中,定义了一个HelloWorldService的gRPC服务Service,这个服务中有一个HelloWorld方法Method。

第二步,编译IDL文件

将IDL的proto文件编译成helloworld.pb.go的gRPC代码文件。

生成的代码文件中,我们可以看到如下信息

// Hello World的客户端接口
type HelloWorldServiceClient interface {
    HelloWorld(ctx context.Context, in *HelloWorldRequest, opts ...grpc.CallOption) (*HelloWorldResponse, error)
}

// Hello World的服务端接口
type HelloWorldServiceServer interface {
    HelloWorld(context.Context, *HelloWorldRequest) (*HelloWorldResponse, error)
}

// HelloWorld的服务注册处理器函数Handler
func _HelloWorldService_HelloWorld_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
    in := new(HelloWorldRequest)
    if err := dec(in); err != nil {
        return nil, err
    }
    if interceptor == nil {
        return srv.(HelloWorldServiceServer).HelloWorld(ctx, in)
    }
    info := &grpc.UnaryServerInfo{
        Server:     srv,
        FullMethod: "/demoapi.HelloWorldService/HelloWorld",
    }
    handler := func(ctx context.Context, req interface{}) (interface{}, error) {
        return srv.(HelloWorldServiceServer).HelloWorld(ctx, req.(*HelloWorldRequest))
    }
    return interceptor(ctx, in, info, handler)
}

// gRPC服务注册的服务描述信息
// gRPC服务注册时,会建立以ServiceName为Key,Methods为Value的一个Map映射
// Methods中的Handler就是如上的服务处理Handler
var _HelloWorldService_serviceDesc = grpc.ServiceDesc{
    ServiceName: "demoapi.HelloWorldService",
    HandlerType: (*HelloWorldServiceServer)(nil),
    Methods: []grpc.MethodDesc{
        {
            MethodName: "HelloWorld",
            Handler:    _HelloWorldService_HelloWorld_Handler,
        },
    },
    Streams:  []grpc.StreamDesc{},
    Metadata: "demoapi/HelloWorld.proto",
}

如上代码中有如下几个关键信息需要解释

  • 服务Service名称 demoapi.HelloWorldService,对应IDL文件的package包名.service服务名称
  • 方法Method名称 HelloWorld,对应IDL文件的rpc方法

第三步,注册HelloWorld服务到gRPC的服务映射中

  • grpc.ServiceDesc是 gRPC服务注册的服务描述信息。
  • gRPC服务注册时,会建立以ServiceName为Key,包装Methods为Value的一个Map映射m。
  • Methods中的Handler就是如上的服务处理Handler。

对应的注册代码如下

// 注册gRPC服务
func RegisterHelloWorldServiceServer(s *grpc.Server, srv HelloWorldServiceServer) {
    s.RegisterService(&_HelloWorldService_serviceDesc, srv)
}

// Server is a gRPC server to serve RPC requests.
type Server struct {
       // ...
    m      map[string]*service // service name -> service info
}

// gRPC service.go的服务注册
func (s *Server) register(sd *ServiceDesc, ss interface{}) {
    srv := &service{
        server: ss,
        md:     make(map[string]*MethodDesc),
        sd:     make(map[string]*StreamDesc),
        mdata:  sd.Metadata,
    }
    for i := range sd.Methods {
        d := &sd.Methods[i]
        srv.md[d.MethodName] = d
    }
    for i := range sd.Streams {
        d := &sd.Streams[i]
        srv.sd[d.StreamName] = d
    }
    s.m[sd.ServiceName] = srv
}

第四步,接收客户端gRPC请求并处理

在这一步中,会进行如下几个步骤和函数的调用,也会回答前面的第一个问题。

  • gRPC客户端通过TCP链接,连接到gRPC服务端
  • gRPC的Serve函数触发TCP的Accept函数调用,生成一个和客户端的网络连接
  • grpc框架代码执行handleRawConn方法,将这个网络连接设置打破gRPC的传输层,做为网络的读和写实现
  • 依次调用grpc流的handlerStream方法,用于处理gRPC数据流
  • 这个函数中会接收gRPC请求的头信息,并解析得到服务名 如第二步中的服务名 demoapi.HelloWorldService
  • 通过如下的服务名中的方法名HelloWorld,并在Method的map中找到这个方法的处理器函数Handler,并执行这个Handler函数,实现gRPC服务的调用
  • 最后将处理结果返回

整体的数据流整理如下:

我们发现在gRPC框架代码中的handleStream存在两类服务,一类是已知服务 knownService, 第二类是unknownService

这两个有什么区别呢?

已知服务 knownService就是gRPC服务端代码注册到gRPC框架中的服务,叫做已知服务,其他没有注册的服务叫做未知服务。

为什么我们要提到这个未知服务unknownService呢?着就是我们实现gRPC服务代码的关键所在,是前面问题三的答案,

要实现gRPC服务代理,我们在创建grpc服务grpc.NewServer时,传递一个未知服务的handler,将未知服务的处理进行接管,然后通过注册的这个Handler实现gRPC代理转发的逻辑。

基于如下描述,gRPC代理的原理如下图所示:

  • 创建grpc服务时,注册一个未知服务处理器Handler和一个自定义的编码Codec编码和解码,此处使用proto标准的Codec(回答前面第二个问题)
  • 这个handle给业务方预留一个director的接口,用于代理重定向转发的grpc连接获取,这样proxy就可以通过redirector得到gRPCServer的grpc连接。
  • proxy接收gRPC客户端的连接,并使用gRPC的RecvMsg方法,接收客户端的消息请求
  • proxy将接收到的gRPC客户端消息请求,通过SendHeader和SendMsg方法发送给gRPC服务端。
  • 同样的方法,RecvMsg接收gRPC服务端的响应消息,使用SendMsg发送给gRPC客户端。
  • 至此gRPC代码服务就完成了消息的转发功能,企业的限流,权限等功能可以通过转发的功能进行拦截处理。

gRPC Proxy的实现逻辑如下图所示:

gRPC 代理服务的关键代码如下所示:

服务端到客户端的转发

// 转发服务端的数据流到客户端
func (s *handler) forwardServerToClient(src grpc.ServerStream, dst grpc.ClientStream) chan error {
    ret := make(chan error, 1)
    go func() {
        f := &frame{}
        for i := 0; ; i++ {
            if err := src.RecvMsg(f); err != nil {
                ret <- err // this can be io.EOF which is happy case
                break
            }
            if err := dst.SendMsg(f); err != nil {
                ret <- err
                break
            }
        }
    }()
    return ret
}

客户端到服务端的转发

// 转发客户端的数据流到服务端
func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan error {
    ret := make(chan error, 1)
    go func() {
        f := &frame{}
        for i := 0; ; i++ {
            if err := src.RecvMsg(f); err != nil {
                ret <- err // this can be io.EOF which is happy case
                break
            }
            if i == 0 {
                // This is a bit of a hack, but client to server headers are only readable after first client msg is
                // received but must be written to server stream before the first msg is flushed.
                // This is the only place to do it nicely.
                md, err := src.Header()
                if err != nil {
                    ret <- err
                    break
                }
                if err := dst.SendHeader(md); err != nil {
                    ret <- err
                    break
                }
            }
            if err := dst.SendMsg(f); err != nil {
                ret <- err
                break
            }
        }
    }()
    return ret
}

相关推荐

十分钟让你学会LNMP架构负载均衡(impala负载均衡)

业务架构、应用架构、数据架构和技术架构一、几个基本概念1、pv值pv值(pageviews):页面的浏览量概念:一个网站的所有页面,在一天内,被浏览的总次数。(大型网站通常是上千万的级别)2、u...

AGV仓储机器人调度系统架构(agv物流机器人)

系统架构层次划分采用分层模块化设计,分为以下五层:1.1用户接口层功能:提供人机交互界面(Web/桌面端),支持任务下发、实时监控、数据可视化和报警管理。模块:任务管理面板:接收订单(如拣货、...

远程热部署在美团的落地实践(远程热点是什么意思)

Sonic是美团内部研发设计的一款用于热部署的IDEA插件,本文其实现原理及落地的一些技术细节。在阅读本文之前,建议大家先熟悉一下Spring源码、SpringMVC源码、SpringBoot...

springboot搭建xxl-job(分布式任务调度系统)

一、部署xxl-job服务端下载xxl-job源码:https://gitee.com/xuxueli0323/xxl-job二、导入项目、创建xxl_job数据库、修改配置文件为自己的数据库三、启动...

大模型:使用vLLM和Ray分布式部署推理应用

一、vLLM:面向大模型的高效推理框架1.核心特点专为推理优化:专注于大模型(如GPT-3、LLaMA)的高吞吐量、低延迟推理。关键技术:PagedAttention:类似操作系统内存分页管理,将K...

国产开源之光【分布式工作流调度系统】:DolphinScheduler

DolphinScheduler是一个开源的分布式工作流调度系统,旨在帮助用户以可靠、高效和可扩展的方式管理和调度大规模的数据处理工作流。它支持以图形化方式定义和管理工作流,提供了丰富的调度功能和监控...

简单可靠高效的分布式任务队列系统

#记录我的2024#大家好,又见面了,我是GitHub精选君!背景介绍在系统访问量逐渐增大,高并发、分布式系统成为了企业技术架构升级的必由之路。在这样的背景下,异步任务队列扮演着至关重要的角色,...

虚拟服务器之间如何分布式运行?(虚拟服务器部署)

  在云计算和虚拟化技术快速发展的今天,传统“单机单任务”的服务器架构早已难以满足现代业务对高并发、高可用、弹性伸缩和容错容灾的严苛要求。分布式系统应运而生,并成为支撑各类互联网平台、企业信息系统和A...

一文掌握 XXL-Job 的 6 大核心组件

XXL-Job是一个分布式任务调度平台,其核心组件主要包括以下部分,各组件相互协作实现高效的任务调度与管理:1.调度注册中心(RegistryCenter)作用:负责管理调度器(Schedule...

京东大佬问我,SpringBoot中如何做延迟队列?单机与分布式如何做?

京东大佬问我,SpringBoot中如何做延迟队列?单机如何做?分布式如何做呢?并给出案例与代码分析。嗯,用户问的是在SpringBoot中如何实现延迟队列,单机和分布式环境下分别怎么做。这个问题其实...

企业级项目组件选型(一)分布式任务调度平台

官网地址:https://www.xuxueli.com/xxl-job/能力介绍架构图安全性为提升系统安全性,调度中心和执行器进行安全性校验,双方AccessToken匹配才允许通讯;调度中心和执...

python多进程的分布式任务调度应用场景及示例

多进程的分布式任务调度可以应用于以下场景:分布式爬虫:importmultiprocessingimportrequestsdefcrawl(url):response=re...

SpringBoot整合ElasticJob实现分布式任务调度

介绍ElasticJob是面向互联网生态和海量任务的分布式调度解决方案,由两个相互独立的子项目ElasticJob-Lite和ElasticJob-Cloud组成。它通过弹性调度、资源管控、...

分布式可视化 DAG 任务调度系统 Taier 的整体流程分析

Taier作为袋鼠云的开源项目之一,是一个分布式可视化的DAG任务调度系统。旨在降低ETL开发成本,提高大数据平台稳定性,让大数据开发人员可以在Taier直接进行业务逻辑的开发,而不用关...

SpringBoot任务调度:@Scheduled与TaskExecutor全面解析

一、任务调度基础概念1.1什么是任务调度任务调度是指按照预定的时间计划或特定条件自动执行任务的过程。在现代应用开发中,任务调度扮演着至关重要的角色,它使得开发者能够自动化处理周期性任务、定时任务和异...

取消回复欢迎 发表评论: