go-micro client 客户端

  go-micro 支持很多通信协议:http、tcp、grpc等,支持的编码方式也很多有json、protobuf、bytes、jsonrpc等。也可以根据自己的需要实现通信协议和编码方式。go-micro 默认的通信协议是http,默认的编码方式是protobuf。


// Client is the interface used to make requests to services.
// It supports Request/Response via Transport and Publishing via the Broker.
// It also supports bidiectional streaming of requests.
type Client interface {
        Init(...Option) error
        Options() Options
        NewMessage(topic string, msg interface{}, opts ...MessageOption) Message
        NewRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
        Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
        Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
        Publish(ctx context.Context, msg Message, opts ...PublishOption) error
        String() string

// Message is the interface for publishing asynchronously
type Message interface {
        Topic() string
        Payload() interface{}
        ContentType() string

// Request is the interface for a synchronous request used by Call or Stream
type Request interface {
        Service() string
        Method() string
        ContentType() string
        Request() interface{}
        // indicates whether the request will be a streaming one rather than unary
        Stream() bool

// Stream is the inteface for a bidirectional synchronous stream
type Stream interface {
        Context() context.Context
        Request() Request
        Send(interface{}) error
        Recv(interface{}) error
        Error() error
        Close() error

// Option used by the Client
type Option func(*Options)

// CallOption used by Call or Stream
type CallOption func(*CallOptions)

// PublishOption used by Publish
type PublishOption func(*PublishOptions)

// MessageOption used by NewMessage
type MessageOption func(*MessageOptions)

// RequestOption used by NewRequest
type RequestOption func(*RequestOptions)



func newPool(size int, ttl time.Duration) *pool {
        return &pool{
                size:  size,
                ttl:   int64(ttl.Seconds()),
                conns: make(map[string][]*poolConn),

// NoOp the Close since we manage it
func (p *poolConn) Close() error {
        return nil

func (p *pool) getConn(addr string, tr transport.Transport, opts ...transport.DialOption) (*poolConn, error) {
        conns := p.conns[addr]
        now := time.Now().Unix()

        // while we have conns check age and then return one
        // otherwise we'll create a new conn
        for len(conns) > 0 {
                conn := conns[len(conns)-1]
                conns = conns[:len(conns)-1]
                p.conns[addr] = conns

                // if conn is old kill it and move on
                if d := now - conn.created; d > p.ttl {

                // we got a good conn, lets unlock and return it

                return conn, nil


        // create new conn
        c, err := tr.Dial(addr, opts...)
        if err != nil {
                return nil, err
        return &poolConn{c, time.Now().Unix()}, nil

func (p *pool) release(addr string, conn *poolConn, err error) {
        // don't store the conn if it has errored
        if err != nil {

        // otherwise put it back for reuse
        conns := p.conns[addr]
        if len(conns) >= p.size {
        p.conns[addr] = append(conns, conn)


type rpcRequest struct {
        service     string
        method      string
        contentType string
        request     interface{}
        opts        RequestOptions

func newRequest(service, method string, request interface{}, contentType string, reqOpts ...RequestOption) Request {
        var opts RequestOptions

        for _, o := range reqOpts {

        // set the content-type specified
        if len(opts.ContentType) > 0 {
                contentType = opts.ContentType

        return &rpcRequest{
                service:     service,
                method:      method,
                request:     request,
                contentType: contentType,
                opts:        opts,



func (r *rpcClient) Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error {
        // make a copy of call opts
        callOpts := r.opts.CallOptions
        for _, opt := range opts {

        next, err := r.next(request, callOpts)
        if err != nil {
                return err

        // check if we already have a deadline
        d, ok := ctx.Deadline()
        if !ok {
                // no deadline so we create a new one
                ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout)
        } else {
                // got a deadline so no need to setup context
                // but we need to set the timeout we pass along
                opt := WithRequestTimeout(d.Sub(time.Now()))

        // should we noop right here?
        select {
        case <-ctx.Done():
                return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)

        // make copy of call method
        rcall := r.call

        // wrap the call in reverse
        for i := len(callOpts.CallWrappers); i > 0; i-- {
                rcall = callOpts.CallWrappers[i-1](rcall)

        // return errors.New("go.micro.client", "request timeout", 408)
        call := func(i int) error {
                // call backoff first. Someone may want an initial start delay
                t, err := callOpts.Backoff(ctx, request, i)
                if err != nil {
                        return errors.InternalServerError("go.micro.client", "backoff error: %v", err.Error())

                // only sleep if greater than 0
                if t.Seconds() > 0 {

                // select next node
                node, err := next()
                if err != nil && err == selector.ErrNotFound {
                        return errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
                } else if err != nil {
                        return errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())

                // set the address
                address := node.Address
                if node.Port > 0 {
                        address = fmt.Sprintf("%s:%d", address, node.Port)

                // make the call
                err = rcall(ctx, address, request, response, callOpts)
                r.opts.Selector.Mark(request.Service(), node, err)
                return err

        ch := make(chan error, callOpts.Retries)
        var gerr error

        for i := 0; i <= callOpts.Retries; i++ {
                go func(i int) {
                        ch <- call(i)

                select {
                case <-ctx.Done():
                        return errors.New("go.micro.client", fmt.Sprintf("call timeout: %v", ctx.Err()), 408)
                case err := <-ch:
                        // if the call succeeded lets bail early
                        if err == nil {
                                return nil

                        retry, rerr := callOpts.Retry(ctx, request, i, err)
                        if rerr != nil {
                                return rerr

                        if !retry {
                                return err

                        gerr = err

        return gerr