Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion docs/adr/0003-materialized-views.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type MaterializedViewDescriptor struct {
BaseTable string
KeySchema KeySchema // own PK + optional SK
ProjectedAttributes []string // empty → all base attributes
GroupBy []string // aggregate views: must match key
Aggregations []MaterializedViewAggregation
RefreshPolicy RefreshPolicy
Status string // building | active | paused | failed
LastRefreshAtUnix int64
Expand Down Expand Up @@ -89,6 +91,17 @@ EAGER hook (Phase 2 / #491):
succeeded. On failure: log, increment error metric, return
error to the caller.

Aggregating EAGER MVs support `COUNT(*)` and `SUM(col)` only. The
`GROUP BY` list must match the MV primary key, and aggregate output
columns are stored as counter columns. The write hook captures the
old image when needed, combines old/new contributions per MV key,
then applies the resulting deltas through the internal
`Replica.AtomicUpdateMV` path. Updates that keep the same group become
a net SUM delta with no COUNT change; group moves decrement the old
group and increment the new group. Deleting the final row in a group
leaves a zero-valued aggregate row; compaction/removal of zero rows is
left to a future maintenance policy.

SCHEDULED / ON_DEMAND writes do not touch the hot path. They go
through a shared **refresh-complete engine** (Phase 4 / #493)
invoked by either:
Expand Down Expand Up @@ -198,7 +211,10 @@ the same base) handles most cases.
- **MV must carry the base PK in its row schema**: yes (otherwise
same base row maps to multiple MV rows with no deterministic
delete).
- **Filter / aggregate / join in view**: out of scope for v1.
- **Filter / join in view**: out of scope for v1.
- **Aggregates in view**: `COUNT(*)` and `SUM(col)` for EAGER views
only; MIN / MAX / AVG and general query-time GROUP BY stay out of
scope.
- **Multiple MVs per base table**: yes, each independently
maintained per its own policy.
- **Schema evolution**: ALTER on a base column the MV depends on
Expand Down
12 changes: 11 additions & 1 deletion internal/catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,10 +645,20 @@ func removeViewName(xs []string, x string) []string {
}

func mvToTableDescriptor(mv types.MaterializedViewDescriptor) types.TableDescriptor {
return types.TableDescriptor{
td := types.TableDescriptor{
Name: mv.Name,
KeySchema: mv.KeySchema,
}
if len(mv.Aggregations) > 0 {
td.AttributeDefinitions = make([]types.AttributeDefinition, 0, len(mv.Aggregations))
for _, agg := range mv.Aggregations {
td.AttributeDefinitions = append(td.AttributeDefinitions, types.AttributeDefinition{
Name: agg.TargetAttribute,
Type: types.AttributeTypeCounter,
})
}
}
return td
}

// loadAllServiceLevels hydrates the in-memory map from pebble on open.
Expand Down
72 changes: 72 additions & 0 deletions internal/catalog/domain/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,72 @@ func NormalizeMVDescriptor(mv *types.MaterializedViewDescriptor) error {
}
mv.ProjectedAttributes = out
}
if mv.GroupBy != nil {
seen := map[string]struct{}{}
out := make([]string, 0, len(mv.GroupBy))
for _, a := range mv.GroupBy {
a = strings.TrimSpace(a)
if a == "" {
continue
}
if _, dup := seen[a]; dup {
continue
}
seen[a] = struct{}{}
out = append(out, a)
}
mv.GroupBy = out
}
if len(mv.Aggregations) > 0 {
if mv.RefreshPolicy.Mode != types.RefreshModeEager {
return fmt.Errorf("materialized view %q: aggregate views require REFRESH EAGER", mv.Name)
}
if len(mv.GroupBy) == 0 {
mv.GroupBy = []string{mv.KeySchema.PK}
if mv.KeySchema.SK != "" {
mv.GroupBy = append(mv.GroupBy, mv.KeySchema.SK)
}
}
if len(mv.GroupBy) != 1 && len(mv.GroupBy) != 2 {
return fmt.Errorf("materialized view %q: aggregate GROUP BY must match the primary key", mv.Name)
}
if mv.GroupBy[0] != mv.KeySchema.PK {
return fmt.Errorf("materialized view %q: GROUP BY %q must match primary key %q", mv.Name, mv.GroupBy[0], mv.KeySchema.PK)
}
if mv.KeySchema.SK == "" && len(mv.GroupBy) > 1 {
return fmt.Errorf("materialized view %q: GROUP BY has sort key %q but view key has no SK", mv.Name, mv.GroupBy[1])
}
if mv.KeySchema.SK != "" && (len(mv.GroupBy) != 2 || mv.GroupBy[1] != mv.KeySchema.SK) {
return fmt.Errorf("materialized view %q: GROUP BY must include sort key %q", mv.Name, mv.KeySchema.SK)
}
seenTargets := map[string]struct{}{mv.KeySchema.PK: {}}
if mv.KeySchema.SK != "" {
seenTargets[mv.KeySchema.SK] = struct{}{}
}
for i := range mv.Aggregations {
agg := &mv.Aggregations[i]
agg.Function = strings.ToUpper(strings.TrimSpace(agg.Function))
agg.SourceAttribute = strings.TrimSpace(agg.SourceAttribute)
agg.TargetAttribute = strings.TrimSpace(agg.TargetAttribute)
if agg.TargetAttribute == "" {
return fmt.Errorf("materialized view %q: aggregation %d target attribute required", mv.Name, i)
}
if _, dup := seenTargets[agg.TargetAttribute]; dup {
return fmt.Errorf("materialized view %q: duplicate aggregation target %q", mv.Name, agg.TargetAttribute)
}
seenTargets[agg.TargetAttribute] = struct{}{}
switch agg.Function {
case types.MVAggregationCount:
agg.SourceAttribute = ""
case types.MVAggregationSum:
if agg.SourceAttribute == "" {
return fmt.Errorf("materialized view %q: SUM aggregation requires source attribute", mv.Name)
}
default:
return fmt.Errorf("materialized view %q: unsupported aggregation %q", mv.Name, agg.Function)
}
}
}
return nil
}

Expand All @@ -327,5 +393,11 @@ func CloneMVDescriptor(in types.MaterializedViewDescriptor) types.MaterializedVi
if in.ProjectedAttributes != nil {
out.ProjectedAttributes = append([]string(nil), in.ProjectedAttributes...)
}
if in.GroupBy != nil {
out.GroupBy = append([]string(nil), in.GroupBy...)
}
if in.Aggregations != nil {
out.Aggregations = append([]types.MaterializedViewAggregation(nil), in.Aggregations...)
}
return out
}
11 changes: 11 additions & 0 deletions internal/cluster/cross_shard_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@ func (m *Manager) BatchWriteMVToPeer(ctx context.Context, peerID, addr string, r
return err
}

