From cf7e9c3721d1523509dd735770d5fb86ff202fac Mon Sep 17 00:00:00 2001 From: Chris Schinnerl <3903476+ChrisSchinnerl@users.noreply.github.com> Date: Thu, 18 Jun 2026 15:19:52 +0200 Subject: [PATCH 1/4] chunk downloads --- slabs/downloads.go | 224 +++++++++++++++++++++++++++++++++++----- slabs/downloads_test.go | 197 ++++++++++++++++++++++------------- slabs/export_test.go | 8 +- slabs/manager.go | 21 ++++ slabs/migrations.go | 51 +++------ 5 files changed, 366 insertions(+), 135 deletions(-) diff --git a/slabs/downloads.go b/slabs/downloads.go index e3ea241f2..6ec042a68 100644 --- a/slabs/downloads.go +++ b/slabs/downloads.go @@ -10,33 +10,168 @@ import ( "sync/atomic" "time" + "github.com/klauspost/reedsolomon" proto "go.sia.tech/core/rhp/v4" "go.sia.tech/core/types" "go.sia.tech/mux/v3" "go.uber.org/zap" + "golang.org/x/crypto/chacha20" ) var errNotEnoughShards = errors.New("not enough shards") +// defaultRecoveryChunkSize is the default size of the segment-aligned byte +// range requested from each host during recovery. It must be a multiple of +// proto.LeafSize. +const defaultRecoveryChunkSize = 1 << 20 // 1 MiB + type slabDownload struct { root types.Hash256 index int } -// downloadShards downloads at least the minimum number of shards required to -// recover the slab. -func (m *SlabManager) downloadShards(ctx context.Context, slab Slab, log *zap.Logger) ([][]byte, error) { +// slabRecovery coordinates the chunked recovery of a single slab. Rather than +// downloading whole sectors from MinShards hosts, the sector is split into +// segment-aligned byte-range chunks that are recovered concurrently and spread +// across all available hosts. Allowing a slab download that requires 10 shards +// out of 30 to actually leverage 30 hosts. +type slabRecovery struct { + m *SlabManager + slab Slab + required []bool + rs reedsolomon.Encoder + log *zap.Logger + + // out holds the decrypted, reconstructed plaintext for every required + // shard index; all other indices are nil. Each chunk writes into the + // [off, off+len) window of every required shard. + out [][]byte + + // excluded tracks hosts that reported a lost sector mid-recovery so that + // later chunks skip them. + mu sync.Mutex + excluded map[types.PublicKey]struct{} +} + +// recoverShards downloads enough segment-aligned chunks of the slab's sectors, +// spread across all available hosts, to reconstruct the shards marked in +// required. It returns a slice of length len(slab.Sectors) where each required +// index holds the decrypted, reconstructed plaintext shard and all other +// indices are nil. +func (m *SlabManager) recoverShards(ctx context.Context, slab Slab, required []bool, log *zap.Logger) ([][]byte, error) { + if len(required) != len(slab.Sectors) { + panic(fmt.Sprintf("slab %s has %d sectors but %d required flags", slab.ID, len(slab.Sectors), len(required))) // developer error + } + + rs, err := reedsolomon.New(int(slab.MinShards), len(slab.Sectors)-int(slab.MinShards)) + if err != nil { + // New only errors on invalid parameters, which originate from the + // database and should always be valid. + log.Panic("failed to create reedsolomon encoder", zap.Error(err)) + } + + out := make([][]byte, len(slab.Sectors)) + for i, req := range required { + if req { + out[i] = make([]byte, proto.SectorSize) + } + } + + r := &slabRecovery{ + m: m, + slab: slab, + required: required, + rs: rs, + log: log, + out: out, + excluded: make(map[types.PublicKey]struct{}), + } + + // determine the segment-aligned chunk size + chunkSize := m.recoveryChunkSize + if chunkSize <= 0 || chunkSize > proto.SectorSize { + chunkSize = proto.SectorSize + } + chunkSize -= chunkSize % proto.LeafSize + if chunkSize == 0 { + chunkSize = proto.LeafSize + } + + numChunks := (proto.SectorSize + chunkSize - 1) / chunkSize + + // run multiple chunks concurrently so we engage more than MinShards hosts + // at once. spread is the number of disjoint MinShards host groups that fit + // across the available shards; with one chunk per group we light up close + // to every host without heavily oversubscribing any single one. + spread := max(len(slab.Sectors)/int(slab.MinShards), 1) + concurrency := min(spread, numChunks) + ctx, cancel := context.WithCancel(ctx) defer cancel() - shards := make([][]byte, len(slab.Sectors)) + sema := make(chan struct{}, concurrency) + var wg sync.WaitGroup + var firstErr error + var errOnce sync.Once + fail := func(err error) { + errOnce.Do(func() { + firstErr = err + cancel() + }) + } + +chunkLoop: + for off := 0; off < proto.SectorSize; off += chunkSize { + length := chunkSize + if off+length > proto.SectorSize { + length = proto.SectorSize - off + } + + select { + case <-ctx.Done(): + break chunkLoop + case sema <- struct{}{}: + } + + off, length := off, length + wg.Go(func() { + defer func() { <-sema }() + if err := r.recoverChunk(ctx, uint64(off), uint64(length)); err != nil { + fail(err) + } + }) + } + wg.Wait() + + if firstErr != nil { + return nil, firstErr + } else if ctx.Err() != nil { + return nil, ctx.Err() + } + return out, nil +} + +// recoverChunk downloads the [offset, offset+length) byte range of MinShards of +// the slab's sectors, spread across the available hosts, then decrypts and +// reconstructs that range for every required shard. +func (r *slabRecovery) recoverChunk(ctx context.Context, offset, length uint64) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + m := r.m + cols := make([][]byte, len(r.slab.Sectors)) var downloaded atomic.Uint32 + // build the candidate set: sectors that still have a host, deduplicated + // and minus any host excluded by a concurrent chunk. slabHosts := make(map[types.PublicKey]slabDownload) - candidates := make([]types.PublicKey, 0, len(slab.Sectors)) - for i, sector := range slab.Sectors { + candidates := make([]types.PublicKey, 0, len(r.slab.Sectors)) + r.mu.Lock() + for i, sector := range r.slab.Sectors { if sector.HostKey == nil { continue + } else if _, excluded := r.excluded[*sector.HostKey]; excluded { + continue } else if _, exists := slabHosts[*sector.HostKey]; exists { continue // prevent duplicates } @@ -46,8 +181,10 @@ func (m *SlabManager) downloadShards(ctx context.Context, slab Slab, log *zap.Lo index: i, } } - // helper to download a shard from a host - sema := make(chan struct{}, slab.MinShards) + r.mu.Unlock() + + // helper to download a chunk of a shard from a host + sema := make(chan struct{}, r.slab.MinShards) downloadShard := func(ctx context.Context, hostKey types.PublicKey, sector slabDownload, log *zap.Logger) error { defer func() { <-sema @@ -64,16 +201,18 @@ func (m *SlabManager) downloadShards(ctx context.Context, slab Slab, log *zap.Lo // debit the service account for the read since the host may charge for it // even if it is cancelled quickly. This is best effort, it's fine to // log the error and continue on failure. - cost := prices.RPCReadSectorCost(proto.SectorSize).RenterCost() + cost := prices.RPCReadSectorCost(length).RenterCost() if err = m.am.DebitServiceAccount(hostKey, m.migrationAccount, cost); err != nil { log.Warn("failed to debit service account for read sector", zap.Error(err)) } start := time.Now() - buf := bytes.NewBuffer(make([]byte, 0, proto.SectorSize)) - if _, err := m.hosts.ReadSector(ctx, m.migrationAccountKey, hostKey, sector.root, buf, 0, proto.SectorSize); err != nil { + buf := bytes.NewBuffer(make([]byte, 0, length)) + if _, err := m.hosts.ReadSector(ctx, m.migrationAccountKey, hostKey, sector.root, buf, offset, length); err != nil { if isErrLostSector(err) { log.Debug("host reports sector lost", zap.Duration("elapsed", time.Since(start))) + // exclude the host from subsequent chunks and mark the sector lost + r.exclude(hostKey) if err := m.store.MarkSectorsLost(hostKey, []types.Hash256{sector.root}); err != nil { log.Error("failed to mark sector as lost", zap.Error(err)) } @@ -83,22 +222,22 @@ func (m *SlabManager) downloadShards(ctx context.Context, slab Slab, log *zap.Lo return err } - shards[sector.index] = buf.Bytes() - if n := downloaded.Add(1); n >= uint32(slab.MinShards) { + cols[sector.index] = buf.Bytes() + if n := downloaded.Add(1); n >= uint32(r.slab.MinShards) { cancel() } return nil } var wg sync.WaitGroup - failedCh := make(chan struct{}, slab.MinShards) + failedCh := make(chan struct{}, r.slab.MinShards) spawnDownload := func(hostKey types.PublicKey, sector slabDownload, release func(), initial bool) { - log := log.With(zap.Stringer("hostKey", hostKey), zap.Stringer("sectorRoot", sector.root)) + log := r.log.With(zap.Stringer("hostKey", hostKey), zap.Stringer("sectorRoot", sector.root), zap.Uint64("offset", offset)) wg.Go(func() { defer release() timeoutCtx, timeoutCancel := context.WithTimeout(ctx, m.shardTimeout) defer timeoutCancel() - if err := downloadShard(timeoutCtx, hostKey, slabHosts[hostKey], log); err != nil { + if err := downloadShard(timeoutCtx, hostKey, sector, log); err != nil { log.Debug("shard download failed", zap.Error(err)) // non-blocking send to indicate a failure select { @@ -116,9 +255,9 @@ func (m *SlabManager) downloadShards(ctx context.Context, slab Slab, log *zap.Lo }) } - initialHosts, releases, remaining := m.hosts.PickReads(candidates, int(slab.MinShards)) + initialHosts, releases, remaining := m.hosts.PickReads(candidates, int(r.slab.MinShards)) if len(initialHosts) == 0 { - return nil, fmt.Errorf("only %d available sectors, minimum required: %d: %w", len(remaining), slab.MinShards, errNotEnoughShards) + return fmt.Errorf("only %d available sectors, minimum required: %d: %w", len(remaining), r.slab.MinShards, errNotEnoughShards) } initialLoop: @@ -137,7 +276,7 @@ initialLoop: t := time.NewTicker(m.shardTimeout / 4) defer t.Stop() raceLoop: - for downloaded.Load() < uint32(slab.MinShards) && len(remaining) > 0 { + for downloaded.Load() < uint32(r.slab.MinShards) && len(remaining) > 0 { select { case <-ctx.Done(): break raceLoop @@ -145,7 +284,7 @@ raceLoop: // a download has failed case <-t.C: // hedge against slow shards - log.Debug("racing slow shards", zap.Uint32("downloaded", downloaded.Load()), zap.Uint32("required", uint32(slab.MinShards))) + r.log.Debug("racing slow shards", zap.Uint32("downloaded", downloaded.Load()), zap.Uint32("required", uint32(r.slab.MinShards))) } select { @@ -167,10 +306,49 @@ raceLoop: wg.Wait() - if downloaded.Load() < uint32(slab.MinShards) { - return nil, fmt.Errorf("downloaded %d sectors, minimum required: %d: %w", downloaded.Load(), slab.MinShards, errNotEnoughShards) + if downloaded.Load() < uint32(r.slab.MinShards) { + return fmt.Errorf("downloaded %d sectors, minimum required: %d: %w", downloaded.Load(), r.slab.MinShards, errNotEnoughShards) } - return shards, nil + + return r.reconstructChunk(offset, length, cols) +} + +// reconstructChunk decrypts the downloaded columns in place, reconstructs the +// required shards for the [offset, offset+length) range and writes them into +// the recovery's output buffers. +func (r *slabRecovery) reconstructChunk(offset, length uint64, cols [][]byte) error { + // the chacha20 block counter is the leaf offset into the sector; offset is + // segment (leaf) aligned so this is exact. + counter := uint32(offset / proto.LeafSize) + nonce := make([]byte, 24) + for i, col := range cols { + if col == nil { + continue + } + nonce[0] = byte(i) + c, _ := chacha20.NewUnauthenticatedCipher(r.slab.EncryptionKey[:], nonce) + c.SetCounter(counter) + c.XORKeyStream(col, col) + } + + if err := r.rs.ReconstructSome(cols, r.required); err != nil { + return fmt.Errorf("failed to reconstruct chunk at offset %d: %w", offset, err) + } + + for i, req := range r.required { + if !req { + continue + } + copy(r.out[i][offset:offset+length], cols[i]) + } + return nil +} + +// exclude marks a host so that subsequent chunks skip it. +func (r *slabRecovery) exclude(hostKey types.PublicKey) { + r.mu.Lock() + r.excluded[hostKey] = struct{}{} + r.mu.Unlock() } func isErrLostSector(err error) bool { diff --git a/slabs/downloads_test.go b/slabs/downloads_test.go index 5668cb872..837c59309 100644 --- a/slabs/downloads_test.go +++ b/slabs/downloads_test.go @@ -1,7 +1,6 @@ package slabs_test import ( - "bytes" "context" "errors" "slices" @@ -16,9 +15,48 @@ import ( "go.sia.tech/indexd/slabs" "go.uber.org/zap" "go.uber.org/zap/zaptest" + "golang.org/x/crypto/chacha20" ) -func TestDownloadShards(t *testing.T) { +// allRequired returns a required mask with every index set. +func allRequired(n int) []bool { + required := make([]bool, n) + for i := range required { + required[i] = true + } + return required +} + +// assertRecovered re-encrypts every recovered plaintext shard and verifies its +// sector root matches the original, proving the chunked reconstruction is +// byte-exact. +func assertRecovered(t *testing.T, encryptionKey [32]byte, required []bool, roots []types.Hash256, recovered [][]byte) { + t.Helper() + if len(recovered) != len(required) { + t.Fatalf("expected %d recovered shards, got %d", len(required), len(recovered)) + } + nonce := make([]byte, 24) + for i, req := range required { + if !req { + if recovered[i] != nil { + t.Fatalf("expected shard %d to be nil, got %d bytes", i, len(recovered[i])) + } + continue + } + if len(recovered[i]) != proto.SectorSize { + t.Fatalf("expected recovered shard %d to be %d bytes, got %d", i, proto.SectorSize, len(recovered[i])) + } + encrypted := slices.Clone(recovered[i]) + nonce[0] = byte(i) + c, _ := chacha20.NewUnauthenticatedCipher(encryptionKey[:], nonce) + c.XORKeyStream(encrypted, encrypted) + if got := proto.SectorRoot((*[proto.SectorSize]byte)(encrypted)); got != roots[i] { + t.Fatalf("recovered shard %d root mismatch: expected %v, got %v", i, roots[i], got) + } + } +} + +func TestRecoverShards(t *testing.T) { log := zaptest.NewLogger(t) store := newMockStore(t) chain := newMockChainManager() @@ -26,41 +64,45 @@ func TestDownloadShards(t *testing.T) { hm := newMockHostManager() client := newMockHostClient() - // setup includes 3 hosts storing 1 sector each - hosts := make([]hosts.Host, 3) - for i := range hosts { + // build a 2-of-4 encoded slab (2 data + 2 parity shards) + encryptionKey, shards, roots := NewTestShards(t, 2, 2) + + // one host per sector + hostList := make([]hosts.Host, len(shards)) + slab := slabs.Slab{ + EncryptionKey: encryptionKey, + MinShards: 2, + } + for i := range hostList { sk := types.GeneratePrivateKey() h := client.addTestHost(sk) - client.slowHosts[sk.PublicKey()] = time.Duration(5*(i+1)) * time.Millisecond // add short sleep to stagger responses + client.slowHosts[sk.PublicKey()] = time.Duration(5*(i+1)) * time.Millisecond // stagger responses hm.hosts[sk.PublicKey()] = h - hosts[i] = h + hostList[i] = h store.AddTestHost(t, h) - } - slab := slabs.Slab{ - MinShards: 2, - } - for i := range hosts { - result, err := client.WriteSector(t.Context(), types.GeneratePrivateKey(), hosts[i].PublicKey, []byte{byte(i + 1)}) + result, err := client.WriteSector(t.Context(), types.GeneratePrivateKey(), h.PublicKey, shards[i]) if err != nil { t.Fatal(err) + } else if result.Root != roots[i] { + t.Fatalf("expected root %v, got %v", roots[i], result.Root) } slab.Sectors = append(slab.Sectors, slabs.Sector{ - Root: result.Root, - HostKey: &hosts[i].PublicKey, + Root: roots[i], + HostKey: &hostList[i].PublicKey, }) // insert sector into db so we can mark it as lost later _, err = store.Exec(context.Background(), `INSERT INTO sectors (sector_root, host_id, next_integrity_check, uploaded_at) SELECT $1, id, $3, NOW() - FROM hosts WHERE public_key = $2`, result.Root[:], hosts[i].PublicKey[:], time.Now().Add(time.Hour)) + FROM hosts WHERE public_key = $2`, roots[i][:], h.PublicKey[:], time.Now().Add(time.Hour)) if err != nil { t.Fatal(err) } if _, err := store.Exec(context.Background(), "INSERT INTO stats_deltas (stat_name, stat_delta) VALUES ('num_unpinned_sectors', 1)"); err != nil { t.Fatal(err) } - if _, err := store.Exec(context.Background(), "UPDATE hosts SET unpinned_sectors = unpinned_sectors + 1 WHERE public_key = $1", hosts[i].PublicKey[:]); err != nil { + if _, err := store.Exec(context.Background(), "UPDATE hosts SET unpinned_sectors = unpinned_sectors + 1 WHERE public_key = $1", h.PublicKey[:]); err != nil { t.Fatal(err) } } @@ -70,13 +112,15 @@ func TestDownloadShards(t *testing.T) { // assert that not enough usable hosts results in errNotEnoughShards t.Run("not enough usable hosts", func(t *testing.T) { - client.unusable[hosts[0].PublicKey] = struct{}{} - client.unusable[hosts[1].PublicKey] = struct{}{} + client.unusable[hostList[0].PublicKey] = struct{}{} + client.unusable[hostList[1].PublicKey] = struct{}{} + client.unusable[hostList[2].PublicKey] = struct{}{} t.Cleanup(func() { - delete(client.unusable, hosts[0].PublicKey) - delete(client.unusable, hosts[1].PublicKey) + delete(client.unusable, hostList[0].PublicKey) + delete(client.unusable, hostList[1].PublicKey) + delete(client.unusable, hostList[2].PublicKey) }) - _, err := sm.DownloadShards(context.Background(), slab, zap.NewNop()) + _, err := sm.RecoverShards(context.Background(), slab, allRequired(len(slab.Sectors)), zap.NewNop()) if !errors.Is(err, slabs.ErrNotEnoughShards) { t.Fatal(err) } @@ -89,84 +133,93 @@ func TestDownloadShards(t *testing.T) { unavailableSlab.Sectors = slices.Clone(unavailableSlab.Sectors) unavailableSlab.Sectors[0].HostKey = nil unavailableSlab.Sectors[1].HostKey = nil - _, err := sm.DownloadShards(context.Background(), unavailableSlab, zap.NewNop()) + unavailableSlab.Sectors[2].HostKey = nil + _, err := sm.RecoverShards(context.Background(), unavailableSlab, allRequired(len(unavailableSlab.Sectors)), zap.NewNop()) if !errors.Is(err, slabs.ErrNotEnoughShards) { t.Fatal(err) } }) - // assert that if all hosts are usable, we succeed and fetch exactly 'minShards' sectors + // assert that recovering every shard reconstructs each one byte-exactly, + // regardless of which MinShards were actually downloaded for each chunk t.Run("success", func(t *testing.T) { - sectors, err := sm.DownloadShards(context.Background(), slab, zap.NewNop()) + required := allRequired(len(slab.Sectors)) + recovered, err := sm.RecoverShards(context.Background(), slab, required, zap.NewNop()) if err != nil { t.Fatal(err) } + assertRecovered(t, encryptionKey, required, roots, recovered) + }) - var fetched int - for i, sector := range sectors { - if len(sector) == 0 { - continue - } - expected := [proto.SectorSize]byte{byte(i + 1)} - if !bytes.Equal(sector, expected[:]) { - t.Fatalf("downloaded sector %d does not match expected data", i+1) - } - fetched++ - } - - if fetched != int(slab.MinShards) { - t.Fatalf("expected %d fetched sectors, got %d", slab.MinShards, fetched) + // assert that recovering a subset only fills the required indices + t.Run("subset", func(t *testing.T) { + required := make([]bool, len(slab.Sectors)) + required[1] = true + required[3] = true + recovered, err := sm.RecoverShards(context.Background(), slab, required, zap.NewNop()) + if err != nil { + t.Fatal(err) } + assertRecovered(t, encryptionKey, required, roots, recovered) }) - // assert that if the first host times out, the download still succeeds + // assert that if the first host times out, recovery still succeeds via the + // race loop. A single chunk keeps the behaviour deterministic. t.Run("success with delay", func(t *testing.T) { synctest.Test(t, func(t *testing.T) { + sm.SetRecoveryChunkSize(proto.SectorSize) sm.SetShardTimeout(2 * time.Second) - client.slowHosts[hosts[0].PublicKey] = 30 * time.Minute + client.slowHosts[hostList[0].PublicKey] = 30 * time.Minute t.Cleanup(func() { + sm.SetRecoveryChunkSize(1 << 20) sm.SetShardTimeout(30 * time.Second) client.slowHosts = make(map[types.PublicKey]time.Duration) }) - sectors, err := sm.DownloadShards(context.Background(), slab, zap.NewNop()) + required := allRequired(len(slab.Sectors)) + recovered, err := sm.RecoverShards(context.Background(), slab, required, zap.NewNop()) if err != nil { t.Fatal(err) - } else if slab.Sectors[1].Root != proto.SectorRoot((*[proto.SectorSize]byte)(sectors[1])) || slab.Sectors[2].Root != proto.SectorRoot((*[proto.SectorSize]byte)(sectors[2])) { - t.Fatal("downloaded sectors do not match expected data") - } else if len(sectors[0]) != 0 { - t.Fatal("expected first sector to be missing due to timeout") } + assertRecovered(t, encryptionKey, required, roots, recovered) }) }) - // assert that a host losing a sector will mark the sector as lost + // assert that a host losing a sector marks the sector as lost and recovery + // still completes by reconstructing from the remaining hosts t.Run("mark sector lost", func(t *testing.T) { - client.hostSectors[hosts[0].PublicKey] = make(map[types.Hash256][proto.SectorSize]byte) // remove sector from host 1 - _, err := sm.DownloadShards(context.Background(), slab, log) + sm.SetRecoveryChunkSize(proto.SectorSize) + client.hostSectors[hostList[0].PublicKey] = make(map[types.Hash256][proto.SectorSize]byte) // remove sector from host 0 + t.Cleanup(func() { + sm.SetRecoveryChunkSize(1 << 20) + }) + required := make([]bool, len(slab.Sectors)) + required[3] = true + recovered, err := sm.RecoverShards(context.Background(), slab, required, log) if err != nil { - // download should still complete successfully t.Fatal(err) - } else if sectors := store.lostSectors(t); len(sectors) == 0 { - t.Fatalf("expected lost sector for host %v, got none", hosts[0].PublicKey) - } else if len(sectors) != 1 { - t.Fatalf("expected 1 lost sector for host %v, got %d %+v", hosts[0].PublicKey, len(sectors), sectors) - } else if _, ok := sectors[slab.Sectors[0].Root]; !ok { + } else if lost := store.lostSectors(t); len(lost) == 0 { + t.Fatalf("expected lost sector for host %v, got none", hostList[0].PublicKey) + } else if len(lost) != 1 { + t.Fatalf("expected 1 lost sector for host %v, got %d %+v", hostList[0].PublicKey, len(lost), lost) + } else if _, ok := lost[slab.Sectors[0].Root]; !ok { t.Fatalf("expected sector %v to be marked as lost, but it wasn't", slab.Sectors[0].Root) } + assertRecovered(t, encryptionKey, required, roots, recovered) }) } -// TestDownloadShardsDemotion exercises the demote logic in downloadShards. -// A host is demoted (AddFailedRPC) when: -// 1. it hits its per-shard timeout while the overall download is still in +// TestRecoverShardsDemotion exercises the demote logic in recoverShards. A host +// is demoted (AddFailedRPC) when: +// 1. it hits its per-shard timeout while the chunk download is still in // progress, or -// 2. it was part of the initial batch and was interrupted by the parent ctx +// 2. it was part of the initial batch and was interrupted by the chunk ctx // being cancelled (because enough other shards completed). // -// In particular, a hedge spawn that gets interrupted by parent-ctx cancellation -// should NOT be demoted, and a host that returns an immediate non-timeout error -// should NOT be demoted either. -func TestDownloadShardsDemotion(t *testing.T) { +// In particular, a hedge spawn that gets interrupted by ctx cancellation should +// NOT be demoted, and a host that returns an immediate non-timeout error should +// NOT be demoted either. A single recovery chunk keeps the assertions +// deterministic. +func TestRecoverShardsDemotion(t *testing.T) { setup := func(t *testing.T, numHosts int, minShards uint) (*slabs.SlabManager, *mockHostClient, []hosts.Host, slabs.Slab) { log := zaptest.NewLogger(t) store := newMockStore(t) @@ -197,6 +250,8 @@ func TestDownloadShardsDemotion(t *testing.T) { account := types.GeneratePrivateKey() sm := slabs.NewSlabManager(chain, am, nil, hm, store, client, alerts.NewManager(), account, types.GeneratePrivateKey(), slabs.WithLogger(log.Named("slabs"))) sm.SetShardTimeout(2 * time.Second) + // a single chunk keeps the demote assertions deterministic + sm.SetRecoveryChunkSize(proto.SectorSize) return sm, client, hs, slab } @@ -207,7 +262,7 @@ func TestDownloadShardsDemotion(t *testing.T) { } // MinShards equals total hosts, so there are no race-loop candidates. The - // slow initial host hits its per-shard timeout while the parent ctx is + // slow initial host hits its per-shard timeout while the chunk ctx is // still alive (downloaded < MinShards), exercising the first demote // clause. t.Run("initial host shard timeout", func(t *testing.T) { @@ -215,7 +270,7 @@ func TestDownloadShardsDemotion(t *testing.T) { client.slowHosts[hs[0].PublicKey] = 30 * time.Minute synctest.Test(t, func(t *testing.T) { - _, err := sm.DownloadShards(context.Background(), slab, zap.NewNop()) + _, err := sm.RecoverShards(context.Background(), slab, make([]bool, len(slab.Sectors)), zap.NewNop()) if !errors.Is(err, slabs.ErrNotEnoughShards) { t.Fatalf("expected ErrNotEnoughShards, got %v", err) } @@ -233,7 +288,7 @@ func TestDownloadShardsDemotion(t *testing.T) { // 3 hosts, MinShards=2. hs[0] is a very slow initial host. hs[1] is an // instant initial host. hs[2] is an instant hedge spawned via the race - // ticker; once it succeeds the parent ctx cancels and hs[0] is + // ticker; once it succeeds the chunk ctx cancels and hs[0] is // interrupted before its per-shard timeout fires - the second demote // clause should apply. t.Run("initial host interrupted by parent cancel", func(t *testing.T) { @@ -241,7 +296,7 @@ func TestDownloadShardsDemotion(t *testing.T) { client.slowHosts[hs[0].PublicKey] = 30 * time.Minute synctest.Test(t, func(t *testing.T) { - if _, err := sm.DownloadShards(context.Background(), slab, zap.NewNop()); err != nil { + if _, err := sm.RecoverShards(context.Background(), slab, make([]bool, len(slab.Sectors)), zap.NewNop()); err != nil { t.Fatal(err) } }) @@ -259,7 +314,7 @@ func TestDownloadShardsDemotion(t *testing.T) { // 4 hosts, MinShards=2. // hs[0] succeeds instantly (initial). // hs[1] returns an immediate non-timeout error (initial). - // hs[2] is a very slow hedge spawned via failedCh; the parent ctx + // hs[2] is a very slow hedge spawned via failedCh; the chunk ctx // cancels before its timeout, so it should NOT be demoted. // hs[3] succeeds instantly as a hedge spawned via the ticker. // Result: none of the hosts should be demoted. @@ -269,7 +324,7 @@ func TestDownloadShardsDemotion(t *testing.T) { client.slowHosts[hs[2].PublicKey] = 30 * time.Minute synctest.Test(t, func(t *testing.T) { - if _, err := sm.DownloadShards(context.Background(), slab, zap.NewNop()); err != nil { + if _, err := sm.RecoverShards(context.Background(), slab, make([]bool, len(slab.Sectors)), zap.NewNop()); err != nil { t.Fatal(err) } }) @@ -283,7 +338,7 @@ func TestDownloadShardsDemotion(t *testing.T) { // 4 hosts, MinShards=2. Same shape as above, but hs[3] is also slow, so // neither hedge completes and both hit their per-shard timeout while the - // overall download is still in progress - the first demote clause fires + // chunk download is still in progress - the first demote clause fires // for hedges too. t.Run("hedge host shard timeout", func(t *testing.T) { sm, client, hs, slab := setup(t, 4, 2) @@ -292,7 +347,7 @@ func TestDownloadShardsDemotion(t *testing.T) { client.slowHosts[hs[3].PublicKey] = 30 * time.Minute synctest.Test(t, func(t *testing.T) { - _, err := sm.DownloadShards(context.Background(), slab, zap.NewNop()) + _, err := sm.RecoverShards(context.Background(), slab, make([]bool, len(slab.Sectors)), zap.NewNop()) if !errors.Is(err, slabs.ErrNotEnoughShards) { t.Fatalf("expected ErrNotEnoughShards, got %v", err) } diff --git a/slabs/export_test.go b/slabs/export_test.go index 73aa9f2f8..c311d6b89 100644 --- a/slabs/export_test.go +++ b/slabs/export_test.go @@ -28,8 +28,12 @@ func (m *SlabManager) UploadShards(ctx context.Context, slab Slab, shards [][]by return m.uploadShards(ctx, slab, shards, available, log) } -func (m *SlabManager) DownloadShards(ctx context.Context, slab Slab, log *zap.Logger) ([][]byte, error) { - return m.downloadShards(ctx, slab, log) +func (m *SlabManager) RecoverShards(ctx context.Context, slab Slab, required []bool, log *zap.Logger) ([][]byte, error) { + return m.recoverShards(ctx, slab, required, log) +} + +func (m *SlabManager) SetRecoveryChunkSize(size int) { + m.recoveryChunkSize = size } func (m *SlabManager) MigrateSlabs(ctx context.Context, slabIDs []SlabID, log *zap.Logger) error { diff --git a/slabs/manager.go b/slabs/manager.go index fb9420acb..54b4f51fa 100644 --- a/slabs/manager.go +++ b/slabs/manager.go @@ -39,6 +39,12 @@ type ( shardTimeout time.Duration integrityCheckTimeout time.Duration + // recoveryChunkSize is the size of the segment-aligned byte range + // requested from each host during slab recovery. Smaller chunks + // spread a recovery across more hosts (more parallel pipes) at the + // cost of more RPCs. Must be a multiple of proto.LeafSize. + recoveryChunkSize int + alerter AlertsManager chain ChainManager am AccountManager @@ -201,6 +207,20 @@ func WithNumMigrationGoroutines(size int) Option { } } +// WithRecoveryChunkSize sets the size of the segment-aligned byte range +// requested from each host during slab recovery. Smaller chunks spread a +// recovery across more hosts at the cost of more RPCs. The value is rounded +// down to a multiple of proto.LeafSize and clamped to proto.SectorSize. The +// default is 1 MiB. +func WithRecoveryChunkSize(size int) Option { + return func(m *SlabManager) { + if size <= 0 { + panic("recovery chunk size must be positive") // developer error + } + m.recoveryChunkSize = size + } +} + // WithMinHostDistance sets the minimum distance between hosts used for storing // sectors of the same slab. The default is 10km, if set to 0, the distance // check is disabled. @@ -261,6 +281,7 @@ func newSlabManager(chain ChainManager, am AccountManager, cm ContractManager, h integrityCheckTimeout: 5 * time.Minute, numIntegrityCheckGoroutines: 50, numMigrationGoroutines: runtime.NumCPU(), + recoveryChunkSize: defaultRecoveryChunkSize, chain: chain, am: am, diff --git a/slabs/migrations.go b/slabs/migrations.go index 882de7db8..e6e488041 100644 --- a/slabs/migrations.go +++ b/slabs/migrations.go @@ -5,7 +5,6 @@ import ( "fmt" "time" - "github.com/klauspost/reedsolomon" "go.sia.tech/core/types" "go.sia.tech/indexd/contracts" "go.sia.tech/indexd/hosts" @@ -74,54 +73,28 @@ func (m *SlabManager) migrateSlab(ctx context.Context, slabID SlabID, state migr } log = log.With(zap.Int("toMigrate", len(indices)), zap.Int("uploadCandidates", len(uploadCandidates))) - // download enough shards to reconstruct the slab's shards - // note: timeouts are set within downloadShards to avoid timing + // indicate what shards are required + required := make([]bool, len(slab.Sectors)) + for _, i := range indices { + required[i] = true + } + + // recover the required shards by downloading segment-aligned chunks spread + // across all available hosts and reconstructing them in plaintext. + // note: timeouts are set within recoverShards to avoid timing // out the database downloadStart := time.Now() - shards, err := m.downloadShards(ctx, slab, log.Named("recover")) + shards, err := m.recoverShards(ctx, slab, required, log.Named("recover")) if err != nil { if ctx.Err() == nil { - log.Error("failed to download slab", zap.Error(err)) + log.Error("failed to recover slab", zap.Error(err)) } return } log = log.With(zap.Duration("downloadElapsed", time.Since(downloadStart))) - // decrypt the shards + // re-encrypt the recovered shards for upload nonce := make([]byte, 24) - var recovered int - for i := range shards { - if len(shards[i]) == 0 { - continue - } - nonce[0] = byte(i) - c, _ := chacha20.NewUnauthenticatedCipher(slab.EncryptionKey[:], nonce) - c.XORKeyStream(shards[i], shards[i]) - recovered++ - } - log.Debug("recovered shards", zap.Int("recovered", recovered)) - - // indicate what shards are required - required := make([]bool, len(slab.Sectors)) - for _, i := range indices { - required[i] = true - } - - // reconstruct the missing shards - rs, err := reedsolomon.New(int(slab.MinShards), len(slab.Sectors)-int(slab.MinShards)) - if err != nil { - // both of these are developer errors. New will only return an error - // if the parameters are invalid, which they shouldn't be since they - // originate from the database. - log.Panic("failed to create reedsolomon encoder", zap.Error(err)) - } else if err := rs.ReconstructSome(shards, required); err != nil { - // reconstructing should only fail if there are not enough shards - // available, which should not happen since the download should have - // errored if not enough shards could be retrieved. - log.Panic("failed to reconstruct shards", zap.Error(err)) - } - - // re-encrypt the shards that are required for i, required := range required { if !required { shards[i] = nil From fbc3b258a8f71ac7a4613881df3deb87775e9247 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl <3903476+ChrisSchinnerl@users.noreply.github.com> Date: Thu, 18 Jun 2026 15:40:55 +0200 Subject: [PATCH 2/4] dynamic racing --- slabs/downloads.go | 50 +++++++++++++++++++++++++++++++++++++------ slabs/manager.go | 4 ++++ slabs/manager_test.go | 10 +++++++++ 3 files changed, 58 insertions(+), 6 deletions(-) diff --git a/slabs/downloads.go b/slabs/downloads.go index 6ec042a68..00a52d736 100644 --- a/slabs/downloads.go +++ b/slabs/downloads.go @@ -25,6 +25,17 @@ var errNotEnoughShards = errors.New("not enough shards") // proto.LeafSize. const defaultRecoveryChunkSize = 1 << 20 // 1 MiB +const ( + // raceFactor scales the chunk read estimate to decide when to start a + // parallel download against a slow host. It matches the Rust SDK. + raceFactor = 1.5 + + // minRaceInterval floors the adaptive race interval so a fast network + // (where the estimate dips below typical RPC latency) doesn't dogpile + // hosts with redundant reads. + minRaceInterval = 200 * time.Millisecond +) + type slabDownload struct { root types.Hash256 index int @@ -151,6 +162,21 @@ chunkLoop: return out, nil } +// raceInterval returns how long recoverChunk waits without progress before +// hedging a chunk read against an additional host. It is derived from the +// network-wide read-throughput estimate (scaled by raceFactor), floored by +// minRaceInterval and capped by the hard per-RPC shardTimeout. +func (m *SlabManager) raceInterval(length uint64) time.Duration { + d := time.Duration(float64(m.hosts.ReadEstimate(length)) * raceFactor) + if d < minRaceInterval { + d = minRaceInterval + } + if d > m.shardTimeout { + d = m.shardTimeout + } + return d +} + // recoverChunk downloads the [offset, offset+length) byte range of MinShards of // the slab's sectors, spread across the available hosts, then decrypts and // reconstructs that range for every required shard. @@ -273,19 +299,31 @@ initialLoop: spawnDownload(hostKey, slabHosts[hostKey], releases[i], true) } - t := time.NewTicker(m.shardTimeout / 4) - defer t.Stop() + // hedge against slow shards on an adaptive interval sized to the expected + // time to read this chunk, decoupled from the hard per-RPC shardTimeout. + raceInterval := m.raceInterval(length) + timer := time.NewTimer(raceInterval) + defer timer.Stop() raceLoop: for downloaded.Load() < uint32(r.slab.MinShards) && len(remaining) > 0 { select { case <-ctx.Done(): break raceLoop case <-failedCh: - // a download has failed - case <-t.C: - // hedge against slow shards - r.log.Debug("racing slow shards", zap.Uint32("downloaded", downloaded.Load()), zap.Uint32("required", uint32(r.slab.MinShards))) + // a download has failed - hedge immediately + case <-timer.C: + // no progress within the race interval - hedge against slow shards + r.log.Debug("racing slow shards", zap.Uint32("downloaded", downloaded.Load()), zap.Uint32("required", uint32(r.slab.MinShards)), zap.Duration("raceInterval", raceInterval)) + } + + // reset the race interval before attempting the next hedge + if !timer.Stop() { + select { + case <-timer.C: + default: + } } + timer.Reset(raceInterval) select { case sema <- struct{}{}: diff --git a/slabs/manager.go b/slabs/manager.go index 54b4f51fa..9c1c639ec 100644 --- a/slabs/manager.go +++ b/slabs/manager.go @@ -89,6 +89,10 @@ type ( ReadSector(ctx context.Context, accountKey types.PrivateKey, hostKey types.PublicKey, root types.Hash256, w io.Writer, offset, length uint64) (rhp.RPCReadSectorResult, error) Prioritize([]types.PublicKey) []types.PublicKey + // ReadEstimate returns the expected time to read the given number + // of bytes based on the network-wide observed read throughput. It + // is used to size adaptive download racing. + ReadEstimate(bytes uint64) time.Duration // PickWrite atomically selects the best write candidate and // reserves an inflight slot. The release function MUST be called // when the RPC completes; the picked host is removed from diff --git a/slabs/manager_test.go b/slabs/manager_test.go index cd4c9f564..f4bdbeea7 100644 --- a/slabs/manager_test.go +++ b/slabs/manager_test.go @@ -395,6 +395,7 @@ type mockHostClient struct { hostSettings map[types.PublicKey]proto.HostSettings unusable map[types.PublicKey]struct{} failedRPCs map[types.PublicKey]int + readEstimate time.Duration } func (m *mockHostClient) resetStorage() { @@ -541,6 +542,14 @@ func (m *mockHostClient) TrackInflightRead(hostKey types.PublicKey) func() { return func() {} } +// ReadEstimate returns a fixed, small estimate so that adaptive download +// racing fires quickly and deterministically under synctest. +func (m *mockHostClient) ReadEstimate(bytes uint64) time.Duration { + m.mu.Lock() + defer m.mu.Unlock() + return m.readEstimate +} + // PickReads filters unusable hosts and keeps the input order. Returns // picked=nil when fewer than n usable hosts remain. func (m *mockHostClient) PickReads(candidates []types.PublicKey, n int) (picked []types.PublicKey, releases []func(), remaining []types.PublicKey) { @@ -623,5 +632,6 @@ func newMockHostClient() *mockHostClient { hostSettings: make(map[types.PublicKey]proto.HostSettings), unusable: make(map[types.PublicKey]struct{}), failedRPCs: make(map[types.PublicKey]int), + readEstimate: 10 * time.Millisecond, } } From 7ad19e587590de853288232c4336a4a3dbedee3f Mon Sep 17 00:00:00 2001 From: Christopher Schinnerl <3903476+ChrisSchinnerl@users.noreply.github.com> Date: Thu, 25 Jun 2026 16:08:11 +0200 Subject: [PATCH 3/4] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- slabs/manager.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/slabs/manager.go b/slabs/manager.go index 9c1c639ec..b9ddc5652 100644 --- a/slabs/manager.go +++ b/slabs/manager.go @@ -213,9 +213,9 @@ func WithNumMigrationGoroutines(size int) Option { // WithRecoveryChunkSize sets the size of the segment-aligned byte range // requested from each host during slab recovery. Smaller chunks spread a -// recovery across more hosts at the cost of more RPCs. The value is rounded -// down to a multiple of proto.LeafSize and clamped to proto.SectorSize. The -// default is 1 MiB. +// recovery across more hosts at the cost of more RPCs. The value is clamped +// to [proto.LeafSize, proto.SectorSize] and rounded down to a multiple of +// proto.LeafSize when used. The default is 1 MiB. func WithRecoveryChunkSize(size int) Option { return func(m *SlabManager) { if size <= 0 { From 71a678dbc0af9cc626309d6177283b99a88f4cbf Mon Sep 17 00:00:00 2001 From: Chris Schinnerl <3903476+ChrisSchinnerl@users.noreply.github.com> Date: Fri, 26 Jun 2026 07:44:06 +0200 Subject: [PATCH 4/4] changeset --- ...g_full_sectors_to_spread_downloads_out_over_more_hosts.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/chunk_downloads_for_slab_migrations_rather_than_downloading_full_sectors_to_spread_downloads_out_over_more_hosts.md diff --git a/.changeset/chunk_downloads_for_slab_migrations_rather_than_downloading_full_sectors_to_spread_downloads_out_over_more_hosts.md b/.changeset/chunk_downloads_for_slab_migrations_rather_than_downloading_full_sectors_to_spread_downloads_out_over_more_hosts.md new file mode 100644 index 000000000..91f935223 --- /dev/null +++ b/.changeset/chunk_downloads_for_slab_migrations_rather_than_downloading_full_sectors_to_spread_downloads_out_over_more_hosts.md @@ -0,0 +1,5 @@ +--- +default: patch +--- + +# Chunk downloads for slab migrations rather than downloading full sectors to spread downloads out over more hosts.