Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions protocol/chaintracker/chain_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,88 @@ func TestChainTrackerCallbacks(t *testing.T) {
})
}

// TestChainTracker_PollingMultiplier4_NoDivideByZero is the safety-net test for the
// polling-relief patch. At PollingTimeMultiplier=4 the adaptive branches in
// chain_tracker.go updateTimer divide by M/4=1 and M/2=2. If the clamp at the
// config layer ever fails and lets M<4 through, those integer divisions go to zero
// and the tracker panics. This test asserts the M=4 case runs without panic and
// without absurd polling intervals.
func TestChainTracker_PollingMultiplier4_NoDivideByZero(t *testing.T) {
mockBlocks := int64(50)
fetcherBlocks := 1
localTimeForPollingMock := 16 * time.Millisecond

var (
mu sync.Mutex
called int
last time.Time
diffs []time.Duration
)
callback := func() {
mu.Lock()
defer mu.Unlock()
now := time.Now()
if !last.IsZero() {
diffs = append(diffs, now.Sub(last))
}
last = now
called++
}

mockChainFetcher := NewMockChainFetcher(1000, mockBlocks, callback)
mockChainFetcher.AdvanceBlock()

chainTrackerConfig := chaintracker.ChainTrackerConfig{
BlocksToSave: uint64(fetcherBlocks),
AverageBlockTime: localTimeForPollingMock,
ServerBlockMemory: uint64(mockBlocks),
PollingTimeMultiplier: 4,
ParseDirectiveEnabled: true,
}

// At M=4 the adaptive divisors are M/4=1 and M/2=2, both non-zero.
// If a future refactor lets M<4 through, this call panics in updateTimer.
require.NotPanics(t, func() {
tracker, err := chaintracker.NewChainTracker(context.Background(), mockChainFetcher, chainTrackerConfig)
require.NoError(t, err)
err = tracker.StartAndServe(context.Background())
require.NoError(t, err)

// Pre-seed block gaps so self-tuning thinks blocks arrive every localTimeForPollingMock
// and doesn't rescale our base pollingTime out from under us.
for i := 0; i < 50; i++ {
tracker.AddBlockGap(localTimeForPollingMock, 1)
}

// Run long enough to exercise the adaptive branches.
// Advance a block mid-run so timeSinceLastUpdate crosses both halves of the cycle
// and we hit all three branches (fast / medium / slow) at least once.
time.Sleep(4 * localTimeForPollingMock)
mockChainFetcher.AdvanceBlock()
time.Sleep(6 * localTimeForPollingMock)
})

mu.Lock()
defer mu.Unlock()

require.Greater(t, called, 10, "tracker should have polled many times during the test window")
require.NotEmpty(t, diffs, "should have recorded inter-poll intervals")

var total time.Duration
for _, d := range diffs {
total += d
}
avg := total / time.Duration(len(diffs))

// At M=4: fast=blockTime, medium=blockTime/2, slow=blockTime/4.
// Observed average should be between blockTime/8 (very conservative lower bound
// accounting for test-clock jitter on the fast-polling branch) and blockTime*2
// (likewise upper bound). The tight "within 30% of blockTime" from the plan is
// informative but too flaky as a strict assertion given goroutine scheduling noise.
require.Greater(t, avg, localTimeForPollingMock/8, "average poll interval suspiciously small")
require.Less(t, avg, localTimeForPollingMock*2, "average poll interval suspiciously large")
}

