diff --git a/.github/tests/mempool_bridge.yaml b/.github/tests/mempool_bridge.yaml new file mode 100644 index 000000000..278a873dd --- /dev/null +++ b/.github/tests/mempool_bridge.yaml @@ -0,0 +1,24 @@ +participants: + - el_type: geth + cl_type: teku + count: 2 + +additional_services: + - mempool_bridge + - dora + +network_params: + network: sepolia + shadowfork_block_height: 340000 + withdrawal_type: "0x01" + validator_balance: 1000000 + withdrawal_address: "0x4d1CB4eB7969f8806E2CaAc0cbbB71f88C8ec413" + +persistent: true + +mempool_bridge_params: + log_level: "debug" + mode: "rpc" + source_enodes: + - http://127.0.0.1:8545 + - http://127.0.0.1:8546 diff --git a/README.md b/README.md index 0d4c61bf3..9a4f033a1 100644 --- a/README.md +++ b/README.md @@ -820,6 +820,7 @@ additional_services: - forky - full_beaconchain_explorer - grafana + - mempool_bridge - prometheus - spamoor - tempo @@ -1028,6 +1029,46 @@ docker_cache_params: github_prefix: "/gh/" google_prefix: "/gcr/" + +# Configuration place for mempool bridge (https://github.com/ethpandaops/mempool-bridge) +mempool_bridge_params: + # The image to use for mempool bridge + image: ethpandaops/mempool-bridge:latest + # The mode for mempool bridge operation + # Valid values are "p2p" or "rpc" + # Default: "p2p" + mode: "p2p" + # The source enodes to use for mempool bridge + # Example: + # P2P mode: + # source_enodes: + # - enode://1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef@127.0.0.1:30303 + # - enode://1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef@127.0.0.1:30304 + # RPC mode: + # source_enodes: + # - http://127.0.0.1:8545 + # - http://127.0.0.1:8546 + # Default: [] + source_enodes: [] + + # The log level for mempool bridge + # Valid values are "error", "warn", "info", "debug", and "trace" + # If empty, will use the global_log_level value + # Default: "" (uses global_log_level) + log_level: "" + + # The number of concurrent goroutines to use when sending transactions to targets + # Default: 10 + send_concurrency: 10 + + # The interval in seconds for polling the source for new transactions + # Default: "10s" + polling_interval: "10s" + + # The retry interval duration for retrying failed operations + # Default: "30s" + retry_interval: "30s" + # Supports four values # Default: "null" - no mev boost, mev builder, mev flood or relays are spun up # "mock" - mock-builder & mev-boost are spun up diff --git a/main.star b/main.star index f95258591..956ba567a 100644 --- a/main.star +++ b/main.star @@ -53,6 +53,7 @@ mev_custom_flood = import_module( "./src/mev/flashbots/mev_custom_flood/mev_custom_flood_launcher.star" ) broadcaster = import_module("./src/broadcaster/broadcaster.star") +mempool_bridge = import_module("./src/mempool_bridge/mempool_bridge_launcher.star") assertoor = import_module("./src/assertoor/assertoor_launcher.star") get_prefunded_accounts = import_module( "./src/prefunded_accounts/get_prefunded_accounts.star" @@ -136,6 +137,9 @@ def run(plan, args={}): static_files.GRAFANA_DASHBOARD_PROVIDERS_CONFIG_TEMPLATE_FILEPATH ) tempo_config_template = read_file(static_files.TEMPO_CONFIG_TEMPLATE_FILEPATH) + mempool_bridge_config_template = read_file( + static_files.MEMPOOL_BRIDGE_CONFIG_TEMPLATE_FILEPATH + ) prometheus_additional_metrics_jobs = [] raw_jwt_secret = read_file(static_files.JWT_PATH_FILEPATH) jwt_file = plan.upload_files( @@ -801,6 +805,22 @@ def run(plan, args={}): global_tolerations, args_with_right_defaults.docker_cache_params, ) + elif additional_service == "mempool_bridge": + plan.print("Launching mempool-bridge") + mempool_bridge.launch_mempool_bridge( + plan, + mempool_bridge_config_template, + all_el_contexts, + args_with_right_defaults.mempool_bridge_params, + args_with_right_defaults.network_params, + global_node_selectors, + global_tolerations, + args_with_right_defaults.port_publisher, + index, + args_with_right_defaults.docker_cache_params, + args_with_right_defaults.global_log_level, + ) + plan.print("Successfully launched mempool-bridge") elif additional_service == "spamoor": plan.print("Launching spamoor") spamoor_config_template = read_file( diff --git a/src/mempool_bridge/mempool_bridge_launcher.star b/src/mempool_bridge/mempool_bridge_launcher.star new file mode 100644 index 000000000..17cebb479 --- /dev/null +++ b/src/mempool_bridge/mempool_bridge_launcher.star @@ -0,0 +1,208 @@ +shared_utils = import_module("../shared_utils/shared_utils.star") +constants = import_module("../package_io/constants.star") +input_parser = import_module("../package_io/input_parser.star") + +SERVICE_NAME = "mempool-bridge" +MAX_ENODES_TO_FETCH = 5 + +VERBOSITY_LEVELS = { + constants.GLOBAL_LOG_LEVEL.error: "error", + constants.GLOBAL_LOG_LEVEL.warn: "warn", + constants.GLOBAL_LOG_LEVEL.info: "info", + constants.GLOBAL_LOG_LEVEL.debug: "debug", + constants.GLOBAL_LOG_LEVEL.trace: "trace", +} + +ENODE_URLS = { + "mainnet": "https://raw.githubusercontent.com/eth-clients/mainnet/refs/heads/main/metadata/enodes.yaml", + "sepolia": "https://raw.githubusercontent.com/eth-clients/sepolia/refs/heads/main/metadata/enodes.yaml", + "hoodi": "https://raw.githubusercontent.com/eth-clients/hoodi/refs/heads/main/metadata/enodes.yaml", + "holesky": "https://raw.githubusercontent.com/eth-clients/holesky/refs/heads/main/metadata/enodes.yaml", +} + +HTTP_PORT_NUMBER = 9090 + +MEMPOOL_BRIDGE_CONFIG_FILENAME = "config.yaml" +MEMPOOL_BRIDGE_CONFIG_MOUNT_DIRPATH_ON_SERVICE = "/config" + +MIN_CPU = 100 +MAX_CPU = 1000 +MIN_MEMORY = 128 +MAX_MEMORY = 2048 + +USED_PORTS = { + constants.HTTP_PORT_ID: shared_utils.new_port_spec( + HTTP_PORT_NUMBER, + shared_utils.TCP_PROTOCOL, + shared_utils.HTTP_APPLICATION_PROTOCOL, + ) +} + + +def launch_mempool_bridge( + plan, + config_template, + all_el_contexts, + mempool_bridge_params, + network_params, + global_node_selectors, + global_tolerations, + port_publisher, + additional_service_index, + docker_cache_params, + global_log_level, +): + tolerations = shared_utils.get_tolerations(global_tolerations=global_tolerations) + + network = network_params.network + mode = mempool_bridge_params.mode + + if mode == "rpc" and not mempool_bridge_params.source_enodes: + fail( + "RPC mode requires source_enodes to be explicitly defined. Please provide dedicated RPC endpoints as the source when using RPC mode." + ) + + # Build source endpoints based on mode + source_endpoints = [] + if mempool_bridge_params.source_enodes: + source_endpoints = mempool_bridge_params.source_enodes + else: + # Only fetch enodes for p2p mode when using public networks + if mode == "p2p": + if "shadowfork" in network_params.network: + network = network_params.network.split("-shadowfork")[0] + if network in constants.PUBLIC_NETWORKS: + plan.print( + "Fetching enodes for {0} from eth-clients repo".format(network) + ) + for i in range(1, MAX_ENODES_TO_FETCH + 1): + enode = plan.run_sh( + name="fetch-enode-{0}".format(i), + description="Fetching enode #{0}".format(i), + run="curl -s {0} | grep -E '^[[:space:]]*-[[:space:]]*enode' | sed -n '{1}p' | sed 's/^[[:space:]]*-[[:space:]]*//; s/[[:space:]]*#.*//' | tr -d '\\n'".format( + ENODE_URLS[network], i + ), + node_selectors=global_node_selectors, + tolerations=tolerations, + wait=None, + ) + source_endpoints.append(enode.output) + + # Build target endpoints from all EL contexts based on mode + target_endpoints = [] + for context in all_el_contexts: + if mode == "rpc": + # For RPC mode, use HTTP RPC endpoint + rpc_url = "http://{0}:{1}".format(context.ip_addr, context.rpc_port_num) + target_endpoints.append(rpc_url) + else: + # For P2P mode, prefer enode if available, fallback to enr + if context.enode: + target_endpoints.append(context.enode) + elif context.enr: + target_endpoints.append(context.enr) + + # Determine log level: use mempool_bridge_params.log_level if set, otherwise use global_log_level + log_level = input_parser.get_client_log_level_or_default( + mempool_bridge_params.log_level, global_log_level, VERBOSITY_LEVELS + ) + + template_data = new_config_template_data( + HTTP_PORT_NUMBER, + source_endpoints, + target_endpoints, + mempool_bridge_params.mode, + log_level, + mempool_bridge_params.send_concurrency, + mempool_bridge_params.polling_interval, + mempool_bridge_params.retry_interval, + ) + + template_and_data = shared_utils.new_template_and_data( + config_template, template_data + ) + template_and_data_by_rel_dest_filepath = {} + template_and_data_by_rel_dest_filepath[ + MEMPOOL_BRIDGE_CONFIG_FILENAME + ] = template_and_data + + config_files_artifact_name = plan.render_templates( + template_and_data_by_rel_dest_filepath, "mempool-bridge-config" + ) + + config = get_config( + config_files_artifact_name, + global_node_selectors, + tolerations, + port_publisher, + additional_service_index, + docker_cache_params, + mempool_bridge_params, + ) + + plan.add_service(SERVICE_NAME, config) + + +def get_config( + config_files_artifact_name, + node_selectors, + tolerations, + port_publisher, + additional_service_index, + docker_cache_params, + mempool_bridge_params, +): + config_file_path = shared_utils.path_join( + MEMPOOL_BRIDGE_CONFIG_MOUNT_DIRPATH_ON_SERVICE, + MEMPOOL_BRIDGE_CONFIG_FILENAME, + ) + + public_ports = shared_utils.get_additional_service_standard_public_port( + port_publisher, + constants.HTTP_PORT_ID, + additional_service_index, + 0, + ) + + return ServiceConfig( + image=shared_utils.docker_cache_image_calc( + docker_cache_params, + mempool_bridge_params.image, + ), + ports=USED_PORTS, + public_ports=public_ports, + files={ + MEMPOOL_BRIDGE_CONFIG_MOUNT_DIRPATH_ON_SERVICE: config_files_artifact_name, + }, + cmd=[ + "--config={0}".format(config_file_path), + ], + min_cpu=MIN_CPU, + max_cpu=MAX_CPU, + min_memory=MIN_MEMORY, + max_memory=MAX_MEMORY, + node_selectors=node_selectors, + tolerations=tolerations, + ) + + +def new_config_template_data( + listen_port_num, + source_endpoints, + target_endpoints, + mode, + log_level, + send_concurrency, + polling_interval, + retry_interval, +): + return { + "ListenPortNum": listen_port_num, + "SourceEndpoints": source_endpoints, + "TargetEndpoints": target_endpoints, + "Mode": mode, + "LogLevel": log_level, + "SendConcurrency": send_concurrency, + "PollingInterval": polling_interval, + "RetryInterval": retry_interval, + } diff --git a/src/package_io/input_parser.star b/src/package_io/input_parser.star index 0008e3d5d..189f2a4cd 100644 --- a/src/package_io/input_parser.star +++ b/src/package_io/input_parser.star @@ -83,6 +83,7 @@ ATTR_TO_BE_SKIPPED_AT_ROOT = ( "xatu_sentry_params", "port_publisher", "spamoor_params", + "mempool_bridge_params", ) @@ -117,6 +118,7 @@ def input_parser(plan, input_args): result["global_node_selectors"] = {} result["port_publisher"] = get_port_publisher_params("default") result["spamoor_params"] = get_default_spamoor_params() + result["mempool_bridge_params"] = get_default_mempool_bridge_params() if constants.NETWORK_NAME.shadowfork in result["network_params"]["network"]: shadow_base = result["network_params"]["network"].split("-shadowfork")[0] @@ -186,6 +188,10 @@ def input_parser(plan, input_args): for sub_attr in input_args["spamoor_params"]: sub_value = input_args["spamoor_params"][sub_attr] result["spamoor_params"][sub_attr] = sub_value + elif attr == "mempool_bridge_params": + for sub_attr in input_args["mempool_bridge_params"]: + sub_value = input_args["mempool_bridge_params"][sub_attr] + result["mempool_bridge_params"][sub_attr] = sub_value elif attr == "ethereum_genesis_generator_params": for sub_attr in input_args["ethereum_genesis_generator_params"]: sub_value = input_args["ethereum_genesis_generator_params"][sub_attr] @@ -333,6 +339,15 @@ def input_parser(plan, input_args): ) ) + if ( + "mempool_bridge" in result["additional_services"] + and result["network_params"]["network"] not in constants.PUBLIC_NETWORKS + and constants.NETWORK_NAME.shadowfork not in result["network_params"]["network"] + ): + fail( + "Mempool bridge is enabled but network is not mainnet, sepolia, hoodi or shadowfork, please set network to mainnet, sepolia, hoodi or shadowfork" + ) + return struct( participants=[ struct( @@ -660,6 +675,15 @@ def input_parser(plan, input_args): spammers=result["spamoor_params"]["spammers"], extra_args=result["spamoor_params"]["extra_args"], ), + mempool_bridge_params=struct( + image=result["mempool_bridge_params"]["image"], + source_enodes=result["mempool_bridge_params"]["source_enodes"], + mode=result["mempool_bridge_params"]["mode"], + log_level=result["mempool_bridge_params"]["log_level"], + send_concurrency=result["mempool_bridge_params"]["send_concurrency"], + polling_interval=result["mempool_bridge_params"]["polling_interval"], + retry_interval=result["mempool_bridge_params"]["retry_interval"], + ), additional_services=result["additional_services"], wait_for_finalization=result["wait_for_finalization"], global_log_level=result["global_log_level"], @@ -1619,6 +1643,18 @@ def get_default_custom_flood_params(): return {"interval_between_transactions": 1} +def get_default_mempool_bridge_params(): + return { + "image": "ethpandaops/mempool-bridge:latest", + "source_enodes": [], + "mode": "p2p", + "log_level": "", + "send_concurrency": 10, + "polling_interval": "10s", + "retry_interval": "30s", + } + + def get_port_publisher_params(parameter_type, input_args=None): port_publisher_parameters = { "nat_exit_ip": "KURTOSIS_IP_ADDR_PLACEHOLDER", diff --git a/src/package_io/sanity_check.star b/src/package_io/sanity_check.star index 7c7dce6aa..0ce8812a0 100644 --- a/src/package_io/sanity_check.star +++ b/src/package_io/sanity_check.star @@ -361,6 +361,15 @@ SUBCATEGORY_PARAMS = { "extra_args", "spammers", ], + "mempool_bridge_params": [ + "image", + "source_enodes", + "mode", + "log_level", + "send_concurrency", + "polling_interval", + "retry_interval", + ], "ethereum_genesis_generator_params": [ "image", "extra_env", @@ -388,6 +397,7 @@ ADDITIONAL_SERVICES_PARAMS = [ "apache", "nginx", "tracoor", + "mempool_bridge", "spamoor", ] diff --git a/src/static_files/static_files.star b/src/static_files/static_files.star index caae93c72..94646a556 100644 --- a/src/static_files/static_files.star +++ b/src/static_files/static_files.star @@ -127,3 +127,7 @@ FLASHBOTS_RBUILDER_CONFIG_FILEPATH = ( COMMIT_BOOST_CONFIG_FILEPATH = ( STATIC_FILES_DIRPATH + "/mev/commit-boost/cb-config.toml.tmpl" ) + +MEMPOOL_BRIDGE_CONFIG_TEMPLATE_FILEPATH = ( + STATIC_FILES_DIRPATH + "/mempool-bridge-config/config.yaml.tmpl" +) diff --git a/static_files/mempool-bridge-config/config.yaml.tmpl b/static_files/mempool-bridge-config/config.yaml.tmpl new file mode 100644 index 000000000..d544ba520 --- /dev/null +++ b/static_files/mempool-bridge-config/config.yaml.tmpl @@ -0,0 +1,33 @@ +logging: "{{ .LogLevel }}" + +metricsAddr: ":{{ .ListenPortNum }}" + +mode: "{{ .Mode }}" + +source: + retryInterval: {{ .RetryInterval }} + pollingInterval: {{ .PollingInterval }} +{{- if eq .Mode "p2p" }} + nodeRecords: +{{- else }} + rpcEndpoints: +{{- end }} +{{- range $endpoint := .SourceEndpoints }} +{{- if $endpoint }} + - {{ $endpoint }} +{{- end }} +{{- end }} + +target: + retryInterval: {{ .RetryInterval }} + sendConcurrency: {{ .SendConcurrency }} +{{- if eq .Mode "p2p" }} + nodeRecords: +{{- else }} + rpcEndpoints: +{{- end }} +{{- range $endpoint := .TargetEndpoints }} +{{- if $endpoint }} + - {{ $endpoint }} +{{- end }} +{{- end }}