Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/added_support_for_slab_versioning.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
default: major
---

# Added support for slab versioning

Slab versioning lets us change the encoding scheme of slabs to add new features or change functionality
10 changes: 10 additions & 0 deletions openapi/app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,12 @@ paths:
schema:
type: object
properties:
version:
type: integer
format: uint8
maximum: 0
default: 0
description: The slab encoding version.
encryptionKey:
allOf:
- $ref: "#/components/schemas/EncryptionKey"
Expand Down Expand Up @@ -348,6 +354,10 @@ paths:
allOf:
- $ref: "#/components/schemas/SlabID"
- description: The ID of the slab
version:
type: integer
format: uint8
description: The slab encoding version, folded into the slab ID for versions greater than 0
encryptionKey:
allOf:
- $ref: "#/components/schemas/EncryptionKey"
Expand Down
4 changes: 4 additions & 0 deletions openapi/components.yml
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,10 @@ components:
properties:
id:
$ref: "#/components/schemas/SlabID"
version:
type: integer
format: uint8
description: The slab encoding version
encryptionKey:
$ref: "#/components/schemas/EncryptionKey"
minShards:
Expand Down
1 change: 1 addition & 0 deletions persist/postgres/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ CREATE TABLE slabs (

encryption_key BYTEA NOT NULL,
min_shards SMALLINT NOT NULL CHECK(min_shards > 0),
version SMALLINT NOT NULL DEFAULT 0 CHECK(version >= 0), -- slab encoding version, folded into digest for version > 0

consecutive_failed_repairs SMALLINT NOT NULL DEFAULT 0 CHECK (consecutive_failed_repairs >= 0),
next_repair_attempt TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
Expand Down
4 changes: 4 additions & 0 deletions persist/postgres/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,8 @@ CREATE INDEX pool_attachments_account_id_host_id_idx ON pool_attachments (accoun
}
return nil
},
func(ctx context.Context, tx *txn, log *zap.Logger) error {
_, err := tx.Exec(ctx, `ALTER TABLE slabs ADD COLUMN version SMALLINT NOT NULL DEFAULT 0 CHECK (version >= 0)`)
return err
},
}
12 changes: 6 additions & 6 deletions persist/postgres/objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (s *Store) SharedObject(key types.Hash256) (obj slabs.SharedObject, _ error
return fmt.Errorf("failed to query shared object: %w", err)
}

