Skip to content

Commit 52a188e

Browse files
committed
coap-gateway: update device status online in shortime than exp token
1 parent 6fbfb87 commit 52a188e

5 files changed

Lines changed: 126 additions & 10 deletions

File tree

coap-gateway/service/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,7 @@ func (client *Client) CleanUp() (oldDeviceID *authCtx) {
344344
aCtx := client.loadAuthorizationContext()
345345
log.Debugf("clenaup client %v for device %v", client.coapConn.RemoteAddr(), aCtx.DeviceId)
346346

347+
client.server.devicesStatusUpdater.Remove(client)
347348
client.server.oicPingCache.Delete(client.remoteAddrString())
348349
client.cleanObservedResources()
349350
client.cancelResourceSubscriptions(false)

coap-gateway/service/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,5 @@ type Config struct {
2727
HeartBeat time.Duration `envconfig:"HEARTBEAT" default:"4s"`
2828
MaxMessageSize int `envconfig:"MAX_MESSAGE_SIZE" default:"262144"`
2929
LogMessages bool `envconfig:"LOG_MESSAGES" default:"false"`
30+
DeviceStatusValidity time.Duration `envconfig:"DEVICE_STATUS_VALIDITY" default:"20m"`
3031
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package service
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
"time"
8+
9+
deviceStatus "github.com/plgd-dev/cloud/coap-gateway/schema/device/status"
10+
pbCQRS "github.com/plgd-dev/cloud/resource-aggregate/pb"
11+
"github.com/plgd-dev/kit/log"
12+
kitNetGrpc "github.com/plgd-dev/kit/net/grpc"
13+
)
14+
15+
type deviceExpires struct {
16+
expires time.Time
17+
client *Client
18+
}
19+
20+
type devicesStatusUpdater struct {
21+
ctx context.Context
22+
deviceStatusValidity time.Duration
23+
24+
mutex sync.Mutex
25+
devices map[string]*deviceExpires
26+
}
27+
28+
func NewDevicesStatusUpdater(ctx context.Context, deviceStatusValidity time.Duration) *devicesStatusUpdater {
29+
u := devicesStatusUpdater{
30+
ctx: ctx,
31+
deviceStatusValidity: deviceStatusValidity,
32+
devices: make(map[string]*deviceExpires),
33+
}
34+
go u.run()
35+
return &u
36+
}
37+
38+
func (u *devicesStatusUpdater) Add(c *Client) error {
39+
expires, err := u.updateOnlineStatus(c, time.Now().Add(u.deviceStatusValidity))
40+
if err != nil {
41+
return err
42+
}
43+
d := deviceExpires{
44+
client: c,
45+
expires: expires,
46+
}
47+
u.mutex.Lock()
48+
defer u.mutex.Unlock()
49+
u.devices[c.remoteAddrString()] = &d
50+
return nil
51+
}
52+
53+
func (u *devicesStatusUpdater) Remove(c *Client) {
54+
u.mutex.Lock()
55+
defer u.mutex.Unlock()
56+
delete(u.devices, c.remoteAddrString())
57+
}
58+
59+
func (u *devicesStatusUpdater) updateOnlineStatus(client *Client, validUntil time.Time) (time.Time, error) {
60+
authCtx := client.loadAuthorizationContext()
61+
if isExpired(authCtx.Expire) {
62+
return time.Time{}, fmt.Errorf("token is expired")
63+
}
64+
serviceToken, err := client.server.oauthMgr.GetToken(client.Context())
65+
if err != nil {
66+
return time.Time{}, fmt.Errorf("cannot get service token: %w", err)
67+
}
68+
ctx := kitNetGrpc.CtxWithUserID(kitNetGrpc.CtxWithToken(client.Context(), serviceToken.AccessToken), authCtx.GetUserID())
69+
if authCtx.Expire.Before(validUntil) {
70+
validUntil = authCtx.Expire
71+
}
72+
73+
return validUntil, deviceStatus.SetOnline(ctx, client.server.raClient, authCtx.GetDeviceId(), validUntil, &pbCQRS.CommandMetadata{
74+
Sequence: client.coapConn.Sequence(),
75+
ConnectionId: client.remoteAddrString(),
76+
}, authCtx.AuthorizationContext)
77+
}
78+
79+
func (u *devicesStatusUpdater) getDevicesToUpdate(now time.Time) []*deviceExpires {
80+
u.mutex.Lock()
81+
defer u.mutex.Unlock()
82+
res := make([]*deviceExpires, 0, len(u.devices))
83+
for key, d := range u.devices {
84+
select {
85+
case <-d.client.Context().Done():
86+
delete(u.devices, key)
87+
default:
88+
if now.Add(u.deviceStatusValidity / 2).After(d.expires) {
89+
res = append(res, d)
90+
}
91+
}
92+
}
93+
return res
94+
}
95+
96+
func (u *devicesStatusUpdater) run() {
97+
t := time.NewTicker(u.deviceStatusValidity / 10)
98+
for {
99+
select {
100+
case <-u.ctx.Done():
101+
return
102+
case now := <-t.C:
103+
for _, d := range u.getDevicesToUpdate(now) {
104+
expires, err := u.updateOnlineStatus(d.client, time.Now().Add(u.deviceStatusValidity))
105+
if err != nil {
106+
log.Errorf("cannot update device(%v) status to online: %v", getDeviceID(d.client), err)
107+
} else {
108+
d.expires = expires
109+
}
110+
}
111+
log.Debugf("update devices statuses to online takes: %v", time.Now().Sub(now))
112+
}
113+
}
114+
}

coap-gateway/service/server.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ type Server struct {
7272
ctx context.Context
7373
cancel context.CancelFunc
7474

75-
sigs chan os.Signal
75+
sigs chan os.Signal
76+
devicesStatusUpdater *devicesStatusUpdater
7677
}
7778

7879
type DialCertManager = interface {
@@ -101,6 +102,10 @@ func New(config Config, dialCertManager DialCertManager, listenCertManager Liste
101102
}
102103
})
103104

