Skip to content

Commit 116f3eb

Browse files
committed
cqrs: try to rewind to snapshot when first events is not a snapshot
1 parent 459bd62 commit 116f3eb

4 files changed

Lines changed: 182 additions & 11 deletions

File tree

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ test: env
9898
-e TEST_ROOT_CA_KEY=/certs/root_ca.key \
9999
-e ACME_DB_DIR=/home/certificate-authority \
100100
cloud-test \
101-
go test -race -p 1 -v ./... -covermode=atomic -coverprofile=/home/coverage.txt
101+
go test -timeout=45m -race -p 1 -v ./... -covermode=atomic -coverprofile=/home/coverage.txt
102102

103103
build: cloud-build $(SUBDIRS)
104104

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
package aggregate_test
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
"testing"
8+
9+
"github.com/google/uuid"
10+
"github.com/kelseyhightower/envconfig"
11+
"github.com/plgd-dev/cloud/pkg/net/grpc"
12+
"github.com/plgd-dev/cloud/resource-aggregate/cqrs/aggregate"
13+
"github.com/plgd-dev/cloud/resource-aggregate/cqrs/events"
14+
"github.com/plgd-dev/cloud/resource-aggregate/cqrs/eventstore"
15+
"github.com/plgd-dev/cloud/resource-aggregate/cqrs/eventstore/mongodb"
16+
"github.com/plgd-dev/cloud/resource-aggregate/cqrs/utils"
17+
"github.com/plgd-dev/cloud/resource-aggregate/pb"
18+
"github.com/plgd-dev/kit/security/certManager"
19+
"github.com/stretchr/testify/assert"
20+
"github.com/stretchr/testify/require"
21+
"go.uber.org/atomic"
22+
)
23+
24+
func testNewEventstore(t *testing.T) *mongodb.EventStore {
25+
var config certManager.Config
26+
err := envconfig.Process("DIAL", &config)
27+
assert.NoError(t, err)
28+
29+
dialCertManager, err := certManager.NewCertManager(config)
30+
require.NoError(t, err)
31+
32+
tlsConfig := dialCertManager.GetClientTLSConfig()
33+
34+
store, err := mongodb.NewEventStore(
35+
mongodb.Config{
36+
URI: "mongodb://localhost:27017",
37+
},
38+
func(f func()) error { go f(); return nil },
39+
mongodb.WithTLS(tlsConfig),
40+
)
41+
require.NoError(t, err)
42+
require.NotNil(t, store)
43+
44+
return store
45+
}
46+
47+
func cleanUpToSnapshot(ctx context.Context, t *testing.T, store *mongodb.EventStore, evs []eventstore.Event) {
48+
for _, event := range evs {
49+
if ru, ok := event.(*events.ResourceStateSnapshotTaken); ok {
50+
if err := store.RemoveUpToVersion(ctx, []eventstore.VersionQuery{{GroupID: ru.GroupId(), AggregateID: ru.AggregateId(), Version: ru.Version()}}); err != nil {
51+
require.NoError(t, err)
52+
}
53+
fmt.Printf("snapshot at version %v\n", event.Version())
54+
break
55+
}
56+
}
57+
}
58+
59+
func Test_parallelRequest(t *testing.T) {
60+
store := testNewEventstore(t)
61+
ctx := context.Background()
62+
ctx = grpc.CtxWithIncomingUserID(ctx, "test")
63+
defer store.Close(ctx)
64+
defer func() {
65+
err := store.Clear(ctx)
66+
require.NoError(t, err)
67+
}()
68+
69+
deviceID := "7397398d-3ae8-4d9a-62d6-511f7b736a60"
70+
href := "/test/resource/1"
71+
commandPub := pb.PublishResourceRequest{
72+
ResourceId: &pb.ResourceId{
73+
DeviceId: deviceID,
74+
Href: href,
75+
},
76+
Resource: &pb.Resource{
77+
Id: utils.MakeResourceId(deviceID, href),
78+
DeviceId: deviceID,
79+
},
80+
CommandMetadata: &pb.CommandMetadata{},
81+
AuthorizationContext: &pb.AuthorizationContext{},
82+
}
83+
84+
newAggragate := func(deviceID, href string) *aggregate.Aggregate {
85+
a, err := aggregate.NewAggregate(deviceID, utils.MakeResourceId(deviceID, href), aggregate.NewDefaultRetryFunc(32), 16, store, func(context.Context) (aggregate.AggregateModel, error) {
86+
ev := events.NewResourceStateSnapshotTaken()
87+
ev.Id = utils.MakeResourceId(deviceID, href)
88+
return ev, nil
89+
}, nil)
90+
require.NoError(t, err)
91+
return a
92+
}
93+
94+
concurrencyExcepTestA := newAggragate(commandPub.GetResourceId().GetDeviceId(), commandPub.GetResourceId().GetHref())
95+
_, err := concurrencyExcepTestA.HandleCommand(ctx, &commandPub)
96+
require.NoError(t, err)
97+
98+
numParallel := 3
99+
var wg sync.WaitGroup
100+
var anyError atomic.Bool
101+
for i := 0; i < numParallel; i++ {
102+
wg.Add(1)
103+
go func() {
104+
defer wg.Done()
105+
for j := 0; j < 100000; j++ {
106+
if anyError.Load() {
107+
return
108+
}
109+
commandContentChanged := pb.NotifyResourceChangedRequest{
110+
ResourceId: &pb.ResourceId{
111+
DeviceId: deviceID,
112+
Href: href,
113+
},
114+
Content: &pb.Content{
115+
Data: []byte("hello world"),
116+
ContentType: "text",
117+
},
118+
CommandMetadata: &pb.CommandMetadata{
119+
ConnectionId: uuid.New().String(),
120+
},
121+
Status: pb.Status_OK,
122+
AuthorizationContext: &pb.AuthorizationContext{},
123+
}
124+
aggr := newAggragate(commandPub.GetResourceId().GetDeviceId(), commandPub.GetResourceId().GetHref())
125+
events, err := aggr.HandleCommand(ctx, &commandContentChanged)
126+
if err != nil {
127+
anyError.Store(true)
128+
require.NoError(t, err)
129+
return
130+
}
131+
cleanUpToSnapshot(ctx, t, store, events)
132+
}
133+
}()
134+
}
135+
wg.Wait()
136+
}