func TestChainTrackerFetchSpreadAcrossPollingTime(t *testing.T) {
t.Run("one long test", func(t *testing.T) {
mockBlocks := int64(50)
Expand Down
137 changes: 137 additions & 0 deletions protocol/rpcprovider/consistency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,143 @@ func TestHandleConsistency_TooNewBailsFast(t *testing.T) {
require.Equal(t, time.Duration(0), slept)
}

// TestHandleConsistency_ReliefGate_WidensRejection verifies the polling-relief gate.
// With blockLagForQosSync=1 and blockGap=3, the default gate (blockLagForQosSync*2 = 2)
// bails because 3 > 2. The relief-configured gate (*4 = 4) does NOT bail because 3 <= 4.
// The changeTime is intentionally very stale so eventRate is large and probabilityBlockError
// is ~0, keeping the second (Poisson) clause inert for both configs so the factor clause
// is what actually differentiates them.
func TestHandleConsistency_ReliefGate_WidensRejection(t *testing.T) {
newTracker := func() *sequenceChainTracker {
return &sequenceChainTracker{
DummyChainTracker: &chaintracker.DummyChainTracker{},
// pre-gate read sees 1000; subsequent wait-loop reads see 1003 so the
// relief path exits cleanly without hitting the post-wait "too new" branch.
seq: []int64{1000, 1003, 1003, 1003, 1003, 1003},
changeTime: time.Now().Add(-10 * time.Second),
}
}

// averageBlockTime is small and changeTime is very stale => eventRate ~= 40,
// Poisson CDF(blockGap-1=2, lambda=40) is essentially 0, so probabilityBlockError ~= 0.
// That makes the second clause of the gate inert and isolates the factor-clause difference.
const (
baseRelayTimeout = time.Second
seenBlock = int64(1003)
requestBlock = int64(1003)
averageBlockTime = 250 * time.Millisecond
blockLagForQosSync = int64(1)
blockDistanceToFinalization = uint32(0)
blocksInFinalizationData = uint32(0)
)

// Default server: no relief. gapFactor=2 -> bails because blockGap(3) > 1*2.
defaultServer := &RPCProviderServer{
chainTracker: newTracker(),
rpcProviderEndpoint: &lavasession.RPCProviderEndpoint{ChainID: "TEST"},
}
ctxDefault, cancelDefault := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancelDefault()
_, _, errDefault := defaultServer.handleConsistency(
ctxDefault,
baseRelayTimeout,
seenBlock,
requestBlock,
averageBlockTime,
blockLagForQosSync,
blockDistanceToFinalization,
blocksInFinalizationData,
)
require.Error(t, errDefault, "default gate (gapFactor=2) should bail on blockGap=3")

// Relief server: gapFactor=4, probGate=0.7. Does NOT bail at the gate, enters wait loop,
// sees latest catch up to 1003 on the next tracker read, returns without error.
reliefServer := &RPCProviderServer{
chainTracker: newTracker(),
rpcProviderEndpoint: &lavasession.RPCProviderEndpoint{ChainID: "TEST"},
consistencyProbGate: 0.7,
consistencyBlockGapFactor: 4,
}
ctxRelief, cancelRelief := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancelRelief()
latest, slept, errRelief := reliefServer.handleConsistency(
ctxRelief,
baseRelayTimeout,
seenBlock,
requestBlock,
averageBlockTime,
blockLagForQosSync,
blockDistanceToFinalization,
blocksInFinalizationData,
)
require.NoError(t, errRelief, "relief gate (gapFactor=4) should NOT bail on blockGap=3")
require.Equal(t, int64(1003), latest, "relief path should observe the caught-up latest block")
require.Greater(t, slept, time.Duration(0), "relief path should have slept at least once before condition was met")
}

// TestHandleConsistency_ReliefGate_DefaultsPreserved verifies that a zero-value
// RPCProviderServer (no relief configured) keeps the pre-patch gate constants
// (gapFactor=2, probGate=0.4) via the zero-value guards in handleConsistency.
// The first-clause bail path is already covered by TestHandleConsistency_TooNewBailsFast
// on a zero-value server; this test adds the second-clause bail and an algebraic-identity
// check against an explicit-default server.
func TestHandleConsistency_ReliefGate_DefaultsPreserved(t *testing.T) {
t.Run("second clause bails when probabilityBlockError > 0.4", func(t *testing.T) {
// Fresh changeTime + small halfTimeLeft => eventRate ~0.025 =>
// Poisson CDF(k=1, lambda=0.025) ~ 0.9997, well above 0.4.
// Large blockLagForQosSync keeps the first clause dormant so the second clause
// (governed by probGate) is what decides.
tracker := NewMockChainTracker()
tracker.SetLatestBlock(1000, time.Now())

rpcps := &RPCProviderServer{
chainTracker: tracker,
rpcProviderEndpoint: &lavasession.RPCProviderEndpoint{ChainID: "TEST"},
}

ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()

_, _, err := rpcps.handleConsistency(
ctx,
time.Second, // baseRelayTimeout
1002, // seenBlock
1002, // requestBlock; blockGap = 2 > 1
time.Second, // averageBlockTime => eventRate ~ halfTimeLeft/1s
100, // blockLagForQosSync: 2*100=200 >> blockGap, first clause dormant
0, 0,
)
require.Error(t, err, "zero-value server must still use probGate=0.4 and bail here")
})

t.Run("zero-value matches explicit-defaults (algebraic identity)", func(t *testing.T) {
// Same scenario as TestHandleConsistency_TooNewBailsFast but run twice:
// once with both relief fields at zero, once with explicit historical defaults.
// The zero-value guards must produce the same bail/no-bail decision in both.
runCase := func(probGate float64, gapFactor int64) error {
tracker := NewMockChainTracker()
tracker.SetLatestBlock(1, time.Now().Add(-time.Second))
rpcps := &RPCProviderServer{
chainTracker: tracker,
rpcProviderEndpoint: &lavasession.RPCProviderEndpoint{ChainID: "TEST"},
consistencyProbGate: probGate,
consistencyBlockGapFactor: gapFactor,
}
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
_, _, err := rpcps.handleConsistency(ctx, time.Second, 1000, 1000, time.Second, 1, 0, 0)
return err
}

errZero := runCase(0, 0)
errExplicit := runCase(0.4, 2)
require.Equal(t,
errZero != nil, errExplicit != nil,
"zero-value RPCProviderServer must reach the same bail decision as one with consistencyProbGate=0.4, consistencyBlockGapFactor=2",
)
})
}

func TestHandleConsistency_SleepsAndCatchesUp(t *testing.T) {
// First call returns 1, subsequent calls return 2.
seqTracker := &sequenceChainTracker{
Expand Down
75 changes: 74 additions & 1 deletion protocol/rpcprovider/rpcprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,19 @@ var (

RelaysHealthEnableFlagDefault = true
RelayHealthIntervalFlagDefault = 5 * time.Minute

// polling-relief: process-wide flags that slow chain-tracker polling and widen the
// handleConsistency rejection gate. Zero value (flag unset) = no relief, current defaults.
// See agent_docs/chaintracker-polling-frequency.
PollingReliefTimeMultiplier int
PollingReliefProbGate float64
PollingReliefBlockGapFactor int64
)

const (
PollingReliefTimeMultiplierFlagName = "polling-time-multiplier"
PollingReliefProbGateFlagName = "consistency-probability-gate"
PollingReliefBlockGapFactorFlagName = "consistency-block-gap-factor"
)

// used to call SetPolicy in base chain parser so we are allowed to run verifications on the addons and extensions
Expand Down Expand Up @@ -620,6 +633,62 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint
var chainRouter chainlib.ChainRouter
var loadManager *ProviderLoadManager

// polling-relief: process-wide flags that slow chain-tracker polling and widen the
// handleConsistency rejection gate. Zero value on any local preserves current defaults.
// PollingTimeMultiplier must stay >= 4 to avoid divide-by-zero in chain_tracker.go:463,465.
// See agent_docs/chaintracker-polling-frequency.
var (
pollingTimeMultiplier int
consistencyProbGate float64
consistencyBlockGapFactor int64
)
if PollingReliefTimeMultiplier != 0 {
if PollingReliefTimeMultiplier < 4 || PollingReliefTimeMultiplier > 16 {
utils.LavaFormatWarning("--"+PollingReliefTimeMultiplierFlagName+" out of allowed range [4,16]; reverting to default", nil,
utils.LogAttr("provided", PollingReliefTimeMultiplier),
utils.LogAttr("chainID", rpcProviderEndpoint.ChainID),
)
} else {
pollingTimeMultiplier = PollingReliefTimeMultiplier
}
}
if PollingReliefProbGate != 0 {
if PollingReliefProbGate <= 0 || PollingReliefProbGate > 1.0 {
utils.LavaFormatWarning("--"+PollingReliefProbGateFlagName+" out of allowed range (0,1]; reverting to default", nil,
utils.LogAttr("provided", PollingReliefProbGate),
utils.LogAttr("chainID", rpcProviderEndpoint.ChainID),
)
} else {
consistencyProbGate = PollingReliefProbGate
}
}
if PollingReliefBlockGapFactor != 0 {
if PollingReliefBlockGapFactor < 2 || PollingReliefBlockGapFactor > 8 {
utils.LavaFormatWarning("--"+PollingReliefBlockGapFactorFlagName+" out of allowed range [2,8]; reverting to default", nil,
utils.LogAttr("provided", PollingReliefBlockGapFactor),
utils.LogAttr("chainID", rpcProviderEndpoint.ChainID),
)
} else {
consistencyBlockGapFactor = PollingReliefBlockGapFactor
}
}
if pollingTimeMultiplier != 0 || consistencyProbGate > 0 || consistencyBlockGapFactor != 0 {
effectiveMultiplier := pollingTimeMultiplier
if effectiveMultiplier == 0 {
effectiveMultiplier = chaintracker.MostFrequentPollingMultiplier
}
utils.LavaFormatInfo("polling-relief active",
utils.LogAttr("chainID", rpcProviderEndpoint.ChainID),
utils.LogAttr("apiInterface", apiInterface),
utils.LogAttr("pollingTimeMultiplier", effectiveMultiplier),
utils.LogAttr("consistencyProbGate", consistencyProbGate),
utils.LogAttr("consistencyBlockGapFactor", consistencyBlockGapFactor),
utils.LogAttr("expectedPollIntervalMs_fast", (averageBlockTime / time.Duration(effectiveMultiplier/4)).Milliseconds()),
utils.LogAttr("expectedPollIntervalMs_medium", (averageBlockTime / time.Duration(effectiveMultiplier/2)).Milliseconds()),
utils.LogAttr("expectedPollIntervalMs_slow", (averageBlockTime / time.Duration(effectiveMultiplier)).Milliseconds()),
)
}

// Provider test-mode should not depend on external node URLs.
// In test-mode, relays are served from predefined responses; routing to the node is unexpected.
if rpcp.testMode {
Expand Down Expand Up @@ -712,6 +781,7 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint
Pmetrics: rpcp.providerMetricsManager,
ChainId: chainID,
ParseDirectiveEnabled: chainParser.ParseDirectiveEnabled(),
PollingTimeMultiplier: pollingTimeMultiplier,
}

chainTracker, err = chaintracker.NewChainTracker(ctx, chainFetcher, chainTrackerConfig)
Expand Down Expand Up @@ -837,7 +907,7 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint
)
}

rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rpcp.rewardServer, providerSessionManager, chainTracker, rpcp.privKey, rpcp.cache, rpcp.cacheLatestBlockEnabled, chainRouter, rpcp.providerStateTracker, rpcp.addr, rpcp.lavaChainID, DEFAULT_ALLOWED_MISSING_CU, providerMetrics, relaysMonitor, providerNodeSubscriptionManager, rpcp.staticProvider, loadManager, rpcp, numberOfRetriesAllowedOnNodeErrors, testModeConfig, resourceLimiter, rpcp.enableConsistencyChecks)
rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rpcp.rewardServer, providerSessionManager, chainTracker, rpcp.privKey, rpcp.cache, rpcp.cacheLatestBlockEnabled, chainRouter, rpcp.providerStateTracker, rpcp.addr, rpcp.lavaChainID, DEFAULT_ALLOWED_MISSING_CU, providerMetrics, relaysMonitor, providerNodeSubscriptionManager, rpcp.staticProvider, loadManager, rpcp, numberOfRetriesAllowedOnNodeErrors, testModeConfig, resourceLimiter, rpcp.enableConsistencyChecks, consistencyProbGate, consistencyBlockGapFactor)
// set up grpc listener
var listener *ProviderListener
func() {
Expand Down Expand Up @@ -1296,6 +1366,9 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt
cmdRPCProvider.Flags().Bool("test_mode", false, "enable test mode - provider returns predefined responses instead of querying real nodes")
cmdRPCProvider.Flags().String("test_responses", "", "path to JSON file containing test responses for different API methods (required when test_mode is enabled)")
cmdRPCProvider.Flags().Uint64Var(&chaintracker.PollingMultiplier, chaintracker.PollingMultiplierFlagName, 1, "when set, forces the chain tracker to poll more often, improving the sync at the cost of more queries")
cmdRPCProvider.Flags().IntVar(&PollingReliefTimeMultiplier, PollingReliefTimeMultiplierFlagName, 0, "polling-relief: override MostFrequentPollingMultiplier (default 16). Allowed [4,16]; values outside revert to default. Smaller value = slower polling.")
cmdRPCProvider.Flags().Float64Var(&PollingReliefProbGate, PollingReliefProbGateFlagName, 0, "polling-relief: override handleConsistency probabilityBlockError gate (default 0.4). Allowed (0,1]; values outside revert to default.")
cmdRPCProvider.Flags().Int64Var(&PollingReliefBlockGapFactor, PollingReliefBlockGapFactorFlagName, 0, "polling-relief: override handleConsistency blockLagForQosSync multiplier (default 2). Allowed [2,8]; values outside revert to default.")
cmdRPCProvider.Flags().DurationVar(&SpecValidationInterval, SpecValidationIntervalFlagName, SpecValidationInterval, "determines the interval of which to run validation on the spec for all connected chains")
cmdRPCProvider.Flags().DurationVar(&SpecValidationIntervalDisabledChains, SpecValidationIntervalDisabledChainsFlagName, SpecValidationIntervalDisabledChains, "determines the interval of which to run validation on the spec for all disabled chains, determines recovery time")
cmdRPCProvider.Flags().Bool(common.RelaysHealthEnableFlag, true, "enables relays health check")
Expand Down
Loading
Loading