diff --git a/core/chains/evm/config/mocks/chain_scoped_config.go b/core/chains/evm/config/mocks/chain_scoped_config.go index b49ce82ea11..7bf324c3560 100644 --- a/core/chains/evm/config/mocks/chain_scoped_config.go +++ b/core/chains/evm/config/mocks/chain_scoped_config.go @@ -2880,6 +2880,20 @@ func (_m *ChainScopedConfig) KeeperRegistryCheckGasOverhead() uint32 { return r0 } +// KeeperRegistryMaxPerformDataSize provides a mock function with given fields: +func (_m *ChainScopedConfig) KeeperRegistryMaxPerformDataSize() uint32 { + ret := _m.Called() + + var r0 uint32 + if rf, ok := ret.Get(0).(func() uint32); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint32) + } + + return r0 +} + // KeeperRegistryPerformGasOverhead provides a mock function with given fields: func (_m *ChainScopedConfig) KeeperRegistryPerformGasOverhead() uint32 { ret := _m.Called() diff --git a/core/cmd/local_client_test.go b/core/cmd/local_client_test.go index 69ee74a09b7..f180b1cdce6 100644 --- a/core/cmd/local_client_test.go +++ b/core/cmd/local_client_test.go @@ -150,6 +150,7 @@ KEEPER_BASE_FEE_BUFFER_PERCENT: 20 KEEPER_MAXIMUM_GRACE_PERIOD: 100 KEEPER_REGISTRY_CHECK_GAS_OVERHEAD: 200000 KEEPER_REGISTRY_PERFORM_GAS_OVERHEAD: 150000 +KEEPER_REGISTRY_MAX_PERFORM_DATA_SIZE: 5000 KEEPER_REGISTRY_SYNC_INTERVAL: 30m0s KEEPER_REGISTRY_SYNC_UPKEEP_QUEUE_SIZE: 10 KEEPER_CHECK_UPKEEP_GAS_PRICE_FEATURE_ENABLED: false diff --git a/core/config/envvar/envvar.go b/core/config/envvar/envvar.go index 48e333557a8..6d5566903d3 100644 --- a/core/config/envvar/envvar.go +++ b/core/config/envvar/envvar.go @@ -28,6 +28,7 @@ var ( JobPipelineReaperThreshold = NewDuration("JobPipelineReaperThreshold") KeeperRegistryCheckGasOverhead = NewUint32("KeeperRegistryCheckGasOverhead") KeeperRegistryPerformGasOverhead = NewUint32("KeeperRegistryPerformGasOverhead") + KeeperRegistryMaxPerformDataSize = NewUint32("KeeperRegistryMaxPerformDataSize") KeeperRegistrySyncInterval = NewDuration("KeeperRegistrySyncInterval") KeeperRegistrySyncUpkeepQueueSize = NewUint32("KeeperRegistrySyncUpkeepQueueSize") LogLevel = New[zapcore.Level]("LogLevel", parse.LogLevel) diff --git a/core/config/envvar/schema.go b/core/config/envvar/schema.go index c23e76fbe0b..4a567b259fd 100644 --- a/core/config/envvar/schema.go +++ b/core/config/envvar/schema.go @@ -281,6 +281,7 @@ type ConfigSchema struct { KeeperMaximumGracePeriod int64 `env:"KEEPER_MAXIMUM_GRACE_PERIOD" default:"100"` KeeperRegistryCheckGasOverhead uint64 `env:"KEEPER_REGISTRY_CHECK_GAS_OVERHEAD" default:"200000"` KeeperRegistryPerformGasOverhead uint64 `env:"KEEPER_REGISTRY_PERFORM_GAS_OVERHEAD" default:"150000"` + KeeperRegistryMaxPerformDataSize uint64 `env:"KEEPER_REGISTRY_MAX_PERFORM_DATA_SIZE" default:"5000"` KeeperRegistrySyncInterval time.Duration `env:"KEEPER_REGISTRY_SYNC_INTERVAL" default:"30m"` KeeperRegistrySyncUpkeepQueueSize uint32 `env:"KEEPER_REGISTRY_SYNC_UPKEEP_QUEUE_SIZE" default:"10"` KeeperTurnLookBack int64 `env:"KEEPER_TURN_LOOK_BACK" default:"1000"` diff --git a/core/config/envvar/schema_test.go b/core/config/envvar/schema_test.go index e9e525bad8f..1d64c63d0b7 100644 --- a/core/config/envvar/schema_test.go +++ b/core/config/envvar/schema_test.go @@ -126,6 +126,7 @@ func TestConfigSchema(t *testing.T) { "KeeperMaximumGracePeriod": "KEEPER_MAXIMUM_GRACE_PERIOD", "KeeperRegistryCheckGasOverhead": "KEEPER_REGISTRY_CHECK_GAS_OVERHEAD", "KeeperRegistryPerformGasOverhead": "KEEPER_REGISTRY_PERFORM_GAS_OVERHEAD", + "KeeperRegistryMaxPerformDataSize": "KEEPER_REGISTRY_MAX_PERFORM_DATA_SIZE", "KeeperRegistrySyncInterval": "KEEPER_REGISTRY_SYNC_INTERVAL", "KeeperRegistrySyncUpkeepQueueSize": "KEEPER_REGISTRY_SYNC_UPKEEP_QUEUE_SIZE", "KeeperTurnLookBack": "KEEPER_TURN_LOOK_BACK", diff --git a/core/config/general_config.go b/core/config/general_config.go index e3d9f24d4e0..e454e6a672d 100644 --- a/core/config/general_config.go +++ b/core/config/general_config.go @@ -131,6 +131,7 @@ type GeneralOnlyConfig interface { KeeperMaximumGracePeriod() int64 KeeperRegistryCheckGasOverhead() uint32 KeeperRegistryPerformGasOverhead() uint32 + KeeperRegistryMaxPerformDataSize() uint32 KeeperRegistrySyncInterval() time.Duration KeeperRegistrySyncUpkeepQueueSize() uint32 KeeperTurnLookBack() int64 @@ -847,6 +848,12 @@ func (c *generalConfig) KeeperRegistryPerformGasOverhead() uint32 { return getEnvWithFallback(c, envvar.KeeperRegistryPerformGasOverhead) } +// KeeperRegistryMaxPerformDataSize is the max perform data size we allow in our pipeline for an +// upkeep to be performed with +func (c *generalConfig) KeeperRegistryMaxPerformDataSize() uint32 { + return getEnvWithFallback(c, envvar.KeeperRegistryMaxPerformDataSize) +} + // KeeperDefaultTransactionQueueDepth controls the queue size for DropOldestStrategy in Keeper // Set to 0 to use SendEvery strategy instead func (c *generalConfig) KeeperDefaultTransactionQueueDepth() uint32 { diff --git a/core/config/mocks/general_config.go b/core/config/mocks/general_config.go index e169ae0dae3..62bc069ffe4 100644 --- a/core/config/mocks/general_config.go +++ b/core/config/mocks/general_config.go @@ -2257,6 +2257,20 @@ func (_m *GeneralConfig) KeeperRegistrySyncUpkeepQueueSize() uint32 { return r0 } +// KeeperRegistryMaxPerformDataSize provides a mock function with given fields: +func (_m *GeneralConfig) KeeperRegistryMaxPerformDataSize() uint32 { + ret := _m.Called() + + var r0 uint32 + if rf, ok := ret.Get(0).(func() uint32); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint32) + } + + return r0 +} + // KeeperTurnFlagEnabled provides a mock function with given fields: func (_m *GeneralConfig) KeeperTurnFlagEnabled() bool { ret := _m.Called() diff --git a/core/config/presenters.go b/core/config/presenters.go index 70ee6227397..909c50f855d 100644 --- a/core/config/presenters.go +++ b/core/config/presenters.go @@ -66,6 +66,7 @@ type EnvPrinter struct { KeeperMaximumGracePeriod int64 `json:"KEEPER_MAXIMUM_GRACE_PERIOD"` KeeperRegistryCheckGasOverhead uint32 `json:"KEEPER_REGISTRY_CHECK_GAS_OVERHEAD"` KeeperRegistryPerformGasOverhead uint32 `json:"KEEPER_REGISTRY_PERFORM_GAS_OVERHEAD"` + KeeperRegistryMaxPerformDataSize uint32 `json:"KEEPER_REGISTRY_MAX_PERFORM_DATA_SIZE"` KeeperRegistrySyncInterval time.Duration `json:"KEEPER_REGISTRY_SYNC_INTERVAL"` KeeperRegistrySyncUpkeepQueueSize uint32 `json:"KEEPER_REGISTRY_SYNC_UPKEEP_QUEUE_SIZE"` KeeperCheckUpkeepGasPriceFeatureEnabled bool `json:"KEEPER_CHECK_UPKEEP_GAS_PRICE_FEATURE_ENABLED"` @@ -181,6 +182,7 @@ func NewConfigPrinter(cfg GeneralConfig) ConfigPrinter { KeeperMaximumGracePeriod: cfg.KeeperMaximumGracePeriod(), KeeperRegistryCheckGasOverhead: cfg.KeeperRegistryCheckGasOverhead(), KeeperRegistryPerformGasOverhead: cfg.KeeperRegistryPerformGasOverhead(), + KeeperRegistryMaxPerformDataSize: cfg.KeeperRegistryMaxPerformDataSize(), KeeperRegistrySyncInterval: cfg.KeeperRegistrySyncInterval(), KeeperRegistrySyncUpkeepQueueSize: cfg.KeeperRegistrySyncUpkeepQueueSize(), KeeperCheckUpkeepGasPriceFeatureEnabled: cfg.KeeperCheckUpkeepGasPriceFeatureEnabled(), diff --git a/core/config/v2/types.go b/core/config/v2/types.go index f14ba1b9d0c..3e40aa8f6ec 100644 --- a/core/config/v2/types.go +++ b/core/config/v2/types.go @@ -307,6 +307,7 @@ type Keeper struct { MaximumGracePeriod *int64 RegistryCheckGasOverhead *uint32 RegistryPerformGasOverhead *uint32 + RegistryMaxPerformDataSize *uint32 RegistrySyncInterval *models.Duration RegistrySyncUpkeepQueueSize *uint32 TurnLookBack *int64 diff --git a/core/internal/testutils/configtest/general_config.go b/core/internal/testutils/configtest/general_config.go index 0cc6072d9ca..eadec21f738 100644 --- a/core/internal/testutils/configtest/general_config.go +++ b/core/internal/testutils/configtest/general_config.go @@ -89,6 +89,7 @@ type GeneralConfigOverrides struct { GlobalMinimumContractPayment *assets.Link GlobalOCRObservationGracePeriod time.Duration KeeperCheckUpkeepGasPriceFeatureEnabled null.Bool + KeeperRegistryMaxPerformDataSize null.Int KeeperMaximumGracePeriod null.Int KeeperRegistrySyncInterval *time.Duration KeeperRegistrySyncUpkeepQueueSize null.Int @@ -443,6 +444,13 @@ func (c *TestGeneralConfig) KeeperRegistrySyncUpkeepQueueSize() uint32 { return c.GeneralConfig.KeeperRegistrySyncUpkeepQueueSize() } +func (c *TestGeneralConfig) KeeperRegistryMaxPerformDataSize() uint32 { + if c.Overrides.KeeperRegistryMaxPerformDataSize.Valid { + return uint32(c.Overrides.KeeperRegistryMaxPerformDataSize.Int64) + } + return c.GeneralConfig.KeeperRegistryMaxPerformDataSize() +} + // KeeperCheckUpkeepGasPriceFeatureEnabled overrides func (c *TestGeneralConfig) KeeperCheckUpkeepGasPriceFeatureEnabled() bool { if c.Overrides.KeeperCheckUpkeepGasPriceFeatureEnabled.Valid { diff --git a/core/services/chainlink/config_dump.go b/core/services/chainlink/config_dump.go index 9ff42be7503..55075e18bd3 100644 --- a/core/services/chainlink/config_dump.go +++ b/core/services/chainlink/config_dump.go @@ -883,6 +883,7 @@ func (c *Config) loadLegacyCoreEnv() { MaximumGracePeriod: envvar.NewInt64("KeeperMaximumGracePeriod").ParsePtr(), RegistryCheckGasOverhead: envvar.NewUint32("KeeperRegistryCheckGasOverhead").ParsePtr(), RegistryPerformGasOverhead: envvar.NewUint32("KeeperRegistryPerformGasOverhead").ParsePtr(), + RegistryMaxPerformDataSize: envvar.NewUint32("KeeperRegistryMaxPerformDataSize").ParsePtr(), RegistrySyncInterval: envDuration("KeeperRegistrySyncInterval"), RegistrySyncUpkeepQueueSize: envvar.KeeperRegistrySyncUpkeepQueueSize.ParsePtr(), TurnLookBack: envvar.NewInt64("KeeperTurnLookBack").ParsePtr(), diff --git a/core/services/chainlink/config_general.go b/core/services/chainlink/config_general.go index 5464be13b45..6fe1abde884 100644 --- a/core/services/chainlink/config_general.go +++ b/core/services/chainlink/config_general.go @@ -357,6 +357,10 @@ func (g *generalConfig) KeeperRegistryPerformGasOverhead() uint32 { return *g.c.Keeper.RegistryPerformGasOverhead } +func (g *generalConfig) KeeperRegistryMaxPerformDataSize() uint32 { + return *g.c.Keeper.RegistryMaxPerformDataSize +} + func (g *generalConfig) KeeperRegistrySyncInterval() time.Duration { return g.c.Keeper.RegistrySyncInterval.Duration() } diff --git a/core/services/chainlink/config_test.go b/core/services/chainlink/config_test.go index c762adeddc1..5a9a501f88b 100644 --- a/core/services/chainlink/config_test.go +++ b/core/services/chainlink/config_test.go @@ -351,6 +351,7 @@ func TestConfig_Marshal(t *testing.T) { RegistryPerformGasOverhead: ptr[uint32](math.MaxUint32), RegistrySyncInterval: models.MustNewDuration(time.Hour), RegistrySyncUpkeepQueueSize: ptr[uint32](31), + RegistryMaxPerformDataSize: ptr[uint32](5000), TurnLookBack: ptr[int64](91), TurnFlagEnabled: ptr(true), UpkeepCheckGasPriceEnabled: ptr(true), @@ -710,6 +711,7 @@ BaseFeeBufferPercent = 89 MaximumGracePeriod = 31 RegistryCheckGasOverhead = 90 RegistryPerformGasOverhead = 4294967295 +RegistryMaxPerformDataSize = 5000 RegistrySyncInterval = '1h0m0s' RegistrySyncUpkeepQueueSize = 31 TurnLookBack = 91 diff --git a/core/services/chainlink/testdata/config-full.toml b/core/services/chainlink/testdata/config-full.toml index 01a568a8c2e..b2ef3582fe4 100644 --- a/core/services/chainlink/testdata/config-full.toml +++ b/core/services/chainlink/testdata/config-full.toml @@ -144,6 +144,7 @@ BaseFeeBufferPercent = 89 MaximumGracePeriod = 31 RegistryCheckGasOverhead = 90 RegistryPerformGasOverhead = 4294967295 +RegistryMaxPerformDataSize = 5000 RegistrySyncInterval = '1h0m0s' RegistrySyncUpkeepQueueSize = 31 TurnLookBack = 91 diff --git a/core/services/chainlink/testdata/dump/empty-strings.env b/core/services/chainlink/testdata/dump/empty-strings.env index d9bafa5ebb5..bf7c29e144d 100644 --- a/core/services/chainlink/testdata/dump/empty-strings.env +++ b/core/services/chainlink/testdata/dump/empty-strings.env @@ -215,6 +215,7 @@ KEEPER_BASE_FEE_BUFFER_PERCENT= KEEPER_MAXIMUM_GRACE_PERIOD= KEEPER_REGISTRY_CHECK_GAS_OVERHEAD= KEEPER_REGISTRY_PERFORM_GAS_OVERHEAD= +KEEPER_REGISTRY_MAX_PERFORM_DATA_SIZE= KEEPER_REGISTRY_SYNC_INTERVAL= KEEPER_REGISTRY_SYNC_UPKEEP_QUEUE_SIZE= KEEPER_TURN_LOOK_BACK= diff --git a/core/services/chainlink/testdata/dump/full-custom.env b/core/services/chainlink/testdata/dump/full-custom.env index d4ead63c3ef..5b7413b9943 100644 --- a/core/services/chainlink/testdata/dump/full-custom.env +++ b/core/services/chainlink/testdata/dump/full-custom.env @@ -211,6 +211,7 @@ KEEPER_BASE_FEE_BUFFER_PERCENT=7 KEEPER_MAXIMUM_GRACE_PERIOD=42 KEEPER_REGISTRY_CHECK_GAS_OVERHEAD=6 KEEPER_REGISTRY_PERFORM_GAS_OVERHEAD=900 +KEEPER_REGISTRY_MAX_PERFORM_DATA_SIZE=5000 KEEPER_REGISTRY_SYNC_INTERVAL=1m KEEPER_REGISTRY_SYNC_UPKEEP_QUEUE_SIZE=99 KEEPER_TURN_LOOK_BACK=67 diff --git a/core/services/chainlink/testdata/dump/full-custom.toml b/core/services/chainlink/testdata/dump/full-custom.toml index 37dd83f4027..1b2f396d2e3 100644 --- a/core/services/chainlink/testdata/dump/full-custom.toml +++ b/core/services/chainlink/testdata/dump/full-custom.toml @@ -140,6 +140,7 @@ BaseFeeBufferPercent = 7 MaximumGracePeriod = 42 RegistryCheckGasOverhead = 6 RegistryPerformGasOverhead = 900 +RegistryMaxPerformDataSize = 5000 RegistrySyncInterval = '1m0s' RegistrySyncUpkeepQueueSize = 99 TurnLookBack = 67 diff --git a/core/services/chainlink/testdata/dump/none-valid.env b/core/services/chainlink/testdata/dump/none-valid.env index 810776c3b36..a488ebf5cc5 100644 --- a/core/services/chainlink/testdata/dump/none-valid.env +++ b/core/services/chainlink/testdata/dump/none-valid.env @@ -111,6 +111,7 @@ KEEPER_BASE_FEE_BUFFER_PERCENT=invalid-test-value-KEEPER_BASE_FEE_BUFFER_PERCENT KEEPER_MAXIMUM_GRACE_PERIOD=invalid-test-value-KEEPER_MAXIMUM_GRACE_PERIOD KEEPER_REGISTRY_CHECK_GAS_OVERHEAD=invalid-test-value-KEEPER_REGISTRY_CHECK_GAS_OVERHEAD KEEPER_REGISTRY_PERFORM_GAS_OVERHEAD=invalid-test-value-KEEPER_REGISTRY_PERFORM_GAS_OVERHEAD +KEEPER_REGISTRY_MAX_PERFORM_DATA_SIZE=invalid-test-value-KEEPER_REGISTRY_MAX_PERFORM_DATA_SIZE KEEPER_REGISTRY_SYNC_INTERVAL=invalid-test-value-KEEPER_REGISTRY_SYNC_INTERVAL KEEPER_REGISTRY_SYNC_UPKEEP_QUEUE_SIZE=invalid-test-value-KEEPER_REGISTRY_SYNC_UPKEEP_QUEUE_SIZE KEEPER_TURN_LOOK_BACK=invalid-test-value-KEEPER_TURN_LOOK_BACK diff --git a/core/services/keeper/common.go b/core/services/keeper/common.go index 4f37f23f1ea..abe63d98ee9 100644 --- a/core/services/keeper/common.go +++ b/core/services/keeper/common.go @@ -26,6 +26,7 @@ type Config interface { KeeperMaximumGracePeriod() int64 KeeperRegistryCheckGasOverhead() uint32 KeeperRegistryPerformGasOverhead() uint32 + KeeperRegistryMaxPerformDataSize() uint32 KeeperRegistrySyncInterval() time.Duration KeeperRegistrySyncUpkeepQueueSize() uint32 KeeperCheckUpkeepGasPriceFeatureEnabled() bool @@ -37,4 +38,5 @@ type Config interface { type RegistryGasChecker interface { KeeperRegistryCheckGasOverhead() uint32 KeeperRegistryPerformGasOverhead() uint32 + KeeperRegistryMaxPerformDataSize() uint32 } diff --git a/core/services/keeper/integration_test.go b/core/services/keeper/integration_test.go index 100e3fa42e2..70b372b5905 100644 --- a/core/services/keeper/integration_test.go +++ b/core/services/keeper/integration_test.go @@ -320,3 +320,116 @@ func TestKeeperEthIntegration(t *testing.T) { }) } } + +func TestMaxPerformDataSize(t *testing.T) { + t.Run("max_perform_data_size_test", func(t *testing.T) { + maxPerformDataSize := 1000 // Will be set as config override + g := gomega.NewWithT(t) + + // setup node key + nodeKey := cltest.MustGenerateRandomKey(t) + nodeAddress := nodeKey.Address + nodeAddressEIP55 := ethkey.EIP55AddressFromAddress(nodeAddress) + + // setup blockchain + sergey := testutils.MustNewSimTransactor(t) // owns all the link + steve := testutils.MustNewSimTransactor(t) // registry owner + carrol := testutils.MustNewSimTransactor(t) // client + nelly := testutils.MustNewSimTransactor(t) // other keeper operator 1 + nick := testutils.MustNewSimTransactor(t) // other keeper operator 2 + genesisData := core.GenesisAlloc{ + sergey.From: {Balance: assets.Ether(1000)}, + steve.From: {Balance: assets.Ether(1000)}, + carrol.From: {Balance: assets.Ether(1000)}, + nelly.From: {Balance: assets.Ether(1000)}, + nick.From: {Balance: assets.Ether(1000)}, + nodeAddress: {Balance: assets.Ether(1000)}, + } + + gasLimit := uint32(ethconfig.Defaults.Miner.GasCeil * 2) + backend := cltest.NewSimulatedBackend(t, genesisData, gasLimit) + + stopMining := cltest.Mine(backend, 1*time.Second) // >> 2 seconds and the test gets slow, << 1 second and the app may miss heads + defer stopMining() + + linkAddr, _, linkToken, err := link_token_interface.DeployLinkToken(sergey, backend) + require.NoError(t, err) + gasFeedAddr, _, _, err := mock_v3_aggregator_contract.DeployMockV3AggregatorContract(steve, backend, 18, big.NewInt(60000000000)) + require.NoError(t, err) + linkFeedAddr, _, _, err := mock_v3_aggregator_contract.DeployMockV3AggregatorContract(steve, backend, 18, big.NewInt(20000000000000000)) + require.NoError(t, err) + + regAddr, registryWrapper := deployKeeperRegistry(t, keeper.RegistryVersion_1_3, steve, backend, linkAddr, linkFeedAddr, gasFeedAddr) + + upkeepAddr, _, upkeepContract, err := basic_upkeep_contract.DeployBasicUpkeepContract(carrol, backend) + require.NoError(t, err) + _, err = linkToken.Transfer(sergey, carrol.From, oneHunEth) + require.NoError(t, err) + _, err = linkToken.Approve(carrol, regAddr, oneHunEth) + require.NoError(t, err) + _, err = registryWrapper.SetKeepers(steve, []common.Address{nodeAddress, nelly.From}, []common.Address{nodeAddress, nelly.From}) + require.NoError(t, err) + registrationTx, err := registryWrapper.RegisterUpkeep(steve, upkeepAddr, 2_500_000, carrol.From, []byte{}) + require.NoError(t, err) + backend.Commit() + upkeepID := getUpkeepIdFromTx(t, registryWrapper, registrationTx, backend) + + _, err = registryWrapper.AddFunds(carrol, upkeepID, tenEth) + require.NoError(t, err) + backend.Commit() + + // setup app + config, db := heavyweight.FullTestDB(t, fmt.Sprintf("keeper_max_perform_data_test")) + korm := keeper.NewORM(db, logger.TestLogger(t), nil, nil) + d := 24 * time.Hour + // disable full sync ticker for test + config.Overrides.KeeperRegistrySyncInterval = &d + // backfill will trigger sync on startup + config.Overrides.BlockBackfillDepth = null.IntFrom(0) + // disable reorg protection for this test + config.Overrides.GlobalMinIncomingConfirmations = null.IntFrom(1) + // avoid waiting to re-submit for upkeeps + config.Overrides.KeeperMaximumGracePeriod = null.IntFrom(0) + // test with gas price feature enabled + config.Overrides.KeeperCheckUpkeepGasPriceFeatureEnabled = null.BoolFrom(true) + // testing doesn't need to do far look back + config.Overrides.KeeperTurnLookBack = null.IntFrom(0) + // testing new turn taking + config.Overrides.KeeperTurnFlagEnabled = null.BoolFrom(true) + // helps prevent missed heads + config.Overrides.GlobalEvmHeadTrackerMaxBufferSize = null.IntFrom(100) + // set the max perform data size + config.Overrides.KeeperRegistryMaxPerformDataSize = null.IntFrom(int64(maxPerformDataSize)) + + app := cltest.NewApplicationWithConfigAndKeyOnSimulatedBlockchain(t, config, backend, nodeKey) + require.NoError(t, app.Start(testutils.Context(t))) + + // create job + regAddrEIP55 := ethkey.EIP55AddressFromAddress(regAddr) + job := cltest.MustInsertKeeperJob(t, db, korm, nodeAddressEIP55, regAddrEIP55) + err = app.JobSpawner().StartService(testutils.Context(t), job) + require.NoError(t, err) + + // keeper job is triggered + receivedBytes := func() []byte { + received, err2 := upkeepContract.ReceivedBytes(nil) + require.NoError(t, err2) + return received + } + + hugePayload := make([]byte, maxPerformDataSize) + _, err = upkeepContract.SetBytesToSend(carrol, hugePayload) + require.NoError(t, err) + _, err = upkeepContract.SetShouldPerformUpkeep(carrol, true) + require.NoError(t, err) + + // Huge payload should not result in a perform + g.Consistently(receivedBytes, 20*time.Second, cltest.DBPollingInterval).Should(gomega.Equal([]byte{})) + + // Set payload to be small and it should get received + smallPayload := make([]byte, maxPerformDataSize-1) + _, err = upkeepContract.SetBytesToSend(carrol, smallPayload) + require.NoError(t, err) + g.Eventually(receivedBytes, 20*time.Second, cltest.DBPollingInterval).Should(gomega.Equal(smallPayload)) + }) +} diff --git a/core/services/keeper/upkeep_executer.go b/core/services/keeper/upkeep_executer.go index 959e875ab2b..20fe0aca1e9 100644 --- a/core/services/keeper/upkeep_executer.go +++ b/core/services/keeper/upkeep_executer.go @@ -235,7 +235,7 @@ func (ex *UpkeepExecuter) execute(upkeep UpkeepRegistration, head *evmtypes.Head } } - vars := pipeline.NewVarsFrom(buildJobSpec(ex.job, upkeep, ex.orm.config, ex.config, gasPrice, gasTipCap, gasFeeCap, evmChainID)) + vars := pipeline.NewVarsFrom(buildJobSpec(ex.job, upkeep, ex.orm.config, gasPrice, gasTipCap, gasFeeCap, evmChainID)) // DotDagSource in database is empty because all the Keeper pipeline runs make use of the same observation source ex.job.PipelineSpec.DotDagSource = pipeline.KeepersObservationSource @@ -309,7 +309,6 @@ func buildJobSpec( jb job.Job, upkeep UpkeepRegistration, ormConfig RegistryGasChecker, - exConfig RegistryGasChecker, gasPrice *big.Int, gasTipCap *big.Int, gasFeeCap *big.Int, @@ -323,12 +322,11 @@ func buildJobSpec( "upkeepID": upkeep.UpkeepID.String(), "prettyID": upkeep.PrettyID(), "performUpkeepGasLimit": upkeep.ExecuteGas + ormConfig.KeeperRegistryPerformGasOverhead(), - "checkUpkeepGasLimit": exConfig.KeeperRegistryCheckGasOverhead() + upkeep.Registry.CheckGas + - exConfig.KeeperRegistryPerformGasOverhead() + upkeep.ExecuteGas, - "gasPrice": gasPrice, - "gasTipCap": gasTipCap, - "gasFeeCap": gasFeeCap, - "evmChainID": chainID, + "maxPerformDataSize": ormConfig.KeeperRegistryMaxPerformDataSize(), + "gasPrice": gasPrice, + "gasTipCap": gasTipCap, + "gasFeeCap": gasFeeCap, + "evmChainID": chainID, }, } } diff --git a/core/services/keeper/upkeep_executer_test.go b/core/services/keeper/upkeep_executer_test.go index 0d0f33e208a..ed2331b3fd1 100644 --- a/core/services/keeper/upkeep_executer_test.go +++ b/core/services/keeper/upkeep_executer_test.go @@ -119,6 +119,8 @@ func Test_UpkeepExecuter_ErrorsIfStartedTwice(t *testing.T) { } func Test_UpkeepExecuter_PerformsUpkeep_Happy(t *testing.T) { + taskRuns := 11 + t.Parallel() t.Run("runs upkeep on triggering block number", func(t *testing.T) { @@ -142,7 +144,7 @@ func Test_UpkeepExecuter_PerformsUpkeep_Happy(t *testing.T) { "checkUpkeep", func(callArgs ethereum.CallMsg) bool { return bigmath.Equal(callArgs.GasPrice, gasPrice) && - callArgs.Gas == 650_000 + callArgs.Gas == 0 }, checkUpkeepResponse, ) @@ -155,7 +157,7 @@ func Test_UpkeepExecuter_PerformsUpkeep_Happy(t *testing.T) { head := newHead() executer.OnNewLongestChain(testutils.Context(t), &head) ethTxCreated.AwaitOrFail(t) - runs := cltest.WaitForPipelineComplete(t, 0, job.ID, 1, 8, jpv2.Jrm, time.Second, 100*time.Millisecond) + runs := cltest.WaitForPipelineComplete(t, 0, job.ID, 1, taskRuns, jpv2.Jrm, time.Second, 100*time.Millisecond) require.Len(t, runs, 1) assert.False(t, runs[0].HasErrors()) assert.False(t, runs[0].HasFatalErrors()) @@ -192,7 +194,7 @@ func Test_UpkeepExecuter_PerformsUpkeep_Happy(t *testing.T) { ) return bigmath.Equal(callArgs.GasPrice, expectedGasPrice) && - 650_000 == callArgs.Gas + callArgs.Gas == 0 }, checkUpkeepResponse, ) @@ -207,7 +209,7 @@ func Test_UpkeepExecuter_PerformsUpkeep_Happy(t *testing.T) { executer.OnNewLongestChain(testutils.Context(t), &head) ethTxCreated.AwaitOrFail(t) - runs := cltest.WaitForPipelineComplete(t, 0, job.ID, 1, 8, jpv2.Jrm, time.Second, 100*time.Millisecond) + runs := cltest.WaitForPipelineComplete(t, 0, job.ID, 1, taskRuns, jpv2.Jrm, time.Second, 100*time.Millisecond) require.Len(t, runs, 1) assert.False(t, runs[0].HasErrors()) assert.False(t, runs[0].HasFatalErrors()) @@ -239,7 +241,7 @@ func Test_UpkeepExecuter_PerformsUpkeep_Happy(t *testing.T) { "checkUpkeep", func(callArgs ethereum.CallMsg) bool { return bigmath.Equal(callArgs.GasPrice, gasPrice) && - callArgs.Gas == 650_000 + callArgs.Gas == 0 }, checkUpkeepResponse, ) @@ -251,7 +253,7 @@ func Test_UpkeepExecuter_PerformsUpkeep_Happy(t *testing.T) { head := newHead() executer.OnNewLongestChain(testutils.Context(t), &head) - runs := cltest.WaitForPipelineError(t, 0, job.ID, 1, 8, jpv2.Jrm, time.Second, 100*time.Millisecond) + runs := cltest.WaitForPipelineError(t, 0, job.ID, 1, taskRuns, jpv2.Jrm, time.Second, 100*time.Millisecond) require.Len(t, runs, 1) assert.True(t, runs[0].HasErrors()) assert.True(t, runs[0].HasFatalErrors()) @@ -302,7 +304,7 @@ func Test_UpkeepExecuter_PerformsUpkeep_Happy(t *testing.T) { head := cltest.Head(36) executer.OnNewLongestChain(testutils.Context(t), head) - runs := cltest.WaitForPipelineComplete(t, 0, job.ID, 1, 8, jpv2.Jrm, time.Second, 100*time.Millisecond) + runs := cltest.WaitForPipelineComplete(t, 0, job.ID, 1, taskRuns, jpv2.Jrm, time.Second, 100*time.Millisecond) require.Len(t, runs, 1) assert.False(t, runs[0].HasErrors()) etxs[0].AwaitOrFail(t) @@ -338,7 +340,7 @@ func Test_UpkeepExecuter_PerformsUpkeep_Happy(t *testing.T) { ) return bigmath.Equal(callArgs.GasPrice, expectedGasPrice) && - 650_000 == callArgs.Gas + callArgs.Gas == 0 }, checkUpkeepResponse, ) @@ -353,7 +355,7 @@ func Test_UpkeepExecuter_PerformsUpkeep_Happy(t *testing.T) { executer.OnNewLongestChain(testutils.Context(t), &head) ethTxCreated.AwaitOrFail(t) - runs := cltest.WaitForPipelineComplete(t, 0, job.ID, 1, 8, jpv2.Jrm, time.Second, 100*time.Millisecond) + runs := cltest.WaitForPipelineComplete(t, 0, job.ID, 1, taskRuns, jpv2.Jrm, time.Second, 100*time.Millisecond) require.Len(t, runs, 1) assert.False(t, runs[0].HasErrors()) assert.False(t, runs[0].HasFatalErrors()) diff --git a/core/services/keeper/upkeep_executer_unit_test.go b/core/services/keeper/upkeep_executer_unit_test.go index 8ebc71e36ca..45f6b0797c1 100644 --- a/core/services/keeper/upkeep_executer_unit_test.go +++ b/core/services/keeper/upkeep_executer_unit_test.go @@ -44,6 +44,19 @@ func (_m *registryGasCheckMock) KeeperRegistryPerformGasOverhead() uint32 { return r0 } +func (_m *registryGasCheckMock) KeeperRegistryMaxPerformDataSize() uint32 { + ret := _m.Called() + + var r0 uint32 + if rf, ok := ret.Get(0).(func() uint32); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint32) + } + + return r0 +} + func TestBuildJobSpec(t *testing.T) { jb := job.Job{ID: 10} from := ethkey.EIP55Address(testutils.NewAddress().Hex()) @@ -66,10 +79,10 @@ func TestBuildJobSpec(t *testing.T) { m := ®istryGasCheckMock{} m.Mock.Test(t) - m.On("KeeperRegistryPerformGasOverhead").Return(uint32(9)).Times(2) - m.On("KeeperRegistryCheckGasOverhead").Return(uint32(6)).Times(1) + m.On("KeeperRegistryPerformGasOverhead").Return(uint32(9)).Times(1) + m.On("KeeperRegistryMaxPerformDataSize").Return(uint32(1000)).Times(1) - spec := buildJobSpec(jb, upkeep, m, m, gasPrice, gasTipCap, gasFeeCap, chainID) + spec := buildJobSpec(jb, upkeep, m, gasPrice, gasTipCap, gasFeeCap, chainID) expected := map[string]interface{}{ "jobSpec": map[string]interface{}{ @@ -79,7 +92,7 @@ func TestBuildJobSpec(t *testing.T) { "upkeepID": "4", "prettyID": fmt.Sprintf("UPx%064d", 4), "performUpkeepGasLimit": uint32(21), - "checkUpkeepGasLimit": uint32(38), + "maxPerformDataSize": uint32(1000), "gasPrice": gasPrice, "gasTipCap": gasTipCap, "gasFeeCap": gasFeeCap, diff --git a/core/services/pipeline/common.go b/core/services/pipeline/common.go index d44cc4d3d36..18d82d406b1 100644 --- a/core/services/pipeline/common.go +++ b/core/services/pipeline/common.go @@ -357,6 +357,8 @@ const ( TaskTypeETHABIDecode TaskType = "ethabidecode" TaskTypeETHABIDecodeLog TaskType = "ethabidecodelog" TaskTypeMerge TaskType = "merge" + TaskTypeLength TaskType = "length" + TaskTypeLessThan TaskType = "lessthan" TaskTypeLowercase TaskType = "lowercase" TaskTypeUppercase TaskType = "uppercase" TaskTypeConditional TaskType = "conditional" @@ -440,6 +442,10 @@ func UnmarshalTaskFromMap(taskType TaskType, taskMap interface{}, ID int, dotID task = &FailTask{BaseTask: BaseTask{id: ID, dotID: dotID}} case TaskTypeMerge: task = &MergeTask{BaseTask: BaseTask{id: ID, dotID: dotID}} + case TaskTypeLength: + task = &LengthTask{BaseTask: BaseTask{id: ID, dotID: dotID}} + case TaskTypeLessThan: + task = &LessThanTask{BaseTask: BaseTask{id: ID, dotID: dotID}} case TaskTypeLowercase: task = &LowercaseTask{BaseTask: BaseTask{id: ID, dotID: dotID}} case TaskTypeUppercase: diff --git a/core/services/pipeline/orm.go b/core/services/pipeline/orm.go index 55dfe174efb..9566abc2999 100644 --- a/core/services/pipeline/orm.go +++ b/core/services/pipeline/orm.go @@ -15,47 +15,55 @@ import ( "github.com/smartcontractkit/sqlx" ) -// KeepersObservationSource is the same for all keeper jobs and it is not perisisted in DB +// KeepersObservationSource is the same for all keeper jobs and it is not persisted in DB const KeepersObservationSource = ` - encode_check_upkeep_tx [type=ethabiencode - abi="checkUpkeep(uint256 id, address from)" - data="{\"id\":$(jobSpec.upkeepID),\"from\":$(jobSpec.fromAddress)}"] - check_upkeep_tx [type=ethcall - failEarly=true - extractRevertReason=true - evmChainID="$(jobSpec.evmChainID)" - contract="$(jobSpec.contractAddress)" - gas="$(jobSpec.checkUpkeepGasLimit)" - gasPrice="$(jobSpec.gasPrice)" - gasTipCap="$(jobSpec.gasTipCap)" - gasFeeCap="$(jobSpec.gasFeeCap)" - data="$(encode_check_upkeep_tx)"] - decode_check_upkeep_tx [type=ethabidecode - abi="bytes memory performData, uint256 maxLinkPayment, uint256 gasLimit, uint256 adjustedGasWei, uint256 linkEth"] - encode_perform_upkeep_tx [type=ethabiencode - abi="performUpkeep(uint256 id, bytes calldata performData)" - data="{\"id\": $(jobSpec.upkeepID),\"performData\":$(decode_check_upkeep_tx.performData)}"] + encode_check_upkeep_tx [type=ethabiencode + abi="checkUpkeep(uint256 id, address from)" + data="{\"id\":$(jobSpec.upkeepID),\"from\":$(jobSpec.fromAddress)}"] + check_upkeep_tx [type=ethcall + failEarly=true + extractRevertReason=true + evmChainID="$(jobSpec.evmChainID)" + contract="$(jobSpec.contractAddress)" + gasUnlimited=true + gasPrice="$(jobSpec.gasPrice)" + gasTipCap="$(jobSpec.gasTipCap)" + gasFeeCap="$(jobSpec.gasFeeCap)" + data="$(encode_check_upkeep_tx)"] + decode_check_upkeep_tx [type=ethabidecode + abi="bytes memory performData, uint256 maxLinkPayment, uint256 gasLimit, uint256 adjustedGasWei, uint256 linkEth"] + calculate_perform_data_len [type=length + input="$(decode_check_upkeep_tx.performData)"] + perform_data_lessthan_limit [type=lessthan + left="$(calculate_perform_data_len)" + right="$(jobSpec.maxPerformDataSize)"] + check_perform_data_limit [type=conditional + failEarly=true + data="$(perform_data_lessthan_limit)"] + encode_perform_upkeep_tx [type=ethabiencode + abi="performUpkeep(uint256 id, bytes calldata performData)" + data="{\"id\": $(jobSpec.upkeepID),\"performData\":$(decode_check_upkeep_tx.performData)}"] simulate_perform_upkeep_tx [type=ethcall - extractRevertReason=true - evmChainID="$(jobSpec.evmChainID)" - contract="$(jobSpec.contractAddress)" - from="$(jobSpec.fromAddress)" - gas="$(jobSpec.performUpkeepGasLimit)" - data="$(encode_perform_upkeep_tx)"] - decode_check_perform_tx [type=ethabidecode - abi="bool success"] - check_success [type=conditional - failEarly=true - data="$(decode_check_perform_tx.success)"] - perform_upkeep_tx [type=ethtx - minConfirmations=0 - to="$(jobSpec.contractAddress)" - from="[$(jobSpec.fromAddress)]" - evmChainID="$(jobSpec.evmChainID)" - data="$(encode_perform_upkeep_tx)" - gasLimit="$(jobSpec.performUpkeepGasLimit)" - txMeta="{\"jobID\":$(jobSpec.jobID),\"upkeepID\":$(jobSpec.prettyID)}"] - encode_check_upkeep_tx -> check_upkeep_tx -> decode_check_upkeep_tx -> encode_perform_upkeep_tx -> simulate_perform_upkeep_tx -> decode_check_perform_tx -> check_success -> perform_upkeep_tx + extractRevertReason=true + evmChainID="$(jobSpec.evmChainID)" + contract="$(jobSpec.contractAddress)" + from="$(jobSpec.fromAddress)" + gasUnlimited=true + data="$(encode_perform_upkeep_tx)"] + decode_check_perform_tx [type=ethabidecode + abi="bool success"] + check_success [type=conditional + failEarly=true + data="$(decode_check_perform_tx.success)"] + perform_upkeep_tx [type=ethtx + minConfirmations=0 + to="$(jobSpec.contractAddress)" + from="[$(jobSpec.fromAddress)]" + evmChainID="$(jobSpec.evmChainID)" + data="$(encode_perform_upkeep_tx)" + gasLimit="$(jobSpec.performUpkeepGasLimit)" + txMeta="{\"jobID\":$(jobSpec.jobID),\"upkeepID\":$(jobSpec.prettyID)}"] + encode_check_upkeep_tx -> check_upkeep_tx -> decode_check_upkeep_tx -> calculate_perform_data_len -> perform_data_lessthan_limit -> check_perform_data_limit -> encode_perform_upkeep_tx -> simulate_perform_upkeep_tx -> decode_check_perform_tx -> check_success -> perform_upkeep_tx ` //go:generate mockery --name ORM --output ./mocks/ --case=underscore diff --git a/core/services/pipeline/task.eth_call.go b/core/services/pipeline/task.eth_call.go index 28830ad411a..7a1aa51cbec 100644 --- a/core/services/pipeline/task.eth_call.go +++ b/core/services/pipeline/task.eth_call.go @@ -17,10 +17,9 @@ import ( "github.com/smartcontractkit/chainlink/core/utils" ) -// // Return types: -// []byte // +// []byte type ETHCallTask struct { BaseTask `mapstructure:",squash"` Contract string `json:"contract"` @@ -30,6 +29,7 @@ type ETHCallTask struct { GasPrice string `json:"gasPrice"` GasTipCap string `json:"gasTipCap"` GasFeeCap string `json:"gasFeeCap"` + GasUnlimited string `json:"unlimitedGas"` ExtractRevertReason bool `json:"extractRevertReason"` EVMChainID string `json:"evmChainID" mapstructure:"evmChainID"` @@ -68,6 +68,7 @@ func (t *ETHCallTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, in gasPrice MaybeBigIntParam gasTipCap MaybeBigIntParam gasFeeCap MaybeBigIntParam + gasUnlimited BoolParam chainID StringParam ) err = multierr.Combine( @@ -79,6 +80,7 @@ func (t *ETHCallTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, in errors.Wrap(ResolveParam(&gasTipCap, From(VarExpr(t.GasTipCap, vars), t.GasTipCap)), "gasTipCap"), errors.Wrap(ResolveParam(&gasFeeCap, From(VarExpr(t.GasFeeCap, vars), t.GasFeeCap)), "gasFeeCap"), errors.Wrap(ResolveParam(&chainID, From(VarExpr(t.EVMChainID, vars), NonemptyString(t.EVMChainID), "")), "evmChainID"), + errors.Wrap(ResolveParam(&gasUnlimited, From(VarExpr(t.GasUnlimited, vars), NonemptyString(t.GasUnlimited), false)), "gasUnlimited"), ) if err != nil { return Result{Error: err}, runInfo @@ -91,10 +93,16 @@ func (t *ETHCallTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, in return Result{Error: err}, runInfo } var selectedGas uint32 - if gas > 0 { - selectedGas = uint32(gas) + if gasUnlimited { + if gas > 0 { + return Result{Error: errors.Wrapf(ErrBadInput, "gas must be zero when gasUnlimited is true")}, runInfo + } } else { - selectedGas = SelectGasLimit(chain.Config(), t.jobType, t.specGasLimit) + if gas > 0 { + selectedGas = uint32(gas) + } else { + selectedGas = SelectGasLimit(chain.Config(), t.jobType, t.specGasLimit) + } } call := ethereum.CallMsg{ diff --git a/core/services/pipeline/task.length.go b/core/services/pipeline/task.length.go new file mode 100644 index 00000000000..00373ac7c95 --- /dev/null +++ b/core/services/pipeline/task.length.go @@ -0,0 +1,43 @@ +package pipeline + +import ( + "context" + + "github.com/pkg/errors" + "github.com/shopspring/decimal" + "go.uber.org/multierr" + + "github.com/smartcontractkit/chainlink/core/logger" +) + +// Return types: +// +// *decimal.Decimal +type LengthTask struct { + BaseTask `mapstructure:",squash"` + Input string `json:"input"` +} + +var _ Task = (*LengthTask)(nil) + +func (t *LengthTask) Type() TaskType { + return TaskTypeLength +} + +func (t *LengthTask) Run(_ context.Context, _ logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo) { + _, err := CheckInputs(inputs, 0, 1, 0) + if err != nil { + return Result{Error: errors.Wrap(err, "task inputs")}, runInfo + } + + var input BytesParam + + err = multierr.Combine( + errors.Wrap(ResolveParam(&input, From(VarExpr(t.Input, vars), NonemptyString(t.Input), Input(inputs, 0))), "input"), + ) + if err != nil { + return Result{Error: err}, runInfo + } + + return Result{Value: decimal.NewFromInt(int64(len(input)))}, runInfo +} diff --git a/core/services/pipeline/task.length_test.go b/core/services/pipeline/task.length_test.go new file mode 100644 index 00000000000..9434b7e4e14 --- /dev/null +++ b/core/services/pipeline/task.length_test.go @@ -0,0 +1,74 @@ +package pipeline_test + +import ( + "testing" + + "github.com/shopspring/decimal" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/core/internal/testutils" + "github.com/smartcontractkit/chainlink/core/logger" + "github.com/smartcontractkit/chainlink/core/services/pipeline" +) + +func TestLengthTask(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + input interface{} + want decimal.Decimal + }{ + {"normal bytes", []byte{0xaa, 0xbb, 0xcc, 0xdd}, decimal.NewFromInt(4)}, + {"empty bytes", []byte{}, decimal.NewFromInt(0)}, + {"string as bytes", []byte("stevetoshi sergeymoto"), decimal.NewFromInt(21)}, + {"string input gets converted to bytes", "stevetoshi sergeymoto", decimal.NewFromInt(21)}, + {"empty string", "", decimal.NewFromInt(0)}, + } + + for _, test := range tests { + assertOK := func(result pipeline.Result, runInfo pipeline.RunInfo) { + assert.False(t, runInfo.IsPending) + assert.False(t, runInfo.IsRetryable) + require.NoError(t, result.Error) + require.Equal(t, test.want.String(), result.Value.(decimal.Decimal).String()) + } + t.Run(test.name, func(t *testing.T) { + t.Run("without vars through job DAG", func(t *testing.T) { + vars := pipeline.NewVarsFrom(nil) + task := pipeline.LengthTask{BaseTask: pipeline.NewBaseTask(0, "task", nil, nil, 0)} + assertOK(task.Run(testutils.Context(t), logger.TestLogger(t), vars, []pipeline.Result{{Value: test.input}})) + }) + t.Run("without vars through input param", func(t *testing.T) { + var inputStr string + if _, ok := test.input.([]byte); ok { + inputStr = string(test.input.([]byte)) + } else { + inputStr = test.input.(string) + } + if inputStr == "" { + // empty input parameter is indistinguishable from not providing it at all + // in that case the task will use an input defined by the job DAG + return + } + vars := pipeline.NewVarsFrom(nil) + task := pipeline.LengthTask{ + BaseTask: pipeline.NewBaseTask(0, "task", nil, nil, 0), + Input: inputStr, + } + assertOK(task.Run(testutils.Context(t), logger.TestLogger(t), vars, []pipeline.Result{})) + }) + t.Run("with vars", func(t *testing.T) { + vars := pipeline.NewVarsFrom(map[string]interface{}{ + "foo": map[string]interface{}{"bar": test.input}, + }) + task := pipeline.LengthTask{ + BaseTask: pipeline.NewBaseTask(0, "task", nil, nil, 0), + Input: "$(foo.bar)", + } + assertOK(task.Run(testutils.Context(t), logger.TestLogger(t), vars, []pipeline.Result{})) + }) + }) + } +} diff --git a/core/services/pipeline/task.lessthan.go b/core/services/pipeline/task.lessthan.go new file mode 100644 index 00000000000..96276db928f --- /dev/null +++ b/core/services/pipeline/task.lessthan.go @@ -0,0 +1,50 @@ +package pipeline + +import ( + "context" + + "github.com/pkg/errors" + "go.uber.org/multierr" + + "github.com/smartcontractkit/chainlink/core/logger" +) + +// Return types: +// +// bool +type LessThanTask struct { + BaseTask `mapstructure:",squash"` + Left string `json:"input"` + Right string `json:"limit"` +} + +var ( + _ Task = (*LessThanTask)(nil) +) + +func (t *LessThanTask) Type() TaskType { + return TaskTypeLessThan +} + +func (t *LessThanTask) Run(_ context.Context, _ logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo) { + _, err := CheckInputs(inputs, 0, 1, 0) + if err != nil { + return Result{Error: errors.Wrap(err, "task inputs")}, runInfo + } + + var ( + a DecimalParam + b DecimalParam + ) + + err = multierr.Combine( + errors.Wrap(ResolveParam(&a, From(VarExpr(t.Left, vars), NonemptyString(t.Left), Input(inputs, 0))), "left"), + errors.Wrap(ResolveParam(&b, From(VarExpr(t.Right, vars), NonemptyString(t.Right))), "right"), + ) + if err != nil { + return Result{Error: err}, runInfo + } + + value := a.Decimal().LessThan(b.Decimal()) + return Result{Value: value}, runInfo +} diff --git a/core/services/pipeline/task.lessthan_test.go b/core/services/pipeline/task.lessthan_test.go new file mode 100644 index 00000000000..a2685874e76 --- /dev/null +++ b/core/services/pipeline/task.lessthan_test.go @@ -0,0 +1,142 @@ +package pipeline_test + +import ( + "fmt" + "testing" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/core/internal/testutils" + "github.com/smartcontractkit/chainlink/core/logger" + "github.com/smartcontractkit/chainlink/core/services/pipeline" +) + +func TestLessThanTask_Happy(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + left interface{} + right string + want bool + }{ + {"string, lt 100", "1.23", "100", true}, + {"string, lt negative", "1.23", "-5", false}, + {"string, lt zero", "1.23", "0", false}, + {"string, lt large value", "1.23", "1000000000000000000", true}, + {"large string, lt large value", "10000000000000000001", "1000000000000000000", false}, + + {"int, true", int(2), "100", true}, + {"int, false", int(2), "-5", false}, + + {"int8, true", int8(2), "100", true}, + {"int8, false", int8(2), "-5", false}, + + {"int16, true", int16(2), "100", true}, + {"int16, false", int16(2), "-5", false}, + + {"int32,true", int32(2), "100", true}, + {"int32, false", int32(2), "-5", false}, + + {"int64, true", int64(2), "100", true}, + {"int64, false", int64(2), "-5", false}, + + {"uint, true", uint(2), "100", true}, + {"uint, false", uint(2), "-5", false}, + + {"uint8, true", uint8(2), "100", true}, + {"uint8, false", uint8(2), "-5", false}, + + {"uint16, true", uint16(2), "100", true}, + {"uint16, false", uint16(2), "-5", false}, + + {"uint32, true", uint32(2), "100", true}, + {"uint32, false", uint32(2), "-5", false}, + + {"uint64, true", uint64(2), "100", true}, + {"uint64, false", uint64(2), "-5", false}, + + {"float32, true", float32(1.23), "10", true}, + {"float32, false", float32(1.23), "-5", false}, + + {"float64, true", float64(1.23), "10", true}, + {"float64, false", float64(1.23), "-5", false}, + } + + for _, test := range tests { + assertOK := func(result pipeline.Result, runInfo pipeline.RunInfo) { + assert.False(t, runInfo.IsPending) + assert.False(t, runInfo.IsRetryable) + require.NoError(t, result.Error) + require.Equal(t, test.want, result.Value.(bool)) + } + t.Run(test.name, func(t *testing.T) { + t.Run("without vars through job DAG", func(t *testing.T) { + vars := pipeline.NewVarsFrom(nil) + task := pipeline.LessThanTask{BaseTask: pipeline.NewBaseTask(0, "task", nil, nil, 0), Right: test.right} + assertOK(task.Run(testutils.Context(t), logger.TestLogger(t), vars, []pipeline.Result{{Value: test.left}})) + }) + t.Run("without vars through input param", func(t *testing.T) { + vars := pipeline.NewVarsFrom(nil) + task := pipeline.LessThanTask{ + BaseTask: pipeline.NewBaseTask(0, "task", nil, nil, 0), + Left: fmt.Sprintf("%v", test.left), + Right: test.right, + } + assertOK(task.Run(testutils.Context(t), logger.TestLogger(t), vars, []pipeline.Result{})) + }) + t.Run("with vars", func(t *testing.T) { + vars := pipeline.NewVarsFrom(map[string]interface{}{ + "foo": map[string]interface{}{"bar": test.left}, + "chain": map[string]interface{}{"link": test.right}, + }) + task := pipeline.LessThanTask{ + BaseTask: pipeline.NewBaseTask(0, "task", nil, nil, 0), + Left: "$(foo.bar)", + Right: "$(chain.link)", + } + assertOK(task.Run(testutils.Context(t), logger.TestLogger(t), vars, []pipeline.Result{})) + }) + }) + } +} + +func TestLessThanTask_Unhappy(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + left string + right string + inputs []pipeline.Result + vars pipeline.Vars + wantErrorCause error + wantErrorContains string + }{ + {"map as input from inputs", "", "100", []pipeline.Result{{Value: map[string]interface{}{"chain": "link"}}}, pipeline.NewVarsFrom(nil), pipeline.ErrBadInput, "left"}, + {"map as input from var", "$(foo)", "100", nil, pipeline.NewVarsFrom(map[string]interface{}{"foo": map[string]interface{}{"chain": "link"}}), pipeline.ErrBadInput, "left"}, + {"slice as input from inputs", "", "100", []pipeline.Result{{Value: []interface{}{"chain", "link"}}}, pipeline.NewVarsFrom(nil), pipeline.ErrBadInput, "left"}, + {"slice as input from var", "$(foo)", "100", nil, pipeline.NewVarsFrom(map[string]interface{}{"foo": []interface{}{"chain", "link"}}), pipeline.ErrBadInput, "left"}, + {"input as missing var", "$(foo)", "100", nil, pipeline.NewVarsFrom(nil), pipeline.ErrKeypathNotFound, "left"}, + {"limit as missing var", "", "$(foo)", []pipeline.Result{{Value: "123"}}, pipeline.NewVarsFrom(nil), pipeline.ErrKeypathNotFound, "right"}, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + task := pipeline.LessThanTask{ + BaseTask: pipeline.NewBaseTask(0, "task", nil, nil, 0), + Left: test.left, + Right: test.right, + } + result, runInfo := task.Run(testutils.Context(t), logger.TestLogger(t), test.vars, test.inputs) + assert.False(t, runInfo.IsPending) + assert.False(t, runInfo.IsRetryable) + require.Equal(t, test.wantErrorCause, errors.Cause(result.Error)) + if test.wantErrorContains != "" { + require.Contains(t, result.Error.Error(), test.wantErrorContains) + } + }) + } +} diff --git a/core/web/resolver/config_test.go b/core/web/resolver/config_test.go index 93ec2db120a..42278da2dee 100644 --- a/core/web/resolver/config_test.go +++ b/core/web/resolver/config_test.go @@ -260,6 +260,10 @@ func TestResolver_Config(t *testing.T) { { "key": "KEEPER_REGISTRY_PERFORM_GAS_OVERHEAD", "value": "150000" + }, + { + "key": "KEEPER_REGISTRY_MAX_PERFORM_DATA_SIZE", + "value": "5000" }, { "key":"KEEPER_REGISTRY_SYNC_INTERVAL", diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index cfb93e6cb35..4069933e934 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added `length` and `lessthan` tasks (pipeline). +- Added `gasUnlimited` parameter to `ethcall` task. - `GAS_ESTIMATOR_MODE` `Arbitrum` to support Nitro's multi-dimensional gas model, with dynamic gas pricing and limits. This new, default estimator for Arbitrum networks uses the suggested gas price (up to `ETH_MAX_GAS_PRICE_WEI`, with `1000 gwei` default) as well as an estimated gas limit (up to `ETH_GAS_LIMIT_MAX`, with `1,000,000,000` default). diff --git a/docs/CONFIG.md b/docs/CONFIG.md index d77193f7b8e..b297c91f3ae 100644 --- a/docs/CONFIG.md +++ b/docs/CONFIG.md @@ -996,6 +996,7 @@ MaximumGracePeriod = 100 # Default RegistryCheckGasOverhead = 200_000 # Default RegistryPerformGasOverhead = 150_000 # Default RegistrySyncInterval = '30m' # Default +RegistryMaxPerformDataSize = 5_000 # Default RegistrySyncUpkeepQueueSize = 10 # Default TurnLookBack = 1000 # Default TurnFlagEnabled = false # Default @@ -1056,6 +1057,13 @@ RegistrySyncInterval = '30m' # Default ``` RegistrySyncInterval is the interval in which the RegistrySynchronizer performs a full sync of the keeper registry contract it is tracking. +### RegistryMaxPerformDataSize +:warning: **_ADVANCED_**: _Do not change this setting unless you know what you are doing._ +```toml +RegistryMaxPerformDataSize = 5_000 # Default +``` +RegistryMaxPerformDataSize is the max size of perform data. + ### RegistrySyncUpkeepQueueSize :warning: **_ADVANCED_**: _Do not change this setting unless you know what you are doing._ ```toml diff --git a/internal/config/docs.toml b/internal/config/docs.toml index 71e9608150a..870e9497855 100644 --- a/internal/config/docs.toml +++ b/internal/config/docs.toml @@ -391,6 +391,9 @@ RegistryPerformGasOverhead = 150_000 # Default # RegistrySyncInterval is the interval in which the RegistrySynchronizer performs a full sync of the keeper registry contract it is tracking. RegistrySyncInterval = '30m' # Default # **ADVANCED** +# RegistryMaxPerformDataSize is the max size of perform data. +RegistryMaxPerformDataSize = 5_000 # Default +# **ADVANCED** # RegistrySyncUpkeepQueueSize represents the maximum number of upkeeps that can be synced in parallel. RegistrySyncUpkeepQueueSize = 10 # Default # TurnLookBack is the number of blocks in the past to look back when getting a block for a turn.