resource-aggregate/cqrs/eventstore/mongodb/eventstore.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,7 @@ type iterator struct {
316316
dataUnmarshaler UnmarshalerFunc
317317
LogDebugfFunc LogDebugfFunc
318318
groupID string
319+
loaded bool
319320
}
320321

321322
func (i *iterator) Next(ctx context.Context) (eventstore.EventUnmarshaler, bool) {
@@ -324,6 +325,7 @@ func (i *iterator) Next(ctx context.Context) (eventstore.EventUnmarshaler, bool)
324325
if !i.iter.Next(ctx) {
325326
return nil, false
326327
}
328+
i.loaded = true
327329

328330
err := i.iter.Decode(&event)
329331
if err != nil {
@@ -345,7 +347,11 @@ func (i *iterator) Next(ctx context.Context) (eventstore.EventUnmarshaler, bool)
345347
}
346348

347349
func (i *iterator) Err() error {
348-
return i.iter.Err()
350+
err := i.iter.Err()
351+
if err == nil && !i.loaded {
352+
err = fmt.Errorf("none event was loaded")
353+
}
354+
return err
349355
}
350356

351357
func versionQueriesToMgoQuery(queries []eventstore.VersionQuery, op signOperator) (bson.M, error) {
@@ -417,7 +423,9 @@ func (l *loader) QueryHandlePool(ctx context.Context, iter *queryIterator) error
417423
var errors []error
418424
var errorsLock sync.Mutex
419425

426+
var loaded bool
420427
for iter.Next(ctx, &query) {
428+
loaded = true
421429
queries = append(queries, query)
422430
if len(queries) >= l.store.batchSize {
423431
wg.Add(1)
@@ -449,6 +457,9 @@ func (l *loader) QueryHandlePool(ctx context.Context, iter *queryIterator) error
449457
if len(errors) > 0 {
450458
return fmt.Errorf("loader cannot load events: %v", errors)
451459
}
460+
if !loaded {
461+
return fmt.Errorf("none snapshot event was loaded")
462+
}
452463

453464
if iter.Err() != nil {
454465
return iter.Err()

resource-aggregate/cqrs/eventstore/projection.go

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -97,15 +97,29 @@ type iterator struct {
9797
reload *VersionQuery
9898
}
9999

100-
func (i *iterator) Rewind(ctx context.Context) {
100+
func (i *iterator) RewindToNextAggregateEvent(ctx context.Context) EventUnmarshaler {
101+
for {
102+
snapshot, nextAggregateEvent := i.RewindToSnapshot(ctx)
103+
if nextAggregateEvent != nil {
104+
return nextAggregateEvent
105+
}
106+
if snapshot == nil && nextAggregateEvent == nil {
107+
return nil
108+
}
109+
}
110+
}
111+
112+
func (i *iterator) RewindToSnapshot(ctx context.Context) (snapshot EventUnmarshaler, nextAggregateEvent EventUnmarshaler) {
101113
for {
102114
e, ok := i.iter.Next(ctx)
103115
if !ok {
104-
break
116+
return nil, nil
117+
}
118+
if e.EventType() == i.model.SnapshotEventType() && e.GroupID() == i.model.groupId && e.AggregateID() == i.model.aggregateId {
119+
return e, nil
105120
}
106121
if e.GroupID() != i.model.groupId || e.AggregateID() != i.model.aggregateId {
107-
i.nextEventToProcess = e
108-
return
122+
return nil, e
109123
}
110124
}
111125
}
@@ -135,9 +149,19 @@ func (i *iterator) Next(ctx context.Context) (EventUnmarshaler, bool) {
135149
ignore, reload := i.model.Update(tmp)
136150
i.model.LogDebugfFunc("projection.iterator.next: GroupId %v: AggregateId %v: Version %v, EvenType %v, ignore %v reload %v", tmp.GroupID, tmp.AggregateID, tmp.Version, tmp.EventType, ignore, reload)
137151
if reload {
138-
i.reload = &VersionQuery{GroupID: tmp.GroupID(), AggregateID: tmp.AggregateID(), Version: i.model.version}
139-
i.Rewind(ctx)
140-
return nil, false
152+
snapshot, nextAggregateEvent := i.RewindToSnapshot(ctx)
153+
if snapshot == nil {
154+
i.nextEventToProcess = nextAggregateEvent
155+
i.reload = &VersionQuery{GroupID: tmp.GroupID(), AggregateID: tmp.AggregateID(), Version: i.model.version}
156+
return nil, false
157+
}
158+
tmp = snapshot
159+
ignore, reload = i.model.Update(tmp)
160+
if reload {
161+
i.nextEventToProcess = i.RewindToNextAggregateEvent(ctx)
162+
i.reload = &VersionQuery{GroupID: tmp.GroupID(), AggregateID: tmp.AggregateID(), Version: i.model.version}
163+
return nil, false
164+
}
141165
}
142166
if ignore {
143167
return i.RewindIgnore(ctx)
@@ -208,8 +232,8 @@ func (p *Projection) handle(ctx context.Context, iter Iter) (reloadQueries []Ver
208232
if i.nextEventToProcess == nil {
209233
_, ok := i.Next(ctx)
210234
if ok {
211-
//iterator need to mode to next
212-
i.Rewind(ctx)
235+
//iterator need to move to the next event
236+
i.nextEventToProcess = i.RewindToNextAggregateEvent(ctx)
213237
}
214238
}
215239

0 commit comments

Comments
 (0)