rows, err := tx.Query(ctx, `SELECT s.id, s.encryption_key, s.min_shards, os.slab_offset, os.slab_length
rows, err := tx.Query(ctx, `SELECT s.id, s.encryption_key, s.min_shards, os.slab_offset, os.slab_length, s.version
FROM object_slabs os
INNER JOIN slabs s ON (os.slab_digest = s.digest)
WHERE os.object_id = $1
Expand All @@ -39,7 +39,7 @@ func (s *Store) SharedObject(key types.Hash256) (obj slabs.SharedObject, _ error
for rows.Next() {
var slab slabs.SlabSlice
var slabDBID int64
err := rows.Scan(&slabDBID, (*sqlHash256)(&slab.EncryptionKey), &slab.MinShards, &slab.Offset, &slab.Length)
err := rows.Scan(&slabDBID, (*sqlHash256)(&slab.EncryptionKey), &slab.MinShards, &slab.Offset, &slab.Length, &slab.Version)
if err != nil {
return fmt.Errorf("failed to scan slab: %w", err)
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func (s *Store) Object(account proto.Account, key types.Hash256) (obj slabs.Seal
}

rows, err := tx.Query(ctx, `
SELECT slabs.id, slab_offset, slab_length, slabs.encryption_key, slabs.min_shards
SELECT slabs.id, slab_offset, slab_length, slabs.encryption_key, slabs.min_shards, slabs.version
FROM object_slabs
JOIN slabs ON slabs.digest = object_slabs.slab_digest
WHERE object_id = $1
Expand All @@ -121,7 +121,7 @@ func (s *Store) Object(account proto.Account, key types.Hash256) (obj slabs.Seal
for rows.Next() {
var slab slabs.SlabSlice
var slabID int64
err := rows.Scan(&slabID, &slab.Offset, &slab.Length, (*sqlHash256)(&slab.EncryptionKey), &slab.MinShards)
err := rows.Scan(&slabID, &slab.Offset, &slab.Length, (*sqlHash256)(&slab.EncryptionKey), &slab.MinShards, &slab.Version)
if err != nil {
return fmt.Errorf("failed to scan slab: %w", err)
}
Expand Down Expand Up @@ -209,7 +209,7 @@ func (s *Store) ListObjects(account proto.Account, cursor slabs.Cursor, limit in
}

rows, err = tx.Query(ctx, `
SELECT slabs.id, slab_offset, slab_length, slabs.encryption_key, slabs.min_shards
SELECT slabs.id, slab_offset, slab_length, slabs.encryption_key, slabs.min_shards, slabs.version
FROM object_slabs
JOIN slabs ON slabs.digest = object_slabs.slab_digest
WHERE object_id = $1
Expand All @@ -223,7 +223,7 @@ func (s *Store) ListObjects(account proto.Account, cursor slabs.Cursor, limit in
for rows.Next() {
var slab slabs.SlabSlice
var slabID int64
err := rows.Scan(&slabID, &slab.Offset, &slab.Length, (*sqlHash256)(&slab.EncryptionKey), &slab.MinShards)
err := rows.Scan(&slabID, &slab.Offset, &slab.Length, (*sqlHash256)(&slab.EncryptionKey), &slab.MinShards, &slab.Version)
if err != nil {
rows.Close()
return fmt.Errorf("failed to scan slab: %w", err)
Expand Down
49 changes: 49 additions & 0 deletions persist/postgres/objects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,55 @@ func TestObject(t *testing.T) {
}
}

func TestObjectSlabVersion(t *testing.T) {
store := initPostgres(t, zap.NewNop())
acc := proto.Account{1}
store.addTestAccount(t, types.PublicKey(acc))
hk := store.addTestHost(t)
fcid := store.addTestContract(t, hk)

params := slabs.SlabPinParams{
Version: 1,
EncryptionKey: frand.Entropy256(),
MinShards: 1,
Sectors: []slabs.PinnedSector{
{Root: frand.Entropy256(), HostKey: hk},
},
}
if _, err := store.PinSlabs(acc, time.Time{}, params); err != nil {
t.Fatal(err)
} else if err := store.PinSectors(fcid, []types.Hash256{params.Sectors[0].Root}); err != nil {
t.Fatal(err)
}

obj := slabs.SealedObject{
EncryptedDataKey: frand.Bytes(72),
DataSignature: types.Signature(frand.Bytes(64)),
MetadataSignature: types.Signature(frand.Bytes(64)),
Slabs: []slabs.SlabSlice{params.Slice(0, 100)},
}
if obj.Slabs[0].Version != 1 {
t.Fatalf("expected slice version 1 before save, got %d", obj.Slabs[0].Version)
}
if err := store.PinObject(acc, obj.PinRequest()); err != nil {
t.Fatal(err)
}

got, err := store.Object(acc, obj.ID())
if err != nil {
t.Fatal(err)
} else if got.Slabs[0].Version != 1 {
t.Fatalf("Object: expected reloaded slice version 1, got %d", got.Slabs[0].Version)
}

shared, err := store.SharedObject(obj.ID())
if err != nil {
t.Fatal(err)
} else if shared.Slabs[0].Version != 1 {
t.Fatalf("SharedObject: expected reloaded slice version 1, got %d", shared.Slabs[0].Version)
}
}

func TestObjects(t *testing.T) {
store := initPostgres(t, zap.NewNop())

Expand Down
10 changes: 5 additions & 5 deletions persist/postgres/sectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,11 +280,11 @@ func (s *Store) PinSlabs(account proto.Account, nextIntegrityCheck time.Time, to
var slabID int64
var existingSlab bool
err = tx.QueryRow(ctx, `
INSERT INTO slabs (digest, encryption_key, min_shards)
VALUES ($1, $2, $3)
INSERT INTO slabs (digest, encryption_key, min_shards, version)
VALUES ($1, $2, $3, $4)
ON CONFLICT (digest) DO UPDATE SET pinned_at = NOW()
RETURNING id, (xmax <> 0)
`, sqlHash256(digest), sqlHash256(slab.EncryptionKey), slab.MinShards).Scan(&slabID, &existingSlab)
`, sqlHash256(digest), sqlHash256(slab.EncryptionKey), slab.MinShards, slab.Version).Scan(&slabID, &existingSlab)
if err != nil {
return err
}
Expand Down Expand Up @@ -633,14 +633,14 @@ func (s *Store) Slabs(account proto.Account, slabIDs []slabs.SlabID) ([]slabs.Sl
var dbIDs []int64
slabBatch := &pgx.Batch{}
for i, slabID := range slabIDs {
slabBatch.Queue(`SELECT s.id, s.encryption_key, s.min_shards, s.pinned_at
slabBatch.Queue(`SELECT s.id, s.encryption_key, s.min_shards, s.version, s.pinned_at
FROM slabs s
INNER JOIN account_slabs ac ON s.id = ac.slab_id
INNER JOIN accounts a ON a.id = ac.account_id
WHERE digest = $1 AND a.public_key = $2`, sqlHash256(slabID), sqlPublicKey(account)).QueryRow(func(row pgx.Row) error {
results[i].ID = slabID
var dbID int64
if err := row.Scan(&dbID, (*sqlHash256)(&results[i].EncryptionKey), &results[i].MinShards, &results[i].PinnedAt); err != nil {
if err := row.Scan(&dbID, (*sqlHash256)(&results[i].EncryptionKey), &results[i].MinShards, &results[i].Version, &results[i].PinnedAt); err != nil {
if errors.Is(err, sql.ErrNoRows) {
err = slabs.ErrSlabNotFound
}
Expand Down
8 changes: 4 additions & 4 deletions persist/postgres/slabs.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ func (s *Store) Slab(slabID slabs.SlabID) (slab slabs.Slab, err error) {
slab.Sectors = slab.Sectors[:0] // reuse same slice if transaction retries

var dbID int64
err = tx.QueryRow(ctx, `SELECT s.id, s.encryption_key, s.min_shards, s.pinned_at FROM slabs s WHERE digest = $1`, sqlHash256(slabID)).Scan(
&dbID, (*sqlHash256)(&slab.EncryptionKey), &slab.MinShards, &slab.PinnedAt)
err = tx.QueryRow(ctx, `SELECT s.id, s.encryption_key, s.min_shards, s.version, s.pinned_at FROM slabs s WHERE digest = $1`, sqlHash256(slabID)).Scan(
&dbID, (*sqlHash256)(&slab.EncryptionKey), &slab.MinShards, &slab.Version, &slab.PinnedAt)
if errors.Is(err, sql.ErrNoRows) {
return slabs.ErrSlabNotFound
} else if err != nil {
Expand Down Expand Up @@ -110,8 +110,8 @@ func (s *Store) PinnedSlab(account proto.Account, slabID slabs.SlabID) (slab sla
slab.Sectors = slab.Sectors[:0] // reuse same slice if transaction retries

var dbID int64
err = tx.QueryRow(ctx, `SELECT s.id, s.encryption_key, s.min_shards FROM slabs s WHERE digest = $1`, sqlHash256(slabID)).Scan(
&dbID, (*sqlHash256)(&slab.EncryptionKey), &slab.MinShards)
err = tx.QueryRow(ctx, `SELECT s.id, s.encryption_key, s.min_shards, s.version FROM slabs s WHERE digest = $1`, sqlHash256(slabID)).Scan(
&dbID, (*sqlHash256)(&slab.EncryptionKey), &slab.MinShards, &slab.Version)
if errors.Is(err, sql.ErrNoRows) {
return slabs.ErrSlabNotFound
} else if err != nil {
Expand Down
58 changes: 58 additions & 0 deletions persist/postgres/slabs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,64 @@ func TestSlab(t *testing.T) {
}
}

func TestSlabVersionRoundTrip(t *testing.T) {
store := initPostgres(t, zaptest.NewLogger(t).Named("postgres"))
account := proto.Account{1}
store.addTestAccount(t, types.PublicKey(account))

hosts := make([]types.PublicKey, 3)
for i := range hosts {
hosts[i] = store.addTestHost(t)
store.addTestContract(t, hosts[i])
}

params := slabs.SlabPinParams{
Version: 1,
EncryptionKey: frand.Entropy256(),
MinShards: 1,
Sectors: make([]slabs.PinnedSector, 0, len(hosts)),
}
for _, host := range hosts {
params.Sectors = append(params.Sectors, slabs.PinnedSector{
Root: frand.Entropy256(),
HostKey: host,
})
}

slabIDs, err := store.PinSlabs(account, time.Time{}, params)
if err != nil {
t.Fatal(err)
}

// the version is folded into the slab ID
if slabIDs[0] != params.Digest() {
t.Fatalf("expected slab ID %v, got %v", params.Digest(), slabIDs[0])
}

slab, err := store.Slab(slabIDs[0])
if err != nil {
t.Fatal(err)
} else if slab.Version != 1 {
t.Fatalf("Slab: expected version 1, got %d", slab.Version)
}

pinned, err := store.PinnedSlab(account, slabIDs[0])
if err != nil {
t.Fatal(err)
} else if pinned.Version != 1 {
t.Fatalf("PinnedSlab: expected version 1, got %d", pinned.Version)
}

bulk, err := store.Slabs(account, slabIDs)
if err != nil {
t.Fatal(err)
} else if len(bulk) != 1 {
t.Fatalf("expected 1 slab, got %d", len(bulk))
} else if bulk[0].Version != 1 {
t.Fatalf("Slabs: expected version 1, got %d", bulk[0].Version)
}
}

func TestMarkSlabRepaired(t *testing.T) {
store := initPostgres(t, zap.NewNop())

Expand Down
2 changes: 2 additions & 0 deletions slabs/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func (ps *PinnedSlab) DecodeFrom(d *types.Decoder) {

// EncodeTo implements types.EncoderTo.
func (s SlabSlice) EncodeTo(e *types.Encoder) {
e.WriteUint8(s.Version)
e.Write(s.EncryptionKey[:])
e.WriteUint8(uint8(s.MinShards))
types.EncodeSlice(e, s.Sectors)
Expand All @@ -54,6 +55,7 @@ func (s SlabSlice) EncodeTo(e *types.Encoder) {

// DecodeFrom implements types.DecoderFrom.
func (s *SlabSlice) DecodeFrom(d *types.Decoder) {
s.Version = d.ReadUint8()
d.Read(s.EncryptionKey[:])
s.MinShards = uint(d.ReadUint8())
types.DecodeSlice(d, &s.Sectors)
Expand Down
4 changes: 4 additions & 0 deletions slabs/objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type (

// SlabSlice represents a slice of a slab that is part of an object.
SlabSlice struct {
Version uint8 `json:"version"`
EncryptionKey EncryptionKey `json:"encryptionKey"`
MinShards uint `json:"minShards"`
Sectors []PinnedSector `json:"sectors"`
Expand Down Expand Up @@ -337,6 +338,7 @@ func (k *EncryptionKey) UnmarshalJSON(b []byte) error {
// Pin converts the SlabSlice to SlabPinParams.
func (s SlabSlice) Pin() SlabPinParams {
return SlabPinParams{
Version: s.Version,
EncryptionKey: s.EncryptionKey,
MinShards: s.MinShards,
Sectors: slices.Clone(s.Sectors),
Expand All @@ -346,6 +348,7 @@ func (s SlabSlice) Pin() SlabPinParams {
// Slice creates a SlabSlice from the SlabPinParams.
func (s SlabPinParams) Slice(offset, length uint32) SlabSlice {
return SlabSlice{
Version: s.Version,
EncryptionKey: s.EncryptionKey,
MinShards: s.MinShards,
Sectors: slices.Clone(s.Sectors),
Expand All @@ -357,6 +360,7 @@ func (s SlabPinParams) Slice(offset, length uint32) SlabSlice {
// Slice creates a SlabSlice from the PinnedSlab.
func (s PinnedSlab) Slice(offset, length uint32) SlabSlice {
return SlabSlice{
Version: s.Version,
EncryptionKey: s.EncryptionKey,
MinShards: s.MinShards,
Sectors: slices.Clone(s.Sectors),
Expand Down
Loading
Loading