Skip to content

集成gRPC

Go-kit 是一套强大的微服务开发工具集,用于指导开发人员解决分布式系统开发过程中所遇到的问题,帮助开发人员更专注于业务开发。
Go-kit 推荐使用 Transport、Endpoint 和 Service 这 3 层结构来组织项目

这里主要涉及 Transport 层和 Endpoint 层

  • Transport 层,主要负责网络传输,例如处理 HTTP、gRPC、Thrift 等相关的逻辑。
  • Endpoint 层,主要负责 request/response 格式的转换,以及公用拦截器相关的逻辑。作为 Go-kit 的核心,Endpoint 层采用类似洋葱的模型,提供了对日志、限流、熔断、链路追踪和服务监控等方面的扩展能力。

Go-kit 和 gRPC 结合的关键在于需要将 gRPC 集成到 Go-kit 的 Transport 层。
Go-kit 的 Transport 层用于接收用户网络请求并将其转为 Endpoint 可以处理的对象,然后交由 Endpoint 层执行,最后再将处理结果转为响应对象返回给客户端。
为了完成这项工作,Transport 层需要具备两个工具方法:

  • 解码器,把用户的请求内容转换为请求对象;
  • 编码器,把处理结果转换为响应对象。

gRPC 请求的处理过程如下图所示,服务端接收到一个客户端请求后,交由 grpc_transport.Handler 处理,它会调用 DecodeRequestFunc 进行解码,
然后交给其 Endpoint 层转换为 Service 层能处理的对象,将返回值通过 EncodeResponseFunc 编码,最后返回给客户端。

Gokit过程调用示意图 Go-kit 过程调用示意图

接下来,我们就按照上述的流程,实现通过 Go-kit 进行 gRPC 调用。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
$ mkdir user
$ cd user
$ go mod init example.com/user
go: creating new go.mod: module example.com/user
$ mkdir user
$ go get github.com/go-kit/kit
go get: added github.com/go-kit/kit v0.12.0
$ go get golang.org/x/time/rate
go: downloading golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac
$ go get github.com/go-kit/kit/transport/grpc@v0.12.0
$ go get google.golang.org/grpc@v1.40.0
$ go get google.golang.org/protobuf/reflect/protoreflect@v1.27.1

定义 proto 生成文件

新建: ./pb/user.proto

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
syntax = "proto3";
package pb;

option go_package = "example.com/user/pb";

service UserService {
    rpc CheckPassword(LoginRequest) returns (LoginResponse) {}
}

message LoginRequest {
    string Username = 1;
    string Password = 2;
}

message LoginResponse {
    string Ret = 1;
    string err = 2;
}

执行如下命令:

1
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative pb/user.proto 

此时 pb 的目录下有三个文件:

1
2
3
4
pb
├── user.pb.go
├── user.proto
└── user_grpc.pb.go

升级

升级 protoc:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
brew upgrade protobuf
``


brew install protobuf@3.17.3


### 定义 Service,提供业务实现

首先定义了 `UserService` 结构,它有一个名为 `CheckPassword` 的 `grpc_transport.Handler` 的方法。         
这个方法会调用 `grpc_transport.Handler` 的 `ServeGRPC` 方法来将请求交由 Go-kit 处理。



`./user/service.go`

