From a95e615236be960f4dd85ae80597afdd8d4c1d34 Mon Sep 17 00:00:00 2001 From: mccrindlebrian Date: Tue, 15 Apr 2025 17:38:49 -0500 Subject: [PATCH 1/2] create parsers --- folding/miners/dft_miner.py | 28 ++++++++++++ folding/protocol.py | 7 +++ folding/utils/ops.py | 65 ++++++++++++++++++++++++++++ folding/validators/forward.py | 80 ++++++++++++++++++++--------------- 4 files changed, 146 insertions(+), 34 deletions(-) create mode 100644 folding/miners/dft_miner.py diff --git a/folding/miners/dft_miner.py b/folding/miners/dft_miner.py new file mode 100644 index 00000000..47c0b96c --- /dev/null +++ b/folding/miners/dft_miner.py @@ -0,0 +1,28 @@ +import os +import time +import glob +import copy +import json +import base64 +import random +import hashlib +import requests +import traceback +import concurrent.futures +import asyncio + +import pandas as pd +from collections import defaultdict +from typing import Dict, List, Tuple, Any + +from folding.miners.folding_miner import FoldingMiner + +import psi4 + + +class DFTMiner(FoldingMiner): + def __init__(self, config=None): + super().__init__(config) + + def dft_forward(self, synapse: JobSubmissionSynapse) -> JobSubmissionSynapse: + return synapse diff --git a/folding/protocol.py b/folding/protocol.py index 35564ad5..99c4c44f 100644 --- a/folding/protocol.py +++ b/folding/protocol.py @@ -152,3 +152,10 @@ def deserialize(self) -> int: self.cpt_files = cpt_files return self + + +class DFTJobSubmissionSynapse(bt.Synapse): + """A synapse for submission of DFT jobs.""" + + job_id: typing.Optional[str] = None + energy: typing.Optional[float] = None diff --git a/folding/utils/ops.py b/folding/utils/ops.py index 7a36d8ef..864f39f6 100644 --- a/folding/utils/ops.py +++ b/folding/utils/ops.py @@ -464,3 +464,68 @@ def check_uniqueness(vectors, tol=0.01): if are_vectors_too_similar(vectors_np[i], vectors_np[j], tol): return False return True + + +def parse_custom_xyz(file_path: str) -> Dict[str, Any]: + """Parse a custom xyz file. + + Args: + file_path (str): The path to the xyz file. + + Returns: + Dict[str, Any]: A dictionary containing the parsed data. + """ + with open(file_path, "r", encoding="utf-8") as f: + lines = [line.strip() for line in f if line.strip()] + + # First line: number of atoms + num_atoms = int(lines[0]) + + # Second line: metadata (tab-separated floats and maybe strings) + metadata_line = lines[1].split("\t") + metadata = [ + float(x) if x.replace(".", "", 1).replace("-", "", 1).isdigit() else x + for x in metadata_line + ] + + # Atom block + atom_lines = lines[2 : 2 + num_atoms] + atom_data = [] + for line in atom_lines: + parts = line.split() + atom_data.append( + { + "element": parts[0], + "x": float(parts[1]), + "y": float(parts[2]), + "z": float(parts[3]), + "charge": float(parts[4]), + } + ) + + atoms_df = pd.DataFrame(atom_data) + + # Line after atoms: float properties (frequencies?) + properties = list(map(float, lines[2 + num_atoms].split())) + + # SMILES line + smiles = lines[3 + num_atoms].split() + + # InChI line + inchis = lines[4 + num_atoms].split() + + return { + "num_atoms": num_atoms, + "metadata": metadata, + "atoms": atoms_df, + "properties": properties, + "smiles": smiles, + "inchis": inchis, + } + + +def to_psi4_geometry_string(array, net_charge=0, spin_multiplicity=1): + """Convert an array of atoms to a psi4 geometry string.""" + lines = [f"{row[0]} {row[1]} {row[2]} {row[3]}" for row in array] + body = "\n".join(lines) + return f"{net_charge} {spin_multiplicity}\n{body}" diff --git a/folding/validators/forward.py b/folding/validators/forward.py index 73645623..6b564c0b 100644 --- a/folding/validators/forward.py +++ b/folding/validators/forward.py @@ -25,6 +25,7 @@ ROOT_DIR = Path(__file__).resolve().parents[2] + async def run_step( self, protein: Protein, @@ -120,7 +121,7 @@ def parse_config(config) -> Dict[str, str]: # TODO: We need to be able to create a bunch of different challenges. -async def create_new_challenge(self, exclude: List) -> Dict: +async def create_new_challenge(self, job_type: str, exclude: List) -> Dict: """Create a new challenge by sampling a random pdb_id and running a hyperparameter search using the try_prepare_md_challenge function. @@ -130,40 +131,46 @@ async def create_new_challenge(self, exclude: List) -> Dict: Returns: Dict: event dictionary containing the results of the hyperparameter search """ - while True: - forward_start_time = time.time() - if self.RSYNC_EXCEPTION_COUNT > 10: - self.config.protein.pdb_id = None - self.config.protein.input_source = "rcsb" - - if self.config.protein.pdb_id is not None: - pdb_id = self.config.protein.pdb_id - else: - pdb_id, input_source = load_and_sample_random_pdb_ids( - root_dir=ROOT_DIR, - filename="pdb_ids.pkl", - input_source=self.config.protein.input_source, - exclude=exclude, - ) - self.config.protein.input_source = input_source - - # Perform a hyperparameter search until we find a valid configuration for the pdb - logger.info(f"Attempting to prepare challenge for pdb {pdb_id}") - event = await try_prepare_md_challenge(self, config=self.config, pdb_id=pdb_id) - event["input_source"] = self.config.protein.input_source - - if event.get("validator_search_status"): - return event - else: - # forward time if validator step fails - event["hp_search_time"] = time.time() - forward_start_time - - # only log the event if the simulation was not successful - log_event(self, event, failed=True) - logger.debug( - f"❌❌ All hyperparameter combinations failed for pdb_id {pdb_id}.. Skipping! ❌❌" + if job_type == "MD": + while True: + forward_start_time = time.time() + if self.RSYNC_EXCEPTION_COUNT > 10: + self.config.protein.pdb_id = None + self.config.protein.input_source = "rcsb" + + if self.config.protein.pdb_id is not None: + pdb_id = self.config.protein.pdb_id + else: + pdb_id, input_source = load_and_sample_random_pdb_ids( + root_dir=ROOT_DIR, + filename="pdb_ids.pkl", + input_source=self.config.protein.input_source, + exclude=exclude, + ) + self.config.protein.input_source = input_source + + # Perform a hyperparameter search until we find a valid configuration for the pdb + logger.info(f"Attempting to prepare challenge for pdb {pdb_id}") + event = await try_prepare_md_challenge( + self, config=self.config, pdb_id=pdb_id ) - exclude.append(pdb_id) + event["input_source"] = self.config.protein.input_source + + if event.get("validator_search_status"): + return event + else: + # forward time if validator step fails + event["hp_search_time"] = time.time() - forward_start_time + + # only log the event if the simulation was not successful + log_event(self, event, failed=True) + logger.debug( + f"❌❌ All hyperparameter combinations failed for pdb_id {pdb_id}.. Skipping! ❌❌" + ) + exclude.append(pdb_id) + + elif job_type == "DFT": + event = await try_prepare_dft_challenge(self, config=self.config, pdb_id=pdb_id) def create_random_modifications_to_system_config(config) -> Dict: @@ -303,3 +310,8 @@ async def try_prepare_md_challenge(self, config, pdb_id: str) -> Dict: return event return event + + +async def try_prepare_dft_challenge(self) -> Dict: + """ """ + logger.info(f"Searching parameter space for pdb {pdb_id}") From cff56550bdd8d1541dcca71c08fba503c82ae962 Mon Sep 17 00:00:00 2001 From: mccrindlebrian Date: Tue, 15 Apr 2025 18:09:46 -0500 Subject: [PATCH 2/2] fix forward pass --- folding/protocol.py | 7 +++- folding/validators/forward.py | 69 +++++++++++++++++++++++++++++++---- neurons/validator.py | 31 ++++++++++++---- 3 files changed, 91 insertions(+), 16 deletions(-) diff --git a/folding/protocol.py b/folding/protocol.py index 99c4c44f..6e45a822 100644 --- a/folding/protocol.py +++ b/folding/protocol.py @@ -157,5 +157,8 @@ def deserialize(self) -> int: class DFTJobSubmissionSynapse(bt.Synapse): """A synapse for submission of DFT jobs.""" - job_id: typing.Optional[str] = None - energy: typing.Optional[float] = None + job_id: str + geometry: str + + def deserialize(self) -> int: + return self diff --git a/folding/validators/forward.py b/folding/validators/forward.py index 6b564c0b..4020ab60 100644 --- a/folding/validators/forward.py +++ b/folding/validators/forward.py @@ -1,16 +1,17 @@ +import os import time +import random import numpy as np from tqdm import tqdm from pathlib import Path from typing import List, Dict -from collections import defaultdict from async_timeout import timeout from folding.utils.s3_utils import upload_to_s3 from folding.validators.protein import Protein from folding.utils.logging import log_event from folding.validators.reward import get_energies -from folding.protocol import JobSubmissionSynapse +from folding.protocol import JobSubmissionSynapse, DFTJobSubmissionSynapse import asyncio from folding.utils.openmm_forcefields import FORCEFIELD_REGISTRY from folding.validators.hyperparameters import HyperParameters @@ -19,13 +20,43 @@ get_response_info, OpenMMException, RsyncException, + to_psi4_geometry_string, + parse_custom_xyz, ) from folding.utils.logger import logger from folding.utils.uids import get_all_miner_uids + ROOT_DIR = Path(__file__).resolve().parents[2] +async def run_dft_step(self, job_event: Dict): + """ + Send a DFT job to the miners and return the response. + """ + uids = get_all_miner_uids( + self.metagraph, + self.config.neuron.vpermit_tao_limit, + include_serving_in_check=False, + ) + + axons = [self.metagraph.axons[uid] for uid in uids] + synapse = DFTJobSubmissionSynapse( + job_id=job_event["job_id"], + geometry=job_event["geometry"], + ) + + responses: List[DFTJobSubmissionSynapse] = await self.dendrite.forward( + axons=axons, + synapse=synapse, + timeout=10, + deserialize=True, # decodes the bytestream response inside of md_outputs. + ) + + response_info = get_response_info(responses=responses) + return response_info + + async def run_step( self, protein: Protein, @@ -131,7 +162,7 @@ async def create_new_challenge(self, job_type: str, exclude: List) -> Dict: Returns: Dict: event dictionary containing the results of the hyperparameter search """ - if job_type == "MD": + if job_type == "md": while True: forward_start_time = time.time() if self.RSYNC_EXCEPTION_COUNT > 10: @@ -169,8 +200,14 @@ async def create_new_challenge(self, job_type: str, exclude: List) -> Dict: ) exclude.append(pdb_id) - elif job_type == "DFT": - event = await try_prepare_dft_challenge(self, config=self.config, pdb_id=pdb_id) + elif job_type == "dft": + while True: + try: + event = await try_prepare_dft_challenge(self) + return event + except Exception as e: + logger.error(f"Error preparing DFT challenge: {e}") + continue def create_random_modifications_to_system_config(config) -> Dict: @@ -313,5 +350,23 @@ async def try_prepare_md_challenge(self, config, pdb_id: str) -> Dict: async def try_prepare_dft_challenge(self) -> Dict: - """ """ - logger.info(f"Searching parameter space for pdb {pdb_id}") + """Attempts to prepare a DFT challenge by sampling a random element from the dataset.""" + path = os.path.join(ROOT_DIR, "dsgdb9nsd.xyz") + + # Ideally, we are going to ping the endpoint ('https://springernature.figshare.com/ndownloader/files/3195389') + elements = [] # This is going to be ~130,000 elements + for file in os.listdir(path): + if file.endswith(".xyz"): + elements.append(file) + + # randomly sample a single element from the list + element = random.choice(elements) + + # parse the element + parsed_data = parse_custom_xyz(os.path.join(path, element)) + geometry = to_psi4_geometry_string(parsed_data["atoms"].to_numpy()) + + return { + "element": element, + "geometry": geometry, + } diff --git a/neurons/validator.py b/neurons/validator.py index 26ec1927..9838c4f1 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -35,7 +35,7 @@ upload_to_s3, DigitalOceanS3Handler, ) -from folding.validators.forward import create_new_challenge, run_step +from folding.validators.forward import create_new_challenge, run_step, run_dft_step from folding.validators.protein import Protein from folding.registries.miner_registry import MinerRegistry from folding.organic.api import start_organic_api @@ -196,12 +196,13 @@ async def add_job(self, job_event: dict[str, Any], protein: Protein = None) -> b return False - async def add_k_synthetic_jobs(self, k: int): + async def add_k_synthetic_jobs(self, k: int, job_type: str): """Creates new synthetic jobs and assigns them to available workers. Updates DB with new records. Each "job" is an individual protein folding challenge that is distributed to the miners. Args: k (int): The number of jobs create and distribute to miners. + job_type (str): The type of job to create. options are "md" or "dft" """ # Deploy K number of unique pdb jobs, where each job gets distributed to self.config.neuron.sample_size miners @@ -210,9 +211,15 @@ async def add_k_synthetic_jobs(self, k: int): # This will change on each loop since we are submitting a new pdb to the batch of miners exclude_pdbs = self.store.get_all_pdbs() - job_event: Dict = await create_new_challenge(self, exclude=exclude_pdbs) + job_event: Dict = await create_new_challenge( + self, exclude=exclude_pdbs, job_type=job_type + ) - await self.add_job(job_event=job_event) + if job_type == "md": + await self.add_job(job_event=job_event) + elif job_type == "dft": + # TODO: Need to figure this out..... How to add to queue? + return job_event await asyncio.sleep(0.01) async def update_job(self, job: Job): @@ -370,7 +377,7 @@ async def prepare_event_for_logging(event: Dict): if protein is not None and job.active is False: protein.remove_pdb_directory() - async def create_synthetic_jobs(self): + async def create_synthetic_md_jobs(self): """ Creates jobs and adds them to the queue. """ @@ -396,7 +403,7 @@ async def create_synthetic_jobs(self): # We also assign the pdb to a group of workers (miners), based on their workloads await self.add_k_synthetic_jobs( - k=self.config.neuron.queue_size - queue.qsize() + k=self.config.neuron.queue_size - queue.qsize(), job_type="md" ) logger.info( @@ -412,6 +419,15 @@ async def create_synthetic_jobs(self): await asyncio.sleep(self.config.neuron.synthetic_job_interval) + async def create_synthetic_dft_jobs(self): + """ + Creates DFT jobs and push them to miners. + """ + job_event: Dict = await self.add_k_synthetic_jobs(k=1, job_type="dft") + + job_event = await run_dft_step(self, job_event=job_event) + await asyncio.sleep(self.config.neuron.synthetic_job_interval) + async def update_jobs(self): """ Updates the jobs in the queue. @@ -587,7 +603,8 @@ async def __aenter__(self): self.loop.create_task(self.sync_loop()) self.loop.create_task(self.update_jobs()) - self.loop.create_task(self.create_synthetic_jobs()) + self.loop.create_task(self.create_synthetic_md_jobs()) + self.loop.create_task(self.create_synthetic_dft_jobs()) self.loop.create_task(self.reward_loop()) self.loop.create_task(self.monitor_db()) if self.config.neuron.organic_enabled: