diff --git a/protocol/chaintracker/chain_tracker_test.go b/protocol/chaintracker/chain_tracker_test.go index fcae7f7fc9..366c54080d 100644 --- a/protocol/chaintracker/chain_tracker_test.go +++ b/protocol/chaintracker/chain_tracker_test.go @@ -367,6 +367,88 @@ func TestChainTrackerCallbacks(t *testing.T) { }) } +// TestChainTracker_PollingMultiplier4_NoDivideByZero is the safety-net test for the +// polling-relief patch. At PollingTimeMultiplier=4 the adaptive branches in +// chain_tracker.go updateTimer divide by M/4=1 and M/2=2. If the clamp at the +// config layer ever fails and lets M<4 through, those integer divisions go to zero +// and the tracker panics. This test asserts the M=4 case runs without panic and +// without absurd polling intervals. +func TestChainTracker_PollingMultiplier4_NoDivideByZero(t *testing.T) { + mockBlocks := int64(50) + fetcherBlocks := 1 + localTimeForPollingMock := 16 * time.Millisecond + + var ( + mu sync.Mutex + called int + last time.Time + diffs []time.Duration + ) + callback := func() { + mu.Lock() + defer mu.Unlock() + now := time.Now() + if !last.IsZero() { + diffs = append(diffs, now.Sub(last)) + } + last = now + called++ + } + + mockChainFetcher := NewMockChainFetcher(1000, mockBlocks, callback) + mockChainFetcher.AdvanceBlock() + + chainTrackerConfig := chaintracker.ChainTrackerConfig{ + BlocksToSave: uint64(fetcherBlocks), + AverageBlockTime: localTimeForPollingMock, + ServerBlockMemory: uint64(mockBlocks), + PollingTimeMultiplier: 4, + ParseDirectiveEnabled: true, + } + + // At M=4 the adaptive divisors are M/4=1 and M/2=2, both non-zero. + // If a future refactor lets M<4 through, this call panics in updateTimer. + require.NotPanics(t, func() { + tracker, err := chaintracker.NewChainTracker(context.Background(), mockChainFetcher, chainTrackerConfig) + require.NoError(t, err) + err = tracker.StartAndServe(context.Background()) + require.NoError(t, err) + + // Pre-seed block gaps so self-tuning thinks blocks arrive every localTimeForPollingMock + // and doesn't rescale our base pollingTime out from under us. + for i := 0; i < 50; i++ { + tracker.AddBlockGap(localTimeForPollingMock, 1) + } + + // Run long enough to exercise the adaptive branches. + // Advance a block mid-run so timeSinceLastUpdate crosses both halves of the cycle + // and we hit all three branches (fast / medium / slow) at least once. + time.Sleep(4 * localTimeForPollingMock) + mockChainFetcher.AdvanceBlock() + time.Sleep(6 * localTimeForPollingMock) + }) + + mu.Lock() + defer mu.Unlock() + + require.Greater(t, called, 10, "tracker should have polled many times during the test window") + require.NotEmpty(t, diffs, "should have recorded inter-poll intervals") + + var total time.Duration + for _, d := range diffs { + total += d + } + avg := total / time.Duration(len(diffs)) + + // At M=4: fast=blockTime, medium=blockTime/2, slow=blockTime/4. + // Observed average should be between blockTime/8 (very conservative lower bound + // accounting for test-clock jitter on the fast-polling branch) and blockTime*2 + // (likewise upper bound). The tight "within 30% of blockTime" from the plan is + // informative but too flaky as a strict assertion given goroutine scheduling noise. + require.Greater(t, avg, localTimeForPollingMock/8, "average poll interval suspiciously small") + require.Less(t, avg, localTimeForPollingMock*2, "average poll interval suspiciously large") +} + func TestChainTrackerFetchSpreadAcrossPollingTime(t *testing.T) { t.Run("one long test", func(t *testing.T) { mockBlocks := int64(50) diff --git a/protocol/rpcprovider/consistency_test.go b/protocol/rpcprovider/consistency_test.go index d0ec82c3fe..43b9f1375c 100644 --- a/protocol/rpcprovider/consistency_test.go +++ b/protocol/rpcprovider/consistency_test.go @@ -114,6 +114,143 @@ func TestHandleConsistency_TooNewBailsFast(t *testing.T) { require.Equal(t, time.Duration(0), slept) } +// TestHandleConsistency_ReliefGate_WidensRejection verifies the polling-relief gate. +// With blockLagForQosSync=1 and blockGap=3, the default gate (blockLagForQosSync*2 = 2) +// bails because 3 > 2. The relief-configured gate (*4 = 4) does NOT bail because 3 <= 4. +// The changeTime is intentionally very stale so eventRate is large and probabilityBlockError +// is ~0, keeping the second (Poisson) clause inert for both configs so the factor clause +// is what actually differentiates them. +func TestHandleConsistency_ReliefGate_WidensRejection(t *testing.T) { + newTracker := func() *sequenceChainTracker { + return &sequenceChainTracker{ + DummyChainTracker: &chaintracker.DummyChainTracker{}, + // pre-gate read sees 1000; subsequent wait-loop reads see 1003 so the + // relief path exits cleanly without hitting the post-wait "too new" branch. + seq: []int64{1000, 1003, 1003, 1003, 1003, 1003}, + changeTime: time.Now().Add(-10 * time.Second), + } + } + + // averageBlockTime is small and changeTime is very stale => eventRate ~= 40, + // Poisson CDF(blockGap-1=2, lambda=40) is essentially 0, so probabilityBlockError ~= 0. + // That makes the second clause of the gate inert and isolates the factor-clause difference. + const ( + baseRelayTimeout = time.Second + seenBlock = int64(1003) + requestBlock = int64(1003) + averageBlockTime = 250 * time.Millisecond + blockLagForQosSync = int64(1) + blockDistanceToFinalization = uint32(0) + blocksInFinalizationData = uint32(0) + ) + + // Default server: no relief. gapFactor=2 -> bails because blockGap(3) > 1*2. + defaultServer := &RPCProviderServer{ + chainTracker: newTracker(), + rpcProviderEndpoint: &lavasession.RPCProviderEndpoint{ChainID: "TEST"}, + } + ctxDefault, cancelDefault := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancelDefault() + _, _, errDefault := defaultServer.handleConsistency( + ctxDefault, + baseRelayTimeout, + seenBlock, + requestBlock, + averageBlockTime, + blockLagForQosSync, + blockDistanceToFinalization, + blocksInFinalizationData, + ) + require.Error(t, errDefault, "default gate (gapFactor=2) should bail on blockGap=3") + + // Relief server: gapFactor=4, probGate=0.7. Does NOT bail at the gate, enters wait loop, + // sees latest catch up to 1003 on the next tracker read, returns without error. + reliefServer := &RPCProviderServer{ + chainTracker: newTracker(), + rpcProviderEndpoint: &lavasession.RPCProviderEndpoint{ChainID: "TEST"}, + consistencyProbGate: 0.7, + consistencyBlockGapFactor: 4, + } + ctxRelief, cancelRelief := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancelRelief() + latest, slept, errRelief := reliefServer.handleConsistency( + ctxRelief, + baseRelayTimeout, + seenBlock, + requestBlock, + averageBlockTime, + blockLagForQosSync, + blockDistanceToFinalization, + blocksInFinalizationData, + ) + require.NoError(t, errRelief, "relief gate (gapFactor=4) should NOT bail on blockGap=3") + require.Equal(t, int64(1003), latest, "relief path should observe the caught-up latest block") + require.Greater(t, slept, time.Duration(0), "relief path should have slept at least once before condition was met") +} + +// TestHandleConsistency_ReliefGate_DefaultsPreserved verifies that a zero-value +// RPCProviderServer (no relief configured) keeps the pre-patch gate constants +// (gapFactor=2, probGate=0.4) via the zero-value guards in handleConsistency. +// The first-clause bail path is already covered by TestHandleConsistency_TooNewBailsFast +// on a zero-value server; this test adds the second-clause bail and an algebraic-identity +// check against an explicit-default server. +func TestHandleConsistency_ReliefGate_DefaultsPreserved(t *testing.T) { + t.Run("second clause bails when probabilityBlockError > 0.4", func(t *testing.T) { + // Fresh changeTime + small halfTimeLeft => eventRate ~0.025 => + // Poisson CDF(k=1, lambda=0.025) ~ 0.9997, well above 0.4. + // Large blockLagForQosSync keeps the first clause dormant so the second clause + // (governed by probGate) is what decides. + tracker := NewMockChainTracker() + tracker.SetLatestBlock(1000, time.Now()) + + rpcps := &RPCProviderServer{ + chainTracker: tracker, + rpcProviderEndpoint: &lavasession.RPCProviderEndpoint{ChainID: "TEST"}, + } + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + _, _, err := rpcps.handleConsistency( + ctx, + time.Second, // baseRelayTimeout + 1002, // seenBlock + 1002, // requestBlock; blockGap = 2 > 1 + time.Second, // averageBlockTime => eventRate ~ halfTimeLeft/1s + 100, // blockLagForQosSync: 2*100=200 >> blockGap, first clause dormant + 0, 0, + ) + require.Error(t, err, "zero-value server must still use probGate=0.4 and bail here") + }) + + t.Run("zero-value matches explicit-defaults (algebraic identity)", func(t *testing.T) { + // Same scenario as TestHandleConsistency_TooNewBailsFast but run twice: + // once with both relief fields at zero, once with explicit historical defaults. + // The zero-value guards must produce the same bail/no-bail decision in both. + runCase := func(probGate float64, gapFactor int64) error { + tracker := NewMockChainTracker() + tracker.SetLatestBlock(1, time.Now().Add(-time.Second)) + rpcps := &RPCProviderServer{ + chainTracker: tracker, + rpcProviderEndpoint: &lavasession.RPCProviderEndpoint{ChainID: "TEST"}, + consistencyProbGate: probGate, + consistencyBlockGapFactor: gapFactor, + } + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + _, _, err := rpcps.handleConsistency(ctx, time.Second, 1000, 1000, time.Second, 1, 0, 0) + return err + } + + errZero := runCase(0, 0) + errExplicit := runCase(0.4, 2) + require.Equal(t, + errZero != nil, errExplicit != nil, + "zero-value RPCProviderServer must reach the same bail decision as one with consistencyProbGate=0.4, consistencyBlockGapFactor=2", + ) + }) +} + func TestHandleConsistency_SleepsAndCatchesUp(t *testing.T) { // First call returns 1, subsequent calls return 2. seqTracker := &sequenceChainTracker{ diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index b323f339bf..c3ab572243 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -64,6 +64,19 @@ var ( RelaysHealthEnableFlagDefault = true RelayHealthIntervalFlagDefault = 5 * time.Minute + + // polling-relief: process-wide flags that slow chain-tracker polling and widen the + // handleConsistency rejection gate. Zero value (flag unset) = no relief, current defaults. + // See agent_docs/chaintracker-polling-frequency. + PollingReliefTimeMultiplier int + PollingReliefProbGate float64 + PollingReliefBlockGapFactor int64 +) + +const ( + PollingReliefTimeMultiplierFlagName = "polling-time-multiplier" + PollingReliefProbGateFlagName = "consistency-probability-gate" + PollingReliefBlockGapFactorFlagName = "consistency-block-gap-factor" ) // used to call SetPolicy in base chain parser so we are allowed to run verifications on the addons and extensions @@ -620,6 +633,62 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint var chainRouter chainlib.ChainRouter var loadManager *ProviderLoadManager + // polling-relief: process-wide flags that slow chain-tracker polling and widen the + // handleConsistency rejection gate. Zero value on any local preserves current defaults. + // PollingTimeMultiplier must stay >= 4 to avoid divide-by-zero in chain_tracker.go:463,465. + // See agent_docs/chaintracker-polling-frequency. + var ( + pollingTimeMultiplier int + consistencyProbGate float64 + consistencyBlockGapFactor int64 + ) + if PollingReliefTimeMultiplier != 0 { + if PollingReliefTimeMultiplier < 4 || PollingReliefTimeMultiplier > 16 { + utils.LavaFormatWarning("--"+PollingReliefTimeMultiplierFlagName+" out of allowed range [4,16]; reverting to default", nil, + utils.LogAttr("provided", PollingReliefTimeMultiplier), + utils.LogAttr("chainID", rpcProviderEndpoint.ChainID), + ) + } else { + pollingTimeMultiplier = PollingReliefTimeMultiplier + } + } + if PollingReliefProbGate != 0 { + if PollingReliefProbGate <= 0 || PollingReliefProbGate > 1.0 { + utils.LavaFormatWarning("--"+PollingReliefProbGateFlagName+" out of allowed range (0,1]; reverting to default", nil, + utils.LogAttr("provided", PollingReliefProbGate), + utils.LogAttr("chainID", rpcProviderEndpoint.ChainID), + ) + } else { + consistencyProbGate = PollingReliefProbGate + } + } + if PollingReliefBlockGapFactor != 0 { + if PollingReliefBlockGapFactor < 2 || PollingReliefBlockGapFactor > 8 { + utils.LavaFormatWarning("--"+PollingReliefBlockGapFactorFlagName+" out of allowed range [2,8]; reverting to default", nil, + utils.LogAttr("provided", PollingReliefBlockGapFactor), + utils.LogAttr("chainID", rpcProviderEndpoint.ChainID), + ) + } else { + consistencyBlockGapFactor = PollingReliefBlockGapFactor + } + } + if pollingTimeMultiplier != 0 || consistencyProbGate > 0 || consistencyBlockGapFactor != 0 { + effectiveMultiplier := pollingTimeMultiplier + if effectiveMultiplier == 0 { + effectiveMultiplier = chaintracker.MostFrequentPollingMultiplier + } + utils.LavaFormatInfo("polling-relief active", + utils.LogAttr("chainID", rpcProviderEndpoint.ChainID), + utils.LogAttr("apiInterface", apiInterface), + utils.LogAttr("pollingTimeMultiplier", effectiveMultiplier), + utils.LogAttr("consistencyProbGate", consistencyProbGate), + utils.LogAttr("consistencyBlockGapFactor", consistencyBlockGapFactor), + utils.LogAttr("expectedPollIntervalMs_fast", (averageBlockTime / time.Duration(effectiveMultiplier/4)).Milliseconds()), + utils.LogAttr("expectedPollIntervalMs_medium", (averageBlockTime / time.Duration(effectiveMultiplier/2)).Milliseconds()), + utils.LogAttr("expectedPollIntervalMs_slow", (averageBlockTime / time.Duration(effectiveMultiplier)).Milliseconds()), + ) + } + // Provider test-mode should not depend on external node URLs. // In test-mode, relays are served from predefined responses; routing to the node is unexpected. if rpcp.testMode { @@ -712,6 +781,7 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint Pmetrics: rpcp.providerMetricsManager, ChainId: chainID, ParseDirectiveEnabled: chainParser.ParseDirectiveEnabled(), + PollingTimeMultiplier: pollingTimeMultiplier, } chainTracker, err = chaintracker.NewChainTracker(ctx, chainFetcher, chainTrackerConfig) @@ -837,7 +907,7 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint ) } - rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rpcp.rewardServer, providerSessionManager, chainTracker, rpcp.privKey, rpcp.cache, rpcp.cacheLatestBlockEnabled, chainRouter, rpcp.providerStateTracker, rpcp.addr, rpcp.lavaChainID, DEFAULT_ALLOWED_MISSING_CU, providerMetrics, relaysMonitor, providerNodeSubscriptionManager, rpcp.staticProvider, loadManager, rpcp, numberOfRetriesAllowedOnNodeErrors, testModeConfig, resourceLimiter, rpcp.enableConsistencyChecks) + rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rpcp.rewardServer, providerSessionManager, chainTracker, rpcp.privKey, rpcp.cache, rpcp.cacheLatestBlockEnabled, chainRouter, rpcp.providerStateTracker, rpcp.addr, rpcp.lavaChainID, DEFAULT_ALLOWED_MISSING_CU, providerMetrics, relaysMonitor, providerNodeSubscriptionManager, rpcp.staticProvider, loadManager, rpcp, numberOfRetriesAllowedOnNodeErrors, testModeConfig, resourceLimiter, rpcp.enableConsistencyChecks, consistencyProbGate, consistencyBlockGapFactor) // set up grpc listener var listener *ProviderListener func() { @@ -1296,6 +1366,9 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt cmdRPCProvider.Flags().Bool("test_mode", false, "enable test mode - provider returns predefined responses instead of querying real nodes") cmdRPCProvider.Flags().String("test_responses", "", "path to JSON file containing test responses for different API methods (required when test_mode is enabled)") cmdRPCProvider.Flags().Uint64Var(&chaintracker.PollingMultiplier, chaintracker.PollingMultiplierFlagName, 1, "when set, forces the chain tracker to poll more often, improving the sync at the cost of more queries") + cmdRPCProvider.Flags().IntVar(&PollingReliefTimeMultiplier, PollingReliefTimeMultiplierFlagName, 0, "polling-relief: override MostFrequentPollingMultiplier (default 16). Allowed [4,16]; values outside revert to default. Smaller value = slower polling.") + cmdRPCProvider.Flags().Float64Var(&PollingReliefProbGate, PollingReliefProbGateFlagName, 0, "polling-relief: override handleConsistency probabilityBlockError gate (default 0.4). Allowed (0,1]; values outside revert to default.") + cmdRPCProvider.Flags().Int64Var(&PollingReliefBlockGapFactor, PollingReliefBlockGapFactorFlagName, 0, "polling-relief: override handleConsistency blockLagForQosSync multiplier (default 2). Allowed [2,8]; values outside revert to default.") cmdRPCProvider.Flags().DurationVar(&SpecValidationInterval, SpecValidationIntervalFlagName, SpecValidationInterval, "determines the interval of which to run validation on the spec for all connected chains") cmdRPCProvider.Flags().DurationVar(&SpecValidationIntervalDisabledChains, SpecValidationIntervalDisabledChainsFlagName, SpecValidationIntervalDisabledChains, "determines the interval of which to run validation on the spec for all disabled chains, determines recovery time") cmdRPCProvider.Flags().Bool(common.RelaysHealthEnableFlag, true, "enables relays health check") diff --git a/protocol/rpcprovider/rpcprovider_server.go b/protocol/rpcprovider/rpcprovider_server.go index 054f23b45f..72332bb03c 100644 --- a/protocol/rpcprovider/rpcprovider_server.go +++ b/protocol/rpcprovider/rpcprovider_server.go @@ -126,6 +126,8 @@ type RPCProviderServer struct { testModeConfig *TestModeConfig resourceLimiter *ResourceLimiter enableConsistency bool + consistencyProbGate float64 // polling-relief: 0 = use default 0.4 in handleConsistency + consistencyBlockGapFactor int64 // polling-relief: 0 = use default 2 in handleConsistency } type RewardServerInf interface { @@ -173,6 +175,8 @@ func (rpcps *RPCProviderServer) ServeRPCRequests( testModeConfig *TestModeConfig, resourceLimiter *ResourceLimiter, enableConsistency bool, + consistencyProbGate float64, + consistencyBlockGapFactor int64, ) { rpcps.cache = cache rpcps.cacheLatestBlockEnabled = cacheLatestBlockEnabled @@ -200,6 +204,16 @@ func (rpcps *RPCProviderServer) ServeRPCRequests( rpcps.verificationsStatusGetter = verificationsStatusGetter rpcps.resourceLimiter = resourceLimiter rpcps.enableConsistency = enableConsistency + rpcps.consistencyProbGate = consistencyProbGate + rpcps.consistencyBlockGapFactor = consistencyBlockGapFactor + if consistencyProbGate > 0 || consistencyBlockGapFactor > 0 { + utils.LavaFormatInfo("polling-relief consistency-gate active on provider server", + utils.LogAttr("chainID", rpcProviderEndpoint.ChainID), + utils.LogAttr("apiInterface", rpcProviderEndpoint.ApiInterface), + utils.LogAttr("consistencyProbGate", consistencyProbGate), + utils.LogAttr("consistencyBlockGapFactor", consistencyBlockGapFactor), + ) + } rpcps.initRelaysMonitor(ctx) } @@ -1269,14 +1283,24 @@ func (rpcps *RPCProviderServer) handleConsistency(ctx context.Context, baseRelay utils.LavaFormatError("invalid rate params", nil, utils.Attribute{Key: "changeTime", Value: changeTime}, utils.Attribute{Key: "averageBlockTime", Value: averageBlockTime}, utils.Attribute{Key: "eventRate", Value: eventRate}, utils.Attribute{Key: "time", Value: time.Until(deadline)}, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: utils.KEY_REQUEST_ID, Value: ctx}, utils.Attribute{Key: utils.KEY_TASK_ID, Value: ctx}, utils.Attribute{Key: utils.KEY_TRANSACTION_ID, Value: ctx}, utils.Attribute{Key: "requestedBlock", Value: requestBlock}, utils.Attribute{Key: "latestBlock", Value: latestBlock}, utils.Attribute{Key: "blockGap", Value: blockGap}) } else { probabilityBlockError = provideroptimizer.CumulativeProbabilityFunctionForPoissonDist(uint64(blockGap-1), eventRate) // this calculates the probability we received insufficient blocks. too few when we don't wait - if debugConsistency { + // polling-relief: when the consistency gate is widened we want field visibility without flipping debugConsistency globally. + if debugConsistency || rpcps.consistencyProbGate > 0 { utils.LavaFormatDebug("consistency calculations breakdown", utils.Attribute{Key: "averageBlockTime", Value: averageBlockTime}, utils.Attribute{Key: "eventRate", Value: eventRate}, utils.Attribute{Key: "probabilityBlockError", Value: probabilityBlockError}, utils.Attribute{Key: "time", Value: time.Until(deadline)}, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: utils.KEY_REQUEST_ID, Value: ctx}, utils.Attribute{Key: utils.KEY_TASK_ID, Value: ctx}, utils.Attribute{Key: utils.KEY_TRANSACTION_ID, Value: ctx}, utils.Attribute{Key: "requestedBlock", Value: requestBlock}, utils.Attribute{Key: "latestBlock", Value: latestBlock}, utils.Attribute{Key: "blockGap", Value: blockGap}) } } } // we only bail if there is no chance for the provider to get to the requested block and the consumer has already got a response from a different provider with that block - if (blockGap > blockLagForQosSync*2 || (blockGap > 1 && probabilityBlockError > 0.4)) && (seenBlock >= latestBlock) { - return latestBlock, 0, utils.LavaFormatWarning("Requested a block that is too new", protocolerrors.ConsistencyError, utils.Attribute{Key: "blockGap", Value: blockGap}, utils.Attribute{Key: "probabilityBlockError", Value: probabilityBlockError}, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: utils.KEY_REQUEST_ID, Value: ctx}, utils.Attribute{Key: utils.KEY_TASK_ID, Value: ctx}, utils.Attribute{Key: utils.KEY_TRANSACTION_ID, Value: ctx}, utils.Attribute{Key: "seenBlock", Value: seenBlock}, utils.Attribute{Key: "requestedBlock", Value: requestBlock}, utils.Attribute{Key: "latestBlock", Value: latestBlock}, utils.Attribute{Key: "chainID", Value: rpcps.rpcProviderEndpoint.ChainID}) + // polling-relief: zero-value on either field preserves the historical defaults (0.4 and 2). + probGate := 0.4 + if rpcps.consistencyProbGate > 0 { + probGate = rpcps.consistencyProbGate + } + gapFactor := int64(2) + if rpcps.consistencyBlockGapFactor > 0 { + gapFactor = rpcps.consistencyBlockGapFactor + } + if (blockGap > blockLagForQosSync*gapFactor || (blockGap > 1 && probabilityBlockError > probGate)) && (seenBlock >= latestBlock) { + return latestBlock, 0, utils.LavaFormatWarning("Requested a block that is too new", protocolerrors.ConsistencyError, utils.Attribute{Key: "blockGap", Value: blockGap}, utils.Attribute{Key: "probabilityBlockError", Value: probabilityBlockError}, utils.Attribute{Key: "probGate", Value: probGate}, utils.Attribute{Key: "gapFactor", Value: gapFactor}, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: utils.KEY_REQUEST_ID, Value: ctx}, utils.Attribute{Key: utils.KEY_TASK_ID, Value: ctx}, utils.Attribute{Key: utils.KEY_TRANSACTION_ID, Value: ctx}, utils.Attribute{Key: "seenBlock", Value: seenBlock}, utils.Attribute{Key: "requestedBlock", Value: requestBlock}, utils.Attribute{Key: "latestBlock", Value: latestBlock}, utils.Attribute{Key: "chainID", Value: rpcps.rpcProviderEndpoint.ChainID}) } if !ok { @@ -1299,7 +1323,8 @@ func (rpcps *RPCProviderServer) handleConsistency(ctx context.Context, baseRelay // meaning we can't guarantee it will work since chainTracker didn't see this requested block yet return 0, sleptTime, utils.LavaFormatWarning("requested block is too new", nil, utils.Attribute{Key: "sleptTime", Value: sleptTime}, utils.Attribute{Key: "requested", Value: requestBlock}, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: utils.KEY_REQUEST_ID, Value: ctx}, utils.Attribute{Key: utils.KEY_TASK_ID, Value: ctx}, utils.Attribute{Key: utils.KEY_TRANSACTION_ID, Value: ctx}, utils.Attribute{Key: "latestBlock", Value: latestBlock}, utils.Attribute{Key: "chainID", Value: rpcps.rpcProviderEndpoint.ChainID}, utils.Attribute{Key: "seenBlock", Value: seenBlock}) } - if debugConsistency { + if debugConsistency || rpcps.consistencyProbGate > 0 { + // polling-relief: sleptTime is the canary for consistency-wait regressions while relief is active. utils.LavaFormatDebug("consistency sleep done", utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: utils.KEY_REQUEST_ID, Value: ctx}, utils.Attribute{Key: utils.KEY_TASK_ID, Value: ctx}, utils.Attribute{Key: utils.KEY_TRANSACTION_ID, Value: ctx}, utils.Attribute{Key: "sleptTime", Value: sleptTime}) } return latestBlock, sleptTime, nil diff --git a/scripts/pre_setups/init_lava_only_with_node_three_providers.sh b/scripts/pre_setups/init_lava_only_with_node_three_providers.sh index c5781e88dd..08f6cadd62 100755 --- a/scripts/pre_setups/init_lava_only_with_node_three_providers.sh +++ b/scripts/pre_setups/init_lava_only_with_node_three_providers.sh @@ -21,7 +21,7 @@ sleep 5 wait_for_lava_node_to_start GASPRICE="0.00002ulava" -specs=$(get_all_specs) +specs="./specs/mainnet-1/specs/tendermint.json,./specs/mainnet-1/specs/ibc.json,./specs/mainnet-1/specs/cosmossdk.json,./specs/testnet-2/specs/lava.json" lavad tx gov submit-legacy-proposal spec-add $specs --lava-dev-test -y --from alice --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE & wait_next_block wait_next_block @@ -64,11 +64,14 @@ screen -d -m -S cache_consumer bash -c "source ~/.bashrc; lavap cache \ 127.0.0.1:20100 --metrics_address 0.0.0.0:20200 --log_level debug 2>&1 | tee $LOGS_DIR/CACHE_CONSUMER.log" && sleep 0.25 sleep 2; +# Provider 1 runs with polling-relief flags enabled for A/B comparison. +# Providers 2 and 3 below stay on positional args with no relief flags as baseline controls. screen -d -m -S provider1 bash -c "source ~/.bashrc; lavap rpcprovider \ $PROVIDER1_LISTENER LAV1 rest '$LAVA_REST' \ $PROVIDER1_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC_WS' \ $PROVIDER1_LISTENER LAV1 grpc '$LAVA_GRPC' \ -$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer1 --chain-id lava --metrics-listen-address ":7766" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25 +$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer1 --chain-id lava --metrics-listen-address \":7766\" \ +--polling-time-multiplier 4 --consistency-probability-gate 0.7 --consistency-block-gap-factor 4 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25 screen -d -m -S provider2 bash -c "source ~/.bashrc; lavap rpcprovider \ $PROVIDER2_LISTENER LAV1 rest '$LAVA_REST' \