Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions cmd/workflow/simulate/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
consensusserver "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/consensus/server"
crontrigger "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/triggers/cron/server"
httptrigger "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/triggers/http/server"
"github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/core/capabilities"
Expand All @@ -26,7 +27,7 @@ type ManualTriggers struct {

// NewManualTriggerCapabilities creates and registers cron and HTTP trigger capabilities.
// These are chain-agnostic and shared across all chain types.
func NewManualTriggerCapabilities(ctx context.Context, lggr logger.Logger, registry *capabilities.Registry, httpTriggerPort int) (*ManualTriggers, error) {
func NewManualTriggerCapabilities(ctx context.Context, lggr logger.Logger, registry *capabilities.Registry, httpTriggerPort int, limits *SimulationLimits) (*ManualTriggers, error) {
manualCronTrigger, err := fakes.NewManualCronTriggerService(lggr)
if err != nil {
return nil, err
Expand All @@ -36,7 +37,12 @@ func NewManualTriggerCapabilities(ctx context.Context, lggr logger.Logger, regis
return nil, err
}

manualHTTPTrigger := NewManualHTTPTriggerService(lggr, httpTriggerPort)
var httpTriggerRateLimit *config.Rate
if limits != nil {
rate := limits.HTTPTriggerRateLimit()
httpTriggerRateLimit = &rate
}
manualHTTPTrigger := NewManualHTTPTriggerService(lggr, httpTriggerPort, httpTriggerRateLimit)
manualHTTPTriggerServer := httptrigger.NewHTTPServer(manualHTTPTrigger)
if err := registry.Add(ctx, manualHTTPTriggerServer); err != nil {
return nil, err
Expand Down
32 changes: 26 additions & 6 deletions cmd/workflow/simulate/chain/evm/chaintype.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

corekeys "github.com/smartcontractkit/chainlink-common/keystore/corekeys"
evmpb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/chain-capabilities/evm"
"github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/services"

"github.com/smartcontractkit/cre-cli/cmd/workflow/simulate/chain"
Expand All @@ -28,6 +29,10 @@ const defaultSentinelPrivateKey = "000000000000000000000000000000000000000000000

var sentinelKeyBytes = common.FromHex(defaultSentinelPrivateKey)

type logTriggerRateLimits interface {
LogTriggerEventRateLimit() config.Rate
}

func init() {
chain.Register(string(corekeys.EVM), func(lggr *zerolog.Logger) chain.ChainType {
return &EVMChainType{log: lggr}
Expand Down Expand Up @@ -215,15 +220,17 @@ func (ct *EVMChainType) NewTriggerListener(ctx context.Context, selector uint64,
if strings.TrimSpace(params.ChainTypeInputs[TriggerInputTxHash]) != "" {
return nil, fmt.Errorf("--listen cannot be combined with --%s for EVM log triggers", TriggerInputTxHash)
}
eventRateLimit := eventRateLimitFromLimits(params.Limits)

cfg, err := decodeLogTriggerConfig(params.TriggerPayload)
if err != nil {
return nil, fmt.Errorf("failed to decode EVM log trigger config: %w", err)
}
return NewEVMLogTriggerListener(ctx, client, WaitForLogConfig{
Selector: selector,
Filter: cfg,
WorkflowName: params.WorkflowName,
Selector: selector,
Filter: cfg,
WorkflowName: params.WorkflowName,
EventRateLimit: eventRateLimit,
})
}

Expand Down Expand Up @@ -362,8 +369,21 @@ func (ct *EVMChainType) ResolveTriggerData(ctx context.Context, selector uint64,
return nil, fmt.Errorf("failed to decode EVM log trigger config: %w", err)
}
return WaitForEVMTriggerLog(ctx, client, WaitForLogConfig{
Selector: selector,
Filter: cfg,
WorkflowName: params.WorkflowName,
Selector: selector,
Filter: cfg,
WorkflowName: params.WorkflowName,
EventRateLimit: eventRateLimitFromLimits(params.Limits),
})
}

func eventRateLimitFromLimits(limits chain.Limits) *config.Rate {
if limits == nil {
return nil
}
triggerLimits, ok := limits.(logTriggerRateLimits)
if !ok {
return nil
}
rate := triggerLimits.LogTriggerEventRateLimit()
return &rate
}
23 changes: 23 additions & 0 deletions cmd/workflow/simulate/chain/evm/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"golang.org/x/time/rate"
"google.golang.org/protobuf/types/known/anypb"

evmpb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/chain-capabilities/evm"
"github.com/smartcontractkit/chainlink-common/pkg/config"
valuespb "github.com/smartcontractkit/chainlink-protos/cre/go/values/pb"

"github.com/smartcontractkit/cre-cli/internal/settings"
Expand All @@ -35,12 +37,16 @@ const defaultWaitForLogPollInterval = 3 * time.Second
// yet and never re-check it.
const rescanOverlapBlocks = 5

var errLogTriggerEventRateLimitExceeded = errors.New("simulation log trigger event rate limit exceeded")

// WaitForLogConfig describes a workflow's EVM log trigger subscription so the
// simulator can wait for the next matching on-chain event.
type WaitForLogConfig struct {
Selector uint64
Filter *evmpb.FilterLogTriggerRequest
WorkflowName string
// EventRateLimit enforces LogTrigger.EventRateLimit when set.
EventRateLimit *config.Rate
// PollInterval overrides the polling cadence for tests. Zero means default.
PollInterval time.Duration
// NowBlock overrides the initial "latest block" lookup for tests. When nil,
Expand All @@ -56,6 +62,8 @@ type EVMLogTriggerListener struct {
fromBlock *big.Int
seen map[string]struct{}
heartbeat int
limiter *rate.Limiter
limitText string
}

func NewEVMLogTriggerListener(ctx context.Context, ethClient *ethclient.Client, cfg WaitForLogConfig) (*EVMLogTriggerListener, error) {
Expand Down Expand Up @@ -88,13 +96,22 @@ func NewEVMLogTriggerListener(ctx context.Context, ethClient *ethclient.Client,
}
ui.Dim(fmt.Sprintf("Listening for logs starting at block %s...", fromBlock.String()))

var eventLimiter *rate.Limiter
limitText := ""
if cfg.EventRateLimit != nil && cfg.EventRateLimit.Limit > 0 && cfg.EventRateLimit.Burst > 0 {
eventLimiter = rate.NewLimiter(cfg.EventRateLimit.Limit, cfg.EventRateLimit.Burst)
limitText = cfg.EventRateLimit.String()
}

return &EVMLogTriggerListener{
ethClient: ethClient,
addresses: addresses,
topics: topics,
poll: poll,
fromBlock: fromBlock,
seen: make(map[string]struct{}),
limiter: eventLimiter,
limitText: limitText,
}, nil
}

Expand All @@ -108,6 +125,9 @@ func (l *EVMLogTriggerListener) Next(ctx context.Context) (interface{}, error) {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return nil, err
}
if errors.Is(err, errLogTriggerEventRateLimitExceeded) {
return nil, err
}
ui.Dim(fmt.Sprintf("RPC error while listening for logs: %v (retrying)", err))
} else if log != nil {
ui.Success(fmt.Sprintf("Matching EVM log event found at block %d (tx %s, index %d)",
Expand Down Expand Up @@ -148,6 +168,9 @@ func (l *EVMLogTriggerListener) scanOnce(ctx context.Context) (*types.Log, error
if _, ok := l.seen[key]; ok {
continue
}
if l.limiter != nil && !l.limiter.Allow() {
return nil, fmt.Errorf("%w: %s", errLogTriggerEventRateLimitExceeded, l.limitText)
}
l.seen[key] = struct{}{}
return &logs[i], nil
}
Expand Down
64 changes: 64 additions & 0 deletions cmd/workflow/simulate/chain/evm/trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package evm
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
"net/http"
Expand All @@ -16,9 +17,11 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
"google.golang.org/protobuf/types/known/anypb"

evmpb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/chain-capabilities/evm"
"github.com/smartcontractkit/chainlink-common/pkg/config"
valuespb "github.com/smartcontractkit/chainlink-protos/cre/go/values/pb"
)

Expand Down Expand Up @@ -628,6 +631,67 @@ func TestEVMLogTriggerListenerReturnsMultipleMatchingLogs(t *testing.T) {
assert.Equal(t, uint32(1), secondLog.Index)
}

func TestEVMLogTriggerListenerEnforcesEventRateLimit(t *testing.T) {
t.Parallel()

m := newMockRPC(t)
c := newEthClient(t, m.srv.URL)
defer c.Close()

addr := addrFromHex("0xabcd000000000000000000000000000000000006")
sig := hashFromHex("0x" + strings.Repeat("e", 64))

m.mu.Lock()
m.headNumber = 400
m.logs = []*types.Log{
{
Address: addr,
Topics: []common.Hash{sig},
Data: []byte{0x01},
BlockHash: hashFromHex("0xbe"),
TxHash: hashFromHex("0x" + strings.Repeat("3", 64)),
BlockNumber: 400,
TxIndex: 0,
Index: 0,
},
{
Address: addr,
Topics: []common.Hash{sig},
Data: []byte{0x02},
BlockHash: hashFromHex("0xbe"),
TxHash: hashFromHex("0x" + strings.Repeat("4", 64)),
BlockNumber: 400,
TxIndex: 1,
Index: 1,
},
}
m.mu.Unlock()

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

listener, err := NewEVMLogTriggerListener(ctx, c, WaitForLogConfig{
Selector: 16015286601757825753,
Filter: &evmpb.FilterLogTriggerRequest{
Addresses: [][]byte{addr.Bytes()},
Topics: []*evmpb.TopicValues{{Values: [][]byte{sig.Bytes()}}},
},
PollInterval: time.Second,
EventRateLimit: &config.Rate{
Limit: rate.Every(time.Hour),
Burst: 1,
},
})
require.NoError(t, err)

_, err = listener.Next(ctx)
require.NoError(t, err)

_, err = listener.Next(ctx)
require.Error(t, err)
assert.True(t, errors.Is(err, errLogTriggerEventRateLimitExceeded))
}

func TestManualEVMTriggerEventsHaveUniqueIDs(t *testing.T) {
t.Parallel()

Expand Down
1 change: 1 addition & 0 deletions cmd/workflow/simulate/chain/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type TriggerParams struct {
Clients map[uint64]ChainClient
Interactive bool
Listen bool
Limits Limits
ChainTypeInputs map[string]string
// TriggerPayload is the protobuf Any payload from the selected
// pb.TriggerSubscription. Chain types unmarshal it into their own
Expand Down
10 changes: 10 additions & 0 deletions cmd/workflow/simulate/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,16 @@ func (l *SimulationLimits) ChainWriteGasLimit() uint64 {
return l.Workflows.ChainWrite.EVM.GasLimit.Default.DefaultValue
}

// HTTPTriggerRateLimit returns the HTTP trigger event rate limit.
func (l *SimulationLimits) HTTPTriggerRateLimit() config.Rate {
return l.Workflows.HTTPTrigger.RateLimit.DefaultValue
}

// LogTriggerEventRateLimit returns the log trigger event rate limit.
func (l *SimulationLimits) LogTriggerEventRateLimit() config.Rate {
return l.Workflows.LogTrigger.EventRateLimit.DefaultValue
}

// WASMBinarySize returns the WASM binary size limit in bytes.
func (l *SimulationLimits) WASMBinarySize() int {
return int(l.Workflows.WASMBinarySizeLimit.DefaultValue)
Expand Down
4 changes: 4 additions & 0 deletions cmd/workflow/simulate/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ func TestDefaultLimitsAndExportDefaultLimitsJSON(t *testing.T) {
assert.Equal(t, uint64(10_000_000), limits.ChainWriteGasLimit())
assert.Equal(t, 100_000_000, limits.WASMBinarySize())
assert.Equal(t, 20_000_000, limits.WASMCompressedBinarySize())
assert.InDelta(t, 1.0/30.0, float64(limits.HTTPTriggerRateLimit().Limit), 0.000001)
assert.Equal(t, 1, limits.HTTPTriggerRateLimit().Burst)
assert.InDelta(t, 1.0/6.0, float64(limits.LogTriggerEventRateLimit().Limit), 0.000001)
assert.Equal(t, 10, limits.LogTriggerEventRateLimit().Burst)
assert.Equal(t, 50, limits.Workflows.ExecutionConcurrencyLimit.DefaultValue)
assert.InDelta(t, 1.0/30.0, float64(limits.Workflows.HTTPTrigger.RateLimit.DefaultValue.Limit), 0.000001)
assert.Equal(t, 1, limits.Workflows.HTTPTrigger.RateLimit.DefaultValue.Burst)
Expand Down
22 changes: 21 additions & 1 deletion cmd/workflow/simulate/manual_http_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import (
"sync/atomic"
"time"

"golang.org/x/time/rate"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
caperrors "github.com/smartcontractkit/chainlink-common/pkg/capabilities/errors"
httptypedapi "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/triggers/http"
httpserver "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/triggers/http/server"
"github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
Expand Down Expand Up @@ -38,18 +41,22 @@ type ManualHTTPTriggerService struct {
callbackCh map[string]chan capabilities.TriggerAndId[*httptypedapi.Payload]
workflowIDs map[string]string
inputs map[string]*httptypedapi.Config
limiters map[string]*rate.Limiter
eventSeq uint64
port int
rateLimit *config.Rate
}

func NewManualHTTPTriggerService(parentLggr logger.Logger, port int) *ManualHTTPTriggerService {
func NewManualHTTPTriggerService(parentLggr logger.Logger, port int, rateLimit *config.Rate) *ManualHTTPTriggerService {
return &ManualHTTPTriggerService{
CapabilityInfo: manualHTTPTriggerInfo,
lggr: logger.Named(parentLggr, "HTTPTriggerService"),
callbackCh: make(map[string]chan capabilities.TriggerAndId[*httptypedapi.Payload]),
workflowIDs: make(map[string]string),
inputs: make(map[string]*httptypedapi.Config),
limiters: make(map[string]*rate.Limiter),
port: port,
rateLimit: rateLimit,
}
}

Expand All @@ -60,10 +67,19 @@ func (f *ManualHTTPTriggerService) RegisterTrigger(ctx context.Context, triggerI
f.inputs[triggerID] = input
f.callbackCh[triggerID] = make(chan capabilities.TriggerAndId[*httptypedapi.Payload], 1)
f.workflowIDs[triggerID] = metadata.WorkflowID
if f.rateLimit != nil && f.rateLimit.Limit > 0 && f.rateLimit.Burst > 0 {
f.limiters[triggerID] = rate.NewLimiter(f.rateLimit.Limit, f.rateLimit.Burst)
}
return f.callbackCh[triggerID], nil
}

func (f *ManualHTTPTriggerService) UnregisterTrigger(ctx context.Context, triggerID string, metadata capabilities.RequestMetadata, input *httptypedapi.Config) caperrors.Error {
f.mu.Lock()
defer f.mu.Unlock()
delete(f.inputs, triggerID)
delete(f.callbackCh, triggerID)
delete(f.workflowIDs, triggerID)
delete(f.limiters, triggerID)
return nil
}

Expand All @@ -81,6 +97,7 @@ func (f *ManualHTTPTriggerService) ManualTrigger(ctx context.Context, triggerID
workflowID, workflowExists := f.workflowIDs[triggerID]
input := f.inputs[triggerID]
callbackCh := f.callbackCh[triggerID]
limiter := f.limiters[triggerID]
f.mu.RUnlock()

if !workflowExists {
Expand All @@ -94,6 +111,9 @@ func (f *ManualHTTPTriggerService) ManualTrigger(ctx context.Context, triggerID
if callbackCh == nil {
return fmt.Errorf("callback channel not found for triggerID")
}
if limiter != nil && !limiter.Allow() {
return fmt.Errorf("simulation limit exceeded: HTTP trigger rate limit %s", f.rateLimit.String())
}

if payload == nil {
var err error
Expand Down
Loading
Loading