gRPC

proto

一文带你玩转ProtoBuf_Go_王中阳Go_InfoQ写作社区

ProtoBuf 全称:protocol buffers,直译过来是:“协议缓冲区”,是一种与语言无关、与平台无关的可扩展机制,用于序列化结构化数据

和 json\xml 最大的区别是:json\xml 都是基于文本格式,ProtoBuf 是二进制格式

ProtoBuf 相比于 json\XML,更小(3 ~ 10 倍)、更快(20 ~ 100 倍)、更为简单。我们只需要定义一次数据结构,就可以使用 ProtoBuf 生成源代码,轻松搞定在各种数据流和各种语言中写入、读取结构化数据

==hello proto==

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// hello.proto
syntax = "proto3";					// 使用的proto版本号 主流的是proto3 不写默认proto2

package hello;						// proto包名 用于其他proto文件 import时解析

option go_package = "./;hello";		// 分号前是proto引用时的路径 生成的go的包名

message Student{					// 类似于struct 可以定义数据的集合 也可以嵌套message
  string            name  = 1;
  bool          	male  = 2;
  repeated string 	word  = 3;		// repeated标记是数组
}
  • package proto包名 在其他proto文件 import时解析用 假如有一个包import "pfoo/foo.proto"中声明了package a那么对于引用foo的其他proto文件 应当使用a.xxx

  • option路径 若a.proto中 option go_package = "github.com/mhqdz/a;a"; 而b.proto中引用了a.protoimport "a.proto"; 则b.proto生成的go文件中会包含import a "github.com/mhqdz/a",如果需要同时编译为多语言版本也可以多写几个

  • message 所有数据的基础 写法类似于 struct

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    
    message ListThreadRequest {
        // session info
        string sessionID = 1;
    
        // pagination
        uint32 pageNumber = 2;
        uint32 pageSize = 3;
    }
    ---对应的go文件---
    type ListThreadRequest struct {
        // session info
        SessionID string `protobuf:"bytes,1,opt,name=sessionID,proto3" json:"sessionID,omitempty"`
        // pagination
        PageNumber           uint32   `protobuf:"varint,2,opt,name=pageNumber,proto3" json:"pageNumber,omitempty"`
        PageSize             uint32   `protobuf:"varint,3,opt,name=pageSize,proto3" json:"pageSize,omitempty"`
        XXX_NoUnkeyedLiteral struct{} `json:"-"`
        XXX_unrecognized     []byte   `json:"-"`
        XXX_sizecache        int32    `json:"-"`
    }
    

