diff --git a/.travis.yml b/.travis.yml index 83fc945..16b264e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,7 +15,7 @@ install: - tar xf protobuf-$PROTOBUF_VERSION.tar.gz - ( cd protobuf-$PROTOBUF_VERSION && ./configure && make -j4 && sudo make install && sudo ldconfig ) - git clone https://github.com/google/snappy - - ( cd snappy && sh ./autogen.sh && ./configure && make -j4 && sudo make install && sudo ldconfig ) + - ( cd snappy && git checkout 1.1.4 && sh ./autogen.sh && ./configure && make -j4 && sudo make install && sudo ldconfig ) - sudo apt-get install zlib1g-dev - git clone https://github.com/google/googletest.git script: diff --git a/go/README.md b/go/README.md new file mode 100644 index 0000000..1a6a637 --- /dev/null +++ b/go/README.md @@ -0,0 +1,36 @@ +# sofa-pbrpc-go + +Go版本sofa-pbrpc的客户端和服务端实现。 + + +# 安装 + +`$ go get github.com/baidu/sofa-pbrpc/go` + +# 依赖 + +go1.7+ + +# Quick Start + +``` +$ go get -u github.com/baidu/sofa-pbrpc/go/examples/echo/echo_server +$ go get -u github.com/baidu/sofa-pbrpc/go/examples/echo/echo_client +$ echo_server & +$ echo_client +``` + +# 生成示例代码 + +1. 安装 protoc +2. 安装protoc插件 + +``` +$ go get -u github.com/baidu/sofa-pbrpc/go/protoc-gen-go +$ cd $GOPATH/github.com/baidu/sofa-pbrpc/go/examples/echo +$ ./compile_proto.sh +``` + +# 功能说明 + +- 对压缩的支持是受限的,目前客户端和服务端可以解析来自于c++的压缩请求或返回,但go没有明确的地方可以主动发起压缩请求 diff --git a/go/client.go b/go/client.go new file mode 100644 index 0000000..1b68318 --- /dev/null +++ b/go/client.go @@ -0,0 +1,46 @@ +package pbrpc + +import ( + "context" + "net" + "net/rpc" +) + +// ClientConn represents a client connection to an RPC server. +type ClientConn struct { + c *rpc.Client +} + +// Close tears down the ClientConn and all underlying connections. +func (c *ClientConn) Close() error { + return c.c.Close() +} + +// Invoke sends the RPC request on the wire and returns after response is received. Invoke is called by generated code. Also users can call Invoke directly when it is really needed in their use cases. +func Invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn) error { + call := cc.c.Go(method, args, reply, make(chan *rpc.Call, 1)) + select { + case <-call.Done: + return call.Error + case <-ctx.Done(): + return ctx.Err() + } +} + +// Dial creates a client connection to the given target. +// The provided Context must be non-nil. If the context expires before the connection is complete, an error is returned. Once successfully connected, any expiration of the context will not affect the connection. +func Dial(ctx context.Context, addr string) (*ClientConn, error) { + dialer := new(net.Dialer) + conn, err := dialer.DialContext(ctx, "tcp", addr) + if err != nil { + return nil, err + } + + return NewClientConn(conn), nil +} + +// NewClientConn creates a ClientConn on a given connection +func NewClientConn(conn net.Conn) *ClientConn { + c := rpc.NewClientWithCodec(newClientCodec(conn)) + return &ClientConn{c: c} +} diff --git a/go/codec.go b/go/codec.go new file mode 100644 index 0000000..5de0eb3 --- /dev/null +++ b/go/codec.go @@ -0,0 +1,300 @@ +package pbrpc + +import ( + "bufio" + "bytes" + "encoding/binary" + "errors" + "fmt" + "io" + "io/ioutil" + "net/rpc" + "sync" + + meta "github.com/baidu/sofa-pbrpc/go/rpc_meta" + "github.com/golang/protobuf/proto" +) + +var ( + MagicStr = [4]byte{'S', 'O', 'F', 'A'} + ErrBadMagic = errors.New("bad magic number") + ErrBadMessageSize = errors.New("message size exceed") +) + +const ( + maxMessageSize = 64<<20 + 24 // 64M+sizeof(rpcHeader) +) + +type rpcHeader struct { + Magic [4]byte + X struct { + MetaSize int32 + DataSize int64 + TotalSize int64 + } +} + +// codec is a common rpc codec for implementing rpc.ClientCodec and rpc.ServerCodec +type codec struct { + conn io.ReadWriteCloser + w *bufio.Writer + r *bufio.Reader + + // temporary work space + h rpcHeader + m *meta.RpcMeta +} + +func newCodec(conn io.ReadWriteCloser) *codec { + return &codec{ + conn: conn, + w: bufio.NewWriter(conn), + r: bufio.NewReader(conn), + } +} + +// Write send rpc header and body to peer +func (c *codec) Write(meta *meta.RpcMeta, x interface{}, cw compressWriter) error { + buffer := new(bytes.Buffer) + metasize := proto.Size(meta) + h := rpcHeader{ + Magic: MagicStr, + } + h.X.MetaSize = int32(metasize) + h.X.DataSize = 0 + h.X.TotalSize = int64(metasize) + + // write header, for placeholder, we will change DataSize and TotalSize later if no error in meta + binary.Write(buffer, binary.LittleEndian, &h) + + // write meta + buf, err := proto.Marshal(meta) + if err != nil { + return err + } + buffer.Write(buf) + + // skip write body if error + if meta.GetFailed() { + buffer.WriteTo(c.w) + return c.w.Flush() + } + + // write body + msg := x.(proto.Message) + buf, err = proto.Marshal(msg) + if err != nil { + return err + } + // record the offset before we write data + len1 := buffer.Len() + if cw != nil { + wc, err := cw(buffer) + if err != nil { + return err + } + wc.Write(buf) + wc.Close() + } else { + buffer.Write(buf) + } + + h.X.DataSize = int64(buffer.Len() - len1) + h.X.TotalSize = int64(metasize) + h.X.DataSize + // write new header + w := bytes.NewBuffer(buffer.Bytes()[:0]) + binary.Write(w, binary.LittleEndian, &h) + + buffer.WriteTo(c.w) + return c.w.Flush() +} + +func mustDecode(r io.Reader, x interface{}) { + err := binary.Read(r, binary.LittleEndian, x) + if err != nil { + panic(err) + } +} + +func mustReadFull(r io.Reader, buf []byte) { + _, err := io.ReadFull(r, buf) + if err != nil { + panic(err) + } +} + +// ReadHeader read rpc header from peer +func (c *codec) ReadHeader(meta *meta.RpcMeta) (err error) { + defer func() { + catch := recover() + if catch != nil { + if e, ok := catch.(error); ok { + err = e + } else { + err = fmt.Errorf("%v", catch) + } + } + }() + + // decode rpc header + mustDecode(c.r, &c.h.Magic) + if c.h.Magic != MagicStr { + return ErrBadMagic + } + mustDecode(c.r, &c.h.X) + + if c.h.X.TotalSize > maxMessageSize { + return ErrBadMessageSize + } + + // decode rpc meta + buf := make([]byte, c.h.X.MetaSize) + mustReadFull(c.r, buf) + err = proto.Unmarshal(buf, meta) + return + +} + +// ReadBody read rpc body from peer which corresponding to last rpc header +func (c *codec) ReadBody(x interface{}, cr compressReader) error { + if x == nil { + _, err := c.r.Discard(int(c.h.X.DataSize)) + return err + } + + msg := x.(proto.Message) + + var buf []byte + if cr != nil { + // construct a compress reader + rc, err := cr(io.LimitReader(c.r, c.h.X.DataSize)) + if err != nil { + return err + } + // read all uncompressed data to buf + buf, err = ioutil.ReadAll(rc) + if err != nil { + rc.Close() + return err + } + rc.Close() + } else { + buf = make([]byte, c.h.X.DataSize) + _, err := io.ReadFull(c.r, buf) + if err != nil { + return err + } + } + + return proto.Unmarshal(buf, msg) +} + +func (c *codec) Close() error { + return c.conn.Close() +} + +type clientCodec struct { + c *codec + + // temporary work space + m meta.RpcMeta +} + +// newClientCodec returns a new rpc.ClientCodec using sofa-pbrpc on conn. +func newClientCodec(conn io.ReadWriteCloser) rpc.ClientCodec { + return &clientCodec{c: newCodec(conn)} +} + +func (c *clientCodec) WriteRequest(req *rpc.Request, x interface{}) error { + m := &meta.RpcMeta{ + Type: meta.RpcMeta_REQUEST.Enum(), + SequenceId: proto.Uint64(req.Seq), + Method: proto.String(req.ServiceMethod), + } + // TODO support request compress type + return c.c.Write(m, x, nil) +} + +func (c *clientCodec) ReadResponseHeader(resp *rpc.Response) error { + err := c.c.ReadHeader(&c.m) + if err != nil { + return err + } + resp.Seq = c.m.GetSequenceId() + if c.m.Failed != nil && c.m.GetFailed() { + resp.Error = fmt.Sprintf("code:%d, reason:%s", c.m.GetErrorCode(), c.m.GetReason()) + } + + return nil +} + +func (c *clientCodec) ReadResponseBody(x interface{}) error { + return c.c.ReadBody(x, newCompressReader(c.m.GetCompressType())) +} + +func (c *clientCodec) Close() error { + return c.c.Close() +} + +type serverCodec struct { + c *codec + + // temporary work space + m meta.RpcMeta + + mutex sync.Mutex + pending map[uint64]meta.CompressType +} + +// newServerCodec returns a new rpc.ServerCodec using sofa-pbrpc on conn. +func newServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec { + return &serverCodec{ + c: newCodec(conn), + pending: make(map[uint64]meta.CompressType), + } +} + +func (s *serverCodec) ReadRequestHeader(req *rpc.Request) error { + err := s.c.ReadHeader(&s.m) + if err != nil { + return err + } + req.Seq = s.m.GetSequenceId() + req.ServiceMethod = s.m.GetMethod() + + s.mutex.Lock() + s.pending[req.Seq] = s.m.GetCompressType() + s.mutex.Unlock() + return nil +} + +func (s *serverCodec) ReadRequestBody(x interface{}) error { + cr := newCompressReader(s.m.GetCompressType()) + return s.c.ReadBody(x, cr) +} + +func (s *serverCodec) WriteResponse(resp *rpc.Response, x interface{}) error { + s.mutex.Lock() + compressType := s.pending[resp.Seq] + delete(s.pending, resp.Seq) + s.mutex.Unlock() + + meta := &meta.RpcMeta{ + Type: meta.RpcMeta_RESPONSE.Enum(), + SequenceId: proto.Uint64(resp.Seq), + Failed: proto.Bool(false), + CompressType: compressType.Enum(), + } + + if resp.Error != "" { + meta.Failed = proto.Bool(true) + meta.ErrorCode = proto.Int32(400) + meta.Reason = proto.String(resp.Error) + } + + return s.c.Write(meta, x, newCompressWriter(compressType)) +} + +func (s *serverCodec) Close() error { + return s.c.Close() +} diff --git a/go/codec_test.go b/go/codec_test.go new file mode 100644 index 0000000..ee3ecec --- /dev/null +++ b/go/codec_test.go @@ -0,0 +1,102 @@ +package pbrpc + +import ( + "bytes" + "io" + "os" + "path/filepath" + "reflect" + "testing" + + meta "github.com/baidu/sofa-pbrpc/go/rpc_meta" + "github.com/baidu/sofa-pbrpc/go/testdata/echo" + "github.com/golang/protobuf/proto" +) + +const ( + methodName = "sofa.pbrpc.test.EchoServer.Echo" + serviceName = "sofa.pbrpc.test.EchoServer" + message = "Hello from qinzuoyan01" +) + +type nopCloser struct { + io.ReadWriter +} + +func (nopCloser) Close() error { return nil } + +func diffStream(t *testing.T, b io.ReadWriteCloser, dump string, bodyType proto.Message, ctype meta.CompressType) { + t.Logf("test for %s", dump) + f, err := os.Open(filepath.Join("testdata", dump)) + if err != nil { + t.Fatal(err) + } + defer f.Close() + c1 := newCodec(f) + c2 := newCodec(b) + + var m1, m2 meta.RpcMeta + if err := c1.ReadHeader(&m1); err != nil { + t.Fatal(err) + } + if err := c2.ReadHeader(&m2); err != nil { + t.Fatal(err) + } + + if m1.GetType() != m2.GetType() || + m1.GetMethod() != m2.GetMethod() || + m1.GetServerTimeout() != m2.GetServerTimeout() || + m1.GetFailed() != m2.GetFailed() || + m1.GetErrorCode() != m2.GetErrorCode() || + m1.GetReason() != m2.GetReason() || + m1.GetCompressType() != m2.GetCompressType() || + m1.GetExpectedResponseCompressType() != m2.GetExpectedResponseCompressType() { + t.Errorf("rpc meta not equal.\nfile:\n%s \nbuffer:\n%s", + proto.MarshalTextString(&m1), proto.MarshalTextString(&m2)) + } + + b1 := proto.Clone(bodyType) + b2 := proto.Clone(bodyType) + if err := c1.ReadBody(b1, newCompressReader(ctype)); err != nil { + t.Fatal(err) + } + + if err := c2.ReadBody(b2, newCompressReader(ctype)); err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(b1, b2) { + t.Errorf("body not equal.\nfile:\n%s\nbuffer:\n%s", + proto.MarshalTextString(b1), proto.MarshalTextString(b2)) + } +} + +func TestCodecCompress(t *testing.T) { + type testcase struct { + ctype meta.CompressType + dump string + } + var cases = []testcase{ + {meta.CompressType_CompressTypeNone, "client-none-none.dump"}, + {meta.CompressType_CompressTypeGzip, "client-gzip-gzip.dump"}, + {meta.CompressType_CompressTypeZlib, "client-zlib-zlib.dump"}, + // {meta.CompressType_CompressTypeSnappy, "client-snappy-snappy.dump"}, + // {meta.CompressType_CompressTypeLZ4, "client-lz4-lz4.dump"}, + } + for _, cs := range cases { + b := nopCloser{new(bytes.Buffer)} + codec := newCodec(b) + m := &meta.RpcMeta{ + Type: meta.RpcMeta_REQUEST.Enum(), + SequenceId: proto.Uint64(1), + Method: proto.String(methodName), + CompressType: cs.ctype.Enum(), + ExpectedResponseCompressType: cs.ctype.Enum(), + } + msg := &echo.EchoRequest{Message: proto.String(message)} + err := codec.Write(m, msg, newCompressWriter(cs.ctype)) + if err != nil { + t.Error(err) + } + diffStream(t, b, cs.dump, new(echo.EchoRequest), cs.ctype) + } +} diff --git a/go/compile_proto.sh b/go/compile_proto.sh new file mode 100755 index 0000000..cffd874 --- /dev/null +++ b/go/compile_proto.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +SRCROOT=$GOPATH/src/github.com/baidu/sofa-pbrpc/src + +protoc -I $SRCROOT --go_out=import_path=meta:. $SRCROOT/sofa/pbrpc/rpc_meta.proto $SRCROOT/sofa/pbrpc/rpc_option.proto + +mkdir -p rpc_meta +mv sofa/pbrpc/*.go rpc_meta +rm -r sofa diff --git a/go/compress.go b/go/compress.go new file mode 100644 index 0000000..12a1142 --- /dev/null +++ b/go/compress.go @@ -0,0 +1,85 @@ +package pbrpc + +import ( + "compress/gzip" + "compress/zlib" + "io" + "io/ioutil" + + meta "github.com/baidu/sofa-pbrpc/go/rpc_meta" + "github.com/golang/snappy" + "github.com/pierrec/lz4" +) + +type ( + compressReader func(r io.Reader) (io.ReadCloser, error) + compressWriter func(w io.Writer) (io.WriteCloser, error) +) + +// gzip compress +func gzipCompressReader(r io.Reader) (io.ReadCloser, error) { + return gzip.NewReader(r) +} + +func gzipCompressWriter(w io.Writer) (io.WriteCloser, error) { + return gzip.NewWriter(w), nil +} + +// zlib compress +func zlibCompressReader(r io.Reader) (io.ReadCloser, error) { + return zlib.NewReader(r) +} + +func zlibCompressWriter(w io.Writer) (io.WriteCloser, error) { + return zlib.NewWriter(w), nil +} + +// snappy compress +func snappyCompressReader(r io.Reader) (io.ReadCloser, error) { + return ioutil.NopCloser(snappy.NewReader(r)), nil +} + +func snappyCompressWriter(w io.Writer) (io.WriteCloser, error) { + return snappy.NewWriter(w), nil +} + +// lz4 compress +func lz4CompressReader(r io.Reader) (io.ReadCloser, error) { + return ioutil.NopCloser(lz4.NewReader(r)), nil +} + +func lz4CompressWriter(w io.Writer) (io.WriteCloser, error) { + return lz4.NewWriter(w), nil +} + +func newCompressReader(tp meta.CompressType) compressReader { + switch tp { + case meta.CompressType_CompressTypeGzip: + return compressReader(gzipCompressReader) + case meta.CompressType_CompressTypeZlib: + return compressReader(zlibCompressReader) + case meta.CompressType_CompressTypeSnappy: + return compressReader(snappyCompressReader) + case meta.CompressType_CompressTypeLZ4: + return compressReader(lz4CompressReader) + default: + return nil + + } +} + +func newCompressWriter(tp meta.CompressType) compressWriter { + switch tp { + case meta.CompressType_CompressTypeGzip: + return compressWriter(gzipCompressWriter) + case meta.CompressType_CompressTypeZlib: + return compressWriter(zlibCompressWriter) + case meta.CompressType_CompressTypeSnappy: + return compressWriter(snappyCompressWriter) + case meta.CompressType_CompressTypeLZ4: + return compressWriter(lz4CompressWriter) + default: + return nil + + } +} diff --git a/go/examples/echo/compile_proto.sh b/go/examples/echo/compile_proto.sh new file mode 100755 index 0000000..4c53a49 --- /dev/null +++ b/go/examples/echo/compile_proto.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +SRCROOT=$GOPATH/src/github.com/baidu/sofa-pbrpc + +protoc -I $SRCROOT/sample --go_out=import_path=echo,plugins=pbrpc:. $SRCROOT/sample/echo/echo_service.proto diff --git a/go/examples/echo/echo/echo_service.pb.go b/go/examples/echo/echo/echo_service.pb.go new file mode 100644 index 0000000..83b06d3 --- /dev/null +++ b/go/examples/echo/echo/echo_service.pb.go @@ -0,0 +1,127 @@ +// Code generated by protoc-gen-go. +// source: echo/echo_service.proto +// DO NOT EDIT! + +/* +Package echo is a generated protocol buffer package. + +It is generated from these files: + echo/echo_service.proto + +It has these top-level messages: + EchoRequest + EchoResponse +*/ +package echo + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +import ( + context "context" + pbrpc "github.com/baidu/sofa-pbrpc/go" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type EchoRequest struct { + Message *string `protobuf:"bytes,1,req,name=message" json:"message,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *EchoRequest) Reset() { *m = EchoRequest{} } +func (m *EchoRequest) String() string { return proto.CompactTextString(m) } +func (*EchoRequest) ProtoMessage() {} +func (*EchoRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func (m *EchoRequest) GetMessage() string { + if m != nil && m.Message != nil { + return *m.Message + } + return "" +} + +type EchoResponse struct { + Message *string `protobuf:"bytes,1,req,name=message" json:"message,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *EchoResponse) Reset() { *m = EchoResponse{} } +func (m *EchoResponse) String() string { return proto.CompactTextString(m) } +func (*EchoResponse) ProtoMessage() {} +func (*EchoResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +func (m *EchoResponse) GetMessage() string { + if m != nil && m.Message != nil { + return *m.Message + } + return "" +} + +func init() { + proto.RegisterType((*EchoRequest)(nil), "sofa.pbrpc.test.EchoRequest") + proto.RegisterType((*EchoResponse)(nil), "sofa.pbrpc.test.EchoResponse") +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ pbrpc.ClientConn + +// Client API for EchoServer service + +type EchoServerClient interface { + Echo(ctx context.Context, in *EchoRequest) (*EchoResponse, error) +} + +type echoServerClient struct { + cc *pbrpc.ClientConn +} + +func NewEchoServerClient(cc *pbrpc.ClientConn) EchoServerClient { + return &echoServerClient{cc} +} + +func (c *echoServerClient) Echo(ctx context.Context, in *EchoRequest) (*EchoResponse, error) { + out := new(EchoResponse) + err := pbrpc.Invoke(ctx, "sofa.pbrpc.test.EchoServer.Echo", in, out, c.cc) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for EchoServer service + +type EchoServerServer interface { + Echo(in *EchoRequest, out *EchoResponse) error +} + +func RegisterEchoServer(s *pbrpc.Server, srv EchoServerServer) { + s.RegisterService("sofa.pbrpc.test.EchoServer", srv) +} + +func init() { proto.RegisterFile("echo/echo_service.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 153 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x4f, 0x4d, 0xce, 0xc8, + 0xd7, 0x07, 0x11, 0xf1, 0xc5, 0xa9, 0x45, 0x65, 0x99, 0xc9, 0xa9, 0x7a, 0x05, 0x45, 0xf9, 0x25, + 0xf9, 0x42, 0xfc, 0xc5, 0xf9, 0x69, 0x89, 0x7a, 0x05, 0x49, 0x45, 0x05, 0xc9, 0x7a, 0x25, 0xa9, + 0xc5, 0x25, 0x4a, 0xea, 0x5c, 0xdc, 0xae, 0xc9, 0x19, 0xf9, 0x41, 0xa9, 0x85, 0xa5, 0xa9, 0xc5, + 0x25, 0x42, 0x12, 0x5c, 0xec, 0xb9, 0xa9, 0xc5, 0xc5, 0x89, 0xe9, 0xa9, 0x12, 0x8c, 0x0a, 0x4c, + 0x1a, 0x9c, 0x41, 0x30, 0xae, 0x92, 0x06, 0x17, 0x0f, 0x44, 0x61, 0x71, 0x41, 0x7e, 0x5e, 0x71, + 0x2a, 0x6e, 0x95, 0x46, 0x81, 0x5c, 0x5c, 0x20, 0x95, 0xc1, 0xa9, 0x45, 0x65, 0xa9, 0x45, 0x42, + 0xce, 0x5c, 0x2c, 0x20, 0x9e, 0x90, 0x8c, 0x1e, 0x9a, 0xd5, 0x7a, 0x48, 0xf6, 0x4a, 0xc9, 0xe2, + 0x90, 0x85, 0x58, 0xe6, 0xc4, 0xd6, 0xc0, 0xc8, 0xd8, 0xc1, 0xc8, 0x08, 0x08, 0x00, 0x00, 0xff, + 0xff, 0x08, 0xdc, 0xde, 0xc4, 0xd8, 0x00, 0x00, 0x00, +} diff --git a/go/examples/echo/echo_client/main.go b/go/examples/echo/echo_client/main.go new file mode 100644 index 0000000..7924b4e --- /dev/null +++ b/go/examples/echo/echo_client/main.go @@ -0,0 +1,36 @@ +package main + +import ( + "context" + "flag" + "log" + "time" + + pbrpc "github.com/baidu/sofa-pbrpc/go" + "github.com/baidu/sofa-pbrpc/go/examples/echo/echo" + "github.com/golang/protobuf/proto" +) + +var ( + addr = flag.String("addr", "127.0.0.1:12321", "server address") +) + +func main() { + flag.Parse() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + conn, err := pbrpc.Dial(ctx, *addr) + if err != nil { + log.Fatal(err) + } + defer conn.Close() + + c := echo.NewEchoServerClient(conn) + reply, err := c.Echo(ctx, &echo.EchoRequest{Message: proto.String("hello")}) + if err != nil { + log.Fatal(err) + } + log.Print(reply.GetMessage()) +} diff --git a/go/examples/echo/echo_server/main.go b/go/examples/echo/echo_server/main.go new file mode 100644 index 0000000..fb1b1a4 --- /dev/null +++ b/go/examples/echo/echo_server/main.go @@ -0,0 +1,37 @@ +package main + +import ( + "flag" + "log" + "net" + + pbrpc "github.com/baidu/sofa-pbrpc/go" + "github.com/baidu/sofa-pbrpc/go/examples/echo/echo" + "github.com/golang/protobuf/proto" +) + +var ( + addr = flag.String("addr", ":12321", "listen address") +) + +type echoService struct { +} + +func (s *echoService) Echo(req *echo.EchoRequest, reply *echo.EchoResponse) error { + reply.Message = proto.String(req.GetMessage()) + return nil +} + +func main() { + flag.Parse() + + server := pbrpc.NewServer() + echo.RegisterEchoServer(server, new(echoService)) + + l, err := net.Listen("tcp", *addr) + if err != nil { + log.Fatal(err) + } + log.Fatal(server.Serve(l)) + +} diff --git a/go/pbrpc_test.go b/go/pbrpc_test.go new file mode 100644 index 0000000..6dd6a10 --- /dev/null +++ b/go/pbrpc_test.go @@ -0,0 +1,90 @@ +package pbrpc + +import ( + "bytes" + "context" + "log" + "net" + "testing" + + meta "github.com/baidu/sofa-pbrpc/go/rpc_meta" + "github.com/baidu/sofa-pbrpc/go/testdata/echo" + "github.com/golang/protobuf/proto" +) + +// bufferConn extends net.Conn by recording read stream and write stream +type bufferConn struct { + net.Conn + wbuf bytes.Buffer + rbuf bytes.Buffer +} + +func newBufferConn(conn net.Conn) *bufferConn { + return &bufferConn{ + Conn: conn, + } +} + +func (b *bufferConn) Write(buf []byte) (int, error) { + n, err := b.Conn.Write(buf) + if err != nil { + return 0, err + } + b.wbuf.Write(buf) + return n, nil +} + +func (b *bufferConn) Read(buf []byte) (int, error) { + n, err := b.Conn.Read(buf) + if err != nil { + return 0, err + } + b.rbuf.Write(buf[:n]) + return n, nil +} + +func (b *bufferConn) RBuffer() *bytes.Buffer { + return &b.rbuf +} + +func (b *bufferConn) WBuffer() *bytes.Buffer { + return &b.wbuf +} + +// pipe behaves like net.Pipe but returns bufferConn +func pipe() (*bufferConn, *bufferConn) { + conn1, conn2 := net.Pipe() + return newBufferConn(conn1), newBufferConn(conn2) +} + +// echoService is an EchoServer implementation +type echoService struct{} + +func (s *echoService) Echo(req *echo.EchoRequest, rep *echo.EchoResponse) error { + rep.Message = proto.String("echo message: " + req.GetMessage()) + return nil +} + +func TestRoundTrip(t *testing.T) { + conn1, conn2 := pipe() + defer conn1.Close() + defer conn2.Close() + + server := NewServer() + server.RegisterService(serviceName, new(echoService)) + go server.ServeConn(conn2) + + client := NewClientConn(conn1) + request := &echo.EchoRequest{Message: proto.String(message)} + reply := new(echo.EchoResponse) + err := Invoke(context.Background(), methodName, request, reply, client) + if err != nil { + log.Fatal(err) + } + + diffStream(t, nopCloser{conn1.WBuffer()}, "client-none-none.dump", + new(echo.EchoRequest), meta.CompressType_CompressTypeNone) + + diffStream(t, nopCloser{conn1.RBuffer()}, "server-none.dump", + new(echo.EchoResponse), meta.CompressType_CompressTypeNone) +} diff --git a/go/protoc-gen-go/link_grpc.go b/go/protoc-gen-go/link_grpc.go new file mode 100644 index 0000000..532a550 --- /dev/null +++ b/go/protoc-gen-go/link_grpc.go @@ -0,0 +1,34 @@ +// Go support for Protocol Buffers - Google's data interchange format +// +// Copyright 2015 The Go Authors. All rights reserved. +// https://github.com/golang/protobuf +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +package main + +import _ "github.com/golang/protobuf/protoc-gen-go/grpc" diff --git a/go/protoc-gen-go/link_pbrpc.go b/go/protoc-gen-go/link_pbrpc.go new file mode 100644 index 0000000..04ad9f8 --- /dev/null +++ b/go/protoc-gen-go/link_pbrpc.go @@ -0,0 +1,34 @@ +// Go support for Protocol Buffers - Google's data interchange format +// +// Copyright 2015 The Go Authors. All rights reserved. +// https://github.com/golang/protobuf +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +package main + +import _ "github.com/baidu/sofa-pbrpc/go/protoc-gen-go/pbrpc" diff --git a/go/protoc-gen-go/main.go b/go/protoc-gen-go/main.go new file mode 100644 index 0000000..8e2486d --- /dev/null +++ b/go/protoc-gen-go/main.go @@ -0,0 +1,98 @@ +// Go support for Protocol Buffers - Google's data interchange format +// +// Copyright 2010 The Go Authors. All rights reserved. +// https://github.com/golang/protobuf +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +// protoc-gen-go is a plugin for the Google protocol buffer compiler to generate +// Go code. Run it by building this program and putting it in your path with +// the name +// protoc-gen-go +// That word 'go' at the end becomes part of the option string set for the +// protocol compiler, so once the protocol compiler (protoc) is installed +// you can run +// protoc --go_out=output_directory input_directory/file.proto +// to generate Go bindings for the protocol defined by file.proto. +// With that input, the output will be written to +// output_directory/file.pb.go +// +// The generated code is documented in the package comment for +// the library. +// +// See the README and documentation for protocol buffers to learn more: +// https://developers.google.com/protocol-buffers/ +package main + +import ( + "io/ioutil" + "os" + + "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/protoc-gen-go/generator" +) + +func main() { + // Begin by allocating a generator. The request and response structures are stored there + // so we can do error handling easily - the response structure contains the field to + // report failure. + g := generator.New() + + data, err := ioutil.ReadAll(os.Stdin) + if err != nil { + g.Error(err, "reading input") + } + + if err := proto.Unmarshal(data, g.Request); err != nil { + g.Error(err, "parsing input proto") + } + + if len(g.Request.FileToGenerate) == 0 { + g.Fail("no files to generate") + } + + g.CommandLineParameters(g.Request.GetParameter()) + + // Create a wrapped version of the Descriptors and EnumDescriptors that + // point to the file that defines them. + g.WrapTypes() + + g.SetPackageNames() + g.BuildTypeNameMap() + + g.GenerateAllFiles() + + // Send back the results. + data, err = proto.Marshal(g.Response) + if err != nil { + g.Error(err, "failed to marshal output proto") + } + _, err = os.Stdout.Write(data) + if err != nil { + g.Error(err, "failed to write output proto") + } +} diff --git a/go/protoc-gen-go/pbrpc/pbrpc.go b/go/protoc-gen-go/pbrpc/pbrpc.go new file mode 100644 index 0000000..4bfff77 --- /dev/null +++ b/go/protoc-gen-go/pbrpc/pbrpc.go @@ -0,0 +1,244 @@ +// Go support for Protocol Buffers - Google's data interchange format +// +// Copyright 2015 The Go Authors. All rights reserved. +// https://github.com/golang/protobuf +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +// Package pbrpc outputs sofa-pbrpc service descriptions in Go code. +// It runs as a plugin for the Go protocol buffer compiler plugin. +// It is linked in to protoc-gen-go. +package pbrpc + +import ( + "fmt" + "path" + "strconv" + "strings" + + pb "github.com/golang/protobuf/protoc-gen-go/descriptor" + "github.com/golang/protobuf/protoc-gen-go/generator" +) + +// Paths for packages used by code generated in this file, +// relative to the import_prefix of the generator.Generator. +const ( + contextPkgPath = "context" + pbrpcPkgPath = "github.com/baidu/sofa-pbrpc/go" +) + +func init() { + generator.RegisterPlugin(new(pbrpc)) +} + +// pbrpc is an implementation of the Go protocol buffer compiler's +// plugin architecture. It generates bindings for sofa-pbrpc support. +type pbrpc struct { + gen *generator.Generator +} + +// Name returns the name of this plugin, "sofa-pbrpc". +func (g *pbrpc) Name() string { + return "pbrpc" +} + +// The names for packages imported in the generated code. +// They may vary from the final path component of the import path +// if the name is used by other packages. +var ( + contextPkg string + pbrpcPkg string +) + +// Init initializes the plugin. +func (g *pbrpc) Init(gen *generator.Generator) { + g.gen = gen + contextPkg = generator.RegisterUniquePackageName("context", nil) + pbrpcPkg = generator.RegisterUniquePackageName("pbrpc", nil) +} + +// Given a type name defined in a .proto, return its object. +// Also record that we're using it, to guarantee the associated import. +func (g *pbrpc) objectNamed(name string) generator.Object { + g.gen.RecordTypeUse(name) + return g.gen.ObjectNamed(name) +} + +// Given a type name defined in a .proto, return its name as we will print it. +func (g *pbrpc) typeName(str string) string { + return g.gen.TypeName(g.objectNamed(str)) +} + +// P forwards to g.gen.P. +func (g *pbrpc) P(args ...interface{}) { g.gen.P(args...) } + +// Generate generates code for the services in the given file. +func (g *pbrpc) Generate(file *generator.FileDescriptor) { + if len(file.FileDescriptorProto.Service) == 0 { + return + } + + g.P("// Reference imports to suppress errors if they are not otherwise used.") + g.P("var _ ", contextPkg, ".Context") + g.P("var _ ", pbrpcPkg, ".ClientConn") + g.P() + + for i, service := range file.FileDescriptorProto.Service { + g.generateService(file, service, i) + } +} + +// GenerateImports generates the import declaration for this file. +func (g *pbrpc) GenerateImports(file *generator.FileDescriptor) { + if len(file.FileDescriptorProto.Service) == 0 { + return + } + g.P("import (") + g.P(contextPkg, " ", strconv.Quote(path.Join(g.gen.ImportPrefix, contextPkgPath))) + g.P(pbrpcPkg, " ", strconv.Quote(path.Join(g.gen.ImportPrefix, pbrpcPkgPath))) + g.P(")") + g.P() +} + +// reservedClientName records whether a client name is reserved on the client side. +var reservedClientName = map[string]bool{ +// TODO: do we need any in gRPC? +} + +func unexport(s string) string { return strings.ToLower(s[:1]) + s[1:] } + +// generateService generates all the code for the named service. +func (g *pbrpc) generateService(file *generator.FileDescriptor, service *pb.ServiceDescriptorProto, index int) { + path := fmt.Sprintf("6,%d", index) // 6 means service. + + origServName := service.GetName() + fullServName := origServName + if pkg := file.GetPackage(); pkg != "" { + fullServName = pkg + "." + fullServName + } + servName := generator.CamelCase(origServName) + + g.P() + g.P("// Client API for ", servName, " service") + g.P() + + // Client interface. + g.P("type ", servName, "Client interface {") + for i, method := range service.Method { + g.gen.PrintComments(fmt.Sprintf("%s,2,%d", path, i)) // 2 means method in a service. + g.P(g.generateClientSignature(servName, method)) + } + g.P("}") + g.P() + + // Client structure. + g.P("type ", unexport(servName), "Client struct {") + g.P("cc *", pbrpcPkg, ".ClientConn") + g.P("}") + g.P() + + // NewClient factory. + g.P("func New", servName, "Client (cc *", pbrpcPkg, ".ClientConn) ", servName, "Client {") + g.P("return &", unexport(servName), "Client{cc}") + g.P("}") + g.P() + + var methodIndex, streamIndex int + serviceDescVar := "_" + servName + "_serviceDesc" + // Client method implementations. + for _, method := range service.Method { + var descExpr string + if !method.GetServerStreaming() && !method.GetClientStreaming() { + // Unary RPC method + descExpr = fmt.Sprintf("&%s.Methods[%d]", serviceDescVar, methodIndex) + methodIndex++ + } else { + // Streaming RPC method + descExpr = fmt.Sprintf("&%s.Streams[%d]", serviceDescVar, streamIndex) + streamIndex++ + } + g.generateClientMethod(servName, fullServName, serviceDescVar, method, descExpr) + } + + g.P("// Server API for ", servName, " service") + g.P() + + // Server interface. + serverType := servName + "Server" + g.P("type ", serverType, " interface {") + for i, method := range service.Method { + g.gen.PrintComments(fmt.Sprintf("%s,2,%d", path, i)) // 2 means method in a service. + g.P(g.generateServerSignature(servName, method)) + } + g.P("}") + g.P() + + // Server registration. + g.P("func Register", servName, "(s *", pbrpcPkg, ".Server, srv ", serverType, ") {") + g.P(fmt.Sprintf(`s.RegisterService("%s", srv)`, fullServName)) + g.P("}") + g.P() +} + +// generateClientSignature returns the client-side signature for a method. +func (g *pbrpc) generateClientSignature(servName string, method *pb.MethodDescriptorProto) string { + origMethName := method.GetName() + methName := generator.CamelCase(origMethName) + if reservedClientName[methName] { + methName += "_" + } + reqArg := ", in *" + g.typeName(method.GetInputType()) + respName := "*" + g.typeName(method.GetOutputType()) + return fmt.Sprintf("%s(ctx %s.Context%s) (%s, error)", methName, contextPkg, reqArg, respName) +} + +func (g *pbrpc) generateClientMethod(servName, fullServName, serviceDescVar string, method *pb.MethodDescriptorProto, descExpr string) { + sname := fmt.Sprintf("%s.%s", fullServName, method.GetName()) + outType := g.typeName(method.GetOutputType()) + + g.P("func (c *", unexport(servName), "Client) ", g.generateClientSignature(servName, method), "{") + g.P("out := new(", outType, ")") + // TODO: Pass descExpr to Invoke. + g.P("err := ", pbrpcPkg, `.Invoke(ctx, "`, sname, `", in, out, c.cc)`) + g.P("if err != nil { return nil, err }") + g.P("return out, nil") + g.P("}") + g.P() + return +} + +// generateServerSignature returns the server-side signature for a method. +func (g *pbrpc) generateServerSignature(servName string, method *pb.MethodDescriptorProto) string { + origMethName := method.GetName() + methName := generator.CamelCase(origMethName) + if reservedClientName[methName] { + methName += "_" + } + reqArg := "in *" + g.typeName(method.GetInputType()) + respName := "out *" + g.typeName(method.GetOutputType()) + return fmt.Sprintf("%s(%s, %s) error", methName, reqArg, respName) +} diff --git a/go/rpc_meta/rpc_meta.pb.go b/go/rpc_meta/rpc_meta.pb.go new file mode 100644 index 0000000..7e7c21f --- /dev/null +++ b/go/rpc_meta/rpc_meta.pb.go @@ -0,0 +1,187 @@ +// Code generated by protoc-gen-go. +// source: sofa/pbrpc/rpc_meta.proto +// DO NOT EDIT! + +/* +Package meta is a generated protocol buffer package. + +It is generated from these files: + sofa/pbrpc/rpc_meta.proto + sofa/pbrpc/rpc_option.proto + +It has these top-level messages: + RpcMeta +*/ +package meta + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +// Message type. +type RpcMeta_Type int32 + +const ( + RpcMeta_REQUEST RpcMeta_Type = 0 + RpcMeta_RESPONSE RpcMeta_Type = 1 +) + +var RpcMeta_Type_name = map[int32]string{ + 0: "REQUEST", + 1: "RESPONSE", +} +var RpcMeta_Type_value = map[string]int32{ + "REQUEST": 0, + "RESPONSE": 1, +} + +func (x RpcMeta_Type) Enum() *RpcMeta_Type { + p := new(RpcMeta_Type) + *p = x + return p +} +func (x RpcMeta_Type) String() string { + return proto.EnumName(RpcMeta_Type_name, int32(x)) +} +func (x *RpcMeta_Type) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(RpcMeta_Type_value, data, "RpcMeta_Type") + if err != nil { + return err + } + *x = RpcMeta_Type(value) + return nil +} +func (RpcMeta_Type) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{0, 0} } + +type RpcMeta struct { + Type *RpcMeta_Type `protobuf:"varint,1,req,name=type,enum=sofa.pbrpc.RpcMeta_Type" json:"type,omitempty"` + // Message sequence id. + SequenceId *uint64 `protobuf:"varint,2,req,name=sequence_id,json=sequenceId" json:"sequence_id,omitempty"` + // Method full name. + // For example: "test.HelloService.GreetMethod" + Method *string `protobuf:"bytes,100,opt,name=method" json:"method,omitempty"` + // Server timeout in milli-seconds. + ServerTimeout *int64 `protobuf:"varint,101,opt,name=server_timeout,json=serverTimeout" json:"server_timeout,omitempty"` + // Set as true if the call is failed. + Failed *bool `protobuf:"varint,200,opt,name=failed" json:"failed,omitempty"` + // The error code if the call is failed. + ErrorCode *int32 `protobuf:"varint,201,opt,name=error_code,json=errorCode" json:"error_code,omitempty"` + // The error reason if the call is failed. + Reason *string `protobuf:"bytes,202,opt,name=reason" json:"reason,omitempty"` + // Set the request/response compress type. + CompressType *CompressType `protobuf:"varint,300,opt,name=compress_type,json=compressType,enum=sofa.pbrpc.CompressType" json:"compress_type,omitempty"` + // Set the response compress type of user expected. + ExpectedResponseCompressType *CompressType `protobuf:"varint,301,opt,name=expected_response_compress_type,json=expectedResponseCompressType,enum=sofa.pbrpc.CompressType" json:"expected_response_compress_type,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *RpcMeta) Reset() { *m = RpcMeta{} } +func (m *RpcMeta) String() string { return proto.CompactTextString(m) } +func (*RpcMeta) ProtoMessage() {} +func (*RpcMeta) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func (m *RpcMeta) GetType() RpcMeta_Type { + if m != nil && m.Type != nil { + return *m.Type + } + return RpcMeta_REQUEST +} + +func (m *RpcMeta) GetSequenceId() uint64 { + if m != nil && m.SequenceId != nil { + return *m.SequenceId + } + return 0 +} + +func (m *RpcMeta) GetMethod() string { + if m != nil && m.Method != nil { + return *m.Method + } + return "" +} + +func (m *RpcMeta) GetServerTimeout() int64 { + if m != nil && m.ServerTimeout != nil { + return *m.ServerTimeout + } + return 0 +} + +func (m *RpcMeta) GetFailed() bool { + if m != nil && m.Failed != nil { + return *m.Failed + } + return false +} + +func (m *RpcMeta) GetErrorCode() int32 { + if m != nil && m.ErrorCode != nil { + return *m.ErrorCode + } + return 0 +} + +func (m *RpcMeta) GetReason() string { + if m != nil && m.Reason != nil { + return *m.Reason + } + return "" +} + +func (m *RpcMeta) GetCompressType() CompressType { + if m != nil && m.CompressType != nil { + return *m.CompressType + } + return CompressType_CompressTypeNone +} + +func (m *RpcMeta) GetExpectedResponseCompressType() CompressType { + if m != nil && m.ExpectedResponseCompressType != nil { + return *m.ExpectedResponseCompressType + } + return CompressType_CompressTypeNone +} + +func init() { + proto.RegisterType((*RpcMeta)(nil), "sofa.pbrpc.RpcMeta") + proto.RegisterEnum("sofa.pbrpc.RpcMeta_Type", RpcMeta_Type_name, RpcMeta_Type_value) +} + +func init() { proto.RegisterFile("sofa/pbrpc/rpc_meta.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 325 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x8f, 0xc9, 0x4e, 0xeb, 0x30, + 0x14, 0x86, 0xaf, 0xd3, 0xde, 0x0e, 0xa7, 0x83, 0x2a, 0x2f, 0xee, 0x35, 0x83, 0x68, 0xa8, 0x84, + 0x94, 0x05, 0x4a, 0x25, 0x1e, 0x80, 0x4d, 0x95, 0x05, 0x0b, 0x26, 0xb7, 0xac, 0x43, 0xb0, 0x4f, + 0x45, 0x24, 0x12, 0x1b, 0xdb, 0x45, 0xf4, 0x9d, 0x80, 0xe7, 0x00, 0x9e, 0x0a, 0x35, 0x49, 0x45, + 0xe9, 0x86, 0xe5, 0x3f, 0x9c, 0xef, 0xe8, 0x87, 0x1d, 0xab, 0xe6, 0xc9, 0x58, 0xdf, 0x19, 0x2d, + 0xc6, 0x46, 0x8b, 0x38, 0x43, 0x97, 0x84, 0xda, 0x28, 0xa7, 0x28, 0xac, 0xa2, 0xb0, 0x88, 0x76, + 0xf7, 0xb6, 0x6a, 0x4a, 0xbb, 0x54, 0xe5, 0x65, 0x71, 0xf4, 0x56, 0x83, 0x26, 0xd7, 0xe2, 0x1c, + 0x5d, 0x42, 0x8f, 0xa1, 0xee, 0x96, 0x1a, 0x19, 0xf1, 0xbd, 0xa0, 0x7f, 0xc2, 0xc2, 0x6f, 0x46, + 0x58, 0x55, 0xc2, 0xd9, 0x52, 0x23, 0x2f, 0x5a, 0x74, 0x08, 0x1d, 0x8b, 0x8f, 0x0b, 0xcc, 0x05, + 0xc6, 0xa9, 0x64, 0x9e, 0xef, 0x05, 0x75, 0x0e, 0x6b, 0xeb, 0x4c, 0xd2, 0x7f, 0xd0, 0xc8, 0xd0, + 0xdd, 0x2b, 0xc9, 0xa4, 0x4f, 0x82, 0x36, 0xaf, 0x14, 0x3d, 0x82, 0xbe, 0x45, 0xf3, 0x84, 0x26, + 0x76, 0x69, 0x86, 0x6a, 0xe1, 0x18, 0xfa, 0x24, 0xa8, 0xf1, 0x5e, 0xe9, 0xce, 0x4a, 0x93, 0xfe, + 0x87, 0xc6, 0x3c, 0x49, 0x1f, 0x50, 0xb2, 0x77, 0xe2, 0x93, 0xa0, 0xc5, 0x2b, 0x49, 0x0f, 0x00, + 0xd0, 0x18, 0x65, 0x62, 0xa1, 0x24, 0xb2, 0x8f, 0x55, 0xf8, 0x97, 0xb7, 0x0b, 0x6b, 0xa2, 0x24, + 0xae, 0x0e, 0x0d, 0x26, 0x56, 0xe5, 0xec, 0x93, 0x94, 0x8f, 0x4b, 0x49, 0x4f, 0xa1, 0x27, 0x54, + 0xa6, 0x0d, 0x5a, 0x1b, 0x17, 0x43, 0x5f, 0x3c, 0x9f, 0x6c, 0x2f, 0x9d, 0x54, 0x8d, 0x62, 0x69, + 0x57, 0x6c, 0x28, 0x7a, 0x0b, 0x43, 0x7c, 0xd6, 0x28, 0x1c, 0xca, 0xd8, 0xa0, 0xd5, 0x2a, 0xb7, + 0x18, 0xff, 0x24, 0xbe, 0xfe, 0x46, 0xdc, 0x5f, 0x13, 0x78, 0x05, 0xd8, 0x4c, 0x47, 0x87, 0x50, + 0x2f, 0x3e, 0x75, 0xa0, 0xc9, 0xa3, 0xeb, 0x9b, 0x68, 0x3a, 0x1b, 0xfc, 0xa1, 0x5d, 0x68, 0xf1, + 0x68, 0x7a, 0x75, 0x79, 0x31, 0x8d, 0x06, 0xe4, 0x2b, 0x00, 0x00, 0xff, 0xff, 0x40, 0x24, 0xc6, + 0x7c, 0xf5, 0x01, 0x00, 0x00, +} diff --git a/go/rpc_meta/rpc_option.pb.go b/go/rpc_meta/rpc_option.pb.go new file mode 100644 index 0000000..62c7a6f --- /dev/null +++ b/go/rpc_meta/rpc_option.pb.go @@ -0,0 +1,128 @@ +// Code generated by protoc-gen-go. +// source: sofa/pbrpc/rpc_option.proto +// DO NOT EDIT! + +package meta + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" +import google_protobuf "github.com/golang/protobuf/protoc-gen-go/descriptor" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +type CompressType int32 + +const ( + CompressType_CompressTypeNone CompressType = 0 + CompressType_CompressTypeGzip CompressType = 1 + CompressType_CompressTypeZlib CompressType = 2 + CompressType_CompressTypeSnappy CompressType = 3 + CompressType_CompressTypeLZ4 CompressType = 4 +) + +var CompressType_name = map[int32]string{ + 0: "CompressTypeNone", + 1: "CompressTypeGzip", + 2: "CompressTypeZlib", + 3: "CompressTypeSnappy", + 4: "CompressTypeLZ4", +} +var CompressType_value = map[string]int32{ + "CompressTypeNone": 0, + "CompressTypeGzip": 1, + "CompressTypeZlib": 2, + "CompressTypeSnappy": 3, + "CompressTypeLZ4": 4, +} + +func (x CompressType) Enum() *CompressType { + p := new(CompressType) + *p = x + return p +} +func (x CompressType) String() string { + return proto.EnumName(CompressType_name, int32(x)) +} +func (x *CompressType) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(CompressType_value, data, "CompressType") + if err != nil { + return err + } + *x = CompressType(value) + return nil +} +func (CompressType) EnumDescriptor() ([]byte, []int) { return fileDescriptor1, []int{0} } + +var E_ServiceTimeout = &proto.ExtensionDesc{ + ExtendedType: (*google_protobuf.ServiceOptions)(nil), + ExtensionType: (*int64)(nil), + Field: 20000, + Name: "sofa.pbrpc.service_timeout", + Tag: "varint,20000,opt,name=service_timeout,json=serviceTimeout,def=10000", + Filename: "sofa/pbrpc/rpc_option.proto", +} + +var E_MethodTimeout = &proto.ExtensionDesc{ + ExtendedType: (*google_protobuf.MethodOptions)(nil), + ExtensionType: (*int64)(nil), + Field: 20000, + Name: "sofa.pbrpc.method_timeout", + Tag: "varint,20000,opt,name=method_timeout,json=methodTimeout", + Filename: "sofa/pbrpc/rpc_option.proto", +} + +var E_RequestCompressType = &proto.ExtensionDesc{ + ExtendedType: (*google_protobuf.MethodOptions)(nil), + ExtensionType: (*CompressType)(nil), + Field: 20001, + Name: "sofa.pbrpc.request_compress_type", + Tag: "varint,20001,opt,name=request_compress_type,json=requestCompressType,enum=sofa.pbrpc.CompressType,def=0", + Filename: "sofa/pbrpc/rpc_option.proto", +} + +var E_ResponseCompressType = &proto.ExtensionDesc{ + ExtendedType: (*google_protobuf.MethodOptions)(nil), + ExtensionType: (*CompressType)(nil), + Field: 20002, + Name: "sofa.pbrpc.response_compress_type", + Tag: "varint,20002,opt,name=response_compress_type,json=responseCompressType,enum=sofa.pbrpc.CompressType,def=0", + Filename: "sofa/pbrpc/rpc_option.proto", +} + +func init() { + proto.RegisterEnum("sofa.pbrpc.CompressType", CompressType_name, CompressType_value) + proto.RegisterExtension(E_ServiceTimeout) + proto.RegisterExtension(E_MethodTimeout) + proto.RegisterExtension(E_RequestCompressType) + proto.RegisterExtension(E_ResponseCompressType) +} + +func init() { proto.RegisterFile("sofa/pbrpc/rpc_option.proto", fileDescriptor1) } + +var fileDescriptor1 = []byte{ + // 305 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x91, 0x4f, 0x4f, 0xc2, 0x40, + 0x10, 0xc5, 0xad, 0xe8, 0x65, 0xa3, 0xb0, 0x59, 0x90, 0x10, 0x4d, 0x94, 0xa3, 0xf1, 0xb0, 0xad, + 0xc6, 0x53, 0xaf, 0x1e, 0xb8, 0xf8, 0x27, 0x02, 0x27, 0x2e, 0x0d, 0x94, 0x01, 0x37, 0xa1, 0x9d, + 0x71, 0x77, 0x6b, 0x82, 0x89, 0x89, 0xf1, 0x73, 0x78, 0x50, 0x3f, 0xa9, 0xa1, 0x7f, 0xb4, 0x96, + 0x83, 0xd1, 0x63, 0xdf, 0x7b, 0xfb, 0x9b, 0x37, 0x53, 0x76, 0x60, 0x70, 0x36, 0x76, 0x69, 0xa2, + 0x29, 0x74, 0x35, 0x85, 0x01, 0x92, 0x55, 0x18, 0x4b, 0xd2, 0x68, 0x51, 0xb0, 0x95, 0x29, 0x53, + 0x73, 0xbf, 0x3b, 0x47, 0x9c, 0x2f, 0xc0, 0x4d, 0x9d, 0x49, 0x32, 0x73, 0xa7, 0x60, 0x42, 0xad, + 0xc8, 0xa2, 0xce, 0xd2, 0x27, 0x4f, 0x6c, 0xe7, 0x02, 0x23, 0xd2, 0x60, 0xcc, 0x70, 0x49, 0x20, + 0x5a, 0x8c, 0x97, 0xbf, 0xaf, 0x31, 0x06, 0xbe, 0x51, 0x55, 0x7b, 0x8f, 0x8a, 0xb8, 0x53, 0x55, + 0x47, 0x0b, 0x35, 0xe1, 0x9b, 0xa2, 0xcd, 0x44, 0x59, 0x1d, 0xc4, 0x63, 0xa2, 0x25, 0xaf, 0x89, + 0x26, 0x6b, 0x94, 0xf5, 0xcb, 0xd1, 0x39, 0xdf, 0xf2, 0x6f, 0x59, 0xc3, 0x80, 0x7e, 0x50, 0x21, + 0x04, 0x56, 0x45, 0x80, 0x89, 0x15, 0x47, 0x32, 0x2b, 0x2d, 0x8b, 0xd2, 0x72, 0x90, 0x25, 0x6e, + 0xd2, 0x2d, 0x4d, 0xe7, 0xed, 0xd5, 0xe9, 0x3a, 0xc7, 0x35, 0x7f, 0xfb, 0xd4, 0xf3, 0x3c, 0xaf, + 0x5f, 0xcf, 0x01, 0xc3, 0xec, 0xbd, 0xdf, 0x63, 0xf5, 0x08, 0xec, 0x1d, 0x4e, 0xbf, 0x88, 0x87, + 0x6b, 0xc4, 0xab, 0x34, 0x50, 0x01, 0xf6, 0x77, 0xb3, 0x77, 0x05, 0xe8, 0xd9, 0x61, 0x7b, 0x1a, + 0xee, 0x13, 0x30, 0x36, 0x08, 0xf3, 0xe6, 0x81, 0x5d, 0x1d, 0xe9, 0x37, 0xe0, 0x7b, 0x0a, 0xac, + 0x9f, 0x75, 0xe4, 0xf7, 0xbf, 0x90, 0xe5, 0xe5, 0xfd, 0xb5, 0x23, 0xf7, 0x9b, 0xf9, 0xa8, 0x1f, + 0xb1, 0x17, 0x87, 0xb5, 0x35, 0x18, 0xc2, 0xd8, 0xc0, 0x1f, 0x3b, 0x7c, 0xfc, 0xa3, 0x43, 0xab, + 0x98, 0x55, 0x76, 0x3e, 0x03, 0x00, 0x00, 0xff, 0xff, 0xd0, 0x46, 0x70, 0x1e, 0x6e, 0x02, 0x00, + 0x00, +} diff --git a/go/server.go b/go/server.go new file mode 100644 index 0000000..3681250 --- /dev/null +++ b/go/server.go @@ -0,0 +1,39 @@ +package pbrpc + +import ( + "net" + "net/rpc" +) + +// Server is a pbrpc server to serve RPC requests. +type Server struct { + s *rpc.Server +} + +// NewServer creates a pbrpc server which has no service registered and has not started to accept requests yet. +func NewServer() *Server { + return &Server{rpc.NewServer()} +} + +// RegisterService register a service and its implementation to the pbrpc server. Called from the IDL generated code. This must be called before invoking Serve. +func (s *Server) RegisterService(name string, srv interface{}) { + s.s.RegisterName(name, srv) +} + +// ServeConn runs the server on a single connection. ServeConn blocks, serving the connection until the client hangs up. The caller typically invokes ServeConn in a go statement. +func (s *Server) ServeConn(conn net.Conn) { + codec := newServerCodec(conn) + s.s.ServeCodec(codec) +} + +// Serve accepts incoming connections on the listener l, creating a new ServerConn and service goroutine for each. The service goroutines read pbrpc requests and then call the registered handlers to reply to them. Serve returns when l.Accept fails with errors. +// TODO Handle non fatal errors +func (s *Server) Serve(l net.Listener) error { + for { + conn, err := l.Accept() + if err != nil { + return err + } + go s.ServeConn(conn) + } +} diff --git a/go/testdata/client-gzip-gzip.dump b/go/testdata/client-gzip-gzip.dump new file mode 100755 index 0000000..538384c Binary files /dev/null and b/go/testdata/client-gzip-gzip.dump differ diff --git a/go/testdata/client-lz4-lz4.dump b/go/testdata/client-lz4-lz4.dump new file mode 100755 index 0000000..35d0434 Binary files /dev/null and b/go/testdata/client-lz4-lz4.dump differ diff --git a/go/testdata/client-none-none.dump b/go/testdata/client-none-none.dump new file mode 100755 index 0000000..971196b Binary files /dev/null and b/go/testdata/client-none-none.dump differ diff --git a/go/testdata/client-snappy-snappy.dump b/go/testdata/client-snappy-snappy.dump new file mode 100755 index 0000000..deedd65 Binary files /dev/null and b/go/testdata/client-snappy-snappy.dump differ diff --git a/go/testdata/client-zlib-zlib.dump b/go/testdata/client-zlib-zlib.dump new file mode 100755 index 0000000..0d454d8 Binary files /dev/null and b/go/testdata/client-zlib-zlib.dump differ diff --git a/go/testdata/echo/echo_service.pb.go b/go/testdata/echo/echo_service.pb.go new file mode 100644 index 0000000..ba6a1b2 --- /dev/null +++ b/go/testdata/echo/echo_service.pb.go @@ -0,0 +1,85 @@ +// Code generated by protoc-gen-go. +// source: echo/echo_service.proto +// DO NOT EDIT! + +/* +Package echo is a generated protocol buffer package. + +It is generated from these files: + echo/echo_service.proto + +It has these top-level messages: + EchoRequest + EchoResponse +*/ +package echo + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type EchoRequest struct { + Message *string `protobuf:"bytes,1,req,name=message" json:"message,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *EchoRequest) Reset() { *m = EchoRequest{} } +func (m *EchoRequest) String() string { return proto.CompactTextString(m) } +func (*EchoRequest) ProtoMessage() {} +func (*EchoRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func (m *EchoRequest) GetMessage() string { + if m != nil && m.Message != nil { + return *m.Message + } + return "" +} + +type EchoResponse struct { + Message *string `protobuf:"bytes,1,req,name=message" json:"message,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *EchoResponse) Reset() { *m = EchoResponse{} } +func (m *EchoResponse) String() string { return proto.CompactTextString(m) } +func (*EchoResponse) ProtoMessage() {} +func (*EchoResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +func (m *EchoResponse) GetMessage() string { + if m != nil && m.Message != nil { + return *m.Message + } + return "" +} + +func init() { + proto.RegisterType((*EchoRequest)(nil), "sofa.pbrpc.test.EchoRequest") + proto.RegisterType((*EchoResponse)(nil), "sofa.pbrpc.test.EchoResponse") +} + +func init() { proto.RegisterFile("echo/echo_service.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 153 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x4f, 0x4d, 0xce, 0xc8, + 0xd7, 0x07, 0x11, 0xf1, 0xc5, 0xa9, 0x45, 0x65, 0x99, 0xc9, 0xa9, 0x7a, 0x05, 0x45, 0xf9, 0x25, + 0xf9, 0x42, 0xfc, 0xc5, 0xf9, 0x69, 0x89, 0x7a, 0x05, 0x49, 0x45, 0x05, 0xc9, 0x7a, 0x25, 0xa9, + 0xc5, 0x25, 0x4a, 0xea, 0x5c, 0xdc, 0xae, 0xc9, 0x19, 0xf9, 0x41, 0xa9, 0x85, 0xa5, 0xa9, 0xc5, + 0x25, 0x42, 0x12, 0x5c, 0xec, 0xb9, 0xa9, 0xc5, 0xc5, 0x89, 0xe9, 0xa9, 0x12, 0x8c, 0x0a, 0x4c, + 0x1a, 0x9c, 0x41, 0x30, 0xae, 0x92, 0x06, 0x17, 0x0f, 0x44, 0x61, 0x71, 0x41, 0x7e, 0x5e, 0x71, + 0x2a, 0x6e, 0x95, 0x46, 0x81, 0x5c, 0x5c, 0x20, 0x95, 0xc1, 0xa9, 0x45, 0x65, 0xa9, 0x45, 0x42, + 0xce, 0x5c, 0x2c, 0x20, 0x9e, 0x90, 0x8c, 0x1e, 0x9a, 0xd5, 0x7a, 0x48, 0xf6, 0x4a, 0xc9, 0xe2, + 0x90, 0x85, 0x58, 0xe6, 0xc4, 0xd6, 0xc0, 0xc8, 0xd8, 0xc1, 0xc8, 0x08, 0x08, 0x00, 0x00, 0xff, + 0xff, 0x08, 0xdc, 0xde, 0xc4, 0xd8, 0x00, 0x00, 0x00, +} diff --git a/go/testdata/server-none.dump b/go/testdata/server-none.dump new file mode 100755 index 0000000..62cfff8 Binary files /dev/null and b/go/testdata/server-none.dump differ