go grpc流式和非流式的例子

参考grpc官方: https://grpc.io/docs/quickstart/go.html

或官方中文翻译: http://doc.oschina.net/grpc?t=60133

安装proto buf 3、protoc编译器

1-安装grpc(需要红杏出墙):

# 因为本朝网络的安全原因,需要提前export http_proxy https_proxy 来红杏出墙到golang.org。 比如执行linux命令 export http{,s}_proxy=红杏出墙IP:红杏出墙端口号
执行:go get -u google.golang.org/grpc

2-安装

下载protoc:

https://github.com/google/protobuf/releases下载预编译的“protoc编译器”,用于生成gRPC服务代码。(文件名为:protoc-<版本>-<平台>.zip)

下载后,解压zip文件,并将protoc二进制文件所在bin目录添加到PATH环境变量中

安装protoc的go插件:

go get -u github.com/golang/protobuf/protoc-gen-go

go get -u github.com/golang/protobuf //解决go build报错:” undefined: proto.ProtoPackageIsVersion3 “ 。高版本protoc搭配低版本protobuf就会出现这样的问题

$GOPATH/bin加入到PATH环境变量中

非流式RPC:

1-使用proto buf 3的IDL(接口定义语言)语法编写 .proto 文件. 关于.proto文件的语法规则参考本博protocol buffer的整理

我的.proto文件路径是:$GOPATH/src/nonstream/non_stream.proto

 1 syntax = "proto3"; //必须指定,否则就认为是proto2版本
 2 
 3 package nonstream;  //包名,生成go代码的时候,protoc会用到这个包名,生成的.pb.go文件中有package nonstream,后面的服务端、客户端go源文件时需要导入该包名
 4 
 5 //定义服务 ( 风格:服务名和rpc方法名是大驼峰风格 )
 6 service EchoIface {
 7      //自己的GO源码的类需要实现接口Echo函数 
 8      rpc Echo (Ping) returns (Pong) {} 
 9 }
10 
11 // 定义消息 (风格: 消息名用大驼峰风格)
12 message Ping {
13   string request = 1;   //(风格: 字段名用小写下划线风格,如果是枚举字段名则是大写下划线风格)
14 }
15 
16 message Pong {
17   string reply = 1;
18 } 

2- 使用protoc生成go代码:

protoc命令的语法: protoc -I /PATH/TO/.proto_DIR/   /PATH/TO/TARGET.proto --go_out=plugins=grpc:/PATH/TO/OUTPUT_DIR  #不指定-I表示从当前工作目录寻找.proto文件。-I一般是指向项目根目录。
#本例编译.proto文件使用下面的命令(需要cd到.proto文件所在目录,再执行)
protoc non_stream.proto --go_out=plugins=grpc:. #执行完这条命令后,就可以在当前目录下看到non_stream.pb.go源码文件了。 命令中:后面的.表示当前目录,加入要生成代码到/tmp目录,则使用选项--go-out=plugins=grpc:/tmp
# 题外话: --go-out 选项中有无plugins=grpc的区别:
1 protoc --go_out=. ./helloworld.proto // -–go_out=后直接跟路径(点号表示当前路径)。只生成序列化/反序列化代码文件,不需要rpc通讯。 2 protoc --go_out=plugins=grpc:. ./helloworld.proto // --go_out=后跟了grpc插件和目录。生成序列化反序列化代码 和 客户端/服务端通讯代码。

