Skip to content

Commit 4ca76a9

Browse files
committed
coap-gw: check for offline event when coap-gw has been shutdown
1 parent 3bd930b commit 4ca76a9

2 files changed

Lines changed: 99 additions & 4 deletions

File tree

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package service_test
2+
3+
import (
4+
"context"
5+
"crypto/tls"
6+
"testing"
7+
8+
coapgwTest "github.com/plgd-dev/hub/v2/coap-gateway/test"
9+
"github.com/plgd-dev/hub/v2/grpc-gateway/client"
10+
"github.com/plgd-dev/hub/v2/grpc-gateway/pb"
11+
kitNetGrpc "github.com/plgd-dev/hub/v2/pkg/net/grpc"
12+
test "github.com/plgd-dev/hub/v2/test"
13+
"github.com/plgd-dev/hub/v2/test/config"
14+
oauthTest "github.com/plgd-dev/hub/v2/test/oauth-server/test"
15+
"github.com/plgd-dev/hub/v2/test/service"
16+
"github.com/stretchr/testify/require"
17+
"go.uber.org/atomic"
18+
"google.golang.org/grpc"
19+
"google.golang.org/grpc/credentials"
20+
)
21+
22+
type testDevsObs struct {
23+
err atomic.Error
24+
ch chan client.DevicesObservationEvent
25+
}
26+
27+
func (t *testDevsObs) Error(err error) {
28+
t.err.Store(err)
29+
}
30+
31+
func (t *testDevsObs) Handle(ctx context.Context, event client.DevicesObservationEvent) error {
32+
t.ch <- event
33+
return nil
34+
}
35+
36+
func (t *testDevsObs) OnClose() {}
37+
38+
func TestShutdownServiceWithDeviceIssue627(t *testing.T) {
39+
deviceID := test.MustFindDeviceByName(test.TestDeviceName)
40+
ctx, cancel := context.WithTimeout(context.Background(), config.TEST_TIMEOUT)
41+
defer cancel()
42+
43+
const services = service.SetUpServicesOAuth | service.SetUpServicesId | service.SetUpServicesResourceDirectory |
44+
service.SetUpServicesGrpcGateway | service.SetUpServicesResourceAggregate
45+
tearDown := service.SetUpServices(ctx, t, services)
46+
defer tearDown()
47+
// log.Setup(log.Config{Debug: true})
48+
49+
ctx = kitNetGrpc.CtxWithToken(ctx, oauthTest.GetDefaultAccessToken(t))
50+
51+
coapShutdown := coapgwTest.SetUp(t)
52+
defer coapShutdown()
53+
54+
grpcConn, err := grpc.Dial(config.GRPC_HOST, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{
55+
RootCAs: test.GetRootCertificatePool(t),
56+
})))
57+
require.NoError(t, err)
58+
defer func() {
59+
_ = grpcConn.Close()
60+
}()
61+
grpcClient := client.New(pb.NewGrpcGatewayClient(grpcConn))
62+
63+
_, shutdownDevSim := test.OnboardDevSim(ctx, t, pb.NewGrpcGatewayClient(grpcConn), deviceID, config.GW_HOST, test.GetAllBackendResourceLinks())
64+
defer shutdownDevSim()
65+
66+
ch := make(chan client.DevicesObservationEvent, 1000)
67+
68+
v := testDevsObs{
69+
ch: ch,
70+
}
71+
72+
observationID, err := grpcClient.ObserveDevices(ctx, &v)
73+
require.NoError(t, err)
74+
defer func(observationID string) {
75+
err := grpcClient.StopObservingDevices(ctx, observationID)
76+
require.NoError(t, err)
77+
require.NoError(t, v.err.Load())
78+
}(observationID)
79+
80+
coapShutdown()
81+
82+
for {
83+
select {
84+
case e := <-ch:
85+
if e.Event != client.DevicesObservationEvent_OFFLINE {
86+
continue
87+
}
88+
require.Len(t, e.DeviceIDs, 1)
89+
require.Equal(t, deviceID, e.DeviceIDs[0])
90+
return
91+
case <-ctx.Done():
92+
require.NoError(t, ctx.Err())
93+
}
94+
}
95+
96+
}

grpc-gateway/client/observeDevices.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package client
22

33
import (
44
"context"
5-
"fmt"
65

76
"github.com/google/uuid"
87
"github.com/plgd-dev/hub/v2/grpc-gateway/pb"
@@ -77,9 +76,9 @@ func (c *Client) ObserveDevices(ctx context.Context, handler DevicesObservationH
7776
sub, err := c.NewDevicesSubscription(ctx, &devicesObservation{
7877
h: handler,
7978
removeSubscription: func() {
80-
if _, err := c.stopObservingDevices(ID.String()); err != nil {
81-
handler.Error(fmt.Errorf("failed to stop device('%v') observation: %w", ID.String(), err))
82-
}
79+
// we can ignore err during removeSubscription, if err != nil then some other
80+
// part of code already removed the subscription
81+
_, _ = c.stopObservingDevices(ID.String())
8382
},
8483
})
8584
if err != nil {

0 commit comments

Comments
 (0)