diff --git a/core/chains/evm/config/mocks/chain_scoped_config.go b/core/chains/evm/config/mocks/chain_scoped_config.go
index b49ce82ea11..7bf324c3560 100644
--- a/core/chains/evm/config/mocks/chain_scoped_config.go
+++ b/core/chains/evm/config/mocks/chain_scoped_config.go
@@ -2880,6 +2880,20 @@ func (_m *ChainScopedConfig) KeeperRegistryCheckGasOverhead() uint32 {
return r0
}
+// KeeperRegistryMaxPerformDataSize provides a mock function with given fields:
+func (_m *ChainScopedConfig) KeeperRegistryMaxPerformDataSize() uint32 {
+ ret := _m.Called()
+
+ var r0 uint32
+ if rf, ok := ret.Get(0).(func() uint32); ok {
+ r0 = rf()
+ } else {
+ r0 = ret.Get(0).(uint32)
+ }
+
+ return r0
+}
+
// KeeperRegistryPerformGasOverhead provides a mock function with given fields:
func (_m *ChainScopedConfig) KeeperRegistryPerformGasOverhead() uint32 {
ret := _m.Called()
diff --git a/core/cmd/local_client_test.go b/core/cmd/local_client_test.go
index 69ee74a09b7..f180b1cdce6 100644
--- a/core/cmd/local_client_test.go
+++ b/core/cmd/local_client_test.go
@@ -150,6 +150,7 @@ KEEPER_BASE_FEE_BUFFER_PERCENT: 20
KEEPER_MAXIMUM_GRACE_PERIOD: 100
KEEPER_REGISTRY_CHECK_GAS_OVERHEAD: 200000
KEEPER_REGISTRY_PERFORM_GAS_OVERHEAD: 150000
+KEEPER_REGISTRY_MAX_PERFORM_DATA_SIZE: 5000
KEEPER_REGISTRY_SYNC_INTERVAL: 30m0s
KEEPER_REGISTRY_SYNC_UPKEEP_QUEUE_SIZE: 10
KEEPER_CHECK_UPKEEP_GAS_PRICE_FEATURE_ENABLED: false
diff --git a/core/config/envvar/envvar.go b/core/config/envvar/envvar.go
index 48e333557a8..6d5566903d3 100644
--- a/core/config/envvar/envvar.go
+++ b/core/config/envvar/envvar.go
@@ -28,6 +28,7 @@ var (
JobPipelineReaperThreshold = NewDuration("JobPipelineReaperThreshold")
KeeperRegistryCheckGasOverhead = NewUint32("KeeperRegistryCheckGasOverhead")
KeeperRegistryPerformGasOverhead = NewUint32("KeeperRegistryPerformGasOverhead")
+ KeeperRegistryMaxPerformDataSize = NewUint32("KeeperRegistryMaxPerformDataSize")
KeeperRegistrySyncInterval = NewDuration("KeeperRegistrySyncInterval")
KeeperRegistrySyncUpkeepQueueSize = NewUint32("KeeperRegistrySyncUpkeepQueueSize")
LogLevel = New[zapcore.Level]("LogLevel", parse.LogLevel)
diff --git a/core/config/envvar/schema.go b/core/config/envvar/schema.go
index c23e76fbe0b..4a567b259fd 100644
--- a/core/config/envvar/schema.go
+++ b/core/config/envvar/schema.go
@@ -281,6 +281,7 @@ type ConfigSchema struct {
KeeperMaximumGracePeriod int64 `env:"KEEPER_MAXIMUM_GRACE_PERIOD" default:"100"`
KeeperRegistryCheckGasOverhead uint64 `env:"KEEPER_REGISTRY_CHECK_GAS_OVERHEAD" default:"200000"`
KeeperRegistryPerformGasOverhead uint64 `env:"KEEPER_REGISTRY_PERFORM_GAS_OVERHEAD" default:"150000"`
+ KeeperRegistryMaxPerformDataSize uint64 `env:"KEEPER_REGISTRY_MAX_PERFORM_DATA_SIZE" default:"5000"`
KeeperRegistrySyncInterval time.Duration `env:"KEEPER_REGISTRY_SYNC_INTERVAL" default:"30m"`
KeeperRegistrySyncUpkeepQueueSize uint32 `env:"KEEPER_REGISTRY_SYNC_UPKEEP_QUEUE_SIZE" default:"10"`
KeeperTurnLookBack int64 `env:"KEEPER_TURN_LOOK_BACK" default:"1000"`
diff --git a/core/config/envvar/schema_test.go b/core/config/envvar/schema_test.go
index e9e525bad8f..1d64c63d0b7 100644
--- a/core/config/envvar/schema_test.go
+++ b/core/config/envvar/schema_test.go
@@ -126,6 +126,7 @@ func TestConfigSchema(t *testing.T) {
"KeeperMaximumGracePeriod": "KEEPER_MAXIMUM_GRACE_PERIOD",
"KeeperRegistryCheckGasOverhead": "KEEPER_REGISTRY_CHECK_GAS_OVERHEAD",
"KeeperRegistryPerformGasOverhead": "KEEPER_REGISTRY_PERFORM_GAS_OVERHEAD",
+ "KeeperRegistryMaxPerformDataSize": "KEEPER_REGISTRY_MAX_PERFORM_DATA_SIZE",
"KeeperRegistrySyncInterval": "KEEPER_REGISTRY_SYNC_INTERVAL",
"KeeperRegistrySyncUpkeepQueueSize": "KEEPER_REGISTRY_SYNC_UPKEEP_QUEUE_SIZE",
"KeeperTurnLookBack": "KEEPER_TURN_LOOK_BACK",
diff --git a/core/config/general_config.go b/core/config/general_config.go
index e3d9f24d4e0..e454e6a672d 100644
--- a/core/config/general_config.go
+++ b/core/config/general_config.go
@@ -131,6 +131,7 @@ type GeneralOnlyConfig interface {
KeeperMaximumGracePeriod() int64
KeeperRegistryCheckGasOverhead() uint32
KeeperRegistryPerformGasOverhead() uint32
+ KeeperRegistryMaxPerformDataSize() uint32
KeeperRegistrySyncInterval() time.Duration
KeeperRegistrySyncUpkeepQueueSize() uint32
KeeperTurnLookBack() int64
@@ -847,6 +848,12 @@ func (c *generalConfig) KeeperRegistryPerformGasOverhead() uint32 {
return getEnvWithFallback(c, envvar.KeeperRegistryPerformGasOverhead)
}
+// KeeperRegistryMaxPerformDataSize is the max perform data size we allow in our pipeline for an
+// upkeep to be performed with
+func (c *generalConfig) KeeperRegistryMaxPerformDataSize() uint32 {
+ return getEnvWithFallback(c, envvar.KeeperRegistryMaxPerformDataSize)
+}
+
// KeeperDefaultTransactionQueueDepth controls the queue size for DropOldestStrategy in Keeper
// Set to 0 to use SendEvery strategy instead
func (c *generalConfig) KeeperDefaultTransactionQueueDepth() uint32 {
diff --git a/core/config/mocks/general_config.go b/core/config/mocks/general_config.go
index e169ae0dae3..62bc069ffe4 100644
--- a/core/config/mocks/general_config.go
+++ b/core/config/mocks/general_config.go
@@ -2257,6 +2257,20 @@ func (_m *GeneralConfig) KeeperRegistrySyncUpkeepQueueSize() uint32 {
return r0
}
+// KeeperRegistryMaxPerformDataSize provides a mock function with given fields:
+func (_m *GeneralConfig) KeeperRegistryMaxPerformDataSize() uint32 {
+ ret := _m.Called()
+
+ var r0 uint32
+ if rf, ok := ret.Get(0).(func() uint32); ok {
+ r0 = rf()
+ } else {
+ r0 = ret.Get(0).(uint32)
+ }
+
+ return r0
+}
+
// KeeperTurnFlagEnabled provides a mock function with given fields:
func (_m *GeneralConfig) KeeperTurnFlagEnabled() bool {
ret := _m.Called()
diff --git a/core/config/presenters.go b/core/config/presenters.go
index 70ee6227397..909c50f855d 100644
--- a/core/config/presenters.go
+++ b/core/config/presenters.go
@@ -66,6 +66,7 @@ type EnvPrinter struct {
KeeperMaximumGracePeriod int64 `json:"KEEPER_MAXIMUM_GRACE_PERIOD"`
KeeperRegistryCheckGasOverhead uint32 `json:"KEEPER_REGISTRY_CHECK_GAS_OVERHEAD"`
KeeperRegistryPerformGasOverhead uint32 `json:"KEEPER_REGISTRY_PERFORM_GAS_OVERHEAD"`
+ KeeperRegistryMaxPerformDataSize uint32 `json:"KEEPER_REGISTRY_MAX_PERFORM_DATA_SIZE"`
KeeperRegistrySyncInterval time.Duration `json:"KEEPER_REGISTRY_SYNC_INTERVAL"`
KeeperRegistrySyncUpkeepQueueSize uint32 `json:"KEEPER_REGISTRY_SYNC_UPKEEP_QUEUE_SIZE"`
KeeperCheckUpkeepGasPriceFeatureEnabled bool `json:"KEEPER_CHECK_UPKEEP_GAS_PRICE_FEATURE_ENABLED"`
@@ -181,6 +182,7 @@ func NewConfigPrinter(cfg GeneralConfig) ConfigPrinter {
KeeperMaximumGracePeriod: cfg.KeeperMaximumGracePeriod(),
KeeperRegistryCheckGasOverhead: cfg.KeeperRegistryCheckGasOverhead(),
KeeperRegistryPerformGasOverhead: cfg.KeeperRegistryPerformGasOverhead(),
+ KeeperRegistryMaxPerformDataSize: cfg.KeeperRegistryMaxPerformDataSize(),
KeeperRegistrySyncInterval: cfg.KeeperRegistrySyncInterval(),
KeeperRegistrySyncUpkeepQueueSize: cfg.KeeperRegistrySyncUpkeepQueueSize(),
KeeperCheckUpkeepGasPriceFeatureEnabled: cfg.KeeperCheckUpkeepGasPriceFeatureEnabled(),
diff --git a/core/config/v2/types.go b/core/config/v2/types.go
index f14ba1b9d0c..3e40aa8f6ec 100644
--- a/core/config/v2/types.go
+++ b/core/config/v2/types.go
@@ -307,6 +307,7 @@ type Keeper struct {
MaximumGracePeriod *int64
RegistryCheckGasOverhead *uint32
RegistryPerformGasOverhead *uint32
+ RegistryMaxPerformDataSize *uint32
RegistrySyncInterval *models.Duration
RegistrySyncUpkeepQueueSize *uint32
TurnLookBack *int64
diff --git a/core/internal/testutils/configtest/general_config.go b/core/internal/testutils/configtest/general_config.go
index 0cc6072d9ca..eadec21f738 100644
--- a/core/internal/testutils/configtest/general_config.go
+++ b/core/internal/testutils/configtest/general_config.go
@@ -89,6 +89,7 @@ type GeneralConfigOverrides struct {
GlobalMinimumContractPayment *assets.Link
GlobalOCRObservationGracePeriod time.Duration
KeeperCheckUpkeepGasPriceFeatureEnabled null.Bool
+ KeeperRegistryMaxPerformDataSize null.Int
KeeperMaximumGracePeriod null.Int
KeeperRegistrySyncInterval *time.Duration
KeeperRegistrySyncUpkeepQueueSize null.Int
@@ -443,6 +444,13 @@ func (c *TestGeneralConfig) KeeperRegistrySyncUpkeepQueueSize() uint32 {
return c.GeneralConfig.KeeperRegistrySyncUpkeepQueueSize()
}
+func (c *TestGeneralConfig) KeeperRegistryMaxPerformDataSize() uint32 {
+ if c.Overrides.KeeperRegistryMaxPerformDataSize.Valid {
+ return uint32(c.Overrides.KeeperRegistryMaxPerformDataSize.Int64)
+ }
+ return c.GeneralConfig.KeeperRegistryMaxPerformDataSize()
+}
+
// KeeperCheckUpkeepGasPriceFeatureEnabled overrides
func (c *TestGeneralConfig) KeeperCheckUpkeepGasPriceFeatureEnabled() bool {
if c.Overrides.KeeperCheckUpkeepGasPriceFeatureEnabled.Valid {
diff --git a/core/services/chainlink/config_dump.go b/core/services/chainlink/config_dump.go
index 9ff42be7503..55075e18bd3 100644
--- a/core/services/chainlink/config_dump.go
+++ b/core/services/chainlink/config_dump.go
@@ -883,6 +883,7 @@ func (c *Config) loadLegacyCoreEnv() {
MaximumGracePeriod: envvar.NewInt64("KeeperMaximumGracePeriod").ParsePtr(),
RegistryCheckGasOverhead: envvar.NewUint32("KeeperRegistryCheckGasOverhead").ParsePtr(),
RegistryPerformGasOverhead: envvar.NewUint32("KeeperRegistryPerformGasOverhead").ParsePtr(),
+ RegistryMaxPerformDataSize: envvar.NewUint32("KeeperRegistryMaxPerformDataSize").ParsePtr(),
RegistrySyncInterval: envDuration("KeeperRegistrySyncInterval"),
RegistrySyncUpkeepQueueSize: envvar.KeeperRegistrySyncUpkeepQueueSize.ParsePtr(),
TurnLookBack: envvar.NewInt64("KeeperTurnLookBack").ParsePtr(),
diff --git a/core/services/chainlink/config_general.go b/core/services/chainlink/config_general.go
index 5464be13b45..6fe1abde884 100644
--- a/core/services/chainlink/config_general.go
+++ b/core/services/chainlink/config_general.go
@@ -357,6 +357,10 @@ func (g *generalConfig) KeeperRegistryPerformGasOverhead() uint32 {
return *g.c.Keeper.RegistryPerformGasOverhead
}
+func (g *generalConfig) KeeperRegistryMaxPerformDataSize() uint32 {
+ return *g.c.Keeper.RegistryMaxPerformDataSize
+}
+
func (g *generalConfig) KeeperRegistrySyncInterval() time.Duration {
return g.c.Keeper.RegistrySyncInterval.Duration()
}
diff --git a/core/services/chainlink/config_test.go b/core/services/chainlink/config_test.go
index c762adeddc1..5a9a501f88b 100644
--- a/core/services/chainlink/config_test.go
+++ b/core/services/chainlink/config_test.go
@@ -351,6 +351,7 @@ func TestConfig_Marshal(t *testing.T) {
RegistryPerformGasOverhead: ptr[uint32](math.MaxUint32),
RegistrySyncInterval: models.MustNewDuration(time.Hour),
RegistrySyncUpkeepQueueSize: ptr[uint32](31),
+ RegistryMaxPerformDataSize: ptr[uint32](5000),
TurnLookBack: ptr[int64](91),
TurnFlagEnabled: ptr(true),
UpkeepCheckGasPriceEnabled: ptr(true),
@@ -710,6 +711,7 @@ BaseFeeBufferPercent = 89
MaximumGracePeriod = 31
RegistryCheckGasOverhead = 90
RegistryPerformGasOverhead = 4294967295
+RegistryMaxPerformDataSize = 5000
RegistrySyncInterval = '1h0m0s'
RegistrySyncUpkeepQueueSize = 31
TurnLookBack = 91
diff --git a/core/services/chainlink/testdata/config-full.toml b/core/services/chainlink/testdata/config-full.toml
index 01a568a8c2e..b2ef3582fe4 100644
--- a/core/services/chainlink/testdata/config-full.toml
+++ b/core/services/chainlink/testdata/config-full.toml
@@ -144,6 +144,7 @@ BaseFeeBufferPercent = 89
MaximumGracePeriod = 31
RegistryCheckGasOverhead = 90
RegistryPerformGasOverhead = 4294967295
+RegistryMaxPerformDataSize = 5000
RegistrySyncInterval = '1h0m0s'
RegistrySyncUpkeepQueueSize = 31
TurnLookBack = 91
diff --git a/core/services/chainlink/testdata/dump/empty-strings.env b/core/services/chainlink/testdata/dump/empty-strings.env
index d9bafa5ebb5..bf7c29e144d 100644
--- a/core/services/chainlink/testdata/dump/empty-strings.env
+++ b/core/services/chainlink/testdata/dump/empty-strings.env
@@ -215,6 +215,7 @@ KEEPER_BASE_FEE_BUFFER_PERCENT=
KEEPER_MAXIMUM_GRACE_PERIOD=
KEEPER_REGISTRY_CHECK_GAS_OVERHEAD=
KEEPER_REGISTRY_PERFORM_GAS_OVERHEAD=
+KEEPER_REGISTRY_MAX_PERFORM_DATA_SIZE=
KEEPER_REGISTRY_SYNC_INTERVAL=
KEEPER_REGISTRY_SYNC_UPKEEP_QUEUE_SIZE=
KEEPER_TURN_LOOK_BACK=
diff --git a/core/services/chainlink/testdata/dump/full-custom.env b/core/services/chainlink/testdata/dump/full-custom.env
index d4ead63c3ef..5b7413b9943 100644
--- a/core/services/chainlink/testdata/dump/full-custom.env
+++ b/core/services/chainlink/testdata/dump/full-custom.env
@@ -211,6 +211,7 @@ KEEPER_BASE_FEE_BUFFER_PERCENT=7
KEEPER_MAXIMUM_GRACE_PERIOD=42
KEEPER_REGISTRY_CHECK_GAS_OVERHEAD=6
KEEPER_REGISTRY_PERFORM_GAS_OVERHEAD=900
+KEEPER_REGISTRY_MAX_PERFORM_DATA_SIZE=5000
KEEPER_REGISTRY_SYNC_INTERVAL=1m
KEEPER_REGISTRY_SYNC_UPKEEP_QUEUE_SIZE=99
KEEPER_TURN_LOOK_BACK=67
diff --git a/core/services/chainlink/testdata/dump/full-custom.toml b/core/services/chainlink/testdata/dump/full-custom.toml
index 37dd83f4027..1b2f396d2e3 100644
--- a/core/services/chainlink/testdata/dump/full-custom.toml
+++ b/core/services/chainlink/testdata/dump/full-custom.toml
@@ -140,6 +140,7 @@ BaseFeeBufferPercent = 7
MaximumGracePeriod = 42
RegistryCheckGasOverhead = 6
RegistryPerformGasOverhead = 900
+RegistryMaxPerformDataSize = 5000
RegistrySyncInterval = '1m0s'
RegistrySyncUpkeepQueueSize = 99
TurnLookBack = 67
diff --git a/core/services/chainlink/testdata/dump/none-valid.env b/core/services/chainlink/testdata/dump/none-valid.env
index 810776c3b36..a488ebf5cc5 100644
--- a/core/services/chainlink/testdata/dump/none-valid.env
+++ b/core/services/chainlink/testdata/dump/none-valid.env
@@ -111,6 +111,7 @@ KEEPER_BASE_FEE_BUFFER_PERCENT=invalid-test-value-KEEPER_BASE_FEE_BUFFER_PERCENT
KEEPER_MAXIMUM_GRACE_PERIOD=invalid-test-value-KEEPER_MAXIMUM_GRACE_PERIOD
KEEPER_REGISTRY_CHECK_GAS_OVERHEAD=invalid-test-value-KEEPER_REGISTRY_CHECK_GAS_OVERHEAD
KEEPER_REGISTRY_PERFORM_GAS_OVERHEAD=invalid-test-value-KEEPER_REGISTRY_PERFORM_GAS_OVERHEAD
+KEEPER_REGISTRY_MAX_PERFORM_DATA_SIZE=invalid-test-value-KEEPER_REGISTRY_MAX_PERFORM_DATA_SIZE
KEEPER_REGISTRY_SYNC_INTERVAL=invalid-test-value-KEEPER_REGISTRY_SYNC_INTERVAL
KEEPER_REGISTRY_SYNC_UPKEEP_QUEUE_SIZE=invalid-test-value-KEEPER_REGISTRY_SYNC_UPKEEP_QUEUE_SIZE
KEEPER_TURN_LOOK_BACK=invalid-test-value-KEEPER_TURN_LOOK_BACK
diff --git a/core/services/keeper/common.go b/core/services/keeper/common.go
index 4f37f23f1ea..abe63d98ee9 100644
--- a/core/services/keeper/common.go
+++ b/core/services/keeper/common.go
@@ -26,6 +26,7 @@ type Config interface {
KeeperMaximumGracePeriod() int64
KeeperRegistryCheckGasOverhead() uint32
KeeperRegistryPerformGasOverhead() uint32
+ KeeperRegistryMaxPerformDataSize() uint32
KeeperRegistrySyncInterval() time.Duration
KeeperRegistrySyncUpkeepQueueSize() uint32
KeeperCheckUpkeepGasPriceFeatureEnabled() bool
@@ -37,4 +38,5 @@ type Config interface {
type RegistryGasChecker interface {
KeeperRegistryCheckGasOverhead() uint32
KeeperRegistryPerformGasOverhead() uint32
+ KeeperRegistryMaxPerformDataSize() uint32
}
diff --git a/core/services/keeper/integration_test.go b/core/services/keeper/integration_test.go
index 100e3fa42e2..70b372b5905 100644
--- a/core/services/keeper/integration_test.go
+++ b/core/services/keeper/integration_test.go
@@ -320,3 +320,116 @@ func TestKeeperEthIntegration(t *testing.T) {
})
}
}
+
+func TestMaxPerformDataSize(t *testing.T) {
+ t.Run("max_perform_data_size_test", func(t *testing.T) {
+ maxPerformDataSize := 1000 // Will be set as config override
+ g := gomega.NewWithT(t)
+
+ // setup node key
+ nodeKey := cltest.MustGenerateRandomKey(t)
+ nodeAddress := nodeKey.Address
+ nodeAddressEIP55 := ethkey.EIP55AddressFromAddress(nodeAddress)
+
+ // setup blockchain
+ sergey := testutils.MustNewSimTransactor(t) // owns all the link
+ steve := testutils.MustNewSimTransactor(t) // registry owner
+ carrol := testutils.MustNewSimTransactor(t) // client
+ nelly := testutils.MustNewSimTransactor(t) // other keeper operator 1
+ nick := testutils.MustNewSimTransactor(t) // other keeper operator 2
+ genesisData := core.GenesisAlloc{
+ sergey.From: {Balance: assets.Ether(1000)},
+ steve.From: {Balance: assets.Ether(1000)},
+ carrol.From: {Balance: assets.Ether(1000)},
+ nelly.From: {Balance: assets.Ether(1000)},
+ nick.From: {Balance: assets.Ether(1000)},
+ nodeAddress: {Balance: assets.Ether(1000)},
+ }
+
+ gasLimit := uint32(ethconfig.Defaults.Miner.GasCeil * 2)
+ backend := cltest.NewSimulatedBackend(t, genesisData, gasLimit)
+
+ stopMining := cltest.Mine(backend, 1*time.Second) // >> 2 seconds and the test gets slow, << 1 second and the app may miss heads
+ defer stopMining()
+
+ linkAddr, _, linkToken, err := link_token_interface.DeployLinkToken(sergey, backend)
+ require.NoError(t, err)
+ gasFeedAddr, _, _, err := mock_v3_aggregator_contract.DeployMockV3AggregatorContract(steve, backend, 18, big.NewInt(60000000000))
+ require.NoError(t, err)
+ linkFeedAddr, _, _, err := mock_v3_aggregator_contract.DeployMockV3AggregatorContract(steve, backend, 18, big.NewInt(20000000000000000))
+ require.NoError(t, err)
+
+ regAddr, registryWrapper := deployKeeperRegistry(t, keeper.RegistryVersion_1_3, steve, backend, linkAddr, linkFeedAddr, gasFeedAddr)
+
+ upkeepAddr, _, upkeepContract, err := basic_upkeep_contract.DeployBasicUpkeepContract(carrol, backend)
+ require.NoError(t, err)
+ _, err = linkToken.Transfer(sergey, carrol.From, oneHunEth)
+ require.NoError(t, err)
+ _, err = linkToken.Approve(carrol, regAddr, oneHunEth)
+ require.NoError(t, err)
+ _, err = registryWrapper.SetKeepers(steve, []common.Address{nodeAddress, nelly.From}, []common.Address{nodeAddress, nelly.From})
+ require.NoError(t, err)
+ registrationTx, err := registryWrapper.RegisterUpkeep(steve, upkeepAddr, 2_500_000, carrol.From, []byte{})
+ require.NoError(t, err)
+ backend.Commit()
+ upkeepID := getUpkeepIdFromTx(t, registryWrapper, registrationTx, backend)
+
+ _, err = registryWrapper.AddFunds(carrol, upkeepID, tenEth)
+ require.NoError(t, err)
+ backend.Commit()
+
+ // setup app
+ config, db := heavyweight.FullTestDB(t, fmt.Sprintf("keeper_max_perform_data_test"))
+ korm := keeper.NewORM(db, logger.TestLogger(t), nil, nil)
+ d := 24 * time.Hour
+ // disable full sync ticker for test
+ config.Overrides.KeeperRegistrySyncInterval = &d
+ // backfill will trigger sync on startup
+ config.Overrides.BlockBackfillDepth = null.IntFrom(0)
+ // disable reorg protection for this test
+ config.Overrides.GlobalMinIncomingConfirmations = null.IntFrom(1)
+ // avoid waiting to re-submit for upkeeps
+ config.Overrides.KeeperMaximumGracePeriod = null.IntFrom(0)
+ // test with gas price feature enabled
+ config.Overrides.KeeperCheckUpkeepGasPriceFeatureEnabled = null.BoolFrom(true)
+ // testing doesn't need to do far look back
+ config.Overrides.KeeperTurnLookBack = null.IntFrom(0)
+ // testing new turn taking
+ config.Overrides.KeeperTurnFlagEnabled = null.BoolFrom(true)
+ // helps prevent missed heads
+ config.Overrides.GlobalEvmHeadTrackerMaxBufferSize = null.IntFrom(100)
+ // set the max perform data size
+ config.Overrides.KeeperRegistryMaxPerformDataSize = null.IntFrom(int64(maxPerformDataSize))
+
+ app := cltest.NewApplicationWithConfigAndKeyOnSimulatedBlockchain(t, config, backend, nodeKey)
+ require.NoError(t, app.Start(testutils.Context(t)))
+
+ // create job
+ regAddrEIP55 := ethkey.EIP55AddressFromAddress(regAddr)
+ job := cltest.MustInsertKeeperJob(t, db, korm, nodeAddressEIP55, regAddrEIP55)
+ err = app.JobSpawner().StartService(testutils.Context(t), job)
+ require.NoError(t, err)
+
+ // keeper job is triggered
+ receivedBytes := func() []byte {
+ received, err2 := upkeepContract.ReceivedBytes(nil)
+ require.NoError(t, err2)
+ return received
+ }
+
+ hugePayload := make([]byte, maxPerformDataSize)
+ _, err = upkeepContract.SetBytesToSend(carrol, hugePayload)
+ require.NoError(t, err)
+ _, err = upkeepContract.SetShouldPerformUpkeep(carrol, true)
+ require.NoError(t, err)
+
+ // Huge payload should not result in a perform
+ g.Consistently(receivedBytes, 20*time.Second, cltest.DBPollingInterval).Should(gomega.Equal([]byte{}))
+
+ // Set payload to be small and it should get received
+ smallPayload := make([]byte, maxPerformDataSize-1)
+ _, err = upkeepContract.SetBytesToSend(carrol, smallPayload)
+ require.NoError(t, err)
+ g.Eventually(receivedBytes, 20*time.Second, cltest.DBPollingInterval).Should(gomega.Equal(smallPayload))
+ })
+}
diff --git a/core/services/keeper/upkeep_executer.go b/core/services/keeper/upkeep_executer.go
index e2504e56a6b..20fe0aca1e9 100644
--- a/core/services/keeper/upkeep_executer.go
+++ b/core/services/keeper/upkeep_executer.go
@@ -322,6 +322,7 @@ func buildJobSpec(
"upkeepID": upkeep.UpkeepID.String(),
"prettyID": upkeep.PrettyID(),
"performUpkeepGasLimit": upkeep.ExecuteGas + ormConfig.KeeperRegistryPerformGasOverhead(),
+ "maxPerformDataSize": ormConfig.KeeperRegistryMaxPerformDataSize(),
"gasPrice": gasPrice,
"gasTipCap": gasTipCap,
"gasFeeCap": gasFeeCap,
diff --git a/core/services/keeper/upkeep_executer_test.go b/core/services/keeper/upkeep_executer_test.go
index b4151285694..ed2331b3fd1 100644
--- a/core/services/keeper/upkeep_executer_test.go
+++ b/core/services/keeper/upkeep_executer_test.go
@@ -119,6 +119,8 @@ func Test_UpkeepExecuter_ErrorsIfStartedTwice(t *testing.T) {
}
func Test_UpkeepExecuter_PerformsUpkeep_Happy(t *testing.T) {
+ taskRuns := 11
+
t.Parallel()
t.Run("runs upkeep on triggering block number", func(t *testing.T) {
@@ -155,7 +157,7 @@ func Test_UpkeepExecuter_PerformsUpkeep_Happy(t *testing.T) {
head := newHead()
executer.OnNewLongestChain(testutils.Context(t), &head)
ethTxCreated.AwaitOrFail(t)
- runs := cltest.WaitForPipelineComplete(t, 0, job.ID, 1, 8, jpv2.Jrm, time.Second, 100*time.Millisecond)
+ runs := cltest.WaitForPipelineComplete(t, 0, job.ID, 1, taskRuns, jpv2.Jrm, time.Second, 100*time.Millisecond)
require.Len(t, runs, 1)
assert.False(t, runs[0].HasErrors())
assert.False(t, runs[0].HasFatalErrors())
@@ -207,7 +209,7 @@ func Test_UpkeepExecuter_PerformsUpkeep_Happy(t *testing.T) {
executer.OnNewLongestChain(testutils.Context(t), &head)
ethTxCreated.AwaitOrFail(t)
- runs := cltest.WaitForPipelineComplete(t, 0, job.ID, 1, 8, jpv2.Jrm, time.Second, 100*time.Millisecond)
+ runs := cltest.WaitForPipelineComplete(t, 0, job.ID, 1, taskRuns, jpv2.Jrm, time.Second, 100*time.Millisecond)
require.Len(t, runs, 1)
assert.False(t, runs[0].HasErrors())
assert.False(t, runs[0].HasFatalErrors())
@@ -251,7 +253,7 @@ func Test_UpkeepExecuter_PerformsUpkeep_Happy(t *testing.T) {
head := newHead()
executer.OnNewLongestChain(testutils.Context(t), &head)
- runs := cltest.WaitForPipelineError(t, 0, job.ID, 1, 8, jpv2.Jrm, time.Second, 100*time.Millisecond)
+ runs := cltest.WaitForPipelineError(t, 0, job.ID, 1, taskRuns, jpv2.Jrm, time.Second, 100*time.Millisecond)
require.Len(t, runs, 1)
assert.True(t, runs[0].HasErrors())
assert.True(t, runs[0].HasFatalErrors())
@@ -302,7 +304,7 @@ func Test_UpkeepExecuter_PerformsUpkeep_Happy(t *testing.T) {
head := cltest.Head(36)
executer.OnNewLongestChain(testutils.Context(t), head)
- runs := cltest.WaitForPipelineComplete(t, 0, job.ID, 1, 8, jpv2.Jrm, time.Second, 100*time.Millisecond)
+ runs := cltest.WaitForPipelineComplete(t, 0, job.ID, 1, taskRuns, jpv2.Jrm, time.Second, 100*time.Millisecond)
require.Len(t, runs, 1)
assert.False(t, runs[0].HasErrors())
etxs[0].AwaitOrFail(t)
@@ -353,7 +355,7 @@ func Test_UpkeepExecuter_PerformsUpkeep_Happy(t *testing.T) {
executer.OnNewLongestChain(testutils.Context(t), &head)
ethTxCreated.AwaitOrFail(t)
- runs := cltest.WaitForPipelineComplete(t, 0, job.ID, 1, 8, jpv2.Jrm, time.Second, 100*time.Millisecond)
+ runs := cltest.WaitForPipelineComplete(t, 0, job.ID, 1, taskRuns, jpv2.Jrm, time.Second, 100*time.Millisecond)
require.Len(t, runs, 1)
assert.False(t, runs[0].HasErrors())
assert.False(t, runs[0].HasFatalErrors())
diff --git a/core/services/keeper/upkeep_executer_unit_test.go b/core/services/keeper/upkeep_executer_unit_test.go
index 1e2bef970c7..45f6b0797c1 100644
--- a/core/services/keeper/upkeep_executer_unit_test.go
+++ b/core/services/keeper/upkeep_executer_unit_test.go
@@ -44,6 +44,19 @@ func (_m *registryGasCheckMock) KeeperRegistryPerformGasOverhead() uint32 {
return r0
}
+func (_m *registryGasCheckMock) KeeperRegistryMaxPerformDataSize() uint32 {
+ ret := _m.Called()
+
+ var r0 uint32
+ if rf, ok := ret.Get(0).(func() uint32); ok {
+ r0 = rf()
+ } else {
+ r0 = ret.Get(0).(uint32)
+ }
+
+ return r0
+}
+
func TestBuildJobSpec(t *testing.T) {
jb := job.Job{ID: 10}
from := ethkey.EIP55Address(testutils.NewAddress().Hex())
@@ -67,6 +80,7 @@ func TestBuildJobSpec(t *testing.T) {
m.Mock.Test(t)
m.On("KeeperRegistryPerformGasOverhead").Return(uint32(9)).Times(1)
+ m.On("KeeperRegistryMaxPerformDataSize").Return(uint32(1000)).Times(1)
spec := buildJobSpec(jb, upkeep, m, gasPrice, gasTipCap, gasFeeCap, chainID)
@@ -78,6 +92,7 @@ func TestBuildJobSpec(t *testing.T) {
"upkeepID": "4",
"prettyID": fmt.Sprintf("UPx%064d", 4),
"performUpkeepGasLimit": uint32(21),
+ "maxPerformDataSize": uint32(1000),
"gasPrice": gasPrice,
"gasTipCap": gasTipCap,
"gasFeeCap": gasFeeCap,
diff --git a/core/services/pipeline/common.go b/core/services/pipeline/common.go
index 40e60e667ad..cd12a27308f 100644
--- a/core/services/pipeline/common.go
+++ b/core/services/pipeline/common.go
@@ -310,6 +310,8 @@ const (
TaskTypeETHABIDecode TaskType = "ethabidecode"
TaskTypeETHABIDecodeLog TaskType = "ethabidecodelog"
TaskTypeMerge TaskType = "merge"
+ TaskTypeLength TaskType = "length"
+ TaskTypeLessThan TaskType = "lessthan"
TaskTypeLowercase TaskType = "lowercase"
TaskTypeUppercase TaskType = "uppercase"
TaskTypeConditional TaskType = "conditional"
@@ -393,6 +395,10 @@ func UnmarshalTaskFromMap(taskType TaskType, taskMap interface{}, ID int, dotID
task = &FailTask{BaseTask: BaseTask{id: ID, dotID: dotID}}
case TaskTypeMerge:
task = &MergeTask{BaseTask: BaseTask{id: ID, dotID: dotID}}
+ case TaskTypeLength:
+ task = &LengthTask{BaseTask: BaseTask{id: ID, dotID: dotID}}
+ case TaskTypeLessThan:
+ task = &LessThanTask{BaseTask: BaseTask{id: ID, dotID: dotID}}
case TaskTypeLowercase:
task = &LowercaseTask{BaseTask: BaseTask{id: ID, dotID: dotID}}
case TaskTypeUppercase:
diff --git a/core/services/pipeline/orm.go b/core/services/pipeline/orm.go
index 27e0d424c7d..0e3235f5a90 100644
--- a/core/services/pipeline/orm.go
+++ b/core/services/pipeline/orm.go
@@ -17,45 +17,53 @@ import (
// KeepersObservationSource is the same for all keeper jobs and it is not persisted in DB
const KeepersObservationSource = `
- encode_check_upkeep_tx [type=ethabiencode
- abi="checkUpkeep(uint256 id, address from)"
- data="{\"id\":$(jobSpec.upkeepID),\"from\":$(jobSpec.fromAddress)}"]
- check_upkeep_tx [type=ethcall
- failEarly=true
- extractRevertReason=true
- evmChainID="$(jobSpec.evmChainID)"
- contract="$(jobSpec.contractAddress)"
- gasUnlimited=true
- gasPrice="$(jobSpec.gasPrice)"
- gasTipCap="$(jobSpec.gasTipCap)"
- gasFeeCap="$(jobSpec.gasFeeCap)"
- data="$(encode_check_upkeep_tx)"]
- decode_check_upkeep_tx [type=ethabidecode
- abi="bytes memory performData, uint256 maxLinkPayment, uint256 gasLimit, uint256 adjustedGasWei, uint256 linkEth"]
- encode_perform_upkeep_tx [type=ethabiencode
- abi="performUpkeep(uint256 id, bytes calldata performData)"
- data="{\"id\": $(jobSpec.upkeepID),\"performData\":$(decode_check_upkeep_tx.performData)}"]
+ encode_check_upkeep_tx [type=ethabiencode
+ abi="checkUpkeep(uint256 id, address from)"
+ data="{\"id\":$(jobSpec.upkeepID),\"from\":$(jobSpec.fromAddress)}"]
+ check_upkeep_tx [type=ethcall
+ failEarly=true
+ extractRevertReason=true
+ evmChainID="$(jobSpec.evmChainID)"
+ contract="$(jobSpec.contractAddress)"
+ gasUnlimited=true
+ gasPrice="$(jobSpec.gasPrice)"
+ gasTipCap="$(jobSpec.gasTipCap)"
+ gasFeeCap="$(jobSpec.gasFeeCap)"
+ data="$(encode_check_upkeep_tx)"]
+ decode_check_upkeep_tx [type=ethabidecode
+ abi="bytes memory performData, uint256 maxLinkPayment, uint256 gasLimit, uint256 adjustedGasWei, uint256 linkEth"]
+ calculate_perform_data_len [type=length
+ input="$(decode_check_upkeep_tx.performData)"]
+ perform_data_lessthan_limit [type=lessthan
+ input="$(calculate_perform_data_len)"
+ limit="$(jobSpec.maxPerformDataSize)"]
+ check_perform_data_limit [type=conditional
+ failEarly=true
+ data="$(perform_data_lessthan_limit)"]
+ encode_perform_upkeep_tx [type=ethabiencode
+ abi="performUpkeep(uint256 id, bytes calldata performData)"
+ data="{\"id\": $(jobSpec.upkeepID),\"performData\":$(decode_check_upkeep_tx.performData)}"]
simulate_perform_upkeep_tx [type=ethcall
- extractRevertReason=true
- evmChainID="$(jobSpec.evmChainID)"
- contract="$(jobSpec.contractAddress)"
- from="$(jobSpec.fromAddress)"
- gasUnlimited=true
- data="$(encode_perform_upkeep_tx)"]
- decode_check_perform_tx [type=ethabidecode
- abi="bool success"]
- check_success [type=conditional
- failEarly=true
- data="$(decode_check_perform_tx.success)"]
- perform_upkeep_tx [type=ethtx
- minConfirmations=0
- to="$(jobSpec.contractAddress)"
- from="[$(jobSpec.fromAddress)]"
- evmChainID="$(jobSpec.evmChainID)"
- data="$(encode_perform_upkeep_tx)"
- gasLimit="$(jobSpec.performUpkeepGasLimit)"
- txMeta="{\"jobID\":$(jobSpec.jobID),\"upkeepID\":$(jobSpec.prettyID)}"]
- encode_check_upkeep_tx -> check_upkeep_tx -> decode_check_upkeep_tx -> encode_perform_upkeep_tx -> simulate_perform_upkeep_tx -> decode_check_perform_tx -> check_success -> perform_upkeep_tx
+ extractRevertReason=true
+ evmChainID="$(jobSpec.evmChainID)"
+ contract="$(jobSpec.contractAddress)"
+ from="$(jobSpec.fromAddress)"
+ gasUnlimited=true
+ data="$(encode_perform_upkeep_tx)"]
+ decode_check_perform_tx [type=ethabidecode
+ abi="bool success"]
+ check_success [type=conditional
+ failEarly=true
+ data="$(decode_check_perform_tx.success)"]
+ perform_upkeep_tx [type=ethtx
+ minConfirmations=0
+ to="$(jobSpec.contractAddress)"
+ from="[$(jobSpec.fromAddress)]"
+ evmChainID="$(jobSpec.evmChainID)"
+ data="$(encode_perform_upkeep_tx)"
+ gasLimit="$(jobSpec.performUpkeepGasLimit)"
+ txMeta="{\"jobID\":$(jobSpec.jobID),\"upkeepID\":$(jobSpec.prettyID)}"]
+ encode_check_upkeep_tx -> check_upkeep_tx -> decode_check_upkeep_tx -> calculate_perform_data_len -> perform_data_lessthan_limit -> check_perform_data_limit -> encode_perform_upkeep_tx -> simulate_perform_upkeep_tx -> decode_check_perform_tx -> check_success -> perform_upkeep_tx
`
//go:generate mockery --name ORM --output ./mocks/ --case=underscore
diff --git a/core/services/pipeline/task.length.go b/core/services/pipeline/task.length.go
new file mode 100644
index 00000000000..00373ac7c95
--- /dev/null
+++ b/core/services/pipeline/task.length.go
@@ -0,0 +1,43 @@
+package pipeline
+
+import (
+ "context"
+
+ "github.com/pkg/errors"
+ "github.com/shopspring/decimal"
+ "go.uber.org/multierr"
+
+ "github.com/smartcontractkit/chainlink/core/logger"
+)
+
+// Return types:
+//
+// *decimal.Decimal
+type LengthTask struct {
+ BaseTask `mapstructure:",squash"`
+ Input string `json:"input"`
+}
+
+var _ Task = (*LengthTask)(nil)
+
+func (t *LengthTask) Type() TaskType {
+ return TaskTypeLength
+}
+
+func (t *LengthTask) Run(_ context.Context, _ logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo) {
+ _, err := CheckInputs(inputs, 0, 1, 0)
+ if err != nil {
+ return Result{Error: errors.Wrap(err, "task inputs")}, runInfo
+ }
+
+ var input BytesParam
+
+ err = multierr.Combine(
+ errors.Wrap(ResolveParam(&input, From(VarExpr(t.Input, vars), NonemptyString(t.Input), Input(inputs, 0))), "input"),
+ )
+ if err != nil {
+ return Result{Error: err}, runInfo
+ }
+
+ return Result{Value: decimal.NewFromInt(int64(len(input)))}, runInfo
+}
diff --git a/core/services/pipeline/task.length_test.go b/core/services/pipeline/task.length_test.go
new file mode 100644
index 00000000000..9434b7e4e14
--- /dev/null
+++ b/core/services/pipeline/task.length_test.go
@@ -0,0 +1,74 @@
+package pipeline_test
+
+import (
+ "testing"
+
+ "github.com/shopspring/decimal"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "github.com/smartcontractkit/chainlink/core/internal/testutils"
+ "github.com/smartcontractkit/chainlink/core/logger"
+ "github.com/smartcontractkit/chainlink/core/services/pipeline"
+)
+
+func TestLengthTask(t *testing.T) {
+ t.Parallel()
+
+ tests := []struct {
+ name string
+ input interface{}
+ want decimal.Decimal
+ }{
+ {"normal bytes", []byte{0xaa, 0xbb, 0xcc, 0xdd}, decimal.NewFromInt(4)},
+ {"empty bytes", []byte{}, decimal.NewFromInt(0)},
+ {"string as bytes", []byte("stevetoshi sergeymoto"), decimal.NewFromInt(21)},
+ {"string input gets converted to bytes", "stevetoshi sergeymoto", decimal.NewFromInt(21)},
+ {"empty string", "", decimal.NewFromInt(0)},
+ }
+
+ for _, test := range tests {
+ assertOK := func(result pipeline.Result, runInfo pipeline.RunInfo) {
+ assert.False(t, runInfo.IsPending)
+ assert.False(t, runInfo.IsRetryable)
+ require.NoError(t, result.Error)
+ require.Equal(t, test.want.String(), result.Value.(decimal.Decimal).String())
+ }
+ t.Run(test.name, func(t *testing.T) {
+ t.Run("without vars through job DAG", func(t *testing.T) {
+ vars := pipeline.NewVarsFrom(nil)
+ task := pipeline.LengthTask{BaseTask: pipeline.NewBaseTask(0, "task", nil, nil, 0)}
+ assertOK(task.Run(testutils.Context(t), logger.TestLogger(t), vars, []pipeline.Result{{Value: test.input}}))
+ })
+ t.Run("without vars through input param", func(t *testing.T) {
+ var inputStr string
+ if _, ok := test.input.([]byte); ok {
+ inputStr = string(test.input.([]byte))
+ } else {
+ inputStr = test.input.(string)
+ }
+ if inputStr == "" {
+ // empty input parameter is indistinguishable from not providing it at all
+ // in that case the task will use an input defined by the job DAG
+ return
+ }
+ vars := pipeline.NewVarsFrom(nil)
+ task := pipeline.LengthTask{
+ BaseTask: pipeline.NewBaseTask(0, "task", nil, nil, 0),
+ Input: inputStr,
+ }
+ assertOK(task.Run(testutils.Context(t), logger.TestLogger(t), vars, []pipeline.Result{}))
+ })
+ t.Run("with vars", func(t *testing.T) {
+ vars := pipeline.NewVarsFrom(map[string]interface{}{
+ "foo": map[string]interface{}{"bar": test.input},
+ })
+ task := pipeline.LengthTask{
+ BaseTask: pipeline.NewBaseTask(0, "task", nil, nil, 0),
+ Input: "$(foo.bar)",
+ }
+ assertOK(task.Run(testutils.Context(t), logger.TestLogger(t), vars, []pipeline.Result{}))
+ })
+ })
+ }
+}
diff --git a/core/services/pipeline/task.lessthan.go b/core/services/pipeline/task.lessthan.go
new file mode 100644
index 00000000000..3350edf6ac7
--- /dev/null
+++ b/core/services/pipeline/task.lessthan.go
@@ -0,0 +1,50 @@
+package pipeline
+
+import (
+ "context"
+
+ "github.com/pkg/errors"
+ "go.uber.org/multierr"
+
+ "github.com/smartcontractkit/chainlink/core/logger"
+)
+
+// Return types:
+//
+// bool
+type LessThanTask struct {
+ BaseTask `mapstructure:",squash"`
+ Input string `json:"input"`
+ Limit string `json:"limit"`
+}
+
+var (
+ _ Task = (*LessThanTask)(nil)
+)
+
+func (t *LessThanTask) Type() TaskType {
+ return TaskTypeLessThan
+}
+
+func (t *LessThanTask) Run(_ context.Context, _ logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo) {
+ _, err := CheckInputs(inputs, 0, 1, 0)
+ if err != nil {
+ return Result{Error: errors.Wrap(err, "task inputs")}, runInfo
+ }
+
+ var (
+ a DecimalParam
+ b DecimalParam
+ )
+
+ err = multierr.Combine(
+ errors.Wrap(ResolveParam(&a, From(VarExpr(t.Input, vars), NonemptyString(t.Input), Input(inputs, 0))), "input"),
+ errors.Wrap(ResolveParam(&b, From(VarExpr(t.Limit, vars), NonemptyString(t.Limit))), "limit"),
+ )
+ if err != nil {
+ return Result{Error: err}, runInfo
+ }
+
+ value := a.Decimal().LessThan(b.Decimal())
+ return Result{Value: value}, runInfo
+}
diff --git a/core/services/pipeline/task.lessthan_test.go b/core/services/pipeline/task.lessthan_test.go
new file mode 100644
index 00000000000..7f55ea9afd2
--- /dev/null
+++ b/core/services/pipeline/task.lessthan_test.go
@@ -0,0 +1,142 @@
+package pipeline_test
+
+import (
+ "fmt"
+ "testing"
+
+ "github.com/pkg/errors"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "github.com/smartcontractkit/chainlink/core/internal/testutils"
+ "github.com/smartcontractkit/chainlink/core/logger"
+ "github.com/smartcontractkit/chainlink/core/services/pipeline"
+)
+
+func TestLessThanTask_Happy(t *testing.T) {
+ t.Parallel()
+
+ tests := []struct {
+ name string
+ input interface{}
+ limit string
+ want bool
+ }{
+ {"string, lt 100", "1.23", "100", true},
+ {"string, lt negative", "1.23", "-5", false},
+ {"string, lt zero", "1.23", "0", false},
+ {"string, lt large value", "1.23", "1000000000000000000", true},
+ {"large string, lt large value", "10000000000000000001", "1000000000000000000", false},
+
+ {"int, true", int(2), "100", true},
+ {"int, false", int(2), "-5", false},
+
+ {"int8, true", int8(2), "100", true},
+ {"int8, false", int8(2), "-5", false},
+
+ {"int16, true", int16(2), "100", true},
+ {"int16, false", int16(2), "-5", false},
+
+ {"int32,true", int32(2), "100", true},
+ {"int32, false", int32(2), "-5", false},
+
+ {"int64, true", int64(2), "100", true},
+ {"int64, false", int64(2), "-5", false},
+
+ {"uint, true", uint(2), "100", true},
+ {"uint, false", uint(2), "-5", false},
+
+ {"uint8, true", uint8(2), "100", true},
+ {"uint8, false", uint8(2), "-5", false},
+
+ {"uint16, true", uint16(2), "100", true},
+ {"uint16, false", uint16(2), "-5", false},
+
+ {"uint32, true", uint32(2), "100", true},
+ {"uint32, false", uint32(2), "-5", false},
+
+ {"uint64, true", uint64(2), "100", true},
+ {"uint64, false", uint64(2), "-5", false},
+
+ {"float32, true", float32(1.23), "10", true},
+ {"float32, false", float32(1.23), "-5", false},
+
+ {"float64, true", float64(1.23), "10", true},
+ {"float64, false", float64(1.23), "-5", false},
+ }
+
+ for _, test := range tests {
+ assertOK := func(result pipeline.Result, runInfo pipeline.RunInfo) {
+ assert.False(t, runInfo.IsPending)
+ assert.False(t, runInfo.IsRetryable)
+ require.NoError(t, result.Error)
+ require.Equal(t, test.want, result.Value.(bool))
+ }
+ t.Run(test.name, func(t *testing.T) {
+ t.Run("without vars through job DAG", func(t *testing.T) {
+ vars := pipeline.NewVarsFrom(nil)
+ task := pipeline.LessThanTask{BaseTask: pipeline.NewBaseTask(0, "task", nil, nil, 0), Limit: test.limit}
+ assertOK(task.Run(testutils.Context(t), logger.TestLogger(t), vars, []pipeline.Result{{Value: test.input}}))
+ })
+ t.Run("without vars through input param", func(t *testing.T) {
+ vars := pipeline.NewVarsFrom(nil)
+ task := pipeline.LessThanTask{
+ BaseTask: pipeline.NewBaseTask(0, "task", nil, nil, 0),
+ Input: fmt.Sprintf("%v", test.input),
+ Limit: test.limit,
+ }
+ assertOK(task.Run(testutils.Context(t), logger.TestLogger(t), vars, []pipeline.Result{}))
+ })
+ t.Run("with vars", func(t *testing.T) {
+ vars := pipeline.NewVarsFrom(map[string]interface{}{
+ "foo": map[string]interface{}{"bar": test.input},
+ "chain": map[string]interface{}{"link": test.limit},
+ })
+ task := pipeline.LessThanTask{
+ BaseTask: pipeline.NewBaseTask(0, "task", nil, nil, 0),
+ Input: "$(foo.bar)",
+ Limit: "$(chain.link)",
+ }
+ assertOK(task.Run(testutils.Context(t), logger.TestLogger(t), vars, []pipeline.Result{}))
+ })
+ })
+ }
+}
+
+func TestLessThanTask_Unhappy(t *testing.T) {
+ t.Parallel()
+
+ tests := []struct {
+ name string
+ limit string
+ input string
+ inputs []pipeline.Result
+ vars pipeline.Vars
+ wantErrorCause error
+ wantErrorContains string
+ }{
+ {"map as input from inputs", "100", "", []pipeline.Result{{Value: map[string]interface{}{"chain": "link"}}}, pipeline.NewVarsFrom(nil), pipeline.ErrBadInput, "input"},
+ {"map as input from var", "100", "$(foo)", nil, pipeline.NewVarsFrom(map[string]interface{}{"foo": map[string]interface{}{"chain": "link"}}), pipeline.ErrBadInput, "input"},
+ {"slice as input from inputs", "100", "", []pipeline.Result{{Value: []interface{}{"chain", "link"}}}, pipeline.NewVarsFrom(nil), pipeline.ErrBadInput, "input"},
+ {"slice as input from var", "100", "$(foo)", nil, pipeline.NewVarsFrom(map[string]interface{}{"foo": []interface{}{"chain", "link"}}), pipeline.ErrBadInput, "input"},
+ {"input as missing var", "100", "$(foo)", nil, pipeline.NewVarsFrom(nil), pipeline.ErrKeypathNotFound, "input"},
+ {"limit as missing var", "$(foo)", "", []pipeline.Result{{Value: "123"}}, pipeline.NewVarsFrom(nil), pipeline.ErrKeypathNotFound, "limit"},
+ }
+
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ task := pipeline.LessThanTask{
+ BaseTask: pipeline.NewBaseTask(0, "task", nil, nil, 0),
+ Input: test.input,
+ Limit: test.limit,
+ }
+ result, runInfo := task.Run(testutils.Context(t), logger.TestLogger(t), test.vars, test.inputs)
+ assert.False(t, runInfo.IsPending)
+ assert.False(t, runInfo.IsRetryable)
+ require.Equal(t, test.wantErrorCause, errors.Cause(result.Error))
+ if test.wantErrorContains != "" {
+ require.Contains(t, result.Error.Error(), test.wantErrorContains)
+ }
+ })
+ }
+}
diff --git a/core/web/resolver/config_test.go b/core/web/resolver/config_test.go
index 93ec2db120a..42278da2dee 100644
--- a/core/web/resolver/config_test.go
+++ b/core/web/resolver/config_test.go
@@ -260,6 +260,10 @@ func TestResolver_Config(t *testing.T) {
{
"key": "KEEPER_REGISTRY_PERFORM_GAS_OVERHEAD",
"value": "150000"
+ },
+ {
+ "key": "KEEPER_REGISTRY_MAX_PERFORM_DATA_SIZE",
+ "value": "5000"
},
{
"key":"KEEPER_REGISTRY_SYNC_INTERVAL",
diff --git a/docs/CONFIG.md b/docs/CONFIG.md
index d77193f7b8e..b297c91f3ae 100644
--- a/docs/CONFIG.md
+++ b/docs/CONFIG.md
@@ -996,6 +996,7 @@ MaximumGracePeriod = 100 # Default
RegistryCheckGasOverhead = 200_000 # Default
RegistryPerformGasOverhead = 150_000 # Default
RegistrySyncInterval = '30m' # Default
+RegistryMaxPerformDataSize = 5_000 # Default
RegistrySyncUpkeepQueueSize = 10 # Default
TurnLookBack = 1000 # Default
TurnFlagEnabled = false # Default
@@ -1056,6 +1057,13 @@ RegistrySyncInterval = '30m' # Default
```
RegistrySyncInterval is the interval in which the RegistrySynchronizer performs a full sync of the keeper registry contract it is tracking.
+### RegistryMaxPerformDataSize
+:warning: **_ADVANCED_**: _Do not change this setting unless you know what you are doing._
+```toml
+RegistryMaxPerformDataSize = 5_000 # Default
+```
+RegistryMaxPerformDataSize is the max size of perform data.
+
### RegistrySyncUpkeepQueueSize
:warning: **_ADVANCED_**: _Do not change this setting unless you know what you are doing._
```toml
diff --git a/internal/config/docs.toml b/internal/config/docs.toml
index 71e9608150a..870e9497855 100644
--- a/internal/config/docs.toml
+++ b/internal/config/docs.toml
@@ -391,6 +391,9 @@ RegistryPerformGasOverhead = 150_000 # Default
# RegistrySyncInterval is the interval in which the RegistrySynchronizer performs a full sync of the keeper registry contract it is tracking.
RegistrySyncInterval = '30m' # Default
# **ADVANCED**
+# RegistryMaxPerformDataSize is the max size of perform data.
+RegistryMaxPerformDataSize = 5_000 # Default
+# **ADVANCED**
# RegistrySyncUpkeepQueueSize represents the maximum number of upkeeps that can be synced in parallel.
RegistrySyncUpkeepQueueSize = 10 # Default
# TurnLookBack is the number of blocks in the past to look back when getting a block for a turn.