3-编写非流式服务端go代码:

 1 package main
 2 
 3 import (
 4     "context"
 5     "log"
 6     "net"
 7     "strings"
 8 
 9     pb "nonstream"  //刚才.proto文件定义的包名
10     "google.golang.org/grpc"
11 )
12 
13 type server struct{}
14 
15 /*server结构体需要实现EchoIface接口的Echo方法, 注意这里的参数多了context,返回值多了error。这可以查看protoc生成的.pb.go文件就发现有下面的定义:
16 type EchoIfaceServer interface {
17     Echo(context.Context, *Ping) (*Pong, error)
18 }
19 */
20 func (s *server) Echo(ctx context.Context, in *pb.Ping) (*pb.Pong, error) {
21     log.Printf("Received from client: %s", in.Request)
22     u := strings.ToUpper(in.Request)  //注意.proto文件中字段是小写下划线,这里变成了大驼峰了
23     return &pb.Pong{Reply: u + "..." + in.Request + "..."}, nil
24 }
25 
26 func main() {
27     lis, err := net.Listen("tcp", ":5555") //1.指定监听地址:端口号
28     if err != nil {
29         log.Fatalf("failed to listen: %v", err)
30     }
31 
32     s := grpc.NewServer() //2.新建gRPC实例
33     pb.RegisterEchoIfaceServer(s, &server{})  //3.在gRPC服务器注册我们的服务实现。参数2是接口(满足服务定义的方法)。在.pb.go文件中搜索Register关键字即可找到这个函数签名
34     if err := s.Serve(lis); err != nil {   //4.Serve()阻塞等待
35         log.Fatalf("failed to serve: %v", err)
36     }
37 }

4-编写非流式客户端go代码:

 1 package main
 2 
 3 import (
 4     "context"
 5     "log"
 6     "time"
 7     "fmt"
 8 
 9     pb "nonstream"
10     "google.golang.org/grpc"
11 )
12 
13 
14 func main() {
15     //1.建立连接
16     conn, err := grpc.Dial("127.0.0.1:5555", grpc.WithInsecure())  //如果需要授权认证或tls加密,则可以使用DialOptions来设置grpc.Dial
17     if err != nil {
18         log.Fatalf("grpc.Dial: %v", err)
19     }
20     defer conn.Close()
21 
22     c := pb.NewEchoIfaceClient(conn) //2.新建一个客户端stub来执行rpc方法
23 
24     ctx, cancel := context.WithTimeout(context.Background(), time.Second)  //超时1秒执行cancel
25     defer cancel()
26 
27     r, err := c.Echo(ctx, &pb.Ping{Request: "yahoo"})  //3.调用rpc方法
28     if err != nil {
29         log.Fatalf("c.Echo: %v", err)
30     }
31 
32     fmt.Printf("echo from server: %s\n", r.Reply)
33 }

---------------------------------- 分割线 ------------------------------

单向流式RPC -- 服务流式响应

流式的好处之一是可以实现异步

1- 编写.proto文件:

 1 // 这个文件的路径: $GOPATH/src/stream/stream.proto 。包名是stream,后面的服务端/客户端代码会导入这个包
 2 syntax = "proto3";
 3 
 4 package stream;
 5 
 6 service EchoIface {
 7       rpc Echo(Ping) returns (stream Pong) {}  //server-side-stream
 8 }
 9 
10 message Ping {
11       string request = 1;
12 }
13 
14 message Pong {
15       string reply = 1;
16 }

2-使用protoc生成go代码(此处省略,具体参考上面protoc的例子)

3-服务端代码:

package main

import (
    "log"
    "net"
    "strconv"

    "google.golang.org/grpc"
    pb "stream"
)

type server struct{}

/* 从.pb.go文件中,我们发现了下面的定义:
      type EchoIfaceServer interface {
           Echo(*Ping, EchoIface_EchoServer) error
      }
   而参数EchoIface_EchoServer的定义为(有Send函数):
      type EchoIface_EchoServer interface {
          Send(*Pong) error
          grpc.ServerStream
   }
*/
func (s *server) Echo(in *pb.Ping, es pb.EchoIface_EchoServer) error {
    n := in.Request
    for i := 0; i < 10; i++ {  //发10次Hello
        es.Send(&pb.Pong{Reply: "Hello " + n + ":" + strconv.Itoa(i)})
    }
    return nil
}

func main() {
    conn, err := net.Listen("tcp", ":6666")
    if err != nil {
        log.Fatalf("net.Listen: %v", err)
    }
    svr := grpc.NewServer()

    pb.RegisterEchoIfaceServer(svr, &server{})
    if err := svr.Serve(conn); err != nil {
        log.Fatalf("s.Serve(): %v", err)
    }
}

