diff --git a/.changeset/added_support_for_slab_versioning.md b/.changeset/added_support_for_slab_versioning.md new file mode 100644 index 000000000..588911c1b --- /dev/null +++ b/.changeset/added_support_for_slab_versioning.md @@ -0,0 +1,7 @@ +--- +default: major +--- + +# Added support for slab versioning + +Slab versioning lets us change the encoding scheme of slabs to add new features or change functionality \ No newline at end of file diff --git a/openapi/app.yml b/openapi/app.yml index 3af4e496a..b8750d0ed 100644 --- a/openapi/app.yml +++ b/openapi/app.yml @@ -268,6 +268,12 @@ paths: schema: type: object properties: + version: + type: integer + format: uint8 + maximum: 0 + default: 0 + description: The slab encoding version. encryptionKey: allOf: - $ref: "#/components/schemas/EncryptionKey" @@ -348,6 +354,10 @@ paths: allOf: - $ref: "#/components/schemas/SlabID" - description: The ID of the slab + version: + type: integer + format: uint8 + description: The slab encoding version, folded into the slab ID for versions greater than 0 encryptionKey: allOf: - $ref: "#/components/schemas/EncryptionKey" diff --git a/openapi/components.yml b/openapi/components.yml index 897cf3afb..2152b2277 100644 --- a/openapi/components.yml +++ b/openapi/components.yml @@ -362,6 +362,10 @@ components: properties: id: $ref: "#/components/schemas/SlabID" + version: + type: integer + format: uint8 + description: The slab encoding version encryptionKey: $ref: "#/components/schemas/EncryptionKey" minShards: diff --git a/persist/postgres/init.sql b/persist/postgres/init.sql index 858b82f34..6f40bb364 100644 --- a/persist/postgres/init.sql +++ b/persist/postgres/init.sql @@ -331,6 +331,7 @@ CREATE TABLE slabs ( encryption_key BYTEA NOT NULL, min_shards SMALLINT NOT NULL CHECK(min_shards > 0), + version SMALLINT NOT NULL DEFAULT 0 CHECK(version >= 0), -- slab encoding version, folded into digest for version > 0 consecutive_failed_repairs SMALLINT NOT NULL DEFAULT 0 CHECK (consecutive_failed_repairs >= 0), next_repair_attempt TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW() diff --git a/persist/postgres/migrations.go b/persist/postgres/migrations.go index 5d9e5d676..8ccf1c8f5 100644 --- a/persist/postgres/migrations.go +++ b/persist/postgres/migrations.go @@ -173,4 +173,8 @@ CREATE INDEX pool_attachments_account_id_host_id_idx ON pool_attachments (accoun } return nil }, + func(ctx context.Context, tx *txn, log *zap.Logger) error { + _, err := tx.Exec(ctx, `ALTER TABLE slabs ADD COLUMN version SMALLINT NOT NULL DEFAULT 0 CHECK (version >= 0)`) + return err + }, } diff --git a/persist/postgres/objects.go b/persist/postgres/objects.go index c7b2791f6..822f3e189 100644 --- a/persist/postgres/objects.go +++ b/persist/postgres/objects.go @@ -25,7 +25,7 @@ func (s *Store) SharedObject(key types.Hash256) (obj slabs.SharedObject, _ error return fmt.Errorf("failed to query shared object: %w", err) } - rows, err := tx.Query(ctx, `SELECT s.id, s.encryption_key, s.min_shards, os.slab_offset, os.slab_length + rows, err := tx.Query(ctx, `SELECT s.id, s.encryption_key, s.min_shards, os.slab_offset, os.slab_length, s.version FROM object_slabs os INNER JOIN slabs s ON (os.slab_digest = s.digest) WHERE os.object_id = $1 @@ -39,7 +39,7 @@ func (s *Store) SharedObject(key types.Hash256) (obj slabs.SharedObject, _ error for rows.Next() { var slab slabs.SlabSlice var slabDBID int64 - err := rows.Scan(&slabDBID, (*sqlHash256)(&slab.EncryptionKey), &slab.MinShards, &slab.Offset, &slab.Length) + err := rows.Scan(&slabDBID, (*sqlHash256)(&slab.EncryptionKey), &slab.MinShards, &slab.Offset, &slab.Length, &slab.Version) if err != nil { return fmt.Errorf("failed to scan slab: %w", err) } @@ -106,7 +106,7 @@ func (s *Store) Object(account proto.Account, key types.Hash256) (obj slabs.Seal } rows, err := tx.Query(ctx, ` - SELECT slabs.id, slab_offset, slab_length, slabs.encryption_key, slabs.min_shards + SELECT slabs.id, slab_offset, slab_length, slabs.encryption_key, slabs.min_shards, slabs.version FROM object_slabs JOIN slabs ON slabs.digest = object_slabs.slab_digest WHERE object_id = $1 @@ -121,7 +121,7 @@ func (s *Store) Object(account proto.Account, key types.Hash256) (obj slabs.Seal for rows.Next() { var slab slabs.SlabSlice var slabID int64 - err := rows.Scan(&slabID, &slab.Offset, &slab.Length, (*sqlHash256)(&slab.EncryptionKey), &slab.MinShards) + err := rows.Scan(&slabID, &slab.Offset, &slab.Length, (*sqlHash256)(&slab.EncryptionKey), &slab.MinShards, &slab.Version) if err != nil { return fmt.Errorf("failed to scan slab: %w", err) } @@ -209,7 +209,7 @@ func (s *Store) ListObjects(account proto.Account, cursor slabs.Cursor, limit in } rows, err = tx.Query(ctx, ` - SELECT slabs.id, slab_offset, slab_length, slabs.encryption_key, slabs.min_shards + SELECT slabs.id, slab_offset, slab_length, slabs.encryption_key, slabs.min_shards, slabs.version FROM object_slabs JOIN slabs ON slabs.digest = object_slabs.slab_digest WHERE object_id = $1 @@ -223,7 +223,7 @@ func (s *Store) ListObjects(account proto.Account, cursor slabs.Cursor, limit in for rows.Next() { var slab slabs.SlabSlice var slabID int64 - err := rows.Scan(&slabID, &slab.Offset, &slab.Length, (*sqlHash256)(&slab.EncryptionKey), &slab.MinShards) + err := rows.Scan(&slabID, &slab.Offset, &slab.Length, (*sqlHash256)(&slab.EncryptionKey), &slab.MinShards, &slab.Version) if err != nil { rows.Close() return fmt.Errorf("failed to scan slab: %w", err) diff --git a/persist/postgres/objects_test.go b/persist/postgres/objects_test.go index 51d4b6e00..3276b9448 100644 --- a/persist/postgres/objects_test.go +++ b/persist/postgres/objects_test.go @@ -112,6 +112,55 @@ func TestObject(t *testing.T) { } } +func TestObjectSlabVersion(t *testing.T) { + store := initPostgres(t, zap.NewNop()) + acc := proto.Account{1} + store.addTestAccount(t, types.PublicKey(acc)) + hk := store.addTestHost(t) + fcid := store.addTestContract(t, hk) + + params := slabs.SlabPinParams{ + Version: 1, + EncryptionKey: frand.Entropy256(), + MinShards: 1, + Sectors: []slabs.PinnedSector{ + {Root: frand.Entropy256(), HostKey: hk}, + }, + } + if _, err := store.PinSlabs(acc, time.Time{}, params); err != nil { + t.Fatal(err) + } else if err := store.PinSectors(fcid, []types.Hash256{params.Sectors[0].Root}); err != nil { + t.Fatal(err) + } + + obj := slabs.SealedObject{ + EncryptedDataKey: frand.Bytes(72), + DataSignature: types.Signature(frand.Bytes(64)), + MetadataSignature: types.Signature(frand.Bytes(64)), + Slabs: []slabs.SlabSlice{params.Slice(0, 100)}, + } + if obj.Slabs[0].Version != 1 { + t.Fatalf("expected slice version 1 before save, got %d", obj.Slabs[0].Version) + } + if err := store.PinObject(acc, obj.PinRequest()); err != nil { + t.Fatal(err) + } + + got, err := store.Object(acc, obj.ID()) + if err != nil { + t.Fatal(err) + } else if got.Slabs[0].Version != 1 { + t.Fatalf("Object: expected reloaded slice version 1, got %d", got.Slabs[0].Version) + } + + shared, err := store.SharedObject(obj.ID()) + if err != nil { + t.Fatal(err) + } else if shared.Slabs[0].Version != 1 { + t.Fatalf("SharedObject: expected reloaded slice version 1, got %d", shared.Slabs[0].Version) + } +} + func TestObjects(t *testing.T) { store := initPostgres(t, zap.NewNop()) diff --git a/persist/postgres/sectors.go b/persist/postgres/sectors.go index 307d8411b..5e732e3c5 100644 --- a/persist/postgres/sectors.go +++ b/persist/postgres/sectors.go @@ -280,11 +280,11 @@ func (s *Store) PinSlabs(account proto.Account, nextIntegrityCheck time.Time, to var slabID int64 var existingSlab bool err = tx.QueryRow(ctx, ` - INSERT INTO slabs (digest, encryption_key, min_shards) - VALUES ($1, $2, $3) + INSERT INTO slabs (digest, encryption_key, min_shards, version) + VALUES ($1, $2, $3, $4) ON CONFLICT (digest) DO UPDATE SET pinned_at = NOW() RETURNING id, (xmax <> 0) - `, sqlHash256(digest), sqlHash256(slab.EncryptionKey), slab.MinShards).Scan(&slabID, &existingSlab) + `, sqlHash256(digest), sqlHash256(slab.EncryptionKey), slab.MinShards, slab.Version).Scan(&slabID, &existingSlab) if err != nil { return err } @@ -633,14 +633,14 @@ func (s *Store) Slabs(account proto.Account, slabIDs []slabs.SlabID) ([]slabs.Sl var dbIDs []int64 slabBatch := &pgx.Batch{} for i, slabID := range slabIDs { - slabBatch.Queue(`SELECT s.id, s.encryption_key, s.min_shards, s.pinned_at + slabBatch.Queue(`SELECT s.id, s.encryption_key, s.min_shards, s.version, s.pinned_at FROM slabs s INNER JOIN account_slabs ac ON s.id = ac.slab_id INNER JOIN accounts a ON a.id = ac.account_id WHERE digest = $1 AND a.public_key = $2`, sqlHash256(slabID), sqlPublicKey(account)).QueryRow(func(row pgx.Row) error { results[i].ID = slabID var dbID int64 - if err := row.Scan(&dbID, (*sqlHash256)(&results[i].EncryptionKey), &results[i].MinShards, &results[i].PinnedAt); err != nil { + if err := row.Scan(&dbID, (*sqlHash256)(&results[i].EncryptionKey), &results[i].MinShards, &results[i].Version, &results[i].PinnedAt); err != nil { if errors.Is(err, sql.ErrNoRows) { err = slabs.ErrSlabNotFound } diff --git a/persist/postgres/slabs.go b/persist/postgres/slabs.go index 6f3e54bf2..8bd65fb20 100644 --- a/persist/postgres/slabs.go +++ b/persist/postgres/slabs.go @@ -59,8 +59,8 @@ func (s *Store) Slab(slabID slabs.SlabID) (slab slabs.Slab, err error) { slab.Sectors = slab.Sectors[:0] // reuse same slice if transaction retries var dbID int64 - err = tx.QueryRow(ctx, `SELECT s.id, s.encryption_key, s.min_shards, s.pinned_at FROM slabs s WHERE digest = $1`, sqlHash256(slabID)).Scan( - &dbID, (*sqlHash256)(&slab.EncryptionKey), &slab.MinShards, &slab.PinnedAt) + err = tx.QueryRow(ctx, `SELECT s.id, s.encryption_key, s.min_shards, s.version, s.pinned_at FROM slabs s WHERE digest = $1`, sqlHash256(slabID)).Scan( + &dbID, (*sqlHash256)(&slab.EncryptionKey), &slab.MinShards, &slab.Version, &slab.PinnedAt) if errors.Is(err, sql.ErrNoRows) { return slabs.ErrSlabNotFound } else if err != nil { @@ -110,8 +110,8 @@ func (s *Store) PinnedSlab(account proto.Account, slabID slabs.SlabID) (slab sla slab.Sectors = slab.Sectors[:0] // reuse same slice if transaction retries var dbID int64 - err = tx.QueryRow(ctx, `SELECT s.id, s.encryption_key, s.min_shards FROM slabs s WHERE digest = $1`, sqlHash256(slabID)).Scan( - &dbID, (*sqlHash256)(&slab.EncryptionKey), &slab.MinShards) + err = tx.QueryRow(ctx, `SELECT s.id, s.encryption_key, s.min_shards, s.version FROM slabs s WHERE digest = $1`, sqlHash256(slabID)).Scan( + &dbID, (*sqlHash256)(&slab.EncryptionKey), &slab.MinShards, &slab.Version) if errors.Is(err, sql.ErrNoRows) { return slabs.ErrSlabNotFound } else if err != nil { diff --git a/persist/postgres/slabs_test.go b/persist/postgres/slabs_test.go index dbb847a36..b7b37bf00 100644 --- a/persist/postgres/slabs_test.go +++ b/persist/postgres/slabs_test.go @@ -99,6 +99,64 @@ func TestSlab(t *testing.T) { } } +func TestSlabVersionRoundTrip(t *testing.T) { + store := initPostgres(t, zaptest.NewLogger(t).Named("postgres")) + account := proto.Account{1} + store.addTestAccount(t, types.PublicKey(account)) + + hosts := make([]types.PublicKey, 3) + for i := range hosts { + hosts[i] = store.addTestHost(t) + store.addTestContract(t, hosts[i]) + } + + params := slabs.SlabPinParams{ + Version: 1, + EncryptionKey: frand.Entropy256(), + MinShards: 1, + Sectors: make([]slabs.PinnedSector, 0, len(hosts)), + } + for _, host := range hosts { + params.Sectors = append(params.Sectors, slabs.PinnedSector{ + Root: frand.Entropy256(), + HostKey: host, + }) + } + + slabIDs, err := store.PinSlabs(account, time.Time{}, params) + if err != nil { + t.Fatal(err) + } + + // the version is folded into the slab ID + if slabIDs[0] != params.Digest() { + t.Fatalf("expected slab ID %v, got %v", params.Digest(), slabIDs[0]) + } + + slab, err := store.Slab(slabIDs[0]) + if err != nil { + t.Fatal(err) + } else if slab.Version != 1 { + t.Fatalf("Slab: expected version 1, got %d", slab.Version) + } + + pinned, err := store.PinnedSlab(account, slabIDs[0]) + if err != nil { + t.Fatal(err) + } else if pinned.Version != 1 { + t.Fatalf("PinnedSlab: expected version 1, got %d", pinned.Version) + } + + bulk, err := store.Slabs(account, slabIDs) + if err != nil { + t.Fatal(err) + } else if len(bulk) != 1 { + t.Fatalf("expected 1 slab, got %d", len(bulk)) + } else if bulk[0].Version != 1 { + t.Fatalf("Slabs: expected version 1, got %d", bulk[0].Version) + } +} + func TestMarkSlabRepaired(t *testing.T) { store := initPostgres(t, zap.NewNop()) diff --git a/slabs/encoding.go b/slabs/encoding.go index 9830ad45d..4e53af77d 100644 --- a/slabs/encoding.go +++ b/slabs/encoding.go @@ -46,6 +46,7 @@ func (ps *PinnedSlab) DecodeFrom(d *types.Decoder) { // EncodeTo implements types.EncoderTo. func (s SlabSlice) EncodeTo(e *types.Encoder) { + e.WriteUint8(s.Version) e.Write(s.EncryptionKey[:]) e.WriteUint8(uint8(s.MinShards)) types.EncodeSlice(e, s.Sectors) @@ -54,6 +55,7 @@ func (s SlabSlice) EncodeTo(e *types.Encoder) { // DecodeFrom implements types.DecoderFrom. func (s *SlabSlice) DecodeFrom(d *types.Decoder) { + s.Version = d.ReadUint8() d.Read(s.EncryptionKey[:]) s.MinShards = uint(d.ReadUint8()) types.DecodeSlice(d, &s.Sectors) diff --git a/slabs/objects.go b/slabs/objects.go index f1fb10266..78f68177c 100644 --- a/slabs/objects.go +++ b/slabs/objects.go @@ -40,6 +40,7 @@ type ( // SlabSlice represents a slice of a slab that is part of an object. SlabSlice struct { + Version uint8 `json:"version"` EncryptionKey EncryptionKey `json:"encryptionKey"` MinShards uint `json:"minShards"` Sectors []PinnedSector `json:"sectors"` @@ -337,6 +338,7 @@ func (k *EncryptionKey) UnmarshalJSON(b []byte) error { // Pin converts the SlabSlice to SlabPinParams. func (s SlabSlice) Pin() SlabPinParams { return SlabPinParams{ + Version: s.Version, EncryptionKey: s.EncryptionKey, MinShards: s.MinShards, Sectors: slices.Clone(s.Sectors), @@ -346,6 +348,7 @@ func (s SlabSlice) Pin() SlabPinParams { // Slice creates a SlabSlice from the SlabPinParams. func (s SlabPinParams) Slice(offset, length uint32) SlabSlice { return SlabSlice{ + Version: s.Version, EncryptionKey: s.EncryptionKey, MinShards: s.MinShards, Sectors: slices.Clone(s.Sectors), @@ -357,6 +360,7 @@ func (s SlabPinParams) Slice(offset, length uint32) SlabSlice { // Slice creates a SlabSlice from the PinnedSlab. func (s PinnedSlab) Slice(offset, length uint32) SlabSlice { return SlabSlice{ + Version: s.Version, EncryptionKey: s.EncryptionKey, MinShards: s.MinShards, Sectors: slices.Clone(s.Sectors), diff --git a/slabs/slabs.go b/slabs/slabs.go index 33606dd11..d02d5322b 100644 --- a/slabs/slabs.go +++ b/slabs/slabs.go @@ -19,6 +19,9 @@ const ( // maxTotalShards is the maximum number of total shards (data + parity) allowed in a slab. maxTotalShards = 256 + + // maxSlabVersion is the maximum slab version supported by the indexer + maxSlabVersion = 1 ) var ( @@ -36,6 +39,10 @@ var ( // number of minimum shards, for example if `MinShards` exceeds the number // of sectors. ErrMinShards = errors.New("slab has invalid number of minimum shards") + + // ErrUnsupportedSlabVersion is returned when attempting to pin a slab with + // a version that is not yet supported. + ErrUnsupportedSlabVersion = errors.New("unsupported slab version") ) type ( @@ -60,6 +67,7 @@ type ( // to hosts. Slab struct { ID SlabID `json:"id"` + Version uint8 `json:"version"` EncryptionKey EncryptionKey `json:"encryptionKey"` MinShards uint `json:"minShards"` Sectors []Sector `json:"sectors"` @@ -74,6 +82,7 @@ type ( // SlabPinParams is the input to PinSlabs SlabPinParams struct { + Version uint8 `json:"version"` EncryptionKey EncryptionKey `json:"encryptionKey"` MinShards uint `json:"minShards"` Sectors []PinnedSector `json:"sectors"` @@ -82,6 +91,7 @@ type ( // A PinnedSlab is a slab that has been pinned to hosts. PinnedSlab struct { ID SlabID `json:"id"` + Version uint8 `json:"version"` EncryptionKey EncryptionKey `json:"encryptionKey"` MinShards uint `json:"minShards"` Sectors []PinnedSector `json:"sectors"` @@ -111,12 +121,12 @@ func (s *SlabID) UnmarshalText(b []byte) error { // Digest computes the digest for the slab pin params. func (s SlabPinParams) Digest() SlabID { - return slabDigest(s.MinShards, s.EncryptionKey, s.Sectors) + return slabDigest(s.Version, s.MinShards, s.EncryptionKey, s.Sectors) } // Digest computes the digest for the slab slice. func (s SlabSlice) Digest() SlabID { - return slabDigest(s.MinShards, s.EncryptionKey, s.Sectors) + return slabDigest(s.Version, s.MinShards, s.EncryptionKey, s.Sectors) } // slabDigest creates a unique digest for a slab. It is important, that the same @@ -124,8 +134,14 @@ func (s SlabSlice) Digest() SlabID { // if one user makes the mistake of pinning a slab with a different encryption // key, this shouldn't prevent other users from pinning the same slab with the // correct key. -func slabDigest(minShards uint, ec [32]byte, sectors []PinnedSector) SlabID { +// +// The version is only included in the digest for version 1 and above so that +// version 0 slabs retain the same ID as before versioning was introduced. +func slabDigest(version uint8, minShards uint, ec [32]byte, sectors []PinnedSector) SlabID { hasher := types.NewHasher() + if version > 0 { + hasher.E.WriteUint8(version) + } hasher.E.WriteUint64(uint64(minShards)) hasher.E.Write(ec[:]) for _, sector := range sectors { @@ -149,7 +165,9 @@ func (s SlabPinParams) DataSize() uint64 { // encryption key is set, the minimum number of shards is met, and that there // are no duplicate host keys or empty roots in the sectors. func (s SlabPinParams) Validate() error { - if s.EncryptionKey == ([32]byte{}) { + if s.Version > maxSlabVersion { + return fmt.Errorf("%w: %d", ErrUnsupportedSlabVersion, s.Version) + } else if s.EncryptionKey == ([32]byte{}) { return errors.New("encryption key is empty") } else if err := ValidateECParams(int(s.MinShards), len(s.Sectors)); err != nil { return err diff --git a/slabs/slabs_test.go b/slabs/slabs_test.go index 2222ca3fb..02d8a26e6 100644 --- a/slabs/slabs_test.go +++ b/slabs/slabs_test.go @@ -87,6 +87,25 @@ func TestSlabPinParamsDigest(t *testing.T) { } } +func TestSlabVersionDigest(t *testing.T) { + v0 := slabs.SlabPinParams{ + Version: 0, + EncryptionKey: frand.Entropy256(), + MinShards: 10, + Sectors: []slabs.PinnedSector{ + {Root: frand.Entropy256(), HostKey: frand.Entropy256()}, + {Root: frand.Entropy256(), HostKey: frand.Entropy256()}, + }, + } + + v1 := v0 + v1.Version = 1 + + if v0.Digest() == v1.Digest() { + t.Fatal("expected version 1 slab to have a different ID than version 0") + } +} + func TestSlabPinParamsSize(t *testing.T) { params := slabs.SlabPinParams{ MinShards: 10,