From 05d87722cbc277e6cb66ac61f823be9024b9ad2a Mon Sep 17 00:00:00 2001 From: mccrindlebrian Date: Mon, 12 May 2025 16:16:58 +0200 Subject: [PATCH 1/4] refactor md location --- folding/experiments/sensitivity_analysis.py | 263 ------------------ folding/validators/md/__init__.py | 0 folding/validators/{ => md}/forward.py | 96 +------ .../validators/{ => md}/hyperparameters.py | 0 folding/validators/{ => md}/protein.py | 0 folding/validators/{ => md}/reward.py | 2 +- tests/test_hyperparameter_sampling.py | 2 +- tests/test_protein.py | 29 -- 8 files changed, 4 insertions(+), 388 deletions(-) delete mode 100644 folding/experiments/sensitivity_analysis.py create mode 100644 folding/validators/md/__init__.py rename folding/validators/{ => md}/forward.py (77%) rename folding/validators/{ => md}/hyperparameters.py (100%) rename folding/validators/{ => md}/protein.py (100%) rename folding/validators/{ => md}/reward.py (99%) delete mode 100644 tests/test_protein.py diff --git a/folding/experiments/sensitivity_analysis.py b/folding/experiments/sensitivity_analysis.py deleted file mode 100644 index 18c301e3..00000000 --- a/folding/experiments/sensitivity_analysis.py +++ /dev/null @@ -1,263 +0,0 @@ -### This script is used to run a sensitivity search for a protein folding simulation. -### Used for benchmarking. - -import os -import time -import wandb -from tqdm import tqdm -from folding.utils.logger import logger -from pathlib import Path -from typing import Dict, List - -import pandas as pd -from box import Box # install using pip install box -import copy - -import plotly.express as px -import openmm as mm -import openmm.app as app -import openmm.unit as unit - -from folding.base.simulation import OpenMMSimulation -from folding.utils.ops import select_random_pdb_id, load_pdb_ids -from folding.validators.hyperparameters import HyperParameters -from folding.utils.opemm_simulation_config import SimulationConfig -from folding.validators.protein import Protein -from folding.utils.reporters import ExitFileReporter, LastTwoCheckpointsReporter - -ROOT_DIR = Path(__file__).resolve().parents[2] -PDB_IDS = load_pdb_ids( - root_dir=ROOT_DIR, filename="pdb_ids.pkl" -) # TODO: Currently this is a small list of PDBs without MISSING flags. - -ROOT_PATH = Path(__file__).parent -SEED = 42 -SIMULATION_STEPS = {"nvt": 50000, "npt": 75000, "md_0_1": 100000} - - -def log_event(event: Dict): - """Log the event to the console and to the wandb logger.""" - # logger.info(f"Event: {event}") - wandb.log(event) - - -def create_wandb_run(project: str = "folding-openmm", entity: str = "macrocosmos"): - wandb.init(project=project, entity=entity) - - -def configure_commands( - state: str, - seed: int, - system_config: SimulationConfig, - pdb_obj: app.PDBFile, - output_dir: str, - CHECKPOINT_INTERVAL: int = 10000, - STATE_DATA_REPORTER_INTERVAL: int = 10, - EXIT_REPORTER_INTERVAL: int = 10, -) -> Dict[str, List[str]]: - simulation, _ = OpenMMSimulation().create_simulation( - pdb=pdb_obj, - system_config=system_config.get_config(), - seed=seed, - state=state, - ) - simulation.reporters.append( - LastTwoCheckpointsReporter( - file_prefix=f"{output_dir}/{state}", - reportInterval=CHECKPOINT_INTERVAL, - ) - ) - simulation.reporters.append( - app.StateDataReporter( - file=f"{output_dir}/{state}.log", - reportInterval=STATE_DATA_REPORTER_INTERVAL, - step=True, - potentialEnergy=True, - ) - ) - simulation.reporters.append( - ExitFileReporter( - filename=f"{output_dir}/{state}", - reportInterval=EXIT_REPORTER_INTERVAL, - file_prefix=state, - ) - ) - - return simulation - - -def create_new_challenge(pdb_id: str) -> Dict: - """Create a new challenge by sampling a random pdb_id and running a hyperparameter search - using the try_prepare_challenge function. - Args: - exclude (List): list of pdb_ids to exclude from the search - Returns: - Dict: event dictionary containing the results of the hyperparameter search - """ - - forward_start_time = time.time() - - # Perform a hyperparameter search until we find a valid configuration for the pdb - # logger.warning(f"Attempting to prepare challenge for pdb {pdb_id}") - protein, event = try_prepare_challenge(pdb_id=pdb_id) - - if event.get("validator_search_status"): - logger.success(f"✅✅ Successfully created challenge for pdb_id {pdb_id} ✅✅") - else: - # forward time if validator step fails - event["hp_search_time"] = time.time() - forward_start_time - logger.error( - f"❌❌ All hyperparameter combinations failed for pdb_id {pdb_id}.. Skipping! ❌❌" - ) - - return protein, event - - -def try_prepare_challenge(pdb_id: str) -> Dict: - """Attempts to setup a simulation environment for the specific pdb & config - Uses a stochastic sampler to find hyperparameters that are compatible with the protein - """ - - # exclude_in_hp_search = parse_config(config) - hp_sampler = HyperParameters() - - logger.info(f"Searching parameter space for pdb {pdb_id}") - - for tries in tqdm( - range(hp_sampler.TOTAL_COMBINATIONS), total=hp_sampler.TOTAL_COMBINATIONS - ): - hp_sampler_time = time.time() - - event = {"hp_tries": tries} - sampled_combination: Dict = hp_sampler.sample_hyperparameters() - - hps = { - "ff": sampled_combination["FF"], - "water": sampled_combination["WATER"], - "box": sampled_combination["BOX"], - } - - config = Box({"force_use_pdb": False}) - protein = Protein(pdb_id=pdb_id, config=config, **hps) - - try: - protein.setup_simulation() - - except Exception as e: - logger.error(f"Error occurred for pdb_id {pdb_id}: {e}") - event["validator_search_status"] = False - - finally: - event["pdb_id"] = pdb_id - event.update(hps) # add the dictionary of hyperparameters to the event - event["hp_sample_time"] = time.time() - hp_sampler_time - event["pdb_complexity"] = [dict(protein.pdb_complexity)] - event["init_energy"] = protein.init_energy - # event["epsilon"] = protein.epsilon - - if "validator_search_status" not in event: - logger.warning("✅✅ Simulation ran successfully! ✅✅") - event["validator_search_status"] = True # simulation passed! - # break out of the loop if the simulation was successful - break - if tries == 3: - logger.error(f"Max tries reached for pdb_id {pdb_id} :x::x:") - break - - return protein, event - - -def sample_pdb(exclude: List = [], pdb_id: str = None): - return pdb_id or select_random_pdb_id(PDB_IDS, exclude=exclude) - - -def extact_energies(state: str, data_directory: str): - check_log_file = pd.read_csv(os.path.join(data_directory, f"{state}.log")) - - return check_log_file["Potential Energy (kJ/mole)"].values - - -def cpt_file_mapper(output_dir: str, state: str): - if state == "nvt": - return f"{output_dir}/em.cpt" - - if "npt" in state: - state = "nvt" + state.split("npt")[-1] - - if "md" in state: - state = "npt" + state.split("md_0_1")[-1] - - return f"{output_dir}/{state}.cpt" - - -if __name__ == "__main__": - create_wandb_run(project="folding-openmm", entity="macrocosmos") - - num_experiments = 10 - temperatures = [50, 100, 200, 300, 400, 500] - - pdbs_to_exclude = [] - - while num_experiments > 0: - pdb_id = sample_pdb(exclude=pdbs_to_exclude) - - try: - protein, event = create_new_challenge(pdb_id=pdb_id) - except Exception as e: - logger.error(f"Error occurred for pdb_id {pdb_id}: {e}") - pdbs_to_exclude.append(pdb_id) - continue - - pdb_obj: app.PDBFile = protein.load_pdb_file(protein.pdb_location) - output_dir = protein.validator_directory - - for temperature in temperatures: - system_config = copy.deepcopy(protein.system_config) - system_config.temperature = float(temperature) - - for state, steps_to_run in SIMULATION_STEPS.items(): - # Creates the simulation object needed for the stage. - - temp_state = state + f"_temp_{temperature}" - - simulation = configure_commands( - state=temp_state, - seed=protein.system_config.seed, - system_config=system_config, - pdb_obj=pdb_obj, - output_dir=protein.validator_directory, - ) - - logger.info( - f"Running {state} for {steps_to_run} steps for pdb {pdb_id}" - ) - - if state == "nvt": - mapper_state = state - else: - mapper_state = temp_state - - simulation.loadCheckpoint(cpt_file_mapper(output_dir, mapper_state)) - - start_time = time.time() - simulation.step(steps_to_run) - simulation_time = time.time() - start_time - event[f"{state}_time"] = simulation_time - - energy_array = extact_energies( - state=temp_state, data_directory=protein.validator_directory - ) - event[f"{state}_energies_temp_{temperature}"] = energy_array.tolist() - - fig = px.scatter( - energy_array, - title=f"Energy array for {pdb_id} for state {state} for temperature {temperature}", - labels={"index": "Step", "value": "energy"}, - height=600, - width=1400, - ) - fig.write_image(os.path.join(output_dir, f"{mapper_state}_energy.png")) - - log_event(event) - num_experiments = num_experiments - 1 - pdbs_to_exclude.append(pdb_id) diff --git a/folding/validators/md/__init__.py b/folding/validators/md/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/folding/validators/forward.py b/folding/validators/md/forward.py similarity index 77% rename from folding/validators/forward.py rename to folding/validators/md/forward.py index 5afdf5e0..fa369bd9 100644 --- a/folding/validators/forward.py +++ b/folding/validators/md/forward.py @@ -7,12 +7,12 @@ from typing import List, Dict from async_timeout import timeout -from folding.validators.protein import Protein +from folding.validators.md.protein import Protein from folding.utils.logging import log_event import asyncio from folding.utils.openmm_forcefields import FORCEFIELD_REGISTRY -from folding.validators.hyperparameters import HyperParameters +from folding.validators.md.hyperparameters import HyperParameters from folding.utils.ops import ( load_and_sample_random_pdb_ids, OpenMMException, @@ -23,98 +23,6 @@ ROOT_DIR = Path(__file__).resolve().parents[2] -async def run_step( - self, - protein: Protein, - timeout: float, - job_type: str, - job_id: str, -) -> Dict: - start_time = time.time() - - if protein is None: - event = { - "block": self.block, - "step_length": time.time() - start_time, - "energies": [], - "active": False, - } - return event - - # Get all uids on the network that are NOT validators. - # the .is_serving flag means that the uid does not have an axon address. - uids = get_all_miner_uids( - self.metagraph, - self.config.neuron.vpermit_tao_limit, - include_serving_in_check=False, - ) - - # Get axons and hotkeys - axons_and_hotkeys = [ - (self.metagraph.axons[uid], self.metagraph.hotkeys[uid]) for uid in uids - ] - axons, hotkeys = zip(*axons_and_hotkeys) - - system_config = protein.system_config.to_dict() - system_config["seed"] = None # We don't want to pass the seed to miners. - - synapses = [ - JobSubmissionSynapse( - pdb_id=protein.pdb_id, - job_id=job_id, - presigned_url=self.handler.generate_presigned_url( - miner_hotkey=hotkey, - pdb_id=protein.pdb_id, - file_name="trajectory.dcd", - method="put_object", - expires_in=300, - ), - ) - for hotkey in hotkeys - ] - - # Make calls to the network with the prompt - this is synchronous. - logger.info("⏰ Waiting for miner responses ⏰") - responses = await asyncio.gather( - *[ - self.dendrite.call( - target_axon=axon, synapse=synapse, timeout=timeout, deserialize=True - ) - for axon, synapse in zip(axons, synapses) - ] - ) - - response_info = get_response_info(responses=responses) - - event = { - "block": self.block, - "step_length": time.time() - start_time, - "uids": uids, - "energies": [], - **response_info, - } - - energies, energy_event = await get_energies( - validator=self, - protein=protein, - responses=responses, - axons=axons, - job_id=job_id, - uids=uids, - miner_registry=self.miner_registry, - job_type=job_type, - ) - - # Log the step event. - event.update({"energies": energies.tolist(), **energy_event}) - - if len(protein.md_inputs) > 0: - event["md_inputs"] = list(protein.md_inputs.keys()) - event["md_inputs_sizes"] = list(map(len, protein.md_inputs.values())) - - return event - - def parse_config(config) -> Dict[str, str]: """ Parse config to check if key hyperparameters are set. diff --git a/folding/validators/hyperparameters.py b/folding/validators/md/hyperparameters.py similarity index 100% rename from folding/validators/hyperparameters.py rename to folding/validators/md/hyperparameters.py diff --git a/folding/validators/protein.py b/folding/validators/md/protein.py similarity index 100% rename from folding/validators/protein.py rename to folding/validators/md/protein.py diff --git a/folding/validators/reward.py b/folding/validators/md/reward.py similarity index 99% rename from folding/validators/reward.py rename to folding/validators/md/reward.py index 95164fa0..c02d0a6a 100644 --- a/folding/validators/reward.py +++ b/folding/validators/md/reward.py @@ -5,7 +5,7 @@ from collections import defaultdict from folding.utils.logger import logger from folding.utils import constants as c -from folding.validators.protein import Protein +from folding.validators.md.protein import Protein from folding.base.evaluation import BaseEvaluator from folding.protocol import JobSubmissionSynapse from folding.registries.miner_registry import MinerRegistry diff --git a/tests/test_hyperparameter_sampling.py b/tests/test_hyperparameter_sampling.py index 2d3051a2..824a11a3 100644 --- a/tests/test_hyperparameter_sampling.py +++ b/tests/test_hyperparameter_sampling.py @@ -1,7 +1,7 @@ import pytest from folding.utils.openmm_forcefields import FORCEFIELD_REGISTRY -from folding.validators.hyperparameters import HyperParameters +from folding.validators.md.hyperparameters import HyperParameters BOX = ["cubic", "dodecahedron", "octahedron"] diff --git a/tests/test_protein.py b/tests/test_protein.py deleted file mode 100644 index c94306d4..00000000 --- a/tests/test_protein.py +++ /dev/null @@ -1,29 +0,0 @@ -import pytest - -import os -from collections import defaultdict -from pathlib import Path -from folding.utils.ops import gro_hash -from folding.validators.protein import Protein - -ROOT_PATH = Path(__file__).parent - - -results = { - "1fjs": defaultdict( - int, {"REMARK": 356, "ATOM": 2236, "HETATM": 239, "CONECT": 98} - ), - "1ubq": defaultdict(int, {"REMARK": 260, "ATOM": 602, "HETATM": 58}), -} - - -@pytest.mark.parametrize( - "pdb_file, expected", - [ - (os.path.join(ROOT_PATH, "fixtures/pdb_files", f"{pdb}.pdb"), result) - for pdb, result in results.items() - ], -) -def test_pdb_complexity(pdb_file, expected): - record_counts = Protein._get_pdb_complexity(pdb_file) - assert record_counts == expected, "record counts do not match expected values" From 14fa9d7d107c7734d591b4c09a397928a98bcd71 Mon Sep 17 00:00:00 2001 From: mccrindlebrian Date: Mon, 12 May 2025 16:20:01 +0200 Subject: [PATCH 2/4] run md step --- neurons/validator.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/neurons/validator.py b/neurons/validator.py index 28403101..3ff4c71e 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -1,23 +1,25 @@ # The MIT License (MIT) # Copyright © 2024 Macrocosmos -from collections import defaultdict import os import time +import pickle import asyncio import traceback from datetime import datetime from typing import Any, Dict, List +from collections import defaultdict import netaddr import requests import torch import numpy as np import pandas as pd -from async_timeout import timeout import tenacity +from async_timeout import timeout + from folding import __spec_version__ as spec_version import folding.utils.constants as c @@ -38,14 +40,13 @@ from folding.protocol import JobSubmissionSynapse from folding.utils.ops import get_response_info -from folding.validators.reward import run_evaluation_validation_pipeline -from folding.validators.forward import create_new_challenge -from folding.validators.protein import Protein +from folding.validators.md.reward import run_evaluation_validation_pipeline +from folding.validators.md.forward import create_new_challenge +from folding.validators.md.protein import Protein from folding.registries.miner_registry import MinerRegistry from folding.organic.api import start_organic_api_in_process from dotenv import load_dotenv -import pickle load_dotenv() @@ -121,7 +122,7 @@ def __init__(self, config=None): self._organic_api_pipe = None self._organic_api_process = None - async def run_step( + async def run_md_step( self, protein: Protein, timeout: float, @@ -218,7 +219,7 @@ async def run_step( return event - async def forward(self, job: Job) -> dict: + async def forward_md(self, job: Job) -> dict: """Carries out a query to the miners to check their progress on a given job (pdb) and updates the job status based on the results. Validator forward pass. Consists of: @@ -239,8 +240,8 @@ async def forward(self, job: Job) -> dict: event = {"energies": []} return event - logger.info(f"Running run_step for {protein.pdb_id}...⏳") - return await self.run_step( + logger.info(f"Running run_md_step for {protein.pdb_id}...⏳") + return await self.run_md_step( protein=protein, timeout=self.config.neuron.timeout, job_id=job.job_id, From 8e203cf42ad3b529d89a75c3097f9ffd6a7527a4 Mon Sep 17 00:00:00 2001 From: mccrindlebrian Date: Mon, 12 May 2025 16:32:43 +0200 Subject: [PATCH 3/4] add md dft pipelines --- folding/base/neuron.py | 5 ++++- neurons/validator.py | 49 +++++++++++++++++++++++++++++------------- 2 files changed, 38 insertions(+), 16 deletions(-) diff --git a/folding/base/neuron.py b/folding/base/neuron.py index c9ee8e48..5a10245b 100644 --- a/folding/base/neuron.py +++ b/folding/base/neuron.py @@ -142,7 +142,10 @@ def remove_wandb_id(self, pdb_id: str): write_pkl(self.wandb_ids, f"{self.config.neuron.full_path}/wandb_ids.pkl", "wb") @abstractmethod - async def forward(self, synapse: bt.Synapse) -> bt.Synapse: + async def forward_md(self, synapse: bt.Synapse) -> bt.Synapse: + ... + + async def forward_dft(self, synapse: bt.Synapse) -> bt.Synapse: ... def sync(self): diff --git a/neurons/validator.py b/neurons/validator.py index 3ff4c71e..e1cda9b9 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -18,6 +18,7 @@ import pandas as pd import tenacity +import bittensor as bt from async_timeout import timeout from folding import __spec_version__ as spec_version @@ -219,6 +220,9 @@ async def run_md_step( return event + async def forward(self, synapse: bt.Synapse) -> bt.Synapse: + pass + async def forward_md(self, job: Job) -> dict: """Carries out a query to the miners to check their progress on a given job (pdb) and updates the job status based on the results. @@ -248,6 +252,9 @@ async def forward_md(self, job: Job) -> dict: job_type=job.job_type, ) + async def forward_dft(self, job: Job) -> dict: + pass + async def add_job(self, job_event: dict[str, Any], protein: Protein = None) -> bool: """Add a job to the job store while also checking to see what uids can be assigned to the job. If uids are not provided, then the function will sample random uids from the network. @@ -603,6 +610,23 @@ async def create_synthetic_jobs(self): await asyncio.sleep(self.config.neuron.synthetic_job_interval) + async def md_pipeline(self, job: Job): + # Here we straightforwardly query the workers associated with each job and update the jobs accordingly + job_event = await self.forward_md(job=job) + + # If we don't have any miners reply to the query, we will make it inactive. + if len(job_event["energies"]) == 0: + job.active = False + self.store.update_gjp_job( + job=job, + gjp_address=self.config.neuron.gjp_address, + keypair=self.wallet.hotkey, + job_id=job.job_id, + ) + return + + await self.update_job(job=job) + async def update_jobs(self): """ Updates the jobs in the queue. @@ -617,22 +641,16 @@ async def update_jobs(self): for job in self.store.get_queue( ready=True, validator_hotkey=self.wallet.hotkey.ss58_address ).queue: - # Here we straightforwardly query the workers associated with each job and update the jobs accordingly - job_event = await self.forward(job=job) - - # If we don't have any miners reply to the query, we will make it inactive. - if len(job_event["energies"]) == 0: - job.active = False - self.store.update_gjp_job( - job=job, - gjp_address=self.config.neuron.gjp_address, - keypair=self.wallet.hotkey, - job_id=job.job_id, - ) - continue + + #TODO: This will break for now + if job.job_type == "md": + job_event = await self.md_pipeline(job=job) + + if isinstance(job.event, str): + job.event = eval(job.event) # if str, convert to dict. - if isinstance(job.event, str): - job.event = eval(job.event) # if str, convert to dict. + elif job.job_type == "dft": + job_event = await self.forward_dft(job=job) job.event.update(job_event) job.hotkeys = [ @@ -641,6 +659,7 @@ async def update_jobs(self): # Determine the status of the job based on the current energy and the previous values (early stopping) # Update the DB with the current status await self.update_job(job=job) + logger.info(f"step({self.step}) block({self.block})") except Exception as e: From 419779ccdba380e3ae75bd1e169f481dd5c0f1a8 Mon Sep 17 00:00:00 2001 From: mccrindlebrian Date: Tue, 13 May 2025 10:05:58 +0200 Subject: [PATCH 4/4] alter store job creation to support dft pipeline --- folding/store.py | 108 +++++++++++++++++++++-------- neurons/validator.py | 159 +++++++++++++++++++++++-------------------- 2 files changed, 166 insertions(+), 101 deletions(-) diff --git a/folding/store.py b/folding/store.py index b5690f58..5b7b3f7b 100644 --- a/folding/store.py +++ b/folding/store.py @@ -5,12 +5,12 @@ import sqlite3 import requests from queue import Queue -from typing import Dict, List +from typing import List, Annotated, Union +from pydantic import constr from dotenv import load_dotenv from datetime import datetime -import numpy as np import pandas as pd from atom.epistula.epistula import Epistula @@ -44,6 +44,7 @@ def _row_to_job(self, row) -> "Job": return None data = dict(row) + # Convert stored JSON strings back to Python objects data["hotkeys"] = json.loads(data["hotkeys"]) data["system_config"] = ( @@ -68,7 +69,7 @@ def _row_to_job(self, row) -> "Job": # Convert boolean data["active"] = bool(data["active"]) - return Job(**data) + return MDJob(**data) if data["job_type"] == "md" else DFTJob(**data) def get_queue(self, validator_hotkey: str, ready=True) -> Queue: """ @@ -90,6 +91,7 @@ def get_queue(self, validator_hotkey: str, ready=True) -> Queue: response = requests.get( f"http://{local_db_addr}/db/query", params={"q": query, "level": "strong"}, + timeout=10, ) else: response = requests.get( @@ -98,6 +100,7 @@ def get_queue(self, validator_hotkey: str, ready=True) -> Queue: "q": f"SELECT * FROM {self.table_name} WHERE active = 1 AND validator_hotkey = '{validator_hotkey}'", "level": "strong", }, + timeout=10, ) if response.status_code != 200: @@ -238,6 +241,49 @@ def __repr__(self): df = pd.read_sql_query(f"SELECT * FROM {self.table_name}", conn) return f"{self.__class__.__name__}\n{df.__repr__()}" + def _setup_and_get_job(self, event: dict, kwargs: dict) -> Union["MDJob", "DFTJob"]: + if event["job_type"] == "md": + job = MDJob( + pdb_id=event["pdb_id"], + system_config=SystemConfig( + ff=event["ff"], + box=event["box"], + water=event["water"], + system_kwargs=SystemKwargs(**event["system_kwargs"]), + ), + hotkeys=[""], + job_type=event["job_type"], + created_at=pd.Timestamp.now().floor("s"), + updated_at=pd.Timestamp.now().floor("s"), + epsilon=event["epsilon"], + s3_links=event["s3_links"], + priority=event.get("priority", 1), + update_interval=event.get( + "time_to_live", random.randint(7200, 14400) + ), # between 2 hours and 4 hours in seconds + max_time_no_improvement=event.get("max_time_no_improvement", 1), + is_organic=event.get("is_organic", False), + job_id=event.get("job_id", None), + active=event.get("active", True), + event=event, + **kwargs, + ) + + elif event["job_type"] == "dft": + job = DFTJob( + system=event["system"], + geometry=event["geometry"], + created_at=pd.Timestamp.now().floor("s"), + updated_at=pd.Timestamp.now().floor("s"), + **event, + **kwargs, + ) + + else: + raise ValueError(f"Invalid job type: {event['job_type']}") + + return job + def upload_job( self, event: dict, @@ -260,32 +306,8 @@ def upload_job( Raises: ValueError: If the job upload fails. """ - job = Job( - pdb_id=event["pdb_id"], - system_config=SystemConfig( - ff=event["ff"], - box=event["box"], - water=event["water"], - system_kwargs=SystemKwargs(**event["system_kwargs"]), - ), - hotkeys=[""], - job_type=event["job_type"], - created_at=pd.Timestamp.now().floor("s"), - updated_at=pd.Timestamp.now().floor("s"), - epsilon=event["epsilon"], - s3_links=event["s3_links"], - priority=event.get("priority", 1), - update_interval=event.get( - "time_to_live", random.randint(7200, 14400) - ), # between 2 hours and 4 hours in seconds - max_time_no_improvement=event.get("max_time_no_improvement", 1), - is_organic=event.get("is_organic", False), - job_id=event.get("job_id", None), - active=event.get("active", True), - event=event, - **kwargs, - ) + job: Union["MDJob", "DFTJob"] = self._setup_and_get_job(event=event, kwargs=kwargs) body = job.model_dump() body_bytes = self.epistula.create_message_body(body) @@ -350,9 +372,37 @@ async def monitor_db(self): return (last_log_leader - last_log_read) != 0 -class Job(JobBase): +class MDJob(JobBase): + """Job class for storing molecular dynamics job information.""" + + # MD-specific parameters + pdb_id: Annotated[str, constr(min_length=4, max_length=10)] + + # MD-specific methods + async def update(self, loss: float, hotkey: str): + """Updates the status of a job in the database. If the loss improves, the best loss, hotkey and hashes are updated.""" + self.active = False + self.best_loss = loss + self.best_loss_at = pd.Timestamp.now().floor("s") + self.best_hotkey = hotkey + self.updated_at = datetime.now() + + def get_simulation_config(self) -> dict: + """Returns the simulation configuration for this MD job.""" + return { + "ff": self.system_config.ff, + "box": self.system_config.box, + "water": self.system_config.water, + "system_kwargs": self.system_config.system_kwargs.model_dump(), + "epsilon": self.epsilon + } + +class DFTJob(JobBase): """Job class for storing job information.""" + system: str + geometry: str # in the form of f"{charge} {multiplicity}\n{body}" + async def update(self, loss: float, hotkey: str): """Updates the status of a job in the database. If the loss improves, the best loss, hotkey and hashes are updated.""" diff --git a/neurons/validator.py b/neurons/validator.py index e1cda9b9..6de49e37 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -255,90 +255,102 @@ async def forward_md(self, job: Job) -> dict: async def forward_dft(self, job: Job) -> dict: pass - async def add_job(self, job_event: dict[str, Any], protein: Protein = None) -> bool: - """Add a job to the job store while also checking to see what uids can be assigned to the job. - If uids are not provided, then the function will sample random uids from the network. + async def setup_organic_md_job(self, job_event: dict[str, Any]) -> bool: + """ + Setup an organic MD job. Args: job_event (dict[str, Any]): parameters that are needed to make the job. """ - start_time = time.time() + self.config.protein.input_source = job_event["source"] + protein = Protein(**job_event, config=self.config.protein) - job_event["uid_search_time"] = time.time() - start_time + try: + job_event["pdb_id"] = job_event["pdb_id"] + job_event["job_type"] = "OrganicMD" + job_event["pdb_complexity"] = [dict(protein.pdb_complexity)] + job_event["init_energy"] = protein.init_energy + job_event["epsilon"] = protein.epsilon + job_event["s3_links"] = { + "testing": "testing" + } # overwritten below if s3 logging is on. + async with timeout(300): + logger.info( + f"setup_simulation for organic query: {job_event['pdb_id']}" + ) + await protein.setup_simulation() + logger.success( + f"✅✅ organic {job_event['pdb_id']} simulation ran successfully! ✅✅" + ) - # If the job is organic, we still need to run the setup simulation to create the files needed for the job. - if job_event.get("is_organic"): - self.config.protein.input_source = job_event["source"] - protein = Protein(**job_event, config=self.config.protein) + if protein.init_energy > 0: + logger.error( + f"Initial energy is positive: {protein.init_energy}. Simulation failed." + ) + job_event["active"] = False + job_event["failed"] = True - try: - job_event["pdb_id"] = job_event["pdb_id"] - job_event["job_type"] = "OrganicMD" - job_event["pdb_complexity"] = [dict(protein.pdb_complexity)] - job_event["init_energy"] = protein.init_energy - job_event["epsilon"] = protein.epsilon - job_event["s3_links"] = { - "testing": "testing" - } # overwritten below if s3 logging is on. - async with timeout(300): - logger.info( - f"setup_simulation for organic query: {job_event['pdb_id']}" - ) - await protein.setup_simulation() - logger.success( - f"✅✅ organic {job_event['pdb_id']} simulation ran successfully! ✅✅" + if not self.config.s3.off: + try: + logger.info(f"Uploading to {self.handler.config.bucket_name}") + files_to_upload = { + "pdb": protein.pdb_location, + "cpt": os.path.join( + protein.validator_directory, protein.simulation_cpt + ), + } + + location = os.path.join( + "inputs", + str(spec_version), + job_event["pdb_id"], + self.validator_hotkey_reference, + datetime.now().strftime("%Y-%m-%d_%H-%M-%S"), ) + s3_links = {} + for file_type, file_path in files_to_upload.items(): + key = self.handler.put( + file_path=file_path, + location=location, + public=True, + ) + s3_links[file_type] = os.path.join( + self.handler.output_url, + key, + ) - if protein.init_energy > 0: - logger.error( - f"Initial energy is positive: {protein.init_energy}. Simulation failed." - ) + job_event["s3_links"] = s3_links + logger.success("✅✅ Simulation ran successfully! ✅✅") + except Exception as e: + logger.error(f"Error in uploading to S3: {e}") + logger.error("❌❌ Simulation failed! ❌❌") job_event["active"] = False job_event["failed"] = True - if not self.config.s3.off: - try: - logger.info(f"Uploading to {self.handler.config.bucket_name}") - files_to_upload = { - "pdb": protein.pdb_location, - "cpt": os.path.join( - protein.validator_directory, protein.simulation_cpt - ), - } - - location = os.path.join( - "inputs", - str(spec_version), - job_event["pdb_id"], - self.validator_hotkey_reference, - datetime.now().strftime("%Y-%m-%d_%H-%M-%S"), - ) - s3_links = {} - for file_type, file_path in files_to_upload.items(): - key = self.handler.put( - file_path=file_path, - location=location, - public=True, - ) - s3_links[file_type] = os.path.join( - self.handler.output_url, - key, - ) - - job_event["s3_links"] = s3_links - logger.success("✅✅ Simulation ran successfully! ✅✅") - except Exception as e: - logger.error(f"Error in uploading to S3: {e}") - logger.error("❌❌ Simulation failed! ❌❌") - job_event["active"] = False - job_event["failed"] = True + except Exception as e: + job_event["active"] = False + job_event["failed"] = True + logger.error(f"Error in setting up organic query: {e}") - except Exception as e: - job_event["active"] = False - job_event["failed"] = True - logger.error(f"Error in setting up organic query: {e}") + async def add_job(self, job_event: dict[str, Any]) -> bool: + """Add a job to the job store while also checking to see what uids can be assigned to the job. + If uids are not provided, then the function will sample random uids from the network. + + Args: + job_event (dict[str, Any]): parameters that are needed to make the job. + """ + start_time = time.time() + + job_event["uid_search_time"] = time.time() - start_time + + # If the job is organic, we still need to run the setup simulation to create the files needed for the job. + if job_event.get("is_organic"): + + #TODO: this is quite an ugly implementation and we need a cleaner solution, but this is fine for now. + if job_event["job_type"] == "md": + job_event = await self.setup_organic_md_job(job_event=job_event) + logger.info(f"Inserting job: {job_event['pdb_id']}") - logger.info(f"Inserting job: {job_event['pdb_id']}") try: job = self.store.upload_job( event=job_event, @@ -363,7 +375,7 @@ async def add_job(self, job_event: dict[str, Any], protein: Protein = None) -> b return False - async def add_k_synthetic_jobs(self, k: int): + async def add_k_synthetic_md_jobs(self, k: int): """Creates new synthetic jobs and assigns them to available workers. Updates DB with new records. Each "job" is an individual protein folding challenge that is distributed to the miners. @@ -645,13 +657,16 @@ async def update_jobs(self): #TODO: This will break for now if job.job_type == "md": job_event = await self.md_pipeline(job=job) - + if isinstance(job.event, str): job.event = eval(job.event) # if str, convert to dict. elif job.job_type == "dft": job_event = await self.forward_dft(job=job) + else: + raise ValueError(f"Invalid job type in update_jobs: {job.job_type}") + job.event.update(job_event) job.hotkeys = [ self.metagraph.hotkeys[uid] for uid in job.event["uids"]