// AtomicUpdateMVToPeer forwards aggregate-MV counter deltas to the
// peer that owns the routed view row.
func (m *Manager) AtomicUpdateMVToPeer(ctx context.Context, peerID, addr string, req *cefaspb.AtomicUpdateMVRequest) error {
conn, err := m.peerWriteConn(ctx, peerID, addr)
if err != nil {
return fmt.Errorf("dial peer %s: %w", peerID, err)
}
_, err = cefaspb.NewReplicaClient(conn).AtomicUpdateMV(ctx, req)
return err
}

// BatchWriteGIToPeer forwards a global-index cascade bucket. Same
// RF=1 contract as BatchWriteMVToPeer; see Replica.BatchWriteGI.
func (m *Manager) BatchWriteGIToPeer(ctx context.Context, peerID, addr string, req *cefaspb.BatchWriteGIRequest) error {
Expand Down
56 changes: 56 additions & 0 deletions internal/server/grpc_materialized_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ func mvDescriptorToPB(mv types.MaterializedViewDescriptor) *cefaspb.Materialized
BaseTable: mv.BaseTable,
KeySchema: &cefaspb.KeySchema{Pk: mv.KeySchema.PK, Sk: mv.KeySchema.SK},
ProjectedAttributes: append([]string(nil), mv.ProjectedAttributes...),
GroupBy: append([]string(nil), mv.GroupBy...),
Aggregations: mvAggregationsToPB(mv.Aggregations),
RefreshPolicy: refreshPolicyToPB(mv.RefreshPolicy),
Status: mv.Status,
LastRefreshAtUnix: mv.LastRefreshAtUnix,
Expand All @@ -148,6 +150,8 @@ func pbToMVDescriptor(pb *cefaspb.MaterializedViewDescriptor) types.Materialized
Name: pb.GetName(),
BaseTable: pb.GetBaseTable(),
ProjectedAttributes: append([]string(nil), pb.GetProjectedAttributes()...),
GroupBy: append([]string(nil), pb.GetGroupBy()...),
Aggregations: pbAggregationsToTypes(pb.GetAggregations()),
Status: pb.GetStatus(),
LastRefreshAtUnix: pb.GetLastRefreshAtUnix(),
}
Expand All @@ -163,6 +167,58 @@ func pbToMVDescriptor(pb *cefaspb.MaterializedViewDescriptor) types.Materialized
return out
}

func mvAggregationsToPB(in []types.MaterializedViewAggregation) []*cefaspb.MaterializedViewAggregation {
if len(in) == 0 {
return nil
}
out := make([]*cefaspb.MaterializedViewAggregation, 0, len(in))
for _, agg := range in {
out = append(out, &cefaspb.MaterializedViewAggregation{
Function: mvAggregationFunctionToPB(agg.Function),
SourceAttribute: agg.SourceAttribute,
TargetAttribute: agg.TargetAttribute,
})
}
return out
}

func pbAggregationsToTypes(in []*cefaspb.MaterializedViewAggregation) []types.MaterializedViewAggregation {
if len(in) == 0 {
return nil
}
out := make([]types.MaterializedViewAggregation, 0, len(in))
for _, agg := range in {
out = append(out, types.MaterializedViewAggregation{
Function: pbAggregationFunctionToTypes(agg.GetFunction()),
SourceAttribute: agg.GetSourceAttribute(),
TargetAttribute: agg.GetTargetAttribute(),
})
}
return out
}

func mvAggregationFunctionToPB(fn string) cefaspb.MaterializedViewAggregation_Function {
switch fn {
case types.MVAggregationCount:
return cefaspb.MaterializedViewAggregation_COUNT
case types.MVAggregationSum:
return cefaspb.MaterializedViewAggregation_SUM
default:
return cefaspb.MaterializedViewAggregation_FUNCTION_UNSPECIFIED
}
}

func pbAggregationFunctionToTypes(fn cefaspb.MaterializedViewAggregation_Function) string {
switch fn {
case cefaspb.MaterializedViewAggregation_COUNT:
return types.MVAggregationCount
case cefaspb.MaterializedViewAggregation_SUM:
return types.MVAggregationSum
default:
return ""
}
}

func refreshPolicyToPB(rp types.RefreshPolicy) *cefaspb.RefreshPolicy {
out := &cefaspb.RefreshPolicy{
IntervalSeconds: rp.IntervalSeconds,
Expand Down
Loading
Loading