105+
if config.DeviceStatusValidity <= 0 {
106+
log.Fatalf("invalid value of config.DeviceStatusValidity(%v)", config.DeviceStatusValidity)
107+
}
108+
104109
dialTLSConfig := dialCertManager.GetClientTLSConfig()
105110
oauthMgr, err := manager.NewManagerFromConfiguration(config.OAuth, dialTLSConfig)
106111
if err != nil {
@@ -212,6 +217,7 @@ func New(config Config, dialCertManager DialCertManager, listenCertManager Liste
212217
oicPingCache: oicPingCache,
213218
listener: listener,
214219
authInterceptor: NewAuthInterceptor(),
220+
devicesStatusUpdater: NewDevicesStatusUpdater(ctx, config.DeviceStatusValidity),
215221

216222
sigs: make(chan os.Signal, 1),
217223

coap-gateway/service/signIn.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -121,18 +121,15 @@ func signInPostHandler(s mux.ResponseWriter, req *mux.Message, client *Client, s
121121
return
122122
}
123123

124-
err = deviceStatus.SetOnline(req.Context, client.server.raClient, signIn.DeviceID, expired, &pbCQRS.CommandMetadata{
125-
Sequence: client.coapConn.Sequence(),
126-
ConnectionId: client.remoteAddrString(),
127-
}, authCtx.AuthorizationContext)
124+
oldAuthCtx := client.replaceAuthorizationContext(&authCtx)
125+
err = client.server.devicesStatusUpdater.Add(client)
128126
if err != nil {
129127
// Events from resources of device will be comes but device is offline. To recover cloud state, client need to reconnect to cloud.
130128
client.logAndWriteErrorResponse(fmt.Errorf("cannot handle sign in: cannot update cloud device status: %w", err), coapCodes.InternalServerError, req.Token)
131129
client.Close()
132130
return
133131
}
134132

135-
oldAuthCtx := client.replaceAuthorizationContext(&authCtx)
136133
newDevice := false
137134

138135
switch {
@@ -244,10 +241,7 @@ func signOutPostHandler(s mux.ResponseWriter, req *mux.Message, client *Client,
244241
client.Close()
245242
return
246243
}
247-
248-
client.cancelResourceSubscriptions(true)
249-
client.cancelDeviceSubscriptions(true)
250-
oldAuthCtx := client.replaceAuthorizationContext(nil)
244+
oldAuthCtx := client.CleanUp()
251245
if oldAuthCtx.DeviceId != "" {
252246
client.server.expirationClientCache.Set(oldAuthCtx.DeviceId, nil, time.Second)
253247
serviceToken, err := client.server.oauthMgr.GetToken(req.Context)

0 commit comments

Comments
 (0)