diff --git a/folding/base/neuron.py b/folding/base/neuron.py index c9ee8e48..5a10245b 100644 --- a/folding/base/neuron.py +++ b/folding/base/neuron.py @@ -142,7 +142,10 @@ def remove_wandb_id(self, pdb_id: str): write_pkl(self.wandb_ids, f"{self.config.neuron.full_path}/wandb_ids.pkl", "wb") @abstractmethod - async def forward(self, synapse: bt.Synapse) -> bt.Synapse: + async def forward_md(self, synapse: bt.Synapse) -> bt.Synapse: + ... + + async def forward_dft(self, synapse: bt.Synapse) -> bt.Synapse: ... def sync(self): diff --git a/folding/experiments/sensitivity_analysis.py b/folding/experiments/sensitivity_analysis.py deleted file mode 100644 index 18c301e3..00000000 --- a/folding/experiments/sensitivity_analysis.py +++ /dev/null @@ -1,263 +0,0 @@ -### This script is used to run a sensitivity search for a protein folding simulation. -### Used for benchmarking. - -import os -import time -import wandb -from tqdm import tqdm -from folding.utils.logger import logger -from pathlib import Path -from typing import Dict, List - -import pandas as pd -from box import Box # install using pip install box -import copy - -import plotly.express as px -import openmm as mm -import openmm.app as app -import openmm.unit as unit - -from folding.base.simulation import OpenMMSimulation -from folding.utils.ops import select_random_pdb_id, load_pdb_ids -from folding.validators.hyperparameters import HyperParameters -from folding.utils.opemm_simulation_config import SimulationConfig -from folding.validators.protein import Protein -from folding.utils.reporters import ExitFileReporter, LastTwoCheckpointsReporter - -ROOT_DIR = Path(__file__).resolve().parents[2] -PDB_IDS = load_pdb_ids( - root_dir=ROOT_DIR, filename="pdb_ids.pkl" -) # TODO: Currently this is a small list of PDBs without MISSING flags. - -ROOT_PATH = Path(__file__).parent -SEED = 42 -SIMULATION_STEPS = {"nvt": 50000, "npt": 75000, "md_0_1": 100000} - - -def log_event(event: Dict): - """Log the event to the console and to the wandb logger.""" - # logger.info(f"Event: {event}") - wandb.log(event) - - -def create_wandb_run(project: str = "folding-openmm", entity: str = "macrocosmos"): - wandb.init(project=project, entity=entity) - - -def configure_commands( - state: str, - seed: int, - system_config: SimulationConfig, - pdb_obj: app.PDBFile, - output_dir: str, - CHECKPOINT_INTERVAL: int = 10000, - STATE_DATA_REPORTER_INTERVAL: int = 10, - EXIT_REPORTER_INTERVAL: int = 10, -) -> Dict[str, List[str]]: - simulation, _ = OpenMMSimulation().create_simulation( - pdb=pdb_obj, - system_config=system_config.get_config(), - seed=seed, - state=state, - ) - simulation.reporters.append( - LastTwoCheckpointsReporter( - file_prefix=f"{output_dir}/{state}", - reportInterval=CHECKPOINT_INTERVAL, - ) - ) - simulation.reporters.append( - app.StateDataReporter( - file=f"{output_dir}/{state}.log", - reportInterval=STATE_DATA_REPORTER_INTERVAL, - step=True, - potentialEnergy=True, - ) - ) - simulation.reporters.append( - ExitFileReporter( - filename=f"{output_dir}/{state}", - reportInterval=EXIT_REPORTER_INTERVAL, - file_prefix=state, - ) - ) - - return simulation - - -def create_new_challenge(pdb_id: str) -> Dict: - """Create a new challenge by sampling a random pdb_id and running a hyperparameter search - using the try_prepare_challenge function. - Args: - exclude (List): list of pdb_ids to exclude from the search - Returns: - Dict: event dictionary containing the results of the hyperparameter search - """ - - forward_start_time = time.time() - - # Perform a hyperparameter search until we find a valid configuration for the pdb - # logger.warning(f"Attempting to prepare challenge for pdb {pdb_id}") - protein, event = try_prepare_challenge(pdb_id=pdb_id) - - if event.get("validator_search_status"): - logger.success(f"✅✅ Successfully created challenge for pdb_id {pdb_id} ✅✅") - else: - # forward time if validator step fails - event["hp_search_time"] = time.time() - forward_start_time - logger.error( - f"❌❌ All hyperparameter combinations failed for pdb_id {pdb_id}.. Skipping! ❌❌" - ) - - return protein, event - - -def try_prepare_challenge(pdb_id: str) -> Dict: - """Attempts to setup a simulation environment for the specific pdb & config - Uses a stochastic sampler to find hyperparameters that are compatible with the protein - """ - - # exclude_in_hp_search = parse_config(config) - hp_sampler = HyperParameters() - - logger.info(f"Searching parameter space for pdb {pdb_id}") - - for tries in tqdm( - range(hp_sampler.TOTAL_COMBINATIONS), total=hp_sampler.TOTAL_COMBINATIONS - ): - hp_sampler_time = time.time() - - event = {"hp_tries": tries} - sampled_combination: Dict = hp_sampler.sample_hyperparameters() - - hps = { - "ff": sampled_combination["FF"], - "water": sampled_combination["WATER"], - "box": sampled_combination["BOX"], - } - - config = Box({"force_use_pdb": False}) - protein = Protein(pdb_id=pdb_id, config=config, **hps) - - try: - protein.setup_simulation() - - except Exception as e: - logger.error(f"Error occurred for pdb_id {pdb_id}: {e}") - event["validator_search_status"] = False - - finally: - event["pdb_id"] = pdb_id - event.update(hps) # add the dictionary of hyperparameters to the event - event["hp_sample_time"] = time.time() - hp_sampler_time - event["pdb_complexity"] = [dict(protein.pdb_complexity)] - event["init_energy"] = protein.init_energy - # event["epsilon"] = protein.epsilon - - if "validator_search_status" not in event: - logger.warning("✅✅ Simulation ran successfully! ✅✅") - event["validator_search_status"] = True # simulation passed! - # break out of the loop if the simulation was successful - break - if tries == 3: - logger.error(f"Max tries reached for pdb_id {pdb_id} :x::x:") - break - - return protein, event - - -def sample_pdb(exclude: List = [], pdb_id: str = None): - return pdb_id or select_random_pdb_id(PDB_IDS, exclude=exclude) - - -def extact_energies(state: str, data_directory: str): - check_log_file = pd.read_csv(os.path.join(data_directory, f"{state}.log")) - - return check_log_file["Potential Energy (kJ/mole)"].values - - -def cpt_file_mapper(output_dir: str, state: str): - if state == "nvt": - return f"{output_dir}/em.cpt" - - if "npt" in state: - state = "nvt" + state.split("npt")[-1] - - if "md" in state: - state = "npt" + state.split("md_0_1")[-1] - - return f"{output_dir}/{state}.cpt" - - -if __name__ == "__main__": - create_wandb_run(project="folding-openmm", entity="macrocosmos") - - num_experiments = 10 - temperatures = [50, 100, 200, 300, 400, 500] - - pdbs_to_exclude = [] - - while num_experiments > 0: - pdb_id = sample_pdb(exclude=pdbs_to_exclude) - - try: - protein, event = create_new_challenge(pdb_id=pdb_id) - except Exception as e: - logger.error(f"Error occurred for pdb_id {pdb_id}: {e}") - pdbs_to_exclude.append(pdb_id) - continue - - pdb_obj: app.PDBFile = protein.load_pdb_file(protein.pdb_location) - output_dir = protein.validator_directory - - for temperature in temperatures: - system_config = copy.deepcopy(protein.system_config) - system_config.temperature = float(temperature) - - for state, steps_to_run in SIMULATION_STEPS.items(): - # Creates the simulation object needed for the stage. - - temp_state = state + f"_temp_{temperature}" - - simulation = configure_commands( - state=temp_state, - seed=protein.system_config.seed, - system_config=system_config, - pdb_obj=pdb_obj, - output_dir=protein.validator_directory, - ) - - logger.info( - f"Running {state} for {steps_to_run} steps for pdb {pdb_id}" - ) - - if state == "nvt": - mapper_state = state - else: - mapper_state = temp_state - - simulation.loadCheckpoint(cpt_file_mapper(output_dir, mapper_state)) - - start_time = time.time() - simulation.step(steps_to_run) - simulation_time = time.time() - start_time - event[f"{state}_time"] = simulation_time - - energy_array = extact_energies( - state=temp_state, data_directory=protein.validator_directory - ) - event[f"{state}_energies_temp_{temperature}"] = energy_array.tolist() - - fig = px.scatter( - energy_array, - title=f"Energy array for {pdb_id} for state {state} for temperature {temperature}", - labels={"index": "Step", "value": "energy"}, - height=600, - width=1400, - ) - fig.write_image(os.path.join(output_dir, f"{mapper_state}_energy.png")) - - log_event(event) - num_experiments = num_experiments - 1 - pdbs_to_exclude.append(pdb_id) diff --git a/folding/store.py b/folding/store.py index b5690f58..5b7b3f7b 100644 --- a/folding/store.py +++ b/folding/store.py @@ -5,12 +5,12 @@ import sqlite3 import requests from queue import Queue -from typing import Dict, List +from typing import List, Annotated, Union +from pydantic import constr from dotenv import load_dotenv from datetime import datetime -import numpy as np import pandas as pd from atom.epistula.epistula import Epistula @@ -44,6 +44,7 @@ def _row_to_job(self, row) -> "Job": return None data = dict(row) + # Convert stored JSON strings back to Python objects data["hotkeys"] = json.loads(data["hotkeys"]) data["system_config"] = ( @@ -68,7 +69,7 @@ def _row_to_job(self, row) -> "Job": # Convert boolean data["active"] = bool(data["active"]) - return Job(**data) + return MDJob(**data) if data["job_type"] == "md" else DFTJob(**data) def get_queue(self, validator_hotkey: str, ready=True) -> Queue: """ @@ -90,6 +91,7 @@ def get_queue(self, validator_hotkey: str, ready=True) -> Queue: response = requests.get( f"http://{local_db_addr}/db/query", params={"q": query, "level": "strong"}, + timeout=10, ) else: response = requests.get( @@ -98,6 +100,7 @@ def get_queue(self, validator_hotkey: str, ready=True) -> Queue: "q": f"SELECT * FROM {self.table_name} WHERE active = 1 AND validator_hotkey = '{validator_hotkey}'", "level": "strong", }, + timeout=10, ) if response.status_code != 200: @@ -238,6 +241,49 @@ def __repr__(self): df = pd.read_sql_query(f"SELECT * FROM {self.table_name}", conn) return f"{self.__class__.__name__}\n{df.__repr__()}" + def _setup_and_get_job(self, event: dict, kwargs: dict) -> Union["MDJob", "DFTJob"]: + if event["job_type"] == "md": + job = MDJob( + pdb_id=event["pdb_id"], + system_config=SystemConfig( + ff=event["ff"], + box=event["box"], + water=event["water"], + system_kwargs=SystemKwargs(**event["system_kwargs"]), + ), + hotkeys=[""], + job_type=event["job_type"], + created_at=pd.Timestamp.now().floor("s"), + updated_at=pd.Timestamp.now().floor("s"), + epsilon=event["epsilon"], + s3_links=event["s3_links"], + priority=event.get("priority", 1), + update_interval=event.get( + "time_to_live", random.randint(7200, 14400) + ), # between 2 hours and 4 hours in seconds + max_time_no_improvement=event.get("max_time_no_improvement", 1), + is_organic=event.get("is_organic", False), + job_id=event.get("job_id", None), + active=event.get("active", True), + event=event, + **kwargs, + ) + + elif event["job_type"] == "dft": + job = DFTJob( + system=event["system"], + geometry=event["geometry"], + created_at=pd.Timestamp.now().floor("s"), + updated_at=pd.Timestamp.now().floor("s"), + **event, + **kwargs, + ) + + else: + raise ValueError(f"Invalid job type: {event['job_type']}") + + return job + def upload_job( self, event: dict, @@ -260,32 +306,8 @@ def upload_job( Raises: ValueError: If the job upload fails. """ - job = Job( - pdb_id=event["pdb_id"], - system_config=SystemConfig( - ff=event["ff"], - box=event["box"], - water=event["water"], - system_kwargs=SystemKwargs(**event["system_kwargs"]), - ), - hotkeys=[""], - job_type=event["job_type"], - created_at=pd.Timestamp.now().floor("s"), - updated_at=pd.Timestamp.now().floor("s"), - epsilon=event["epsilon"], - s3_links=event["s3_links"], - priority=event.get("priority", 1), - update_interval=event.get( - "time_to_live", random.randint(7200, 14400) - ), # between 2 hours and 4 hours in seconds - max_time_no_improvement=event.get("max_time_no_improvement", 1), - is_organic=event.get("is_organic", False), - job_id=event.get("job_id", None), - active=event.get("active", True), - event=event, - **kwargs, - ) + job: Union["MDJob", "DFTJob"] = self._setup_and_get_job(event=event, kwargs=kwargs) body = job.model_dump() body_bytes = self.epistula.create_message_body(body) @@ -350,9 +372,37 @@ async def monitor_db(self): return (last_log_leader - last_log_read) != 0 -class Job(JobBase): +class MDJob(JobBase): + """Job class for storing molecular dynamics job information.""" + + # MD-specific parameters + pdb_id: Annotated[str, constr(min_length=4, max_length=10)] + + # MD-specific methods + async def update(self, loss: float, hotkey: str): + """Updates the status of a job in the database. If the loss improves, the best loss, hotkey and hashes are updated.""" + self.active = False + self.best_loss = loss + self.best_loss_at = pd.Timestamp.now().floor("s") + self.best_hotkey = hotkey + self.updated_at = datetime.now() + + def get_simulation_config(self) -> dict: + """Returns the simulation configuration for this MD job.""" + return { + "ff": self.system_config.ff, + "box": self.system_config.box, + "water": self.system_config.water, + "system_kwargs": self.system_config.system_kwargs.model_dump(), + "epsilon": self.epsilon + } + +class DFTJob(JobBase): """Job class for storing job information.""" + system: str + geometry: str # in the form of f"{charge} {multiplicity}\n{body}" + async def update(self, loss: float, hotkey: str): """Updates the status of a job in the database. If the loss improves, the best loss, hotkey and hashes are updated.""" diff --git a/folding/validators/md/__init__.py b/folding/validators/md/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/folding/validators/forward.py b/folding/validators/md/forward.py similarity index 77% rename from folding/validators/forward.py rename to folding/validators/md/forward.py index 5afdf5e0..fa369bd9 100644 --- a/folding/validators/forward.py +++ b/folding/validators/md/forward.py @@ -7,12 +7,12 @@ from typing import List, Dict from async_timeout import timeout -from folding.validators.protein import Protein +from folding.validators.md.protein import Protein from folding.utils.logging import log_event import asyncio from folding.utils.openmm_forcefields import FORCEFIELD_REGISTRY -from folding.validators.hyperparameters import HyperParameters +from folding.validators.md.hyperparameters import HyperParameters from folding.utils.ops import ( load_and_sample_random_pdb_ids, OpenMMException, @@ -23,98 +23,6 @@ ROOT_DIR = Path(__file__).resolve().parents[2] -async def run_step( - self, - protein: Protein, - timeout: float, - job_type: str, - job_id: str, -) -> Dict: - start_time = time.time() - - if protein is None: - event = { - "block": self.block, - "step_length": time.time() - start_time, - "energies": [], - "active": False, - } - return event - - # Get all uids on the network that are NOT validators. - # the .is_serving flag means that the uid does not have an axon address. - uids = get_all_miner_uids( - self.metagraph, - self.config.neuron.vpermit_tao_limit, - include_serving_in_check=False, - ) - - # Get axons and hotkeys - axons_and_hotkeys = [ - (self.metagraph.axons[uid], self.metagraph.hotkeys[uid]) for uid in uids - ] - axons, hotkeys = zip(*axons_and_hotkeys) - - system_config = protein.system_config.to_dict() - system_config["seed"] = None # We don't want to pass the seed to miners. - - synapses = [ - JobSubmissionSynapse( - pdb_id=protein.pdb_id, - job_id=job_id, - presigned_url=self.handler.generate_presigned_url( - miner_hotkey=hotkey, - pdb_id=protein.pdb_id, - file_name="trajectory.dcd", - method="put_object", - expires_in=300, - ), - ) - for hotkey in hotkeys - ] - - # Make calls to the network with the prompt - this is synchronous. - logger.info("⏰ Waiting for miner responses ⏰") - responses = await asyncio.gather( - *[ - self.dendrite.call( - target_axon=axon, synapse=synapse, timeout=timeout, deserialize=True - ) - for axon, synapse in zip(axons, synapses) - ] - ) - - response_info = get_response_info(responses=responses) - - event = { - "block": self.block, - "step_length": time.time() - start_time, - "uids": uids, - "energies": [], - **response_info, - } - - energies, energy_event = await get_energies( - validator=self, - protein=protein, - responses=responses, - axons=axons, - job_id=job_id, - uids=uids, - miner_registry=self.miner_registry, - job_type=job_type, - ) - - # Log the step event. - event.update({"energies": energies.tolist(), **energy_event}) - - if len(protein.md_inputs) > 0: - event["md_inputs"] = list(protein.md_inputs.keys()) - event["md_inputs_sizes"] = list(map(len, protein.md_inputs.values())) - - return event - - def parse_config(config) -> Dict[str, str]: """ Parse config to check if key hyperparameters are set. diff --git a/folding/validators/hyperparameters.py b/folding/validators/md/hyperparameters.py similarity index 100% rename from folding/validators/hyperparameters.py rename to folding/validators/md/hyperparameters.py diff --git a/folding/validators/protein.py b/folding/validators/md/protein.py similarity index 100% rename from folding/validators/protein.py rename to folding/validators/md/protein.py diff --git a/folding/validators/reward.py b/folding/validators/md/reward.py similarity index 99% rename from folding/validators/reward.py rename to folding/validators/md/reward.py index 95164fa0..c02d0a6a 100644 --- a/folding/validators/reward.py +++ b/folding/validators/md/reward.py @@ -5,7 +5,7 @@ from collections import defaultdict from folding.utils.logger import logger from folding.utils import constants as c -from folding.validators.protein import Protein +from folding.validators.md.protein import Protein from folding.base.evaluation import BaseEvaluator from folding.protocol import JobSubmissionSynapse from folding.registries.miner_registry import MinerRegistry diff --git a/neurons/validator.py b/neurons/validator.py index 28403101..6de49e37 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -1,23 +1,26 @@ # The MIT License (MIT) # Copyright © 2024 Macrocosmos -from collections import defaultdict import os import time +import pickle import asyncio import traceback from datetime import datetime from typing import Any, Dict, List +from collections import defaultdict import netaddr import requests import torch import numpy as np import pandas as pd -from async_timeout import timeout import tenacity +import bittensor as bt +from async_timeout import timeout + from folding import __spec_version__ as spec_version import folding.utils.constants as c @@ -38,14 +41,13 @@ from folding.protocol import JobSubmissionSynapse from folding.utils.ops import get_response_info -from folding.validators.reward import run_evaluation_validation_pipeline -from folding.validators.forward import create_new_challenge -from folding.validators.protein import Protein +from folding.validators.md.reward import run_evaluation_validation_pipeline +from folding.validators.md.forward import create_new_challenge +from folding.validators.md.protein import Protein from folding.registries.miner_registry import MinerRegistry from folding.organic.api import start_organic_api_in_process from dotenv import load_dotenv -import pickle load_dotenv() @@ -121,7 +123,7 @@ def __init__(self, config=None): self._organic_api_pipe = None self._organic_api_process = None - async def run_step( + async def run_md_step( self, protein: Protein, timeout: float, @@ -218,7 +220,10 @@ async def run_step( return event - async def forward(self, job: Job) -> dict: + async def forward(self, synapse: bt.Synapse) -> bt.Synapse: + pass + + async def forward_md(self, job: Job) -> dict: """Carries out a query to the miners to check their progress on a given job (pdb) and updates the job status based on the results. Validator forward pass. Consists of: @@ -239,98 +244,113 @@ async def forward(self, job: Job) -> dict: event = {"energies": []} return event - logger.info(f"Running run_step for {protein.pdb_id}...⏳") - return await self.run_step( + logger.info(f"Running run_md_step for {protein.pdb_id}...⏳") + return await self.run_md_step( protein=protein, timeout=self.config.neuron.timeout, job_id=job.job_id, job_type=job.job_type, ) - async def add_job(self, job_event: dict[str, Any], protein: Protein = None) -> bool: - """Add a job to the job store while also checking to see what uids can be assigned to the job. - If uids are not provided, then the function will sample random uids from the network. + async def forward_dft(self, job: Job) -> dict: + pass + + async def setup_organic_md_job(self, job_event: dict[str, Any]) -> bool: + """ + Setup an organic MD job. Args: job_event (dict[str, Any]): parameters that are needed to make the job. """ - start_time = time.time() + self.config.protein.input_source = job_event["source"] + protein = Protein(**job_event, config=self.config.protein) - job_event["uid_search_time"] = time.time() - start_time + try: + job_event["pdb_id"] = job_event["pdb_id"] + job_event["job_type"] = "OrganicMD" + job_event["pdb_complexity"] = [dict(protein.pdb_complexity)] + job_event["init_energy"] = protein.init_energy + job_event["epsilon"] = protein.epsilon + job_event["s3_links"] = { + "testing": "testing" + } # overwritten below if s3 logging is on. + async with timeout(300): + logger.info( + f"setup_simulation for organic query: {job_event['pdb_id']}" + ) + await protein.setup_simulation() + logger.success( + f"✅✅ organic {job_event['pdb_id']} simulation ran successfully! ✅✅" + ) - # If the job is organic, we still need to run the setup simulation to create the files needed for the job. - if job_event.get("is_organic"): - self.config.protein.input_source = job_event["source"] - protein = Protein(**job_event, config=self.config.protein) + if protein.init_energy > 0: + logger.error( + f"Initial energy is positive: {protein.init_energy}. Simulation failed." + ) + job_event["active"] = False + job_event["failed"] = True - try: - job_event["pdb_id"] = job_event["pdb_id"] - job_event["job_type"] = "OrganicMD" - job_event["pdb_complexity"] = [dict(protein.pdb_complexity)] - job_event["init_energy"] = protein.init_energy - job_event["epsilon"] = protein.epsilon - job_event["s3_links"] = { - "testing": "testing" - } # overwritten below if s3 logging is on. - async with timeout(300): - logger.info( - f"setup_simulation for organic query: {job_event['pdb_id']}" - ) - await protein.setup_simulation() - logger.success( - f"✅✅ organic {job_event['pdb_id']} simulation ran successfully! ✅✅" + if not self.config.s3.off: + try: + logger.info(f"Uploading to {self.handler.config.bucket_name}") + files_to_upload = { + "pdb": protein.pdb_location, + "cpt": os.path.join( + protein.validator_directory, protein.simulation_cpt + ), + } + + location = os.path.join( + "inputs", + str(spec_version), + job_event["pdb_id"], + self.validator_hotkey_reference, + datetime.now().strftime("%Y-%m-%d_%H-%M-%S"), ) + s3_links = {} + for file_type, file_path in files_to_upload.items(): + key = self.handler.put( + file_path=file_path, + location=location, + public=True, + ) + s3_links[file_type] = os.path.join( + self.handler.output_url, + key, + ) - if protein.init_energy > 0: - logger.error( - f"Initial energy is positive: {protein.init_energy}. Simulation failed." - ) + job_event["s3_links"] = s3_links + logger.success("✅✅ Simulation ran successfully! ✅✅") + except Exception as e: + logger.error(f"Error in uploading to S3: {e}") + logger.error("❌❌ Simulation failed! ❌❌") job_event["active"] = False job_event["failed"] = True - if not self.config.s3.off: - try: - logger.info(f"Uploading to {self.handler.config.bucket_name}") - files_to_upload = { - "pdb": protein.pdb_location, - "cpt": os.path.join( - protein.validator_directory, protein.simulation_cpt - ), - } - - location = os.path.join( - "inputs", - str(spec_version), - job_event["pdb_id"], - self.validator_hotkey_reference, - datetime.now().strftime("%Y-%m-%d_%H-%M-%S"), - ) - s3_links = {} - for file_type, file_path in files_to_upload.items(): - key = self.handler.put( - file_path=file_path, - location=location, - public=True, - ) - s3_links[file_type] = os.path.join( - self.handler.output_url, - key, - ) - - job_event["s3_links"] = s3_links - logger.success("✅✅ Simulation ran successfully! ✅✅") - except Exception as e: - logger.error(f"Error in uploading to S3: {e}") - logger.error("❌❌ Simulation failed! ❌❌") - job_event["active"] = False - job_event["failed"] = True + except Exception as e: + job_event["active"] = False + job_event["failed"] = True + logger.error(f"Error in setting up organic query: {e}") - except Exception as e: - job_event["active"] = False - job_event["failed"] = True - logger.error(f"Error in setting up organic query: {e}") + async def add_job(self, job_event: dict[str, Any]) -> bool: + """Add a job to the job store while also checking to see what uids can be assigned to the job. + If uids are not provided, then the function will sample random uids from the network. + + Args: + job_event (dict[str, Any]): parameters that are needed to make the job. + """ + start_time = time.time() + + job_event["uid_search_time"] = time.time() - start_time + + # If the job is organic, we still need to run the setup simulation to create the files needed for the job. + if job_event.get("is_organic"): + + #TODO: this is quite an ugly implementation and we need a cleaner solution, but this is fine for now. + if job_event["job_type"] == "md": + job_event = await self.setup_organic_md_job(job_event=job_event) + logger.info(f"Inserting job: {job_event['pdb_id']}") - logger.info(f"Inserting job: {job_event['pdb_id']}") try: job = self.store.upload_job( event=job_event, @@ -355,7 +375,7 @@ async def add_job(self, job_event: dict[str, Any], protein: Protein = None) -> b return False - async def add_k_synthetic_jobs(self, k: int): + async def add_k_synthetic_md_jobs(self, k: int): """Creates new synthetic jobs and assigns them to available workers. Updates DB with new records. Each "job" is an individual protein folding challenge that is distributed to the miners. @@ -602,6 +622,23 @@ async def create_synthetic_jobs(self): await asyncio.sleep(self.config.neuron.synthetic_job_interval) + async def md_pipeline(self, job: Job): + # Here we straightforwardly query the workers associated with each job and update the jobs accordingly + job_event = await self.forward_md(job=job) + + # If we don't have any miners reply to the query, we will make it inactive. + if len(job_event["energies"]) == 0: + job.active = False + self.store.update_gjp_job( + job=job, + gjp_address=self.config.neuron.gjp_address, + keypair=self.wallet.hotkey, + job_id=job.job_id, + ) + return + + await self.update_job(job=job) + async def update_jobs(self): """ Updates the jobs in the queue. @@ -616,23 +653,20 @@ async def update_jobs(self): for job in self.store.get_queue( ready=True, validator_hotkey=self.wallet.hotkey.ss58_address ).queue: - # Here we straightforwardly query the workers associated with each job and update the jobs accordingly - job_event = await self.forward(job=job) - - # If we don't have any miners reply to the query, we will make it inactive. - if len(job_event["energies"]) == 0: - job.active = False - self.store.update_gjp_job( - job=job, - gjp_address=self.config.neuron.gjp_address, - keypair=self.wallet.hotkey, - job_id=job.job_id, - ) - continue + + #TODO: This will break for now + if job.job_type == "md": + job_event = await self.md_pipeline(job=job) + + if isinstance(job.event, str): + job.event = eval(job.event) # if str, convert to dict. - if isinstance(job.event, str): - job.event = eval(job.event) # if str, convert to dict. + elif job.job_type == "dft": + job_event = await self.forward_dft(job=job) + else: + raise ValueError(f"Invalid job type in update_jobs: {job.job_type}") + job.event.update(job_event) job.hotkeys = [ self.metagraph.hotkeys[uid] for uid in job.event["uids"] @@ -640,6 +674,7 @@ async def update_jobs(self): # Determine the status of the job based on the current energy and the previous values (early stopping) # Update the DB with the current status await self.update_job(job=job) + logger.info(f"step({self.step}) block({self.block})") except Exception as e: diff --git a/tests/test_hyperparameter_sampling.py b/tests/test_hyperparameter_sampling.py index 2d3051a2..824a11a3 100644 --- a/tests/test_hyperparameter_sampling.py +++ b/tests/test_hyperparameter_sampling.py @@ -1,7 +1,7 @@ import pytest from folding.utils.openmm_forcefields import FORCEFIELD_REGISTRY -from folding.validators.hyperparameters import HyperParameters +from folding.validators.md.hyperparameters import HyperParameters BOX = ["cubic", "dodecahedron", "octahedron"] diff --git a/tests/test_protein.py b/tests/test_protein.py deleted file mode 100644 index c94306d4..00000000 --- a/tests/test_protein.py +++ /dev/null @@ -1,29 +0,0 @@ -import pytest - -import os -from collections import defaultdict -from pathlib import Path -from folding.utils.ops import gro_hash -from folding.validators.protein import Protein - -ROOT_PATH = Path(__file__).parent - - -results = { - "1fjs": defaultdict( - int, {"REMARK": 356, "ATOM": 2236, "HETATM": 239, "CONECT": 98} - ), - "1ubq": defaultdict(int, {"REMARK": 260, "ATOM": 602, "HETATM": 58}), -} - - -@pytest.mark.parametrize( - "pdb_file, expected", - [ - (os.path.join(ROOT_PATH, "fixtures/pdb_files", f"{pdb}.pdb"), result) - for pdb, result in results.items() - ], -) -def test_pdb_complexity(pdb_file, expected): - record_counts = Protein._get_pdb_complexity(pdb_file) - assert record_counts == expected, "record counts do not match expected values"