From 4966df4d765d9f51a31953c522120acced518cc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mien=20Kocher?= Date: Mon, 31 Jan 2022 12:30:15 +0100 Subject: [PATCH 1/3] Adds packet watcher for Polypus - Updates timeouts for cosipbft - Updates protobuf - Refactors the packets watcher - Adds artificial delays in minogrpc - Adds PacketID in mino.Router interface --- core/ordering/cosipbft/mod.go | 6 +- go.mod | 10 +- go.sum | 73 +- internal/testing/mod.go | 6 +- internal/traffic/mod.go | 42 -- internal/traffic/mod_test.go | 18 +- internal/traffic/watcher.go | 80 ++ mino/minogrpc/mod.go | 8 +- mino/minogrpc/ptypes/mod.go | 2 +- mino/minogrpc/ptypes/overlay.pb.go | 962 +++++++++++------------- mino/minogrpc/ptypes/overlay.proto | 5 +- mino/minogrpc/ptypes/overlay_grpc.pb.go | 298 ++++++++ mino/minogrpc/rpc.go | 3 + mino/minogrpc/rpc_test.go | 2 + mino/minogrpc/server.go | 4 + mino/minogrpc/server_test.go | 12 +- mino/minogrpc/session/mod.go | 34 +- mino/minogrpc/session/mod_test.go | 11 +- mino/router/mod.go | 2 + mino/router/tree/json/mod.go | 16 +- mino/router/tree/json/mod_test.go | 10 +- mino/router/tree/mod.go | 7 +- mino/router/tree/mod_test.go | 2 +- mino/router/tree/types/packet.go | 21 +- mino/router/tree/types/packet_test.go | 18 +- test/cosidela_test.go | 1 - test/integration_test.go | 2 +- 27 files changed, 1020 insertions(+), 635 deletions(-) create mode 100644 internal/traffic/watcher.go create mode 100644 mino/minogrpc/ptypes/overlay_grpc.pb.go diff --git a/core/ordering/cosipbft/mod.go b/core/ordering/cosipbft/mod.go index 29a38ed97..875ffd1d5 100644 --- a/core/ordering/cosipbft/mod.go +++ b/core/ordering/cosipbft/mod.go @@ -62,14 +62,14 @@ import ( const ( // RoundTimeout is the maximum of time the service waits for an event to // happen. - RoundTimeout = 10 * time.Second + RoundTimeout = 20 * time.Second // RoundWait is the constant value of the exponential backoff use between // round failures. - RoundWait = 5 * time.Millisecond + RoundWait = 15 * time.Millisecond // RoundMaxWait is the maximum amount for the backoff. - RoundMaxWait = 5 * time.Minute + RoundMaxWait = 15 * time.Minute rpcName = "cosipbft" ) diff --git a/go.mod b/go.mod index d8c01246c..0023aff99 100644 --- a/go.mod +++ b/go.mod @@ -5,12 +5,11 @@ go 1.14 require ( github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect - github.com/golang/protobuf v1.3.5 - github.com/opentracing-contrib/go-grpc v0.0.0-20200813121455-4a6760c71486 + github.com/opentracing-contrib/go-grpc v0.0.0-20210225150812-73cb765af46e github.com/opentracing/opentracing-go v1.2.0 github.com/rs/xid v1.2.1 github.com/rs/zerolog v1.19.0 - github.com/stretchr/testify v1.6.1 + github.com/stretchr/testify v1.7.0 github.com/uber/jaeger-client-go v2.25.0+incompatible github.com/uber/jaeger-lib v2.4.0+incompatible // indirect github.com/urfave/cli/v2 v2.2.0 @@ -21,7 +20,8 @@ require ( golang.org/x/net v0.0.0-20201021035429-f5854403a974 golang.org/x/tools v0.1.0 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 - google.golang.org/grpc v1.31.1 + google.golang.org/grpc v1.44.0 + google.golang.org/protobuf v1.27.1 gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect - gopkg.in/yaml.v2 v2.2.2 + gopkg.in/yaml.v2 v2.2.3 ) diff --git a/go.sum b/go.sum index dcce70d50..90a30429b 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,18 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/HdrHistogram/hdrhistogram-go v1.0.1 h1:GX8GAYDuhlFQnI2fRDHQhTlkHMz8bEn0jTI6LJU0mpw= github.com/HdrHistogram/hdrhistogram-go v1.0.1/go.mod h1:BWJ+nMSHY3L41Zj7CA3uXnloDp7xxV0YvstAE7nKTaM= +github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= +github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0 h1:EoUDS0afbrsXAZ9YQ9jdu/mZ2sXgT1/2yyNng4PGlyM= @@ -14,19 +22,37 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= -github.com/golang/protobuf v1.3.5 h1:F768QJ1E9tib+q5Sc8MkdJi1RxLTbRcTf8LJV56aRls= -github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0 h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -34,8 +60,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= -github.com/opentracing-contrib/go-grpc v0.0.0-20200813121455-4a6760c71486 h1:K35HCWaOTJIPW6cDHK4yj3QfRY/NhE0pBbfoc0M2NMQ= -github.com/opentracing-contrib/go-grpc v0.0.0-20200813121455-4a6760c71486/go.mod h1:DYR5Eij8rJl8h7gblRrOZ8g0kW1umSpKqYIBTgeDtLo= +github.com/opentracing-contrib/go-grpc v0.0.0-20210225150812-73cb765af46e h1:4cPxUYdgaGzZIT5/j0IfqOrrXmq6bG8AwvwisMXpdrg= +github.com/opentracing-contrib/go-grpc v0.0.0-20210225150812-73cb765af46e/go.mod h1:DYR5Eij8rJl8h7gblRrOZ8g0kW1umSpKqYIBTgeDtLo= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= @@ -44,6 +70,7 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/zerolog v1.19.0 h1:hYz4ZVdUgjXTBUmrkrw55j1nHx68LfOKIQk5IYtyScg= @@ -56,8 +83,10 @@ github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/uber/jaeger-client-go v2.25.0+incompatible h1:IxcNZ7WRY1Y3G4poYlx24szfsn/3LvK9QHCq9oQw8+U= github.com/uber/jaeger-client-go v2.25.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= github.com/uber/jaeger-lib v2.4.0+incompatible h1:fY7QsGQWiCt8pajv4r7JEvmATdCVaWxXbjwyYwsNaLQ= @@ -77,6 +106,7 @@ go.dedis.ch/protobuf v1.0.11 h1:FTYVIEzY/bfl37lu3pR4lIj+F9Vp1jE8oh91VmxKgLo= go.dedis.ch/protobuf v1.0.11/go.mod h1:97QR256dnkimeNdfmURz0wAMNVbd1VmLXhG1CrTYrJ4= go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0= go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= +go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= golang.org/x/crypto v0.0.0-20190123085648-057139ce5d2b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= @@ -93,16 +123,20 @@ golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190921015927-1a5e07d1ff72/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -110,6 +144,7 @@ golang.org/x/sys v0.0.0-20190124100055-b90733256f2e/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 h1:myAQVi0cGEoqQVR5POX+8RR2mrocKqNN1hmeMqhX27k= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -133,20 +168,38 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8T google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= -google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.23.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= -google.golang.org/grpc v1.31.1 h1:SfXqXS5hkufcdZ/mHtYCh53P2b+92WQq/DZcKLgsFRs= -google.golang.org/grpc v1.31.1/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= +google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= +google.golang.org/grpc v1.44.0 h1:weqSxi/TMs1SqFRMHCtBgXRs8k3X39QIDEZ0pRcttUg= +google.golang.org/grpc v1.44.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b h1:QRR6H1YWRnHb4Y/HeNFCTJLFVxaq6wH4YuVdsUOr75U= gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.3 h1:fvjTMHxHEw/mxHbtzPi3JCcKXQRAnQTBRo6YCJSVHKI= +gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/internal/testing/mod.go b/internal/testing/mod.go index 4ca9d701c..752d0d81a 100644 --- a/internal/testing/mod.go +++ b/internal/testing/mod.go @@ -5,9 +5,8 @@ import ( "strings" "testing" - "github.com/golang/protobuf/descriptor" - "github.com/golang/protobuf/proto" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" ) // CoverProtoMessage triggers a test on the message definition to force the @@ -18,9 +17,6 @@ func CoverProtoMessage(t *testing.T, message proto.Message) error { err = proto.Unmarshal(buffer, message) require.NoError(t, err) proto.Merge(message, message) - proto.DiscardUnknown(message) - require.NotNil(t, message.String()) - message.(descriptor.Message).Descriptor() // Run the getters tt := reflect.TypeOf(message) diff --git a/internal/traffic/mod.go b/internal/traffic/mod.go index a437d8b7e..a30f4147c 100644 --- a/internal/traffic/mod.go +++ b/internal/traffic/mod.go @@ -13,7 +13,6 @@ import ( "time" "go.dedis.ch/dela" - "go.dedis.ch/dela/core" "go.dedis.ch/dela/mino" "go.dedis.ch/dela/mino/router" "golang.org/x/xerrors" @@ -65,43 +64,6 @@ var ( headerURIKey = "apiuri" ) -// GlobalWatcher can be used to watch for sent and received messages. -var GlobalWatcher = Watcher{ - outWatcher: core.NewWatcher(), - inWatcher: core.NewWatcher(), -} - -// Watcher defines an element to watch for sent and received messages. -type Watcher struct { - outWatcher core.Observable - inWatcher core.Observable -} - -// WatchOuts returns a channel populated with sent messages. -func (w *Watcher) WatchOuts(ctx context.Context) <-chan Event { - return watch(ctx, w.outWatcher) -} - -// WatchIns returns a channel populated with received messages. -func (w *Watcher) WatchIns(ctx context.Context) <-chan Event { - return watch(ctx, w.inWatcher) -} - -// watch is a generic function to watch for events -func watch(ctx context.Context, watcher core.Observable) <-chan Event { - obs := observer{ch: make(chan Event, watcherSize)} - - watcher.Add(obs) - - go func() { - <-ctx.Done() - watcher.Remove(obs) - close(obs.ch) - }() - - return obs.ch -} - // SaveItems saves all the items as a graph func SaveItems(path string, withSend, withRcv bool) error { f, err := os.Create(path) @@ -161,16 +123,12 @@ func (t *Traffic) Save(path string, withSend, withRcv bool) error { // sender and the gateway as the receiver, while also recording the packet // itself. func (t *Traffic) LogSend(ctx context.Context, gateway mino.Address, pkt router.Packet) { - GlobalWatcher.outWatcher.Notify(Event{Address: gateway, Pkt: pkt}) - t.addItem(ctx, "send", gateway, pkt) } // LogRecv records a packet received by the node. The sender is the gateway and // the receiver the node. func (t *Traffic) LogRecv(ctx context.Context, gateway mino.Address, pkt router.Packet) { - GlobalWatcher.inWatcher.Notify(Event{Address: gateway, Pkt: pkt}) - t.addItem(ctx, "received", gateway, pkt) } diff --git a/internal/traffic/mod_test.go b/internal/traffic/mod_test.go index 41dc9fc54..6f08fd261 100644 --- a/internal/traffic/mod_test.go +++ b/internal/traffic/mod_test.go @@ -161,14 +161,13 @@ func TestGenerateEventGraphViz(t *testing.T) { } func TestWatcherIns(t *testing.T) { - watcher := GlobalWatcher - events := watcher.WatchIns(context.Background()) - - traffic := NewTraffic(fake.NewAddress(0), ioutil.Discard) + eh := NewEventHandler() + events := eh.WatchIns(context.Background()) addr := fake.NewAddress(0) pkt := newFakePacket(fake.NewAddress(1), fake.NewAddress(2)) - traffic.LogRecv(context.Background(), addr, pkt) + + eh.NotifyIn(addr, pkt) select { case event := <-events: @@ -180,14 +179,13 @@ func TestWatcherIns(t *testing.T) { } func TestWatcherOuts(t *testing.T) { - watcher := GlobalWatcher - events := watcher.WatchOuts(context.Background()) - - traffic := NewTraffic(fake.NewAddress(0), ioutil.Discard) + eh := NewEventHandler() + events := eh.WatchOuts(context.Background()) addr := fake.NewAddress(0) pkt := newFakePacket(fake.NewAddress(1), fake.NewAddress(2)) - traffic.LogSend(context.Background(), addr, pkt) + + eh.NotifyOut(addr, pkt) select { case event := <-events: diff --git a/internal/traffic/watcher.go b/internal/traffic/watcher.go new file mode 100644 index 000000000..e7640593a --- /dev/null +++ b/internal/traffic/watcher.go @@ -0,0 +1,80 @@ +package traffic + +import ( + "context" + + "go.dedis.ch/dela/core" + "go.dedis.ch/dela/mino" + "go.dedis.ch/dela/mino/router" +) + +// Watcher describes a Watcher of outgoing and incoming events. +type Watcher interface { + WatchOuts(ctx context.Context) <-chan Event + WatchIns(ctx context.Context) <-chan Event +} + +// Notifier describes the function to push events to a Watcher. +type Notifier interface { + NotifyIn(mino.Address, router.Packet) + NotifyOut(mino.Address, router.Packet) +} + +// EventsHandler describes the functions to handle events +type EventsHandler interface { + Watcher + Notifier +} + +// NewEventHandler returns a new initialized event handler. +func NewEventHandler() EventsHandler { + return &DefaultEventHandlers{ + outWatcher: core.NewWatcher(), + inWatcher: core.NewWatcher(), + } +} + +// DefaultEventHandlers provides a default implementation of an EventHandler. +// +// - implements EventsHandler +type DefaultEventHandlers struct { + outWatcher core.Observable + inWatcher core.Observable +} + +// WatchOuts implements Watcher. It returns a channel populated with sent +// messages. +func (d *DefaultEventHandlers) WatchOuts(ctx context.Context) <-chan Event { + return watch(ctx, d.outWatcher) +} + +// WatchIns implements Watcher. It returns a channel populated with received +// messages. +func (d *DefaultEventHandlers) WatchIns(ctx context.Context) <-chan Event { + return watch(ctx, d.inWatcher) +} + +// NotifyIn implements Notifier. +func (d *DefaultEventHandlers) NotifyIn(a mino.Address, p router.Packet) { + d.inWatcher.Notify(Event{Address: a, Pkt: p}) +} + +// NotifyOut implements Notifier. +func (d *DefaultEventHandlers) NotifyOut(a mino.Address, p router.Packet) { + d.outWatcher.Notify(Event{Address: a, Pkt: p}) +} + +// watch is a generic function to watch for events +func watch(ctx context.Context, watcher core.Observable) <-chan Event { + obs := observer{ch: make(chan Event, watcherSize)} + + watcher.Add(obs) + + go func() { + <-ctx.Done() + watcher.Remove(obs) + close(obs.ch) + }() + + return obs.ch +} diff --git a/mino/minogrpc/mod.go b/mino/minogrpc/mod.go index 69e8fa8ca..d83aa5e8d 100644 --- a/mino/minogrpc/mod.go +++ b/mino/minogrpc/mod.go @@ -104,6 +104,8 @@ type Minogrpc struct { endpoints map[string]*Endpoint started chan struct{} closing chan error + + eh traffic.EventsHandler } type minoTemplate struct { @@ -191,6 +193,7 @@ func NewMinogrpc(addr net.Addr, router router.Router, opts ...Option) (*Minogrpc endpoints: make(map[string]*Endpoint), started: make(chan struct{}), closing: make(chan error, 1), + eh: traffic.NewEventHandler(), } // Counter needs to be >=1 for asynchronous call to Add. @@ -199,6 +202,7 @@ func NewMinogrpc(addr net.Addr, router router.Router, opts ...Option) (*Minogrpc ptypes.RegisterOverlayServer(server, &overlayServer{ overlay: o, endpoints: m.endpoints, + notifier: m.eh, }) m.listen(socket) @@ -273,6 +277,7 @@ func (m *Minogrpc) WithSegment(segment string) mino.Mino { overlay: m.overlay, segments: append(m.segments, segment), endpoints: m.endpoints, + eh: m.eh, } return newM @@ -288,6 +293,7 @@ func (m *Minogrpc) CreateRPC(name string, h mino.Handler, f serde.Factory) (mino uri: strings.Join(uri, "/"), overlay: m.overlay, factory: f, + eh: m.eh, } for _, segment := range uri { @@ -319,7 +325,7 @@ func (m *Minogrpc) String() string { // GetTrafficWatcher returns the traffic watcher. func (m *Minogrpc) GetTrafficWatcher() traffic.Watcher { - return traffic.GlobalWatcher + return m.eh } // Listen starts the server. It waits for the go routine to start before diff --git a/mino/minogrpc/ptypes/mod.go b/mino/minogrpc/ptypes/mod.go index f515b6fff..a733657f3 100644 --- a/mino/minogrpc/ptypes/mod.go +++ b/mino/minogrpc/ptypes/mod.go @@ -2,4 +2,4 @@ // minogrpc. package ptypes -//go:generate protoc -I ./ --go_out=plugins=grpc:./ ./overlay.proto +//go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative overlay.proto diff --git a/mino/minogrpc/ptypes/overlay.pb.go b/mino/minogrpc/ptypes/overlay.pb.go index 153b02bbe..6eeda981c 100644 --- a/mino/minogrpc/ptypes/overlay.pb.go +++ b/mino/minogrpc/ptypes/overlay.pb.go @@ -1,153 +1,172 @@ // Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.27.1 +// protoc v3.6.1 // source: overlay.proto package ptypes import ( - context "context" - fmt "fmt" - proto "github.com/golang/protobuf/proto" - grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" - math "math" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" ) -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) // Certificate is a wrapper around a x509 raw certificate and its address. type Certificate struct { - Address []byte `protobuf:"bytes,1,opt,name=address,proto3" json:"address,omitempty"` - Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields -func (m *Certificate) Reset() { *m = Certificate{} } -func (m *Certificate) String() string { return proto.CompactTextString(m) } -func (*Certificate) ProtoMessage() {} -func (*Certificate) Descriptor() ([]byte, []int) { - return fileDescriptor_61fc82527fbe24ad, []int{0} + Address []byte `protobuf:"bytes,1,opt,name=address,proto3" json:"address,omitempty"` + Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` } -func (m *Certificate) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Certificate.Unmarshal(m, b) -} -func (m *Certificate) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Certificate.Marshal(b, m, deterministic) -} -func (m *Certificate) XXX_Merge(src proto.Message) { - xxx_messageInfo_Certificate.Merge(m, src) +func (x *Certificate) Reset() { + *x = Certificate{} + if protoimpl.UnsafeEnabled { + mi := &file_overlay_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } } -func (m *Certificate) XXX_Size() int { - return xxx_messageInfo_Certificate.Size(m) + +func (x *Certificate) String() string { + return protoimpl.X.MessageStringOf(x) } -func (m *Certificate) XXX_DiscardUnknown() { - xxx_messageInfo_Certificate.DiscardUnknown(m) + +func (*Certificate) ProtoMessage() {} + +func (x *Certificate) ProtoReflect() protoreflect.Message { + mi := &file_overlay_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) } -var xxx_messageInfo_Certificate proto.InternalMessageInfo +// Deprecated: Use Certificate.ProtoReflect.Descriptor instead. +func (*Certificate) Descriptor() ([]byte, []int) { + return file_overlay_proto_rawDescGZIP(), []int{0} +} -func (m *Certificate) GetAddress() []byte { - if m != nil { - return m.Address +func (x *Certificate) GetAddress() []byte { + if x != nil { + return x.Address } return nil } -func (m *Certificate) GetValue() []byte { - if m != nil { - return m.Value +func (x *Certificate) GetValue() []byte { + if x != nil { + return x.Value } return nil } type CertificateAck struct { - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields } -func (m *CertificateAck) Reset() { *m = CertificateAck{} } -func (m *CertificateAck) String() string { return proto.CompactTextString(m) } -func (*CertificateAck) ProtoMessage() {} -func (*CertificateAck) Descriptor() ([]byte, []int) { - return fileDescriptor_61fc82527fbe24ad, []int{1} +func (x *CertificateAck) Reset() { + *x = CertificateAck{} + if protoimpl.UnsafeEnabled { + mi := &file_overlay_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } } -func (m *CertificateAck) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_CertificateAck.Unmarshal(m, b) -} -func (m *CertificateAck) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_CertificateAck.Marshal(b, m, deterministic) -} -func (m *CertificateAck) XXX_Merge(src proto.Message) { - xxx_messageInfo_CertificateAck.Merge(m, src) +func (x *CertificateAck) String() string { + return protoimpl.X.MessageStringOf(x) } -func (m *CertificateAck) XXX_Size() int { - return xxx_messageInfo_CertificateAck.Size(m) -} -func (m *CertificateAck) XXX_DiscardUnknown() { - xxx_messageInfo_CertificateAck.DiscardUnknown(m) + +func (*CertificateAck) ProtoMessage() {} + +func (x *CertificateAck) ProtoReflect() protoreflect.Message { + mi := &file_overlay_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) } -var xxx_messageInfo_CertificateAck proto.InternalMessageInfo +// Deprecated: Use CertificateAck.ProtoReflect.Descriptor instead. +func (*CertificateAck) Descriptor() ([]byte, []int) { + return file_overlay_proto_rawDescGZIP(), []int{1} +} // JoinRequest sends a request to join a network to a distant node. It must // contain a valid token and its own certificate. type JoinRequest struct { - Token string `protobuf:"bytes,1,opt,name=token,proto3" json:"token,omitempty"` - Certificate *Certificate `protobuf:"bytes,2,opt,name=certificate,proto3" json:"certificate,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields -func (m *JoinRequest) Reset() { *m = JoinRequest{} } -func (m *JoinRequest) String() string { return proto.CompactTextString(m) } -func (*JoinRequest) ProtoMessage() {} -func (*JoinRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_61fc82527fbe24ad, []int{2} + Token string `protobuf:"bytes,1,opt,name=token,proto3" json:"token,omitempty"` + Certificate *Certificate `protobuf:"bytes,2,opt,name=certificate,proto3" json:"certificate,omitempty"` } -func (m *JoinRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_JoinRequest.Unmarshal(m, b) -} -func (m *JoinRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_JoinRequest.Marshal(b, m, deterministic) -} -func (m *JoinRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_JoinRequest.Merge(m, src) +func (x *JoinRequest) Reset() { + *x = JoinRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_overlay_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } } -func (m *JoinRequest) XXX_Size() int { - return xxx_messageInfo_JoinRequest.Size(m) + +func (x *JoinRequest) String() string { + return protoimpl.X.MessageStringOf(x) } -func (m *JoinRequest) XXX_DiscardUnknown() { - xxx_messageInfo_JoinRequest.DiscardUnknown(m) + +func (*JoinRequest) ProtoMessage() {} + +func (x *JoinRequest) ProtoReflect() protoreflect.Message { + mi := &file_overlay_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) } -var xxx_messageInfo_JoinRequest proto.InternalMessageInfo +// Deprecated: Use JoinRequest.ProtoReflect.Descriptor instead. +func (*JoinRequest) Descriptor() ([]byte, []int) { + return file_overlay_proto_rawDescGZIP(), []int{2} +} -func (m *JoinRequest) GetToken() string { - if m != nil { - return m.Token +func (x *JoinRequest) GetToken() string { + if x != nil { + return x.Token } return "" } -func (m *JoinRequest) GetCertificate() *Certificate { - if m != nil { - return m.Certificate +func (x *JoinRequest) GetCertificate() *Certificate { + if x != nil { + return x.Certificate } return nil } @@ -155,40 +174,48 @@ func (m *JoinRequest) GetCertificate() *Certificate { // JoinResponse is a response of a join request that contains the list of // certificates known by the distant node. type JoinResponse struct { - Peers []*Certificate `protobuf:"bytes,1,rep,name=peers,proto3" json:"peers,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields -func (m *JoinResponse) Reset() { *m = JoinResponse{} } -func (m *JoinResponse) String() string { return proto.CompactTextString(m) } -func (*JoinResponse) ProtoMessage() {} -func (*JoinResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_61fc82527fbe24ad, []int{3} + Peers []*Certificate `protobuf:"bytes,1,rep,name=peers,proto3" json:"peers,omitempty"` } -func (m *JoinResponse) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_JoinResponse.Unmarshal(m, b) -} -func (m *JoinResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_JoinResponse.Marshal(b, m, deterministic) -} -func (m *JoinResponse) XXX_Merge(src proto.Message) { - xxx_messageInfo_JoinResponse.Merge(m, src) +func (x *JoinResponse) Reset() { + *x = JoinResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_overlay_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } } -func (m *JoinResponse) XXX_Size() int { - return xxx_messageInfo_JoinResponse.Size(m) + +func (x *JoinResponse) String() string { + return protoimpl.X.MessageStringOf(x) } -func (m *JoinResponse) XXX_DiscardUnknown() { - xxx_messageInfo_JoinResponse.DiscardUnknown(m) + +func (*JoinResponse) ProtoMessage() {} + +func (x *JoinResponse) ProtoReflect() protoreflect.Message { + mi := &file_overlay_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) } -var xxx_messageInfo_JoinResponse proto.InternalMessageInfo +// Deprecated: Use JoinResponse.ProtoReflect.Descriptor instead. +func (*JoinResponse) Descriptor() ([]byte, []int) { + return file_overlay_proto_rawDescGZIP(), []int{3} +} -func (m *JoinResponse) GetPeers() []*Certificate { - if m != nil { - return m.Peers +func (x *JoinResponse) GetPeers() []*Certificate { + if x != nil { + return x.Peers } return nil } @@ -196,446 +223,363 @@ func (m *JoinResponse) GetPeers() []*Certificate { // Message is a network message that contains the address of the sender and the // payload. type Message struct { - From []byte `protobuf:"bytes,1,opt,name=from,proto3" json:"from,omitempty"` - Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields -func (m *Message) Reset() { *m = Message{} } -func (m *Message) String() string { return proto.CompactTextString(m) } -func (*Message) ProtoMessage() {} -func (*Message) Descriptor() ([]byte, []int) { - return fileDescriptor_61fc82527fbe24ad, []int{4} + From []byte `protobuf:"bytes,1,opt,name=from,proto3" json:"from,omitempty"` + Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` } -func (m *Message) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Message.Unmarshal(m, b) -} -func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Message.Marshal(b, m, deterministic) -} -func (m *Message) XXX_Merge(src proto.Message) { - xxx_messageInfo_Message.Merge(m, src) -} -func (m *Message) XXX_Size() int { - return xxx_messageInfo_Message.Size(m) -} -func (m *Message) XXX_DiscardUnknown() { - xxx_messageInfo_Message.DiscardUnknown(m) -} - -var xxx_messageInfo_Message proto.InternalMessageInfo - -func (m *Message) GetFrom() []byte { - if m != nil { - return m.From +func (x *Message) Reset() { + *x = Message{} + if protoimpl.UnsafeEnabled { + mi := &file_overlay_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } - return nil } -func (m *Message) GetPayload() []byte { - if m != nil { - return m.Payload - } - return nil +func (x *Message) String() string { + return protoimpl.X.MessageStringOf(x) } -// Packet is a wrapper around a packet. -type Packet struct { - Serialized []byte `protobuf:"bytes,1,opt,name=serialized,proto3" json:"serialized,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} +func (*Message) ProtoMessage() {} -func (m *Packet) Reset() { *m = Packet{} } -func (m *Packet) String() string { return proto.CompactTextString(m) } -func (*Packet) ProtoMessage() {} -func (*Packet) Descriptor() ([]byte, []int) { - return fileDescriptor_61fc82527fbe24ad, []int{5} +func (x *Message) ProtoReflect() protoreflect.Message { + mi := &file_overlay_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) } -func (m *Packet) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Packet.Unmarshal(m, b) -} -func (m *Packet) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Packet.Marshal(b, m, deterministic) -} -func (m *Packet) XXX_Merge(src proto.Message) { - xxx_messageInfo_Packet.Merge(m, src) -} -func (m *Packet) XXX_Size() int { - return xxx_messageInfo_Packet.Size(m) -} -func (m *Packet) XXX_DiscardUnknown() { - xxx_messageInfo_Packet.DiscardUnknown(m) +// Deprecated: Use Message.ProtoReflect.Descriptor instead. +func (*Message) Descriptor() ([]byte, []int) { + return file_overlay_proto_rawDescGZIP(), []int{4} } -var xxx_messageInfo_Packet proto.InternalMessageInfo - -func (m *Packet) GetSerialized() []byte { - if m != nil { - return m.Serialized +func (x *Message) GetFrom() []byte { + if x != nil { + return x.From } return nil } -// Ack is the return of a unicast request to forward a message. -type Ack struct { - Errors []string `protobuf:"bytes,1,rep,name=errors,proto3" json:"errors,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *Ack) Reset() { *m = Ack{} } -func (m *Ack) String() string { return proto.CompactTextString(m) } -func (*Ack) ProtoMessage() {} -func (*Ack) Descriptor() ([]byte, []int) { - return fileDescriptor_61fc82527fbe24ad, []int{6} -} - -func (m *Ack) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Ack.Unmarshal(m, b) -} -func (m *Ack) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Ack.Marshal(b, m, deterministic) -} -func (m *Ack) XXX_Merge(src proto.Message) { - xxx_messageInfo_Ack.Merge(m, src) -} -func (m *Ack) XXX_Size() int { - return xxx_messageInfo_Ack.Size(m) -} -func (m *Ack) XXX_DiscardUnknown() { - xxx_messageInfo_Ack.DiscardUnknown(m) -} - -var xxx_messageInfo_Ack proto.InternalMessageInfo - -func (m *Ack) GetErrors() []string { - if m != nil { - return m.Errors +func (x *Message) GetPayload() []byte { + if x != nil { + return x.Payload } return nil } -func init() { - proto.RegisterType((*Certificate)(nil), "ptypes.Certificate") - proto.RegisterType((*CertificateAck)(nil), "ptypes.CertificateAck") - proto.RegisterType((*JoinRequest)(nil), "ptypes.JoinRequest") - proto.RegisterType((*JoinResponse)(nil), "ptypes.JoinResponse") - proto.RegisterType((*Message)(nil), "ptypes.Message") - proto.RegisterType((*Packet)(nil), "ptypes.Packet") - proto.RegisterType((*Ack)(nil), "ptypes.Ack") -} - -func init() { - proto.RegisterFile("overlay.proto", fileDescriptor_61fc82527fbe24ad) -} - -var fileDescriptor_61fc82527fbe24ad = []byte{ - // 359 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x52, 0xc1, 0x6a, 0xdb, 0x40, - 0x10, 0x95, 0x6a, 0x5b, 0xc2, 0x23, 0xd7, 0x2d, 0x53, 0x63, 0x8c, 0xa0, 0xa5, 0xec, 0x49, 0xed, - 0x41, 0x14, 0x9b, 0xb6, 0xe4, 0x90, 0x83, 0x31, 0xe4, 0x10, 0x08, 0x09, 0xf2, 0x2d, 0xb7, 0x8d, - 0x34, 0x4e, 0x84, 0x64, 0xaf, 0xb2, 0xbb, 0x76, 0x70, 0xbe, 0x3d, 0x87, 0x20, 0xad, 0x94, 0x28, - 0x8e, 0x73, 0xd3, 0x7b, 0x9a, 0xf7, 0x78, 0x6f, 0x66, 0xe1, 0xb3, 0xd8, 0x91, 0xcc, 0xf9, 0x3e, - 0x2c, 0xa4, 0xd0, 0x02, 0x9d, 0x42, 0xef, 0x0b, 0x52, 0xec, 0x14, 0xbc, 0x05, 0x49, 0x9d, 0xae, - 0xd2, 0x98, 0x6b, 0xc2, 0x09, 0xb8, 0x3c, 0x49, 0x24, 0x29, 0x35, 0xb1, 0x7f, 0xda, 0xc1, 0x20, - 0x6a, 0x20, 0x8e, 0xa0, 0xb7, 0xe3, 0xf9, 0x96, 0x26, 0x9f, 0x2a, 0xde, 0x00, 0xf6, 0x15, 0x86, - 0x2d, 0xf9, 0x3c, 0xce, 0xd8, 0x35, 0x78, 0xe7, 0x22, 0xdd, 0x44, 0x74, 0xbf, 0x25, 0xa5, 0x4b, - 0x99, 0x16, 0x19, 0x6d, 0x2a, 0xbb, 0x7e, 0x64, 0x00, 0xfe, 0x05, 0x2f, 0x7e, 0x95, 0x55, 0x96, - 0xde, 0xf4, 0x5b, 0x68, 0x32, 0x85, 0x2d, 0xc7, 0xa8, 0x3d, 0xc7, 0x4e, 0x60, 0x60, 0xbc, 0x55, - 0x21, 0x36, 0x8a, 0xf0, 0x17, 0xf4, 0x0a, 0x22, 0x59, 0x66, 0xed, 0x7c, 0x64, 0x60, 0x26, 0xd8, - 0x7f, 0x70, 0x2f, 0x48, 0x29, 0x7e, 0x4b, 0x88, 0xd0, 0x5d, 0x49, 0xb1, 0xae, 0x0b, 0x56, 0xdf, - 0x65, 0xef, 0x82, 0xef, 0x73, 0xc1, 0x93, 0xba, 0x5f, 0x03, 0x59, 0x00, 0xce, 0x15, 0x8f, 0x33, - 0xd2, 0xf8, 0x03, 0x40, 0x91, 0x4c, 0x79, 0x9e, 0x3e, 0x52, 0x52, 0xab, 0x5b, 0x0c, 0xfb, 0x0e, - 0x9d, 0x79, 0x9c, 0xe1, 0x18, 0x1c, 0x92, 0x52, 0xd4, 0xa9, 0xfa, 0x51, 0x8d, 0xa6, 0x4f, 0x36, - 0xb8, 0x97, 0xe6, 0x06, 0x38, 0x83, 0x6e, 0x59, 0x04, 0x5f, 0x12, 0xb7, 0x56, 0xe6, 0x8f, 0xde, - 0x92, 0xa6, 0x2b, 0xb3, 0xf0, 0x1f, 0xf4, 0x96, 0x77, 0x5c, 0x12, 0x1e, 0xeb, 0xe9, 0x8f, 0x8f, - 0x90, 0xe5, 0x3d, 0x2c, 0xfc, 0x0d, 0xdd, 0x05, 0xcf, 0x73, 0xfc, 0xd2, 0x4c, 0xd4, 0x8b, 0xf0, - 0x0f, 0x09, 0x66, 0x61, 0x08, 0xce, 0x52, 0x4b, 0xe2, 0x6b, 0x1c, 0x36, 0x3f, 0x4d, 0x7b, 0xff, - 0x00, 0x33, 0x2b, 0xb0, 0xff, 0xd8, 0x18, 0x80, 0x7b, 0x26, 0xe4, 0x03, 0x97, 0xc9, 0x3b, 0x81, - 0xd7, 0xe0, 0x2a, 0xc5, 0x8d, 0x53, 0xbd, 0xbb, 0xd9, 0x73, 0x00, 0x00, 0x00, 0xff, 0xff, 0x73, - 0xfb, 0x8b, 0x12, 0x88, 0x02, 0x00, 0x00, -} - -// Reference imports to suppress errors if they are not otherwise used. -var _ context.Context -var _ grpc.ClientConnInterface - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -const _ = grpc.SupportPackageIsVersion6 - -// OverlayClient is the client API for Overlay service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. -type OverlayClient interface { - // Join handles join request from an unknown node. It accepts to share the - // certificates if the token is valid. - Join(ctx context.Context, in *JoinRequest, opts ...grpc.CallOption) (*JoinResponse, error) - // Share handles a certificate share from another participant of the - // network. - Share(ctx context.Context, in *Certificate, opts ...grpc.CallOption) (*CertificateAck, error) - // Call is a unicast rpc to send a message to a participant and expect a - // reply from it. - Call(ctx context.Context, in *Message, opts ...grpc.CallOption) (*Message, error) - // Stream is a stream rpc that will build a network of nodes which will - // relay the messages between each others. - Stream(ctx context.Context, opts ...grpc.CallOption) (Overlay_StreamClient, error) - // Forward is used in association with Stream to send a message through - // relays and get a feedback that the message has been received. - Forward(ctx context.Context, in *Packet, opts ...grpc.CallOption) (*Ack, error) -} - -type overlayClient struct { - cc grpc.ClientConnInterface -} - -func NewOverlayClient(cc grpc.ClientConnInterface) OverlayClient { - return &overlayClient{cc} -} - -func (c *overlayClient) Join(ctx context.Context, in *JoinRequest, opts ...grpc.CallOption) (*JoinResponse, error) { - out := new(JoinResponse) - err := c.cc.Invoke(ctx, "/ptypes.Overlay/Join", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} +// Packet is a wrapper around a packet. +type Packet struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields -func (c *overlayClient) Share(ctx context.Context, in *Certificate, opts ...grpc.CallOption) (*CertificateAck, error) { - out := new(CertificateAck) - err := c.cc.Invoke(ctx, "/ptypes.Overlay/Share", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil + Requestid string `protobuf:"bytes,1,opt,name=requestid,proto3" json:"requestid,omitempty"` + Serialized []byte `protobuf:"bytes,2,opt,name=serialized,proto3" json:"serialized,omitempty"` } -func (c *overlayClient) Call(ctx context.Context, in *Message, opts ...grpc.CallOption) (*Message, error) { - out := new(Message) - err := c.cc.Invoke(ctx, "/ptypes.Overlay/Call", in, out, opts...) - if err != nil { - return nil, err +func (x *Packet) Reset() { + *x = Packet{} + if protoimpl.UnsafeEnabled { + mi := &file_overlay_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } - return out, nil } -func (c *overlayClient) Stream(ctx context.Context, opts ...grpc.CallOption) (Overlay_StreamClient, error) { - stream, err := c.cc.NewStream(ctx, &_Overlay_serviceDesc.Streams[0], "/ptypes.Overlay/Stream", opts...) - if err != nil { - return nil, err - } - x := &overlayStreamClient{stream} - return x, nil +func (x *Packet) String() string { + return protoimpl.X.MessageStringOf(x) } -type Overlay_StreamClient interface { - Send(*Packet) error - Recv() (*Packet, error) - grpc.ClientStream -} +func (*Packet) ProtoMessage() {} -type overlayStreamClient struct { - grpc.ClientStream +func (x *Packet) ProtoReflect() protoreflect.Message { + mi := &file_overlay_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) } -func (x *overlayStreamClient) Send(m *Packet) error { - return x.ClientStream.SendMsg(m) +// Deprecated: Use Packet.ProtoReflect.Descriptor instead. +func (*Packet) Descriptor() ([]byte, []int) { + return file_overlay_proto_rawDescGZIP(), []int{5} } -func (x *overlayStreamClient) Recv() (*Packet, error) { - m := new(Packet) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err +func (x *Packet) GetRequestid() string { + if x != nil { + return x.Requestid } - return m, nil + return "" } -func (c *overlayClient) Forward(ctx context.Context, in *Packet, opts ...grpc.CallOption) (*Ack, error) { - out := new(Ack) - err := c.cc.Invoke(ctx, "/ptypes.Overlay/Forward", in, out, opts...) - if err != nil { - return nil, err +func (x *Packet) GetSerialized() []byte { + if x != nil { + return x.Serialized } - return out, nil -} - -// OverlayServer is the server API for Overlay service. -type OverlayServer interface { - // Join handles join request from an unknown node. It accepts to share the - // certificates if the token is valid. - Join(context.Context, *JoinRequest) (*JoinResponse, error) - // Share handles a certificate share from another participant of the - // network. - Share(context.Context, *Certificate) (*CertificateAck, error) - // Call is a unicast rpc to send a message to a participant and expect a - // reply from it. - Call(context.Context, *Message) (*Message, error) - // Stream is a stream rpc that will build a network of nodes which will - // relay the messages between each others. - Stream(Overlay_StreamServer) error - // Forward is used in association with Stream to send a message through - // relays and get a feedback that the message has been received. - Forward(context.Context, *Packet) (*Ack, error) -} - -// UnimplementedOverlayServer can be embedded to have forward compatible implementations. -type UnimplementedOverlayServer struct { -} - -func (*UnimplementedOverlayServer) Join(ctx context.Context, req *JoinRequest) (*JoinResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Join not implemented") -} -func (*UnimplementedOverlayServer) Share(ctx context.Context, req *Certificate) (*CertificateAck, error) { - return nil, status.Errorf(codes.Unimplemented, "method Share not implemented") -} -func (*UnimplementedOverlayServer) Call(ctx context.Context, req *Message) (*Message, error) { - return nil, status.Errorf(codes.Unimplemented, "method Call not implemented") -} -func (*UnimplementedOverlayServer) Stream(srv Overlay_StreamServer) error { - return status.Errorf(codes.Unimplemented, "method Stream not implemented") -} -func (*UnimplementedOverlayServer) Forward(ctx context.Context, req *Packet) (*Ack, error) { - return nil, status.Errorf(codes.Unimplemented, "method Forward not implemented") + return nil } -func RegisterOverlayServer(s *grpc.Server, srv OverlayServer) { - s.RegisterService(&_Overlay_serviceDesc, srv) -} +// Ack is the return of a unicast request to forward a message. +type Ack struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields -func _Overlay_Join_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(JoinRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(OverlayServer).Join(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/ptypes.Overlay/Join", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(OverlayServer).Join(ctx, req.(*JoinRequest)) - } - return interceptor(ctx, in, info, handler) + Errors []string `protobuf:"bytes,1,rep,name=errors,proto3" json:"errors,omitempty"` } -func _Overlay_Share_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(Certificate) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(OverlayServer).Share(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/ptypes.Overlay/Share", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(OverlayServer).Share(ctx, req.(*Certificate)) +func (x *Ack) Reset() { + *x = Ack{} + if protoimpl.UnsafeEnabled { + mi := &file_overlay_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } - return interceptor(ctx, in, info, handler) } -func _Overlay_Call_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(Message) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(OverlayServer).Call(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/ptypes.Overlay/Call", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(OverlayServer).Call(ctx, req.(*Message)) - } - return interceptor(ctx, in, info, handler) +func (x *Ack) String() string { + return protoimpl.X.MessageStringOf(x) } -func _Overlay_Stream_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(OverlayServer).Stream(&overlayStreamServer{stream}) -} - -type Overlay_StreamServer interface { - Send(*Packet) error - Recv() (*Packet, error) - grpc.ServerStream -} +func (*Ack) ProtoMessage() {} -type overlayStreamServer struct { - grpc.ServerStream +func (x *Ack) ProtoReflect() protoreflect.Message { + mi := &file_overlay_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) } -func (x *overlayStreamServer) Send(m *Packet) error { - return x.ServerStream.SendMsg(m) +// Deprecated: Use Ack.ProtoReflect.Descriptor instead. +func (*Ack) Descriptor() ([]byte, []int) { + return file_overlay_proto_rawDescGZIP(), []int{6} } -func (x *overlayStreamServer) Recv() (*Packet, error) { - m := new(Packet) - if err := x.ServerStream.RecvMsg(m); err != nil { - return nil, err +func (x *Ack) GetErrors() []string { + if x != nil { + return x.Errors } - return m, nil + return nil } -func _Overlay_Forward_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(Packet) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(OverlayServer).Forward(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/ptypes.Overlay/Forward", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(OverlayServer).Forward(ctx, req.(*Packet)) - } - return interceptor(ctx, in, info, handler) -} +var File_overlay_proto protoreflect.FileDescriptor + +var file_overlay_proto_rawDesc = []byte{ + 0x0a, 0x0d, 0x6f, 0x76, 0x65, 0x72, 0x6c, 0x61, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, + 0x06, 0x70, 0x74, 0x79, 0x70, 0x65, 0x73, 0x22, 0x3d, 0x0a, 0x0b, 0x43, 0x65, 0x72, 0x74, 0x69, + 0x66, 0x69, 0x63, 0x61, 0x74, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, + 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, + 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, + 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x10, 0x0a, 0x0e, 0x43, 0x65, 0x72, 0x74, 0x69, 0x66, + 0x69, 0x63, 0x61, 0x74, 0x65, 0x41, 0x63, 0x6b, 0x22, 0x5a, 0x0a, 0x0b, 0x4a, 0x6f, 0x69, 0x6e, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x35, 0x0a, + 0x0b, 0x63, 0x65, 0x72, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x65, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x70, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x43, 0x65, 0x72, 0x74, + 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x65, 0x52, 0x0b, 0x63, 0x65, 0x72, 0x74, 0x69, 0x66, 0x69, + 0x63, 0x61, 0x74, 0x65, 0x22, 0x39, 0x0a, 0x0c, 0x4a, 0x6f, 0x69, 0x6e, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x29, 0x0a, 0x05, 0x70, 0x65, 0x65, 0x72, 0x73, 0x18, 0x01, 0x20, + 0x03, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x70, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x43, 0x65, 0x72, + 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x65, 0x52, 0x05, 0x70, 0x65, 0x65, 0x72, 0x73, 0x22, + 0x37, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x66, 0x72, + 0x6f, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x66, 0x72, 0x6f, 0x6d, 0x12, 0x18, + 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, + 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x46, 0x0a, 0x06, 0x50, 0x61, 0x63, 0x6b, + 0x65, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x69, 0x64, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x69, 0x64, + 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x65, 0x72, 0x69, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x64, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x73, 0x65, 0x72, 0x69, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x64, + 0x22, 0x1d, 0x0a, 0x03, 0x41, 0x63, 0x6b, 0x12, 0x16, 0x0a, 0x06, 0x65, 0x72, 0x72, 0x6f, 0x72, + 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x06, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x73, 0x32, + 0xfc, 0x01, 0x0a, 0x07, 0x4f, 0x76, 0x65, 0x72, 0x6c, 0x61, 0x79, 0x12, 0x33, 0x0a, 0x04, 0x4a, + 0x6f, 0x69, 0x6e, 0x12, 0x13, 0x2e, 0x70, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x4a, 0x6f, 0x69, + 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x70, 0x74, 0x79, 0x70, 0x65, + 0x73, 0x2e, 0x4a, 0x6f, 0x69, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, + 0x12, 0x36, 0x0a, 0x05, 0x53, 0x68, 0x61, 0x72, 0x65, 0x12, 0x13, 0x2e, 0x70, 0x74, 0x79, 0x70, + 0x65, 0x73, 0x2e, 0x43, 0x65, 0x72, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x65, 0x1a, 0x16, + 0x2e, 0x70, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x43, 0x65, 0x72, 0x74, 0x69, 0x66, 0x69, 0x63, + 0x61, 0x74, 0x65, 0x41, 0x63, 0x6b, 0x22, 0x00, 0x12, 0x2a, 0x0a, 0x04, 0x43, 0x61, 0x6c, 0x6c, + 0x12, 0x0f, 0x2e, 0x70, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x1a, 0x0f, 0x2e, 0x70, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x22, 0x00, 0x12, 0x2e, 0x0a, 0x06, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x0e, + 0x2e, 0x70, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x1a, 0x0e, + 0x2e, 0x70, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x22, 0x00, + 0x28, 0x01, 0x30, 0x01, 0x12, 0x28, 0x0a, 0x07, 0x46, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x12, + 0x0e, 0x2e, 0x70, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x1a, + 0x0b, 0x2e, 0x70, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x41, 0x63, 0x6b, 0x22, 0x00, 0x42, 0x27, + 0x5a, 0x25, 0x67, 0x6f, 0x2e, 0x64, 0x65, 0x64, 0x69, 0x73, 0x2e, 0x63, 0x68, 0x2f, 0x64, 0x65, + 0x6c, 0x61, 0x2f, 0x6d, 0x69, 0x6e, 0x6f, 0x2f, 0x6d, 0x69, 0x6e, 0x6f, 0x67, 0x72, 0x70, 0x63, + 0x2f, 0x70, 0x74, 0x79, 0x70, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_overlay_proto_rawDescOnce sync.Once + file_overlay_proto_rawDescData = file_overlay_proto_rawDesc +) -var _Overlay_serviceDesc = grpc.ServiceDesc{ - ServiceName: "ptypes.Overlay", - HandlerType: (*OverlayServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "Join", - Handler: _Overlay_Join_Handler, - }, - { - MethodName: "Share", - Handler: _Overlay_Share_Handler, - }, - { - MethodName: "Call", - Handler: _Overlay_Call_Handler, - }, - { - MethodName: "Forward", - Handler: _Overlay_Forward_Handler, - }, - }, - Streams: []grpc.StreamDesc{ - { - StreamName: "Stream", - Handler: _Overlay_Stream_Handler, - ServerStreams: true, - ClientStreams: true, +func file_overlay_proto_rawDescGZIP() []byte { + file_overlay_proto_rawDescOnce.Do(func() { + file_overlay_proto_rawDescData = protoimpl.X.CompressGZIP(file_overlay_proto_rawDescData) + }) + return file_overlay_proto_rawDescData +} + +var file_overlay_proto_msgTypes = make([]protoimpl.MessageInfo, 7) +var file_overlay_proto_goTypes = []interface{}{ + (*Certificate)(nil), // 0: ptypes.Certificate + (*CertificateAck)(nil), // 1: ptypes.CertificateAck + (*JoinRequest)(nil), // 2: ptypes.JoinRequest + (*JoinResponse)(nil), // 3: ptypes.JoinResponse + (*Message)(nil), // 4: ptypes.Message + (*Packet)(nil), // 5: ptypes.Packet + (*Ack)(nil), // 6: ptypes.Ack +} +var file_overlay_proto_depIdxs = []int32{ + 0, // 0: ptypes.JoinRequest.certificate:type_name -> ptypes.Certificate + 0, // 1: ptypes.JoinResponse.peers:type_name -> ptypes.Certificate + 2, // 2: ptypes.Overlay.Join:input_type -> ptypes.JoinRequest + 0, // 3: ptypes.Overlay.Share:input_type -> ptypes.Certificate + 4, // 4: ptypes.Overlay.Call:input_type -> ptypes.Message + 5, // 5: ptypes.Overlay.Stream:input_type -> ptypes.Packet + 5, // 6: ptypes.Overlay.Forward:input_type -> ptypes.Packet + 3, // 7: ptypes.Overlay.Join:output_type -> ptypes.JoinResponse + 1, // 8: ptypes.Overlay.Share:output_type -> ptypes.CertificateAck + 4, // 9: ptypes.Overlay.Call:output_type -> ptypes.Message + 5, // 10: ptypes.Overlay.Stream:output_type -> ptypes.Packet + 6, // 11: ptypes.Overlay.Forward:output_type -> ptypes.Ack + 7, // [7:12] is the sub-list for method output_type + 2, // [2:7] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_overlay_proto_init() } +func file_overlay_proto_init() { + if File_overlay_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_overlay_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Certificate); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_overlay_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CertificateAck); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_overlay_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*JoinRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_overlay_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*JoinResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_overlay_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Message); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_overlay_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Packet); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_overlay_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Ack); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_overlay_proto_rawDesc, + NumEnums: 0, + NumMessages: 7, + NumExtensions: 0, + NumServices: 1, }, - }, - Metadata: "overlay.proto", + GoTypes: file_overlay_proto_goTypes, + DependencyIndexes: file_overlay_proto_depIdxs, + MessageInfos: file_overlay_proto_msgTypes, + }.Build() + File_overlay_proto = out.File + file_overlay_proto_rawDesc = nil + file_overlay_proto_goTypes = nil + file_overlay_proto_depIdxs = nil } diff --git a/mino/minogrpc/ptypes/overlay.proto b/mino/minogrpc/ptypes/overlay.proto index 669ef5e49..4ef1de077 100644 --- a/mino/minogrpc/ptypes/overlay.proto +++ b/mino/minogrpc/ptypes/overlay.proto @@ -2,6 +2,8 @@ syntax = "proto3"; package ptypes; +option go_package = "go.dedis.ch/dela/mino/minogrpc/ptypes"; + // Certificate is a wrapper around a x509 raw certificate and its address. message Certificate { bytes address = 1; @@ -32,7 +34,8 @@ message Message { // Packet is a wrapper around a packet. message Packet { - bytes serialized = 1; + string requestid = 1; + bytes serialized = 2; } // Ack is the return of a unicast request to forward a message. diff --git a/mino/minogrpc/ptypes/overlay_grpc.pb.go b/mino/minogrpc/ptypes/overlay_grpc.pb.go new file mode 100644 index 000000000..fdb39c5c3 --- /dev/null +++ b/mino/minogrpc/ptypes/overlay_grpc.pb.go @@ -0,0 +1,298 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. + +package ptypes + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// OverlayClient is the client API for Overlay service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type OverlayClient interface { + // Join handles join request from an unknown node. It accepts to share the + // certificates if the token is valid. + Join(ctx context.Context, in *JoinRequest, opts ...grpc.CallOption) (*JoinResponse, error) + // Share handles a certificate share from another participant of the + // network. + Share(ctx context.Context, in *Certificate, opts ...grpc.CallOption) (*CertificateAck, error) + // Call is a unicast rpc to send a message to a participant and expect a + // reply from it. + Call(ctx context.Context, in *Message, opts ...grpc.CallOption) (*Message, error) + // Stream is a stream rpc that will build a network of nodes which will + // relay the messages between each others. + Stream(ctx context.Context, opts ...grpc.CallOption) (Overlay_StreamClient, error) + // Forward is used in association with Stream to send a message through + // relays and get a feedback that the message has been received. + Forward(ctx context.Context, in *Packet, opts ...grpc.CallOption) (*Ack, error) +} + +type overlayClient struct { + cc grpc.ClientConnInterface +} + +func NewOverlayClient(cc grpc.ClientConnInterface) OverlayClient { + return &overlayClient{cc} +} + +func (c *overlayClient) Join(ctx context.Context, in *JoinRequest, opts ...grpc.CallOption) (*JoinResponse, error) { + out := new(JoinResponse) + err := c.cc.Invoke(ctx, "/ptypes.Overlay/Join", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *overlayClient) Share(ctx context.Context, in *Certificate, opts ...grpc.CallOption) (*CertificateAck, error) { + out := new(CertificateAck) + err := c.cc.Invoke(ctx, "/ptypes.Overlay/Share", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *overlayClient) Call(ctx context.Context, in *Message, opts ...grpc.CallOption) (*Message, error) { + out := new(Message) + err := c.cc.Invoke(ctx, "/ptypes.Overlay/Call", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *overlayClient) Stream(ctx context.Context, opts ...grpc.CallOption) (Overlay_StreamClient, error) { + stream, err := c.cc.NewStream(ctx, &Overlay_ServiceDesc.Streams[0], "/ptypes.Overlay/Stream", opts...) + if err != nil { + return nil, err + } + x := &overlayStreamClient{stream} + return x, nil +} + +type Overlay_StreamClient interface { + Send(*Packet) error + Recv() (*Packet, error) + grpc.ClientStream +} + +type overlayStreamClient struct { + grpc.ClientStream +} + +func (x *overlayStreamClient) Send(m *Packet) error { + return x.ClientStream.SendMsg(m) +} + +func (x *overlayStreamClient) Recv() (*Packet, error) { + m := new(Packet) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *overlayClient) Forward(ctx context.Context, in *Packet, opts ...grpc.CallOption) (*Ack, error) { + out := new(Ack) + err := c.cc.Invoke(ctx, "/ptypes.Overlay/Forward", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// OverlayServer is the server API for Overlay service. +// All implementations must embed UnimplementedOverlayServer +// for forward compatibility +type OverlayServer interface { + // Join handles join request from an unknown node. It accepts to share the + // certificates if the token is valid. + Join(context.Context, *JoinRequest) (*JoinResponse, error) + // Share handles a certificate share from another participant of the + // network. + Share(context.Context, *Certificate) (*CertificateAck, error) + // Call is a unicast rpc to send a message to a participant and expect a + // reply from it. + Call(context.Context, *Message) (*Message, error) + // Stream is a stream rpc that will build a network of nodes which will + // relay the messages between each others. + Stream(Overlay_StreamServer) error + // Forward is used in association with Stream to send a message through + // relays and get a feedback that the message has been received. + Forward(context.Context, *Packet) (*Ack, error) + mustEmbedUnimplementedOverlayServer() +} + +// UnimplementedOverlayServer must be embedded to have forward compatible implementations. +type UnimplementedOverlayServer struct { +} + +func (UnimplementedOverlayServer) Join(context.Context, *JoinRequest) (*JoinResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Join not implemented") +} +func (UnimplementedOverlayServer) Share(context.Context, *Certificate) (*CertificateAck, error) { + return nil, status.Errorf(codes.Unimplemented, "method Share not implemented") +} +func (UnimplementedOverlayServer) Call(context.Context, *Message) (*Message, error) { + return nil, status.Errorf(codes.Unimplemented, "method Call not implemented") +} +func (UnimplementedOverlayServer) Stream(Overlay_StreamServer) error { + return status.Errorf(codes.Unimplemented, "method Stream not implemented") +} +func (UnimplementedOverlayServer) Forward(context.Context, *Packet) (*Ack, error) { + return nil, status.Errorf(codes.Unimplemented, "method Forward not implemented") +} +func (UnimplementedOverlayServer) mustEmbedUnimplementedOverlayServer() {} + +// UnsafeOverlayServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to OverlayServer will +// result in compilation errors. +type UnsafeOverlayServer interface { + mustEmbedUnimplementedOverlayServer() +} + +func RegisterOverlayServer(s grpc.ServiceRegistrar, srv OverlayServer) { + s.RegisterService(&Overlay_ServiceDesc, srv) +} + +func _Overlay_Join_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(JoinRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(OverlayServer).Join(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/ptypes.Overlay/Join", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(OverlayServer).Join(ctx, req.(*JoinRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Overlay_Share_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Certificate) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(OverlayServer).Share(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/ptypes.Overlay/Share", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(OverlayServer).Share(ctx, req.(*Certificate)) + } + return interceptor(ctx, in, info, handler) +} + +func _Overlay_Call_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Message) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(OverlayServer).Call(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/ptypes.Overlay/Call", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(OverlayServer).Call(ctx, req.(*Message)) + } + return interceptor(ctx, in, info, handler) +} + +func _Overlay_Stream_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(OverlayServer).Stream(&overlayStreamServer{stream}) +} + +type Overlay_StreamServer interface { + Send(*Packet) error + Recv() (*Packet, error) + grpc.ServerStream +} + +type overlayStreamServer struct { + grpc.ServerStream +} + +func (x *overlayStreamServer) Send(m *Packet) error { + return x.ServerStream.SendMsg(m) +} + +func (x *overlayStreamServer) Recv() (*Packet, error) { + m := new(Packet) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func _Overlay_Forward_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Packet) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(OverlayServer).Forward(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/ptypes.Overlay/Forward", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(OverlayServer).Forward(ctx, req.(*Packet)) + } + return interceptor(ctx, in, info, handler) +} + +// Overlay_ServiceDesc is the grpc.ServiceDesc for Overlay service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Overlay_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "ptypes.Overlay", + HandlerType: (*OverlayServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Join", + Handler: _Overlay_Join_Handler, + }, + { + MethodName: "Share", + Handler: _Overlay_Share_Handler, + }, + { + MethodName: "Call", + Handler: _Overlay_Call_Handler, + }, + { + MethodName: "Forward", + Handler: _Overlay_Forward_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "Stream", + Handler: _Overlay_Stream_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "overlay.proto", +} diff --git a/mino/minogrpc/rpc.go b/mino/minogrpc/rpc.go index 7b4862cd1..52aa497f9 100644 --- a/mino/minogrpc/rpc.go +++ b/mino/minogrpc/rpc.go @@ -12,6 +12,7 @@ import ( "github.com/rs/xid" "go.dedis.ch/dela" "go.dedis.ch/dela/internal/tracing" + "go.dedis.ch/dela/internal/traffic" "go.dedis.ch/dela/mino" "go.dedis.ch/dela/mino/minogrpc/ptypes" "go.dedis.ch/dela/mino/minogrpc/session" @@ -30,6 +31,7 @@ type RPC struct { overlay *overlay uri string factory serde.Factory + eh traffic.EventsHandler } // Call implements mino.RPC. It calls the RPC on each provided address. @@ -206,6 +208,7 @@ func (rpc RPC) Stream(ctx context.Context, players mino.Players) (mino.Sender, m rpc.overlay.router.GetPacketFactory(), rpc.overlay.context, rpc.overlay.connMgr, + rpc.eh, ) // There is no listen for the orchestrator as we need to forward the diff --git a/mino/minogrpc/rpc_test.go b/mino/minogrpc/rpc_test.go index 5616fe42a..0c8cd55e1 100644 --- a/mino/minogrpc/rpc_test.go +++ b/mino/minogrpc/rpc_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/require" "go.dedis.ch/dela/internal/testing/fake" + "go.dedis.ch/dela/internal/traffic" "go.dedis.ch/dela/mino" "go.dedis.ch/dela/mino/minogrpc/ptypes" "go.dedis.ch/dela/mino/minogrpc/session" @@ -166,6 +167,7 @@ func TestRPC_Stream(t *testing.T) { context: json.NewContext(), }, factory: fake.MessageFactory{}, + eh: traffic.NewEventHandler(), } ctx, cancel := context.WithCancel(context.Background()) diff --git a/mino/minogrpc/server.go b/mino/minogrpc/server.go index 18e24ccb6..c11b67b19 100644 --- a/mino/minogrpc/server.go +++ b/mino/minogrpc/server.go @@ -19,6 +19,7 @@ import ( "go.dedis.ch/dela" "go.dedis.ch/dela/internal/tracing" + "go.dedis.ch/dela/internal/traffic" "go.dedis.ch/dela/mino" "go.dedis.ch/dela/mino/minogrpc/certs" "go.dedis.ch/dela/mino/minogrpc/ptypes" @@ -57,8 +58,10 @@ var getTracerForAddr = tracing.GetTracerForAddr type overlayServer struct { *overlay + ptypes.UnimplementedOverlayServer endpoints map[string]*Endpoint + notifier traffic.Notifier } // Join implements ptypes.OverlayServer. It processes the request by checking @@ -263,6 +266,7 @@ func (o *overlayServer) Stream(stream ptypes.Overlay_StreamServer) error { o.router.GetPacketFactory(), o.context, o.connMgr, + o.notifier, ) endpoint.streams[streamID] = sess diff --git a/mino/minogrpc/server_test.go b/mino/minogrpc/server_test.go index 6d2e3e751..c26eadba5 100644 --- a/mino/minogrpc/server_test.go +++ b/mino/minogrpc/server_test.go @@ -122,7 +122,7 @@ func TestMinogrpc_Scenario_Failures(t *testing.T) { authority := fake.NewAuthorityFromMino(fake.NewSigner, srvs...) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second) defer cancel() sender, recvr, err := rpcs[1].Stream(ctx, authority) @@ -142,7 +142,10 @@ func TestMinogrpc_Scenario_Failures(t *testing.T) { checkError(t, <-errs, srvs[0]) require.NoError(t, <-errs) - from, _, err := recvr.Recv(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + from, _, err := recvr.Recv(ctx) require.NoError(t, err) require.True(t, to.Equal(from)) } @@ -169,7 +172,10 @@ func TestMinogrpc_Scenario_Failures(t *testing.T) { checkError(t, <-errs, srvs[0], srvs[2], srvs[4]) require.NoError(t, <-errs) - from, _, err := recvr.Recv(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + from, _, err := recvr.Recv(ctx) require.NoError(t, err) require.True(t, to.Equal(from)) } diff --git a/mino/minogrpc/session/mod.go b/mino/minogrpc/session/mod.go index e1d9b73ff..00d8a2029 100644 --- a/mino/minogrpc/session/mod.go +++ b/mino/minogrpc/session/mod.go @@ -21,8 +21,10 @@ import ( "context" "io" "io/ioutil" + "math/rand" "os" "sync" + "time" "github.com/rs/zerolog" "go.dedis.ch/dela" @@ -123,6 +125,8 @@ type session struct { // A read-write lock is used there as there are much more read requests than // write ones, and the read should be parallelized. parentsLock sync.RWMutex + + notifier traffic.Notifier } // NewSession creates a new session for the provided parent relay. @@ -133,19 +137,21 @@ func NewSession( pktFac router.PacketFactory, ctx serde.Context, connMgr ConnectionManager, + notifier traffic.Notifier, ) Session { sess := &session{ - logger: dela.Logger.With().Str("addr", me.String()).Logger(), - md: md, - me: me, - errs: make(chan error, 1), - msgFac: msgFac, - pktFac: pktFac, - context: ctx, - queue: newNonBlockingQueue(), - relays: make(map[mino.Address]Relay), - connMgr: connMgr, - parents: make(map[mino.Address]parent), + logger: dela.Logger.With().Str("addr", me.String()).Logger(), + md: md, + me: me, + errs: make(chan error, 1), + msgFac: msgFac, + pktFac: pktFac, + context: ctx, + queue: newNonBlockingQueue(), + relays: make(map[mino.Address]Relay), + connMgr: connMgr, + parents: make(map[mino.Address]parent), + notifier: notifier, } switch os.Getenv(traffic.EnvVariable) { @@ -234,7 +240,10 @@ func (s *session) RecvPacket(from mino.Address, p *ptypes.Packet) (*ptypes.Ack, // Try to send the packet to each parent until one works. for _, parent := range s.parents { + time.Sleep(time.Millisecond * time.Duration(rand.Intn(500))) + s.traffic.LogRecv(parent.relay.Stream().Context(), from, pkt) + s.notifier.NotifyIn(from, pkt) errs := make(chan error, len(pkt.GetDestination())) sent := s.sendPacket(parent, pkt, errs) @@ -374,7 +383,10 @@ func (s *session) sendTo(p parent, to mino.Address, pkt router.Packet, errs chan ctx := p.relay.Stream().Context() + time.Sleep(time.Millisecond * time.Duration(rand.Intn(500))) + s.traffic.LogSend(ctx, relay.GetDistantAddress(), pkt) + s.notifier.NotifyOut(relay.GetDistantAddress(), pkt) ack, err := relay.Send(ctx, pkt) if to == nil && err != nil { diff --git a/mino/minogrpc/session/mod_test.go b/mino/minogrpc/session/mod_test.go index 9a8d6ed7f..b69f831b6 100644 --- a/mino/minogrpc/session/mod_test.go +++ b/mino/minogrpc/session/mod_test.go @@ -26,15 +26,15 @@ func TestSession_New(t *testing.T) { defer os.Setenv(traffic.EnvVariable, curr) os.Setenv(traffic.EnvVariable, "log") - sess := NewSession(nil, fake.NewAddress(999), nil, nil, fake.NewContext(), nil) + sess := NewSession(nil, fake.NewAddress(999), nil, nil, fake.NewContext(), nil, nil) require.NotNil(t, sess.(*session).traffic) os.Setenv(traffic.EnvVariable, "print") - sess = NewSession(nil, fake.NewAddress(999), nil, nil, fake.NewContext(), nil) + sess = NewSession(nil, fake.NewAddress(999), nil, nil, fake.NewContext(), nil, nil) require.NotNil(t, sess.(*session).traffic) os.Unsetenv(traffic.EnvVariable) - sess = NewSession(nil, fake.NewAddress(999), nil, nil, fake.NewContext(), nil) + sess = NewSession(nil, fake.NewAddress(999), nil, nil, fake.NewContext(), nil, nil) require.Nil(t, sess.(*session).traffic) } @@ -118,6 +118,7 @@ func TestSession_RecvPacket(t *testing.T) { table: fakeTable{err: xerrors.New("bad route")}, }, }, + notifier: traffic.NewEventHandler(), } ack, err := sess.RecvPacket(fake.NewAddress(0), &ptypes.Packet{}) @@ -150,6 +151,7 @@ func TestSession_Send(t *testing.T) { table: fakeTable{}, }, }, + notifier: traffic.NewEventHandler(), } errs := sess.Send(fake.Message{}, newWrapAddress(fake.NewAddress(0))) @@ -317,7 +319,8 @@ func TestSession_Recv(t *testing.T) { func TestSession_OnFailure(t *testing.T) { sess := &session{ - queue: newNonBlockingQueue(), + queue: newNonBlockingQueue(), + notifier: traffic.NewEventHandler(), } p := parent{ diff --git a/mino/router/mod.go b/mino/router/mod.go index b86bdfe10..34ffad056 100644 --- a/mino/router/mod.go +++ b/mino/router/mod.go @@ -29,6 +29,8 @@ type Packet interface { // Slice removes the address from the destination if found and return a new // packet with the addr as the destination. If not found return nil. Slice(addr mino.Address) Packet + + GetPacketID() string } // PacketFactory describes the primitives to deserialize a packet diff --git a/mino/router/tree/json/mod.go b/mino/router/tree/json/mod.go index 822d42924..a35cf1e5f 100644 --- a/mino/router/tree/json/mod.go +++ b/mino/router/tree/json/mod.go @@ -14,9 +14,10 @@ func init() { // PacketJSON describes a JSON formatted packet type PacketJSON struct { - Source []byte - Dest [][]byte - Message []byte + PacketID string + Source []byte + Dest [][]byte + Message []byte } // HandshakeJSON is the JSON message for the handshake. @@ -55,9 +56,10 @@ func (f packetFormat) Encode(ctx serde.Context, msg serde.Message) ([]byte, erro } p := PacketJSON{ - Source: source, - Dest: dest, - Message: packet.GetMessage(), + PacketID: packet.GetPacketID(), + Source: source, + Dest: dest, + Message: packet.GetMessage(), } data, err := ctx.Marshal(p) @@ -92,7 +94,7 @@ func (f packetFormat) Decode(ctx serde.Context, data []byte) (serde.Message, err dest[i] = fac.FromText(buf) } - packet := types.NewPacket(source, p.Message, dest...) + packet := types.NewPacket(p.PacketID, source, p.Message, dest...) return packet, nil } diff --git a/mino/router/tree/json/mod_test.go b/mino/router/tree/json/mod_test.go index d5a5583f4..a726264de 100644 --- a/mino/router/tree/json/mod_test.go +++ b/mino/router/tree/json/mod_test.go @@ -13,19 +13,19 @@ func TestPacketFormat_Encode(t *testing.T) { fmt := packetFormat{} ctx := fake.NewContext() - pkt := types.NewPacket(fake.NewAddress(0), []byte("data"), fake.NewAddress(1)) + pkt := types.NewPacket("id", fake.NewAddress(0), []byte("data"), fake.NewAddress(1)) data, err := fmt.Encode(ctx, pkt) require.NoError(t, err) - require.Equal(t, `{"Source":"AAAAAA==","Dest":["AQAAAA=="],"Message":"ZGF0YQ=="}`, string(data)) + require.Equal(t, `{"PacketID":"id","Source":"AAAAAA==","Dest":["AQAAAA=="],"Message":"ZGF0YQ=="}`, string(data)) _, err = fmt.Encode(ctx, fake.Message{}) require.EqualError(t, err, "unsupported message 'fake.Message'") - _, err = fmt.Encode(ctx, types.NewPacket(fake.NewBadAddress(), nil)) + _, err = fmt.Encode(ctx, types.NewPacket("", fake.NewBadAddress(), nil)) require.EqualError(t, err, fake.Err("failed to marshal source addr")) - _, err = fmt.Encode(ctx, types.NewPacket(fake.NewAddress(0), nil, fake.NewBadAddress())) + _, err = fmt.Encode(ctx, types.NewPacket("", fake.NewAddress(0), nil, fake.NewBadAddress())) require.EqualError(t, err, fake.Err("failed to marshal dest addr")) _, err = fmt.Encode(fake.NewBadContext(), pkt) @@ -35,7 +35,7 @@ func TestPacketFormat_Encode(t *testing.T) { func TestPacketFormat_Decode(t *testing.T) { fmt := packetFormat{} - pkt := types.NewPacket(fake.NewAddress(0), []byte{}, fake.NewAddress(1)) + pkt := types.NewPacket("", fake.NewAddress(0), []byte{}, fake.NewAddress(1)) ctx := fake.NewContext() ctx = serde.WithFactory(ctx, types.AddrKey{}, fake.AddressFactory{}) diff --git a/mino/router/tree/mod.go b/mino/router/tree/mod.go index b34eaaf3d..1c8ecac16 100644 --- a/mino/router/tree/mod.go +++ b/mino/router/tree/mod.go @@ -14,6 +14,7 @@ package tree import ( + "github.com/rs/xid" "go.dedis.ch/dela/mino" "go.dedis.ch/dela/mino/router" "go.dedis.ch/dela/mino/router/tree/types" @@ -95,7 +96,8 @@ func NewTable(height int, expected []mino.Address) Table { // Make implements router.RoutingTable. It creates a packet with the source // address, the destination addresses and the payload. func (t Table) Make(src mino.Address, to []mino.Address, msg []byte) router.Packet { - return types.NewPacket(src, msg, to...) + id := xid.New().String() + return types.NewPacket(id, src, msg, to...) } // PrepareHandshakeFor implements router.RoutingTable. It creates a handshake @@ -122,7 +124,8 @@ func (t Table) Forward(packet router.Packet) (router.Routes, router.Voids) { p, ok := routes[gateway] if !ok { - p = types.NewPacket(packet.GetSource(), packet.GetMessage()) + id := xid.New().String() + p = types.NewPacket(id, packet.GetSource(), packet.GetMessage()) routes[gateway] = p } diff --git a/mino/router/tree/mod_test.go b/mino/router/tree/mod_test.go index 4b04fa119..b20919881 100644 --- a/mino/router/tree/mod_test.go +++ b/mino/router/tree/mod_test.go @@ -76,7 +76,7 @@ func TestTable_PrepareHandshakeFor(t *testing.T) { func TestTable_Forward(t *testing.T) { table := NewTable(3, makeAddrs(20)) - pkt := types.NewPacket(fake.NewAddress(0), []byte{1, 2, 3}, makeAddrs(20)...) + pkt := types.NewPacket("", fake.NewAddress(0), []byte{1, 2, 3}, makeAddrs(20)...) routes, voids := table.Forward(pkt) require.Empty(t, voids) diff --git a/mino/router/tree/types/packet.go b/mino/router/tree/types/packet.go index d006f92b2..1a0e5a63f 100644 --- a/mino/router/tree/types/packet.go +++ b/mino/router/tree/types/packet.go @@ -19,17 +19,19 @@ var packetFormat = registry.NewSimpleRegistry() // // - implements router.Packet type Packet struct { - src mino.Address - dest []mino.Address - msg []byte + pktid string + src mino.Address + dest []mino.Address + msg []byte } // NewPacket creates a new packet. -func NewPacket(src mino.Address, msg []byte, dest ...mino.Address) *Packet { +func NewPacket(pkid string, src mino.Address, msg []byte, dest ...mino.Address) *Packet { return &Packet{ - src: src, - dest: dest, - msg: msg, + pktid: pkid, + src: src, + dest: dest, + msg: msg, } } @@ -63,6 +65,11 @@ func (p *Packet) Add(to mino.Address) { p.dest = append(p.dest, to) } +// GetPacketID implements router.Packet. It returns the uniq packet identifier. +func (p *Packet) GetPacketID() string { + return p.pktid +} + // Slice implements router.Packet. It removes the address from the destination // list and returns a packet with this single destination, if it exists. // Otherwise the packet stays unchanged. diff --git a/mino/router/tree/types/packet_test.go b/mino/router/tree/types/packet_test.go index 12bc19cbe..8d31a4d5e 100644 --- a/mino/router/tree/types/packet_test.go +++ b/mino/router/tree/types/packet_test.go @@ -15,25 +15,31 @@ func init() { } func TestPacket_GetSource(t *testing.T) { - pkt := NewPacket(fake.NewAddress(0), nil) + pkt := NewPacket("", fake.NewAddress(0), nil) require.Equal(t, fake.NewAddress(0), pkt.GetSource()) } func TestPacket_GetDestination(t *testing.T) { - pkt := NewPacket(fake.NewAddress(0), nil, makeAddrs(5)...) + pkt := NewPacket("", fake.NewAddress(0), nil, makeAddrs(5)...) require.Len(t, pkt.GetDestination(), 5) } func TestPacket_GetMessage(t *testing.T) { - pkt := NewPacket(fake.NewAddress(0), []byte{1, 2, 3}) + pkt := NewPacket("", fake.NewAddress(0), []byte{1, 2, 3}) require.Equal(t, []byte{1, 2, 3}, pkt.GetMessage()) } +func TestPacket_GetPacketID(t *testing.T) { + pkt := NewPacket("XX", fake.NewAddress(0), []byte{1, 2, 3}) + + require.Equal(t, "XX", pkt.GetPacketID()) +} + func TestPacket_Add(t *testing.T) { - pkt := NewPacket(fake.NewAddress(0), nil) + pkt := NewPacket("", fake.NewAddress(0), nil) require.Len(t, pkt.dest, 0) pkt.Add(fake.NewAddress(0)) @@ -47,7 +53,7 @@ func TestPacket_Add(t *testing.T) { } func TestPacket_Slice(t *testing.T) { - pkt := NewPacket(fake.NewAddress(0), []byte{0xaa}, makeAddrs(10)...) + pkt := NewPacket("", fake.NewAddress(0), []byte{0xaa}, makeAddrs(10)...) newPkt := pkt.Slice(fake.NewAddress(500)) require.Nil(t, newPkt) @@ -58,7 +64,7 @@ func TestPacket_Slice(t *testing.T) { } func TestPacket_Serialize(t *testing.T) { - pkt := NewPacket(fake.NewAddress(0), nil) + pkt := NewPacket("", fake.NewAddress(0), nil) data, err := pkt.Serialize(fake.NewContext()) require.NoError(t, err) diff --git a/test/cosidela_test.go b/test/cosidela_test.go index eb6a85abf..8ac13acef 100644 --- a/test/cosidela_test.go +++ b/test/cosidela_test.go @@ -106,7 +106,6 @@ func newDelaNode(t *testing.T, path string, port int) dela { onet, err := minogrpc.NewMinogrpc(addr, router, opts...) require.NoError(t, err) - onet.GetAddress() // ordering + validation + execution fload = loader.NewFileLoader(filepath.Join(path, privateKeyFile)) diff --git a/test/integration_test.go b/test/integration_test.go index 7453c428a..915a7e60c 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -116,7 +116,7 @@ func addAndWait(t *testing.T, manager txn.Manager, node cosiDelaNode, args ...tx err = node.GetPool().Add(tx) require.NoError(t, err) - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) defer cancel() events := node.GetOrdering().Watch(ctx) From 333d9ac1f3f1b384610e7b40dd490990ce6588db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mien=20Kocher?= Date: Mon, 11 Jul 2022 12:14:27 +0200 Subject: [PATCH 2/3] Applies go mod tidy --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 0bc92e80d..56c6af3ab 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.14 require ( github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect - github.com/golang/protobuf v1.5.2 github.com/opentracing-contrib/go-grpc v0.0.0-20200813121455-4a6760c71486 github.com/opentracing/opentracing-go v1.2.0 github.com/prometheus/client_golang v1.12.1 @@ -22,6 +21,7 @@ require ( golang.org/x/tools v0.1.11-0.20220316014157-77aa08bb151a golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 google.golang.org/grpc v1.45.0 + google.golang.org/protobuf v1.26.0 gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect gopkg.in/yaml.v2 v2.4.0 ) diff --git a/go.sum b/go.sum index 33142a7e7..dce57c440 100644 --- a/go.sum +++ b/go.sum @@ -174,8 +174,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= -github.com/opentracing-contrib/go-grpc v0.0.0-20210225150812-73cb765af46e h1:4cPxUYdgaGzZIT5/j0IfqOrrXmq6bG8AwvwisMXpdrg= -github.com/opentracing-contrib/go-grpc v0.0.0-20210225150812-73cb765af46e/go.mod h1:DYR5Eij8rJl8h7gblRrOZ8g0kW1umSpKqYIBTgeDtLo= +github.com/opentracing-contrib/go-grpc v0.0.0-20200813121455-4a6760c71486 h1:K35HCWaOTJIPW6cDHK4yj3QfRY/NhE0pBbfoc0M2NMQ= +github.com/opentracing-contrib/go-grpc v0.0.0-20200813121455-4a6760c71486/go.mod h1:DYR5Eij8rJl8h7gblRrOZ8g0kW1umSpKqYIBTgeDtLo= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= From ef5e9dc6933e415ec93d6ef41e1708d45a286a8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mien=20Kocher?= Date: Mon, 11 Jul 2022 12:24:56 +0200 Subject: [PATCH 3/3] Adds packet ID on flat routing --- mino/router/flat/json/mod.go | 16 +++++++++------- mino/router/flat/json/mod_test.go | 10 +++++----- mino/router/flat/mod.go | 7 +++++-- mino/router/flat/mod_test.go | 2 +- mino/router/flat/types/packet.go | 21 ++++++++++++++------- mino/router/flat/types/packet_test.go | 18 ++++++++++++------ 6 files changed, 46 insertions(+), 28 deletions(-) diff --git a/mino/router/flat/json/mod.go b/mino/router/flat/json/mod.go index b8c73b4ea..3254796f8 100644 --- a/mino/router/flat/json/mod.go +++ b/mino/router/flat/json/mod.go @@ -14,9 +14,10 @@ func init() { // PacketJSON describes a JSON formatted packet type PacketJSON struct { - Source []byte - Dest [][]byte - Message []byte + PacketID string + Source []byte + Dest [][]byte + Message []byte } // HandshakeJSON is the JSON message for the handshake. @@ -54,9 +55,10 @@ func (f packetFormat) Encode(ctx serde.Context, msg serde.Message) ([]byte, erro } p := PacketJSON{ - Source: source, - Dest: dest, - Message: packet.GetMessage(), + Source: source, + Dest: dest, + Message: packet.GetMessage(), + PacketID: packet.GetPacketID(), } data, err := ctx.Marshal(p) @@ -91,7 +93,7 @@ func (f packetFormat) Decode(ctx serde.Context, data []byte) (serde.Message, err dest[i] = fac.FromText(buf) } - packet := types.NewPacket(source, p.Message, dest...) + packet := types.NewPacket(p.PacketID, source, p.Message, dest...) return packet, nil } diff --git a/mino/router/flat/json/mod_test.go b/mino/router/flat/json/mod_test.go index 1ebcf4a51..a2ecc73c0 100644 --- a/mino/router/flat/json/mod_test.go +++ b/mino/router/flat/json/mod_test.go @@ -13,19 +13,19 @@ func TestPacketFormat_Encode(t *testing.T) { fmt := packetFormat{} ctx := fake.NewContext() - pkt := types.NewPacket(fake.NewAddress(0), []byte("data"), fake.NewAddress(1)) + pkt := types.NewPacket("id", fake.NewAddress(0), []byte("data"), fake.NewAddress(1)) data, err := fmt.Encode(ctx, pkt) require.NoError(t, err) - require.Equal(t, `{"Source":"AAAAAA==","Dest":["AQAAAA=="],"Message":"ZGF0YQ=="}`, string(data)) + require.Equal(t, `{"PacketID":"id","Source":"AAAAAA==","Dest":["AQAAAA=="],"Message":"ZGF0YQ=="}`, string(data)) _, err = fmt.Encode(ctx, fake.Message{}) require.EqualError(t, err, "unsupported message 'fake.Message'") - _, err = fmt.Encode(ctx, types.NewPacket(fake.NewBadAddress(), nil)) + _, err = fmt.Encode(ctx, types.NewPacket("", fake.NewBadAddress(), nil)) require.EqualError(t, err, fake.Err("failed to marshal source addr")) - _, err = fmt.Encode(ctx, types.NewPacket(fake.NewAddress(0), nil, fake.NewBadAddress())) + _, err = fmt.Encode(ctx, types.NewPacket("", fake.NewAddress(0), nil, fake.NewBadAddress())) require.EqualError(t, err, fake.Err("failed to marshal dest addr")) _, err = fmt.Encode(fake.NewBadContext(), pkt) @@ -35,7 +35,7 @@ func TestPacketFormat_Encode(t *testing.T) { func TestPacketFormat_Decode(t *testing.T) { fmt := packetFormat{} - pkt := types.NewPacket(fake.NewAddress(0), []byte{}, fake.NewAddress(1)) + pkt := types.NewPacket("", fake.NewAddress(0), []byte{}, fake.NewAddress(1)) ctx := fake.NewContext() ctx = serde.WithFactory(ctx, types.AddrKey{}, fake.AddressFactory{}) diff --git a/mino/router/flat/mod.go b/mino/router/flat/mod.go index ea0752c24..625903421 100644 --- a/mino/router/flat/mod.go +++ b/mino/router/flat/mod.go @@ -4,6 +4,7 @@ package flat import ( + "github.com/rs/xid" "go.dedis.ch/dela/mino" "go.dedis.ch/dela/mino/router" "go.dedis.ch/dela/mino/router/flat/types" @@ -78,7 +79,8 @@ func NewTable(addrs []mino.Address) Table { // Make implements router.RoutingTable. It creates a packet with the source // address, the destination addresses and the payload. func (t Table) Make(src mino.Address, to []mino.Address, msg []byte) router.Packet { - return types.NewPacket(src, msg, to...) + id := xid.New().String() + return types.NewPacket(id, src, msg, to...) } // PrepareHandshakeFor implements router.RoutingTable. It creates a handshake @@ -109,7 +111,8 @@ func (t Table) Forward(packet router.Packet) (router.Routes, router.Voids) { p, ok := routes[gateway] if !ok { - p = types.NewPacket(packet.GetSource(), packet.GetMessage()) + id := xid.New().String() + p = types.NewPacket(id, packet.GetSource(), packet.GetMessage()) routes[gateway] = p } diff --git a/mino/router/flat/mod_test.go b/mino/router/flat/mod_test.go index d51470096..7e83932f8 100644 --- a/mino/router/flat/mod_test.go +++ b/mino/router/flat/mod_test.go @@ -59,7 +59,7 @@ func TestTable_PrepareHandshakeFor(t *testing.T) { func TestTable_Forward(t *testing.T) { table := NewTable(makeAddrs(20)) - pkt := types.NewPacket(fake.NewAddress(0), []byte{1, 2, 3}, makeAddrs(20)...) + pkt := types.NewPacket("", fake.NewAddress(0), []byte{1, 2, 3}, makeAddrs(20)...) routes, voids := table.Forward(pkt) require.Len(t, voids, 0) diff --git a/mino/router/flat/types/packet.go b/mino/router/flat/types/packet.go index 37028d33d..d386feb2e 100644 --- a/mino/router/flat/types/packet.go +++ b/mino/router/flat/types/packet.go @@ -16,17 +16,19 @@ var packetFormat = registry.NewSimpleRegistry() // // - implements router.Packet type Packet struct { - src mino.Address - dest []mino.Address - msg []byte + src mino.Address + dest []mino.Address + msg []byte + pktid string } // NewPacket creates a new packet. -func NewPacket(src mino.Address, msg []byte, dest ...mino.Address) *Packet { +func NewPacket(pkid string, src mino.Address, msg []byte, dest ...mino.Address) *Packet { return &Packet{ - src: src, - dest: dest, - msg: msg, + src: src, + dest: dest, + msg: msg, + pktid: pkid, } } @@ -60,6 +62,11 @@ func (p *Packet) Add(to mino.Address) { p.dest = append(p.dest, to) } +// GetPacketID implements router.Packet. It returns the uniq packet identifier. +func (p *Packet) GetPacketID() string { + return p.pktid +} + // Slice implements router.Packet. It removes the address from the destination // list and returns a packet with this single destination, if it exists. // Otherwise the packet stays unchanged. diff --git a/mino/router/flat/types/packet_test.go b/mino/router/flat/types/packet_test.go index 12bc19cbe..8d31a4d5e 100644 --- a/mino/router/flat/types/packet_test.go +++ b/mino/router/flat/types/packet_test.go @@ -15,25 +15,31 @@ func init() { } func TestPacket_GetSource(t *testing.T) { - pkt := NewPacket(fake.NewAddress(0), nil) + pkt := NewPacket("", fake.NewAddress(0), nil) require.Equal(t, fake.NewAddress(0), pkt.GetSource()) } func TestPacket_GetDestination(t *testing.T) { - pkt := NewPacket(fake.NewAddress(0), nil, makeAddrs(5)...) + pkt := NewPacket("", fake.NewAddress(0), nil, makeAddrs(5)...) require.Len(t, pkt.GetDestination(), 5) } func TestPacket_GetMessage(t *testing.T) { - pkt := NewPacket(fake.NewAddress(0), []byte{1, 2, 3}) + pkt := NewPacket("", fake.NewAddress(0), []byte{1, 2, 3}) require.Equal(t, []byte{1, 2, 3}, pkt.GetMessage()) } +func TestPacket_GetPacketID(t *testing.T) { + pkt := NewPacket("XX", fake.NewAddress(0), []byte{1, 2, 3}) + + require.Equal(t, "XX", pkt.GetPacketID()) +} + func TestPacket_Add(t *testing.T) { - pkt := NewPacket(fake.NewAddress(0), nil) + pkt := NewPacket("", fake.NewAddress(0), nil) require.Len(t, pkt.dest, 0) pkt.Add(fake.NewAddress(0)) @@ -47,7 +53,7 @@ func TestPacket_Add(t *testing.T) { } func TestPacket_Slice(t *testing.T) { - pkt := NewPacket(fake.NewAddress(0), []byte{0xaa}, makeAddrs(10)...) + pkt := NewPacket("", fake.NewAddress(0), []byte{0xaa}, makeAddrs(10)...) newPkt := pkt.Slice(fake.NewAddress(500)) require.Nil(t, newPkt) @@ -58,7 +64,7 @@ func TestPacket_Slice(t *testing.T) { } func TestPacket_Serialize(t *testing.T) { - pkt := NewPacket(fake.NewAddress(0), nil) + pkt := NewPacket("", fake.NewAddress(0), nil) data, err := pkt.Serialize(fake.NewContext()) require.NoError(t, err)