其他关键字

  • repeated 数组类型 定义一个int32类型的数组 arrays

    1
    2
    3
    
    message Msg {
      repeated int32 arrays = 1;
    }
    
  • //,/* */ 单行注释,多行注释

  • enum 枚举类型(Enumerations) 枚举类型适用于提供一组预定义的值,选择其中一个

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    
    // 以性别为例
    message Student {
      string name = 1;
      enum Gender {					// enum 的一个标识符必须是0
      	option allow_alias = true;	// 开启允许别名,在这里male和man映射的值都是1 如果不写这个就不能这样写
        FEMALE = 0;					
        MALE = 1;
        MAN = 1;
      }
      Gender gender = 2;
      repeated int32 scores = 3;
    }
    
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    
    // 编译成的go文件
    const (
    	Student_FEMALE Student_Gender = 0
    	Student_MALE   Student_Gender = 1
    	Student_MAN    Student_Gender = 1
    )
    
    // Enum value maps for Student_Gender.
    var (
    	Student_Gender_name = map[int32]string{
    		0: "FEMALE",
    		1: "MALE",
    		// Duplicate value: 1: "MAN",
    	}
    	Student_Gender_value = map[string]int32{
    		"FEMALE": 0,
    		"MALE":   1,
    		"MAN":    1,
    	}
    )
    type Student struct {
    	state         protoimpl.MessageState
    	sizeCache     protoimpl.SizeCache
    	unknownFields protoimpl.UnknownFields
    
    	Name   string         `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
    	Gender Student_Gender `protobuf:"varint,2,opt,name=gender,proto3,enum=hello.Student_Gender" json:"gender,omitempty"`
    	Scores []int32        `protobuf:"varint,3,rep,packed,name=scores,proto3" json:"scores,omitempty"`
    }
    
  • oneof 类似c语言中的联合体C语言|联合体详解 在一个message中的多个数据类型里选择一个

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    
    syntax = "proto3";
    
    package oneofTest;
    
    option go_package = "./;oneofTest";
    
    message a{
      oneof b{
        int32 i=1;
        uint32 ui=2;
      }
    }
    ---------------------------------------------
    // go 中的使用
    package main
    
    import (
    	"fmt"
    	oneofTest "main/hello/oneofTest"
    )
    
    func main() {
    	fmt.Println(&oneofTest.A{B: &oneofTest.A_I{1}})			// i:1
    	fmt.Println(&oneofTest.A{B: &oneofTest.A_I{-1}})		// i:-1
    
    	fmt.Println(&oneofTest.A{B: &oneofTest.A_Ui{1}})		// ui:1
    	fmt.Println(&oneofTest.A{B: &oneofTest.A_Ui{-1}})		// 报错 它的底层还是结构体里的uint32数据
    }
    
  • service 接口

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    
    service ProdService{
      // 定义方法
      rpc GetProductStock(ProductRequest) returns(ProductResponse);
    }
    --------------------------------------
    // 在go中编译为
    type ProdServiceClient interface {
    	// 定义方法
    	GetProductStock(ctx context.Context, in *ProductRequest, opts ...grpc.CallOption) (*ProductResponse, error)
    }
    

编译语句

1
protoc --proto_path=IMPORT_PATH --<lang>_out=DST_DIR path/to/file.proto
  • --proto_path=IMPORT_PATH:可以在 .proto 文件中 import 其他的 .proto 文件,proto_path 即用来指定其他 .proto 文件的查找目录。如果没有引入其他的 .proto 文件,该参数可以省略。
  • --<lang>_out=DST_DIR:指定生成代码的目标文件夹,例如 –go_out=. 即生成 GO 代码在当前文件夹,另外支持 cpp/java/python/ruby/objc/csharp/php 等语言
1
2
protoc --go_out=. *.proto		# 将当前目录下的所有.proto编译为.pb.go 输出到当前目录
protoc --go_out=. hello.proto	# 编译当前目录下的hello.proto 输出到当前目录,默认输出为hello.pd.go

==在go中使用protoc编译后的数据==

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
tree /f 项目结构
-------------------------------
    D:.
      go.mod
      go.sum
      main.go
    
    └─hello
            hello.pb.go
            hello.proto
-------------------------------
package main

import (
	"fmt"
	hello "main/hello"
)

func main() {
	fmt.Println(&hello.Student{
		Name: "mhqdz",
		Male: false,
		Word: []string{"1", "2", "3"},
	})
}

搭建golang-poto环境的dockerfile

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
FROM golang:latest

RUN  sed -i s@/archive.ubuntu.com/@/mirrors.aliyun.com/@g /etc/apt/sources.list
RUN  apt-get clean

# install protobuf
# ENV PB_VER 3.20.3
# ENV PB_URL https://github.com/protocolbuffers/protobuf/releases/download/v${PB_VER}/protoc-${PB_VER}-linux-x86_64.zip

RUN apt-get -qq update && apt-get -qqy install curl git make unzip gettext rsync

 ENV PB_URL https://github.com/protocolbuffers/protobuf/releases/download/v3.20.3/protoc-3.20.3-linux-x86_64.zip
 
 RUN mkdir -p /tmp/protoc && \
    curl -L ${PB_URL} > /tmp/protoc/protoc.zip && \
    cd /tmp/protoc && \
    unzip protoc.zip && \
    cp /tmp/protoc/bin/protoc /usr/local/bin && \
    cp -R /tmp/protoc/include/* /usr/local/include && \
    chmod go+rx /usr/local/bin/protoc && \
    cd /tmp && \
    rm -r /tmp/protoc

# Get the source from GitHub
RUN Go get -u google.golang.org/grpc

# Install protoc-gen-go
RUN Go get -u github.com/golang/protobuf/protoc-gen-go

注:这个dockerfile没有运行成功 在curl下载时出错 github.com 443

==Makefile==

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
.PHONY: all docker push test

IMAGE := mhqdz/testproto

all: docker

docker:
	docker build --rm -t $(IMAGE) .

push:
	docker push $(IMAGE)

test:
	docker run -it --rm $(IMAGE) sh

protobuf标准数据类型

proto类型 go类型 备注 proto类型 go类型 备注
double float64 float float32
int32 int32 int64 int64
uint32 uint32 uint64 uint64
sint32 int32 适合负数 sint64 int64 适合负数
fixed32 uint32 固长编码,适合大于2^28的值 fixed64 uint64 固长编码,适合大于2^56的值
sfixed32 int32 固长编码 sfixed64 int64 固长编码
bool bool string string UTF8 编码,长度不超过 2^32
bytes []byte 任意字节序列,长度不超过 2^32 map<string,int32> map[string]int32

标量类型如果没有被赋值,则不会被序列化,解析时,会赋予默认值。

  • strings:空字符串
  • bytes:空序列
  • bools:false
  • 数值类型:0

使用其他消息类型

Result是另一个消息类型,在 SearchReponse 作为一个消息字段类型使用。

1
2
3
4
5
6
7
8
9
message SearchResponse {
  repeated Result results = 1; 
}

message Result {
  string url = 1;
  string title = 2;
  repeated string snippets = 3;
}

嵌套写也是支持的:

1
2
3
4
5
6
7
8
message SearchResponse {
  message Result {
    string url = 1;
    string title = 2;
    repeated string snippets = 3;
  }
  repeated Result results = 1;
}

如果定义在其他文件中,可以导入其他消息类型来使用:

1
import "myproject/other_protos.proto";

任意类型(Any)

1
2
3
4
5
6
7
// 需要导入这个包
import "google/protobuf/any.proto";

message ErrorStatus {
  string message = 1;
  repeated google.protobuf.Any details = 2;
}
1
2
3
// 在go中使用时使用anypb包的New函数将自己的类型转换为any类型
// 下面的 gRPC/stream/客户端流/service.go 中有使用该转化方法
any, _ := anypb.New(&content)

关于找不到包(Import “google/protobuf/any.proto” was not found or had errors)

方法一:直接把goolgle/protobuf/...这个包直接复制到项目路径下,使其名称与import的路径对齐

方法二:使用绝对路径指定当前项目目录和google包的根目录

1
2
3
# 执行该命令的路径为项目根目录 D:/go/src/gRPC/proto03_any,D:/proto/include是google包的上级目录 ../是为了找到项目文件夹;protobuf文件互相导的时候会用到 go mod的相对地址 而go mod的地址会以包名(项目文件夹名)开头 
# 相同的命令用vscode的powershell终端执行失败了 在外面的powershell里却成功了 可能是vscode里的终端更新环境比外面慢点?
protoc -I=D:/proto/include -I=D:/go/src/gRPC/proto03_any --go_out=../ --go-grpc_out=../ protos/service/service.proto
1
2
3
4
5
6
7
8
// vscode-proto3 设置google包的路径让它不飘红(不会影响编译)
"protoc": {
        "path": "protoc",
        "options": [
            "--proto_path=${workspaceRoot},",
            "--proto_path= D:/proto/include,",
    ]
},

==any的使用==

详细项目看下面的gRPC流的部分

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
syntax = "proto3";
import "google/protobuf/any.proto";

option go_package="proto03_any/protoOut/service;service";

package service;

// 定义request model
message HelloRequest{
  int32 id = 1; // 1代表顺序
}

message Content {
  string msg = 1;
  int32 value= 2;
}

// 定义response model
message HelloResponse{
  string hello = 1;
  google.protobuf.Any data = 2;
}

service ProdService{
  rpc SayHello(HelloRequest) returns (HelloResponse);
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20

package service

import (
	context "context"
	"google.golang.org/protobuf/types/known/anypb"
)

type prodService struct {
}

func (p *prodService) SayHello(cnt context.Context, req *HelloRequest) (*HelloResponse, error) {
	// 使用anypb包 把proto文件中定义的Content转换为any
	a, err := anypb.New(&Content{Value: req.Id, Msg: "hello"})

	return &HelloResponse{
		Hello: "hello is me",
		Data:  a,
	}, err
}

go mod-import

import导入路径踩的坑,b.proto使用a.proto

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
# 最终的项目结构
proto02_import
      go.mod
      go.sum
      main.exe
      main.go
    
    ├─protoOut				# proto编译生成go文件的目录
      ├─A
            a.pb.go
      
      └─B
              b.pb.go
    
    └─protos				# proto文件的目录
        ├─a
              a.proto
        
        └─b
                b.proto
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// a.proto
syntax = "proto3";

package A;

// 生成A.pb.go文件的位置
// go module项目中的路径 go mod读取包是 module名/路径 不要使用 ./这类相对路径
// 此处路径的意思是 在proto02_import(module名称)下的protoOut/A文件夹下生成a.pd.go属于package A
option go_package = "proto02_import/protoOut/A;A";

message User{
    string name = 1;
    int32  age  = 2;
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
//b.proto
syntax = "proto3";

package B;

// 引入a.proto文件 这个路径的.路径默认为执行编译时的路径
// vscode插件会默认你在项目根目录下编译 并以此来判断路径是否正确 如果不在根目录下编译 正确的路径会报错 但不影响编译
// 这里执行编译的路径为项目根目录 proto02_import
import "protos/a/a.proto";

option go_package = "proto02_import/protoOut/B;B";

message userAuth{
    string  pwd = 1;
    A.User    u = 2;
}

执行的编译命令命令

1
2
3
# 读取./protos/a/a.proto文件 输出路径的根目录为项目上级目录(否则会找不到文件夹proto02_import)
protoc protos/a/a.proto --go_out=../
protoc protos/b/b.proto --go_out=../
1
2
3
4
5
6
// go.mod文件
module proto02_import

go 1.18

require google.golang.org/protobuf v1.28.1

gRPC

以一个简单的server和client为例

最终项目结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
gRPC01-helloGRPC
    go.mod
    go.sum
    
  ├─main_client
        grpc_client.go			# 客户端代码 连接服务端发送请求并获取数据
        
  ├─main_server
        grpc_server.go			# 服务端代码 建立连接 传入request请求 并将response答复放到端口上
  
  ├─Protos
        Product.proto
  
  └─service
          product.go			# 实现ProdService接口 完成request请求和response答复之间的行为
          product.pb.go			# protobuf文件编译生成
          product_grpc.pb.go	# protobuf文件编译生成

==Product.proto==

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
syntax = "proto3";

package service;

option go_package = "../service;service";

// 定义request model
message ProductRequest{
  int32 prod_id = 1; // 1代表顺序
}

// 定义response model
message ProductResponse{
  int32 prod_stock = 1; // 1代表顺序
}

// 定义服务主体
service ProdService{
  // 定义方法
  rpc GetProductStock(ProductRequest) returns(ProductResponse);
}
1
protoc --go_out=./ --go-grpc_out=. product.proto

==product.go==

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package service

import (
	context "context"
)

// 如果下面定义productService为int 则使用new(productService)来声明
var ProductService = &productService{}

// 因为没有任何成员变量所以可以使用 type productService int更好看一些
// 实现 type ProdServiceServer interface {...} 接口 总是 type xxxServer interface{} 在xx_grpc.pb.go
type productService struct {
}

func (p *productService) GetProductStock(ctx context.Context, req *ProductRequest) (*ProductResponse, error) {
	// 实现GetProductStock接口
	value := doSomething(req.ProdId, 1)

	// 这里的ProductResponse也是在poroto中定义的类型
	return &ProductResponse{ProdStock: value}, nil
}

func (p *productService) mustEmbedUnimplementedProdServiceServer() {}

// 处理了requst的信息的函数
func doSomething(x int32, y int32) int32 {
	return x + y
}

==grpc_server.go==

关于server和client这两个文件下面的 客户端流 的注释写的比较详细

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
	"fmt"
	"log"
	"net"
	service "testGRPC/service"

	"google.golang.org/grpc"
)

func main() {
	// 新建server
    rpcServer := grpc.NewServer()
    // 注册ProdService服务器
	service.RegisterProdServiceServer(rpcServer, service.ProductService)

	l, err := net.Listen("tcp",":8080")
	if err != nil {
		log.Fatal("启动监听失败", err)
	}
	if err = rpcServer.Serve(l); err != nil {
		log.Fatal("启动服务失败", err)
	}
	fmt.Println("启动服务成功")
}

==grpc_client.go==

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main

import (
	"context"
	"fmt"
	"log"
	"testGRPC/service"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
)

func main() {
	// 建立端口连接
    conn, err := grpc.Dial(":8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatal("服务端连接失败")
	}
	defer conn.Close()
    // 新建prodService服务端
	psc := service.NewProdServiceClient(conn)
	// 发送请求 得到数据
    fmt.Println(psc.GetProductStock(context.Background(), &service.ProductRequest{ProdId: 123}))
}

安全认证

认证 | 码神之路知识体系 (mszlu.com)

单向认证

客户端访问服务端时必须使用服务端的公钥 以验证客户端的身份

img

借用第一个gRPC的文件结构 在与main_server的同级目录下创建cert文件夹存放 证书\公钥文件

==创建根证书== 服务器证书,客户端证书的基础

1
2
3
openssl genrsa -des3 -out ca.key 2048		# 生成私钥文件
openssl req -new -key ca.key -out ca.csr	# 创建证书请求
openssl x509 -req -days 365 -in ca.csr -signkey ca.key -out ca.crt	#生成 ca.crt证书

==cng文件==

创建openssl.cnf文件 cmd->openssl version -a 查看版本信息中的OPENSSLDIR,找到openssl.conf文件的原始路径,将其拷贝到cert文件夹下(也可以不拷贝当前目录而是在编译时指定使用的配置文件,下以当前目录为例)

  1. 打开(取消注释) copy_extensions = copy
  2. 打开 req_extensions = v3_req
  3. 找到[ v3_req ],在下面添加 subjectAltName = @alt_names
  4. 添加新的标签 [ alt_names ] 实现第3步的映射
1
2
3
# 使用正则表达式描述 客户端允许访问的域名 可以是1个也可以用指定多个
[ alt_names ]
DNS.1 = *.mhqdz.com

==逐步生成SAN证书==

(go1.7+版本后要求SAN证书 相比正常的公钥/证书 它的主要差别就是允许使用正则表达式来指定域名)

1
2
3
4
5
6
7
8
9
# 生成证书私钥server.key
# genrsa生成私钥 -algorithm指定公钥算法为RSA 输出为server.key
openssl genpkey -algorithm RSA -out server.key

# 通过私钥server.key生成证书请求文件server.csr
openssl req -new -nodes -key server.key -out server.csr -days 3650 -config ./openssl.cnf -extensions v3_req

# 生成SAN证书 server.pem
openssl x509 -req -days 365 -in server.csr -out server.pem -CA ca.crt -CAkey ca.key -CAcreateserial -extfile ./openssl.cnf -extensions v3_req
  • key: 服务器上的私钥文件,用于对发送给客户端数据的加密,以及对从客户端接收到数据的解密
  • csr\crt: CA认证后的证书文件,用于提交给证书颁发机构(CA)对证书签名,(windows下面的csr,其实是crt),签署人用自己的key给你签署的凭证。
  • pem: 证书\公钥 由证书颁发机构(CA)签名后的证书,或者是开发者自签名的证书,包含证书持有人的信息,持有人的公钥,证书文件扩展名包括.pem(用ASCLL\BASE64编码的证书文本文件).der\.cer (二进制文件),这些后缀都是最终编成的证书,只是不同系统用户的编译习惯所致的不同名称

==server端和cenlit端使用证书==

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// main_server.go
package main

import (...)

func main() {
    // 传入证书\公钥文件和私钥文件 创建证书
	creds, err := credentials.NewServerTLSFromFile("../cert/server.pem", "../cert/server.key")
	if err != nil {...}
    
    // 创建服务时传入证书creds
	rpcServer := grpc.NewServer(grpc.Creds(creds))
    ......
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// main_client.go
package main

import (...)

func main() {
	// 传入一个pem公钥以访问服务端 第二个参数必须符合openssl.cnf中定义的域名 *.mhqdz.com
    // 如果pem证书错误、未使用证书或不符合命名规则(如*.mhzsz.com不能匹配*.mhqdz.com) client端会报错 得到数据 
	creds, err := credentials.NewClientTLSFromFile("../cert/server.pem", "org.mhqdz.com")
	if err != nil {...}
    // 在建立连接时使用证书
	conn, err := grpc.Dial(":8080", grpc.WithTransportCredentials(creds))
	......
}

双向认证

客户端连接服务端 需要带着服务端的公钥

服务端向客户端发送信息 需要带着客户端的公钥

img

创建client端的SAN证书

1
2
3
openssl genpkey -algorithm RSA -out client.key	# 私钥
openssl req -new -nodes -key client.key -out client.csr -days 3650 -config ./openssl.cnf -extensions v3_req	# 公钥\证书
openssl x509 -req -days 365 -in client.csr -out client.pem -CA ca.crt -CAkey ca.key -CAcreateserial -extfile ./openssl.cnf -extensions v3_req	# SAN证书

==改进后的双端证书使用==

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// grpc_server.go
package main

import (...)

func main() {
	// 双向认证
	// 从证书相关文件中读取和解析信息,得到证书公钥、密钥对
	cert, err := tls.LoadX509KeyPair("../cert/server.pem", "../cert/server.key")
	if err != nil {
		log.Fatal("证书读取错误", err)
	}
	// 创建一个新的、空的 CertPool
	certPool := x509.NewCertPool()
	ca, err := ioutil.ReadFile("../cert/ca.crt")
	if err != nil {
		log.Fatal("ca证书读取错误", err)
	}
	// 尝试解析所传入的 PEM 编码的证书。如果解析成功会将其加到 CertPool 中,便于后面的使用
	certPool.AppendCertsFromPEM(ca)
	// 构建基于 TLS 的 TransportCredentials 选项
	creds := credentials.NewTLS(&tls.Config{
		// 设置证书链,允许包含一个或多个
		Certificates: []tls.Certificate{cert},
		// 要求必须校验客户端的证书。可以根据实际情况选用以下参数
		ClientAuth: tls.RequireAndVerifyClientCert,
		// 设置根证书的集合,校验方式使用 ClientAuth 中设定的模式
		ClientCAs: certPool,
	})

	rpcServer := grpc.NewServer(grpc.Creds(creds))
	......
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// grpc_client.go
package main

import (...)

func main() {
    // 双向认证
	// 从证书相关文件中读取和解析信息,得到证书公钥、密钥对
	cert, err := tls.LoadX509KeyPair("../cert/client.pem", "../cert/client.key")
	if err != nil {
		log.Fatal("获取证书文件失败", err)
	}

	// 创建一个新的、空的 CertPool
	certPool := x509.NewCertPool()
	ca, _ := ioutil.ReadFile("../cert/ca.crt")

	// 尝试解析所传入的 PEM 编码的证书。如果解析成功会将其加到 CertPool 中,便于后面的使用
	certPool.AppendCertsFromPEM(ca)
	// 构建基于 TLS 的 TransportCredentials 选项
	creds := credentials.NewTLS(&tls.Config{
		// 设置证书链,允许包含一个或多个
		Certificates: []tls.Certificate{cert},
		// 要求必须校验客户端的证书。可以根据实际情况选用以下参数
		ServerName: "*.mhqdz.com",
		RootCAs:    certPool,
	})

	conn, err := grpc.Dial(":8080", grpc.WithTransportCredentials(creds))
    ......
}

==token认证==

以用户名认证为例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// main_server.go
package main

import (...)

func main() {
    // 安全认证实现传入NewServer的creds,或者干脆向最初的例子一样不验证了
	......
	// token认证 在newServer时传入一个拦截器来完成用户名密码的认证 其余部分不变
	var usi grpc.UnaryServerInterceptor = func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
        // auth()为下面定义的实现验证的处理的函数
        if err = auth(ctx); err != nil {
			return
		}
		return handler(ctx, req)
	}

	rpcServer := grpc.NewServer(grpc.Creds(creds), grpc.UnaryInterceptor(usi))
	......
}

func auth(ctx context.Context) error {
    // FromIncomingContext 用于取出Context里的数据
    // 该函数的返回值类型为 MD : type MD map[string][]string,所以后续的操作也和go map相同
	md, ok := metadata.FromIncomingContext(ctx)
	if !ok {
		return fmt.Errorf("从ctx获取数据失败")
	}
	var user string
	if val, ok := md["name"]; ok {
		user = val[0]
	}
	if user != "mhqdz" {
		return fmt.Errorf("用户名错误")
	}
	return nil
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// main_client.go
package main

import (
	"context"
	"crypto/tls"
	"crypto/x509"
	"fmt"
	"io/ioutil"
	"log"
	"testGRPC/service"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
)

func main() {
    // 同上server端 省略安全认证
	......
    // 使用grpc.WithPerRPCCredentials()传入一个PerRPCCredentials(接口)类型的数据 这里是实现了接口的user
    /* // PerRPCCredentials定义
    type PerRPCCredentials interface {
	GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error)
	RequireTransportSecurity() bool
	}
	*/

	conn, err := grpc.Dial(":8080", grpc.WithTransportCredentials(creds), grpc.WithPerRPCCredentials(&user{name: "mhqdz"}))
    ......
}

type user struct {
	name string
}

// 把值以 map[string]string的形式返回
func (a *user) GetRequestMetadata(context.Context, ...string) (map[string]string, error) {
	return map[string]string{"name": a.name}, nil
}

// 是否需要基于 TLS 认证进行安全传输 是否开启安全连接 这里true也试了下没看出区别...
func (a *user) RequireTransportSecurity() bool {
	return false
}

stream(流)

详细请参考:gRPC的stream使用_沐-羽晨

流的4种类型

1
2
3
4
5
// stream提示流
rpc SayHello(HelloRequest) returns (HelloResponse);					// 普通 RPC 一个请求一个回复
rpc LotsOfReplies(stream HelloRequest) returns (HelloResponse);		// 客户端流 RPC 一堆输入一个回复
rpc LotsOfGreetings(HelloRequest) returns (stream HelloResponse);	// 服务端流 RPC 一个输入一堆回复
rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);	// 双向流 RPC 一堆输入一堆回复

==项目结构==

关于流的3个项目(上面的普通RPC不再演示)都是这个结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
clientStream
      go.mod
      go.sum
      
    ├─main
      ├─client
            client.go
            
      └─server
             server.go
    
    ├─protoOut
      └─service
              service.go
              service.pb.go
              service_grpc.pb.go
    
    └─protos
        └─service
                service.proto

客户端流

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// service.proto 文件
syntax = "proto3";
import "google/protobuf/any.proto";

option go_package="clientStream/protoOut/service;service";

package service;

// 定义request消息类型
message HelloRequest{
  int32 id = 1; // 1代表顺序
}

message Content {
  string msg = 1;
  int32 value= 2;
}

// 定义response消息类型
message HelloResponse{
  string hello = 1;
  google.protobuf.Any data = 2;
}

// 定义SayHello服务 输入为HelloRequest流 返回一个HelloResponse
service ProdService{
  rpc SayHello(stream HelloRequest) returns (HelloResponse);
}
1
protoc -I=D:/proto/include -I=D:/go/src/gRPC/proto04_stream/clientStream --go_out=../ --go-grpc_out=../ protos/service/service.proto
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// service.go
package service

import (
	"fmt"
	"io"
	"log"

	"google.golang.org/protobuf/types/known/anypb"
)

var ProdService = new(prodService)

type prodService int

func (p *prodService) SayHello(stream ProdService_SayHelloServer) error {
	count := 0
	for {
		// Recv 接收一个客户端发来的信息
		recv, err := stream.Recv()
		if err != nil {
			if err == io.EOF {
				log.Fatal(err)
				return nil
			}
			return err
		}
		fmt.Println("服务端接收到的流", recv.Id, count)
		count++

		if count > 1000 {
			// 创建any类型的消息 赋值为response的Data
			a, err := anypb.New(&Content{Value: recv.Id, Msg: "hello"})
			if err != nil {
				return err
			}

			rsp := &HelloResponse{Hello: "hello", Data: a}

            // Send 返回一个响应 但不关闭
			// SendAndClose 表示服务器已经接收消息结束,发送一个响应并关闭
			err = stream.SendAndClose(rsp)
			if err != nil {
				return err
			}
			return nil
		}
	}
}
func (p *prodService) mustEmbedUnimplementedProdServiceServer() {}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// server.go
package main

import (
	"clientStream/protoOut/service"
	"log"
	"net"

	"google.golang.org/grpc"
)

func main() {
    // NewServer 创建一个gRPC server,它没有注册服务,也还没有开始接受请求
	// 默认单次接收最大消息长度为`1024*1024*4`bytes(4M),单次发送消息最大长度为`math.MaxInt32`bytes
	// grpcServer := grpc.NewServer(grpc.MaxRecvMsgSize(1024*1024*4), grpc.MaxSendMsgSize(math.MaxInt32))
	// 新建gRPC服务器实例
    rpcServer := grpc.NewServer()

	// 它只是调用了 rpcServer.RegisterService(&ProdService_ServiceDesc, srv)方法 向gRPC服务器注册 一个服务及其实现
	// Registerservice方法依赖 service.ProdService.HandlerType使用反射来确认要注册的服务
	service.RegisterProdServiceServer(rpcServer, service.ProdService)

	l, err := net.Listen("tcp", ":8080")
	if err != nil {
		log.Fatal("启动监听失败", err)
	}

	// 持续等待 requests 并以前文注册的服务来处理它们
	if err = rpcServer.Serve(l); err != nil {
		log.Fatal("启动服务失败", err)
	}
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// client.go
package main

import (
	"clientStream/protoOut/service"
	"context"
	"fmt"
	"log"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
)

// prodRequest 写入1000个request给server
func prodRequest(stream service.ProdService_SayHelloClient, rsp chan int) {
	count := 0
	for {
		request := &service.HelloRequest{
			Id: 123,
		}
		err := stream.Send(request)
		if err != nil {
			log.Fatal("输入流错误", err)
		}
		//time.Sleep(time.Second)
		count++
		if count > 1000 {
			rsp <- 0
			break
		}
	}
}

func main() {
	// 建立端口连接
	conn, err := grpc.Dial(":8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatal("服务端连接失败", err)
		return
	}
	defer conn.Close()

	// 该方法返回一个_grpc.pb.go中定义的struct prodServiceClient{}
	prodService := service.NewProdServiceClient(conn)

	// context.Background() 官方提供的一个测试用的空的Context不包含任何值 也没有停止时间
	// Context 运行的上下文 一般由server传出 client传入 包含截止日期、取消信号和其他值
	// ProdService_SayHelloClient 是一个仅包含Context接口的结构体
	// SayHello(Context)ProdService_SayHelloClient 它做的仅仅是把Context赋值进空的prodService里 并返回prodService
	stream, err := prodService.SayHello(context.Background())
	if err != nil {
		log.Fatal("获取流失败", err)
	}

	// rsp的信号由下面的prodRequest(stream, rsp) 函数发出 写入一个0代表接收请求成功
	rsp := make(chan int, 1)
	defer close(rsp)
	go prodRequest(stream, rsp)
    // select {	// select{case:}用于循环等待接收/发送(此处为接收)多个消息
	// case <-rsp:
	<-rsp // 只有一种信号直接写一个这个等待就好了 不必使用select...case
	recv, err := stream.CloseAndRecv()
	if err != nil {
		log.Fatal(err)
	}
	Data := recv.Data
	hello := recv.Hello
	fmt.Println("客户端收到响应:", hello, Data)
	// }
}

服务端流

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
syntax = "proto3";
import "google/protobuf/any.proto";

option go_package="serverStream/protoOut/service;service";

...(同上 故省略)

service ProdService{
  rpc SayHello(HelloRequest) returns (stream HelloResponse);
}
1
protoc -I=d:/go/src/gRPC/gRPC02_stream/serverStream -I=d:/proto/include  --go_out=../ --go-grpc_out=../ protos/service/service.proto
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// service.go
package service

import "google.golang.org/protobuf/types/known/anypb"

var ProdService = new(prodServiceServer)

type prodServiceServer int

func (p *prodServiceServer) SayHello(req *HelloRequest, stream ProdService_SayHelloServer) error {
    // 在接收请求后发送5条消息
    // 传入的req包含传入接受到的请求的信息 这里没有使用
	for i := 0; i < 5; i++ {
		date, err := anypb.New(&Content{Msg: "哼哼啊啊啊", Value: 114514})
		if err != nil {
			return err
		}
        // Send 返回一个响应 但不关闭
		// SendAndClose 表示服务器已经接收消息结束,发送一个响应并关闭
		if err = stream.Send(&HelloResponse{Hello: "hello it's me", Data: date}); err != nil {
			return err
		}
	}
	return nil
}

func (p *prodServiceServer) mustEmbedUnimplementedProdServiceServer() {}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// server.go 同客户端流只有引入service的包名改变了
package main

import (...)

func main() {
	// NewServer 创建一个gRPC server,它没有注册服务,也还没有开始接受请求。
	rpcServer := grpc.NewServer()

	// 它只是调用了 rpcServer.RegisterService(&ProdService_ServiceDesc, srv)方法 向gRPC服务器注册 一个服务及其实现
	// Registerservice方法依赖 service.ProdService.HandlerType使用反射来确认要注册的服务
	service.RegisterProdServiceServer(rpcServer, service.ProdService)

	l, err := net.Listen("tcp", ":8080")
	if err != nil {
		log.Fatal("启动监听失败", err)
	}

	// 持续等待 requests 并以前文注册的服务来处理它们
	if err = rpcServer.Serve(l); err != nil {
		log.Fatal("启动服务失败", err)
	}
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// client.go
package main

import (...)

func main() {
	// 建立端口连接
	conn, err := grpc.Dial(":8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatal("服务端连接失败", err)
		return
	}
	defer conn.Close()

	// 该方法返回一个_grpc.pb.go中定义的struct prodServiceClient{}
	prodService := service.NewProdServiceClient(conn)

	stream, err := prodService.SayHello(context.Background(), &service.HelloRequest{Id: 1919})
	if err != nil {
		log.Fatal("创建请求接收器ProdService_SayHelloClient失败,err:", err)
	}
    
	for {
		//Recv() 方法接收服务端消息,默认每次Recv()最大消息长度为`1024*1024*4`bytes(4M)
		res, err := stream.Recv()
		if err != nil {
			if err == io.EOF {
				break
			}
			log.Fatal("接收出错,err:", err)
		}
		log.Println(res.Hello, res.Data)
	}
}
运行结果

双端流(双向流)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// service.proto
syntax = "proto3";
import "google/protobuf/any.proto";

option go_package="bidirectionalStream/protoOut/service;service";

package service;

// 定义request model
message HelloRequest{
  int32 id = 1; // 1代表顺序
}

message Content {
  string msg = 1;
  int32 value= 2;
}

// 定义response model
message HelloResponse{
  string hello = 1;
  google.protobuf.Any data = 2;
}

service ProdService{
  rpc SayHello(stream HelloRequest) returns (stream HelloResponse);
}
1
protoc -I D:\go\src\gRPC\gRPC02_stream\bidirectionalStream -I D:\proto\include --go_out=../ --go-grpc_out=../ protos/service/service.proto
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// server.go 依旧不变
package main

import (
	"bidirectionalStream/protoOut/service"
	"log"
	"net"

	"google.golang.org/grpc"
)

func main() {
	// NewServer 创建一个gRPC server,它没有注册服务,也还没有开始接受请求。
	rpcServer := grpc.NewServer()

	// 它只是调用了 rpcServer.RegisterService(&ProdService_ServiceDesc, srv)方法 向gRPC服务器注册 一个服务及其实现
	// Registerservice方法依赖 service.ProdService.HandlerType使用反射来确认要注册的服务
	service.RegisterProdServiceServer(rpcServer, service.ProdService)

	l, err := net.Listen("tcp", ":8080")
	if err != nil {
		log.Fatal("启动监听失败", err)
	}

	// 持续等待 requests 并以前文注册的服务来处理它们
	if err = rpcServer.Serve(l); err != nil {
		log.Fatal("启动服务失败", err)
	}
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// client.go
package main

import (
	"bidirectionalStream/protoOut/service"
	"context"
	"fmt"
	"log"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/types/known/anypb"
)

func prodRequest(stream service.ProdService_SayHelloClient) {
	for {
		request := &service.HelloRequest{
			Id: 123,
		}
		err := stream.Send(request)
		if err != nil {
			log.Fatal("输入流错误", err)
		}
		time.Sleep(time.Second)
		resp, err := stream.Recv()
		if err != nil {
			log.Fatal("接收信息出错,err:", err)
		}
		data, err := anypb.UnmarshalNew(resp.Data, proto.UnmarshalOptions{})
		if err != nil {
			log.Fatal("any类解析出错,err:", err)
		}

		fmt.Println(resp.Hello, data)
	}
}

func main() {
	// 建立端口连接
	conn, err := grpc.Dial(":8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatal("服务端连接失败", err)
		return
	}
	defer conn.Close()

	// 该方法返回一个_grpc.pb.go中定义的struct prodServiceClient{}
	prodService := service.NewProdServiceClient(conn)

	// context.Background() 官方提供的一个测试用的空的Context不包含任何值 也没有停止时间
	// Context 运行的上下文 一般由server传出 client传入 包含截止日期、取消信号和其他值
	// ProdService_SayHelloClient 是一个仅包含Context接口的结构体
	// SayHello(Context)ProdService_SayHelloClient 它做的仅仅是把Context赋值进空的prodService里 并返回prodService
	stream, err := prodService.SayHello(context.Background())
	if err != nil {
		log.Fatal("获取流失败", err)
	}
	prodRequest(stream)
}

dubbo-go

官方文档

updatedupdated2023-02-252023-02-25