4-客户端代码:

 1 package main
 2 
 3 import (
 4     "context"
 5     "io"
 6     "log"
 7 
 8     "google.golang.org/grpc"
 9     pb "stream"
10 )
11 
12 func main() {
13     conn, err := grpc.Dial("127.0.0.1:6666", grpc.WithInsecure())
14     if err != nil {
15         log.Fatalf("grpc.Dial: %v", err)
16     }
17     defer conn.Close()
18 
19     c := pb.NewEchoIfaceClient(conn)
20 
21     /* 从protoc生成的.pb.go文件中,我们发现如下定义:
22                type EchoIfaceClient interface {
23                       Echo(ctx context.Context, in *Ping, opts ...grpc.CallOption) (EchoIface_EchoClient, error)
24                }
25                而上面接口函数的第一个返回值在.pb.go文件中的的定义是:
26                        type EchoIface_EchoClient interface {
27                          Recv() (*Pong, error)
28                         grpc.ClientStream
29                    }
30     */
31     stream, err := c.Echo(context.Background(), &pb.Ping{Request: "google"})
32     if err != nil {
33         log.Fatalf("c.Echo: %v", err)
34     }
35 
36     for {
37         pong, err := stream.Recv()
38         if err == io.EOF {
39             break
40         }
41 
42         if err != nil {
43             log.Printf("stream.Recv: %v", err)
44         }
45 
46         log.Printf("echo from server: %s", pong.Reply)
47     }
48 
49 }

双向流式RPC,客户端流式RPC

无非就是:

1-编写.proto文件的服务定义时,在需要变成流式的参数或返回值前加stream修饰。

2-对.pb.go文件执行正则搜索,找到函数/方法的定义。再根据定义写go代码

但是要注意一些差异(细节代码参考):

1-客户端流式RPC (客户端一次或多次发数据到服务端,服务端响应一次):

客户端: 使用 流 的Send()方法 发送请求,发送完后使用 流 的CloseAndRecv方法让gRPC服务端知道客户端已经完成请求并且期望获得一个响应。如果CloseAndRecv()返回的err不为nil,那么返回的第一个值就是一个有效的服务端响应。

服务端: 使用 流 的Recv() 方法接收客户端消息,并且用 流 的SendAndClose() 方法返回它的单个响应。

2-双向流式RPC中(客户端:等待接收,并同时发送一次或多次数据到服务端,最后一次发送结束标识--CloseSend()方法。 服务端:来一个,处理一个,响应一个,不用等待客户端发送完成才处理。):

客户端:一个goroutine用来执行 流的 Recv()方法 等待服务端发来的流式响应(阻塞等待状态)。另一个goroutine用来将一次或多次请求通过流的Send()方法发送到服务端,发完后,使用CloseSend()结束发送。

服务端: 服务端用 流 的Recv()方法接收客户端的请求流,接收一个、处理一个、发送一个响应,直到出错,或因为客户端CloseSend()导致的服务端接收完请求。

更多细节,结合源代码https://github.com/grpc/grpc-go/blob/master/examples/route_guide和官方文档中译版看

不需要入参结构体或返回结构体的情况:

方法1: 可以在.proto文件中定义空的消息(也就是空结构体,这样做的好处是以后可以给结构体添加字段, 接口函数签名不变).

message MyRespone {
}

务必注意,在源代码编写的时候,不能直接直接返回空指针, 否则会报"rpc error: code = Internal desc = grpc: error while marshaling: proto: Marshal called with nil". 意思大概就是不能序列化nil指针

if err != nil {
       return nil, err  //错误! 正确写法 return &pb.MyRespone{},err
}

方法2: 使用protobuf的内置空类型.

.proto文件需要类似这样定义(以上面非流式的例子做修改成不需要返回结构体为例子)

 syntax = "proto3"; 
 import "google/protobuf/empty.proto";  //这个就是protobuf内置的空类型
 package nonstream; 
 
 service EchoIface {
      rpc Echo (Ping) returns (google.protobuf.Empty) {} 
 }
 
 message Ping {
   string request = 1;
 }

源码文件的编写需要这样(以上面非流式的例子做修改):

import "github.com/golang/protobuf/ptypes/empty"
 ...
 func (s *server) Echo(ctx context.Context, in *pb.Ping) (*empty.Empty, error)