```go
package user

import "context"

type UserService interface {
    CheckPassword(ctx context.Context, username string, password string) (bool, error)
}

type UserServiceImpl struct{}

func (s UserServiceImpl) CheckPassword(ctx context.Context, username string, password string) (bool, error) {
    if username == "admin" && password == "admin" {
        return true, nil
    }
    return false, nil
}

定义 Endpoint,提供参数转换能力

接下来我们需要建立对应的 Endpoint。
它应该是将请求转发给上述的 UserService 处理,然后定义编解码函数 DecodeRequest 和 EncodeResponse,具体代码如下所示:

user/endpoints.go:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package user

import (
    "context"

    "github.com/go-kit/kit/endpoint"
)

type LoginForm struct {
    Username string `json:"username"`
    Password string `json:"password"`
}

type LoginResult struct {
    Ret bool  `json:"ret"`
    Err error `json:"err"`
}

type Endpoints struct {
    UserEndpoint endpoint.Endpoint
}

func MakeUserEndpoint(svc UserService) endpoint.Endpoint {
    return func(ctx context.Context, form interface{}) (result interface{}, err error) {
        req := form.(LoginForm)
        ret, err := svc.CheckPassword(ctx, req.Username, req.Password)
        return LoginResult{Ret: ret, Err: err}, nil
    }
}

同样的,Endpoint 层也未直接处理 LoginRequestLoginResponse,而是直接处理 LoginFormLoginResult,使用 DecodeLoginRequest 函数将 LoginRequest 转换成 LoginForm,然后使用 EncodeLoginResponseLoginResult 转换为 LoginResponse
转换函数的具体定义如下所示(这两个函数会放在在 Transport 层):

./user/transports.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package user

import "context"

func DecodeLoginRequest(ctx context.Context, r interface{}) (interface{}, error) {
    req := r.(*pb.LoginRequest)
    return LoginForm{
        Username: req.Username,
        Password: req.Password,
    }, nil
}

func EncoLoginResponse(_ context.Context, r interface{}) (interface{}, error) {
    resp := r.(LoginResult)
    retStr := "fail"
    if resp.Ret {
        retStr = "success"
    }
    errStr := ""
    if resp.Err != nil {
        errStr = resp.Err.Error()
    }
    return &pb.LoginResponse{
        Ret: retStr,
        Err: errStr,
    }, nil
}

这样做的好处有两点:一是 LoginRequest 和 LoginResponse 是通过 gRPC 生成的,属于 Transport 层,Endpoint 层不需要感知,后续如果技术选型变化了,需要将 gRPC 替换成 Thrift 时就可以只处理 Transport 层的变化,让变更最小化(如下图);
二是后端业务处理时的属性类型和返回给前端的数据属性类型不一定完全一样,比如上述代码中 LoginResult 中的 Ret 是 bool 类型,而返回给前端的 LoginResponse 中 Ret 是 string 类型,从而实现兼容性。

Gokit分层示意图 Go-kit 分层示意图

如上图所示,Service 在最内层,Endpoint 在中间,Transport 在最外侧,所以 Transport 是最容易进行变更的一层,越往内层逻辑应该越贴近领域逻辑。

定义 Middleware,提供限流和日志中间件

如上文所说,Endpoint 层可以添加诸如日志、限流、熔断、链路追踪和服务监控等能力,这里我们就以限流为例,讲述如何为 Endpoint 添加额外能力。

./user/limit.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package user

import (
    "context"
    "errors"

    "github.com/go-kit/kit/endpoint"
    "golang.org/x/time/rate"
)

var ErrLimitExceed = errors.New("Rate limit excedd!")

func NewTokenBucketLimitterWithBuildIn(bkt *rate.Limiter) endpoint.Middleware {
    return func(next endpoint.Endpoint) endpoint.Endpoint {
        return func(ctx context.Context, request interface{}) (response interface{}, err error) {
            if !bkt.Allow() {
                return nil, ErrLimitExceed
            }

            return next(ctx, request)
        }
    }
}

// 使用时的代码实例    
// ratebucket := rate.NewLimiter(rate.Every(time.Second * 100), 1000)
// endpoint = user.NewTokenBucketLimitterWithBuildIn(ratebucket)(endpoint)

上述代码使用了 x/time/rate 来进行限流,具体则使用了令牌桶限流策略,其中 NewLimiter 函数会生成 Limiter 限流器,有两个参数,一个表示每秒生成多少令牌,另一个表示允许缓存多少令牌。

当请求通过 Endpoint 时,就会被该 Middleware 拦截,然后调用 Limiter 的 Allow 函数,如果当前还存有令牌,则消耗一枚令牌,放行请求,返回 true;如果不存在,则阻拦请求,返回 false。
有关令牌桶的限流策略,如果你感兴趣的话,可以自行搜索学习。

除了限流外,Endpoint 的 Middleware 还可以和 Hystrix 结合提供熔断能力,和 ZipkinTracer 结合提供服务链路追踪能力、自定义接口调用统计指标或打印日志。

由此可以看出 Endpoint 的 Middleware 确实是 Go-kit 的核心,众多服务治理相关中间件的集成都使用该层进行封装,提供统一的类似于切面的能力供开发者使用,免去了开发者处理架构集成的烦恼。

定义 Transport,提供网络传输能力

下面,我们来具体看一下 Transport 层的实现。
我们要给出 proto 文件中 UserServiceServer 的具体实现,也就是下述代码中的 grpcServer 结构体,它实现了 CheckPassword 方法,其中调用了其成员变量 checkPassword 的 ServeGRPC 方法。

./user/transports.go:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package user

import (
    "context"

    "example.com/user/pb"
    "github.com/go-kit/kit/transport/grpc"
)

func DecodeLoginRequest(ctx context.Context, r interface{}) (interface{}, error) {
    req := r.(*pb.LoginRequest)
    return LoginForm{
        Username: req.Username,
        Password: req.Password,
    }, nil
}

func EncodeLoginResponse(_ context.Context, r interface{}) (interface{}, error) {
    resp := r.(LoginResult)
    retStr := "fail"
    if resp.Ret {
        retStr = "success"
    }
    errStr := ""
    if resp.Err != nil {
        errStr = resp.Err.Error()
    }
    return &pb.LoginResponse{
        Ret: retStr,
        Err: errStr,
    }, nil
}

type grpcServer struct {
    checkPassword grpc.Handler
    pb.UnimplementedUserServiceServer
}

func (s *grpcServer) CheckPassword(ctx context.Context, r *pb.LoginRequest) (*pb.LoginResponse, error) {
    _, resp, err := s.checkPassword.ServeGRPC(ctx, r)
    if err != nil {
        return nil, err
    }
    return resp.(*pb.LoginResponse), nil
}

func NewUserServer(ctx context.Context, endpoints Endpoints) pb.UserServiceServer {
    return &grpcServer{
        checkPassword: grpc.NewServer(
            endpoints.UserEndpoint,
            DecodeLoginRequest,
            EncodeLoginResponse,
        ),
    }
}

checkPassword 的类型是 grpc.Handler,在 NewUserServer 方法中我们可以看到调用 grpc.NewServer 将其创建出来,需要传入 Endpoint 和编解码函数。
这也正对应上文原理解析中 Go-kit 过程调用示意图的内容。

启动服务端,注册RPC服务

我们来将上述部分组合起来,建立真正的网络服务端,并注册对应的 RPC 服务。具体代码如下所示:

./server.go:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main

import (
    "context"
    "flag"
    "net"
    "os"
    "time"

    "example.com/user/pb"
    "example.com/user/user"
    "github.com/go-kit/kit/log"
    "golang.org/x/time/rate"
    "google.golang.org/grpc"
)

func main() {
    flag.Parse()

    var logger log.Logger
    {
        logger = log.NewLogfmtLogger(os.Stderr)
        logger = log.With(logger, "ts", log.DefaultTimestampUTC)
        logger = log.With(logger, "caller", log.DefaultCaller)
    }

    ctx := context.Background()
    // 建立 service
    var svc user.UserService
    svc = user.UserServiceImpl{}

    // 建立 endpoint
    endpoint := user.MakeUserEndpoint(svc)
    // 构造限流中间件
    ratebucket := rate.NewLimiter(rate.Every(time.Second*1), 100)
    endpoint = user.NewTokenBucketLimitterWithBuildIn(ratebucket)(endpoint)

    endpts := user.Endpoints{
        UserEndpoint: endpoint,
    }
    // 使用 transport 构造 UserServiceServer
    handler := user.NewUserServer(ctx, endpts)
    // 监听端口,建立 gRPC 网络服务器,注册 RPC 服务
    ls, _ := net.Listen("tcp", "127.0.0.1:8080")
    gRPCServer := grpc.NewServer()
    pb.RegisterUserServiceServer(gRPCServer, handler)
    gRPCServer.Serve(ls)
}

在 main 函数中,首先创建 UserService
然后调用 MakeUserEndpoint 函数创建 Endpoint,并使用限流中间件封装;
接着调用 user 的 NewUserServer 方法,传入对应的 Endpoint,得到对应的 gRPC 处理器(Handler);
最后监听网络端口,创建 gRPC 服务端,并注册对应的处理器,即可启动 gRPC 服务端。

创建客户端与服务器通信

./client.go:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
    "context"
    "fmt"

    "example.com/user/pb"
    "google.golang.org/grpc"
)

func main() {
    serviceAddress := "127.0.0.1:8080"
    conn, err := grpc.Dial(serviceAddress, grpc.WithInsecure())
    if err != nil {
        panic("connect error")
    }
    defer conn.Close()
    userClient := pb.NewUserServiceClient(conn)
    stringReq := &pb.LoginRequest{Username: "admin", Password: "admin"}
    reply, _ := userClient.CheckPassword(context.Background(), stringReq)
    fmt.Printf("CheckPassword ret is %s\n", reply.Ret)
}

添加 logging Middleware

./user/logging.go:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package user

import (
    "context"
    "time"

    "github.com/go-kit/kit/log"
)

type ServiceMiddleware func(service UserService) UserService

type loggingMiddleware struct {
    UserService
    logger log.Logger
}

func LoggingMiddleware(logger log.Logger) ServiceMiddleware {
    return func(next UserService) UserService {
        return loggingMiddleware{next, logger}
    }
}

func (mw loggingMiddleware) CheckPassword(ctx context.Context, username, password string) (ret bool, err error) {

    defer func(begin time.Time) {
        mw.logger.Log(
            "function", "CheckPassword",
            "username", username,
            "result", ret,
            "took", time.Since(begin),
        )
    }(time.Now())

    ret, err = mw.UserService.CheckPassword(ctx, username, password)
    return ret, err
}

修改 ./server.go:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package main

import (
    "context"
    "flag"
    "net"
    "os"
    "time"

    "example.com/user/pb"
    "example.com/user/user"
    "github.com/go-kit/kit/log"
    "golang.org/x/time/rate"
    "google.golang.org/grpc"
)

func main() {
    flag.Parse()

    var logger log.Logger
    {
        logger = log.NewLogfmtLogger(os.Stderr)
        logger = log.With(logger, "ts", log.DefaultTimestampUTC)
        logger = log.With(logger, "caller", log.DefaultCaller)
    }

    ctx := context.Background()
    // 建立 service
    var svc user.UserService
    svc = user.UserServiceImpl{}

    svc = user.LoggingMiddleware(logger)(svc)

    // 建立 endpoint
    endpoint := user.MakeUserEndpoint(svc)
    // 构造限流中间件
    ratebucket := rate.NewLimiter(rate.Every(time.Second*1), 100)
    endpoint = user.NewTokenBucketLimitterWithBuildIn(ratebucket)(endpoint)

    endpts := user.Endpoints{
        UserEndpoint: endpoint,
    }
    // 使用 transport 构造 UserServiceServer
    handler := user.NewUserServer(ctx, endpts)
    // 监听端口,建立 gRPC 网络服务器,注册 RPC 服务
    ls, _ := net.Listen("tcp", "127.0.0.1:8080")
    gRPCServer := grpc.NewServer()
    pb.RegisterUserServiceServer(gRPCServer, handler)
    gRPCServer.Serve(ls)
}

重启 server,运行 go run client.go

server输出:

1
ts=2021-10-28T07:03:34.360416651Z caller=logging.go:26 function=CheckPassword username=admin result=true took=135ns