diff --git a/.vscode/settings.json b/.vscode/settings.json index 13974492..f4cbc948 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -3,7 +3,7 @@ "editor.defaultFormatter": "ms-python.black-formatter", "editor.formatOnSave": true, "editor.codeActionsOnSave": { - "source.organizeImports": true + "source.organizeImports": "explicit" } }, "isort.args": [ diff --git a/integration_tests/cc_stack_test.py b/integration_tests/cc_stack_test.py index e714a46d..acd23eaa 100644 --- a/integration_tests/cc_stack_test.py +++ b/integration_tests/cc_stack_test.py @@ -8,16 +8,16 @@ cross_correlate, stack_cross_correlations, ) -from noisepy.seis.channelcatalog import ( +from noisepy.seis.io.channelcatalog import ( XMLStationChannelCatalog, # Required stationXML handling object ) -from noisepy.seis.datatypes import ( # Main configuration object +from noisepy.seis.io.datatypes import ( # Main configuration object CCMethod, ConfigParameters, StackMethod, ) -from noisepy.seis.numpystore import NumpyCCStore, NumpyStackStore -from noisepy.seis.scedc_s3store import ( # Object to query SCEDC data from on S3 +from noisepy.seis.io.numpystore import NumpyCCStore, NumpyStackStore +from noisepy.seis.io.scedc_s3store import ( # Object to query SCEDC data from on S3 SCEDCS3DataStore, channel_filter, ) diff --git a/pyproject.toml b/pyproject.toml index 01d3085f..a6ddd712 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,6 +50,7 @@ dependencies = [ "PyYAML==6.0", "pydantic-yaml==1.0", "psutil>=5.9.5,<6.0.0", + "noisepy-seis-io", ] diff --git a/script/test_whiten.py b/script/test_whiten.py index 49b08d35..acbb9407 100644 --- a/script/test_whiten.py +++ b/script/test_whiten.py @@ -3,7 +3,7 @@ import scipy from scipy.fftpack import next_fast_len -from noisepy.seis.datatypes import FreqNorm +from noisepy.seis.io.datatypes import FreqNorm from noisepy.seis.noise_module import moving_ave, whiten diff --git a/script/write_speed/write.py b/script/write_speed/write.py index 7f7f2d98..178601d8 100644 --- a/script/write_speed/write.py +++ b/script/write_speed/write.py @@ -13,8 +13,8 @@ import pyasdf import zarr +from noisepy.seis.io.utils import fs_join, get_filesystem from noisepy.seis.tiledb import _TileDBHelper -from noisepy.seis.utils import fs_join, get_filesystem logger = logging.getLogger(__name__) s3_path = "s3:///write_speed/" diff --git a/src/noisepy/seis/S0B_to_ASDF_2019.py b/src/noisepy/seis/S0B_to_ASDF_2019.py index 1925d472..1363ffd0 100644 --- a/src/noisepy/seis/S0B_to_ASDF_2019.py +++ b/src/noisepy/seis/S0B_to_ASDF_2019.py @@ -9,8 +9,9 @@ import pyasdf from mpi4py import MPI +from noisepy.seis.io.datatypes import RmResp + from . import noise_module -from .datatypes import RmResp if not sys.warnoptions: import warnings diff --git a/src/noisepy/seis/asdfstore.py b/src/noisepy/seis/asdfstore.py deleted file mode 100644 index 13021bcf..00000000 --- a/src/noisepy/seis/asdfstore.py +++ /dev/null @@ -1,264 +0,0 @@ -import glob -import logging -import os -from pathlib import Path -from typing import Callable, Dict, Generic, List, Optional, Set, Tuple, TypeVar - -import numpy as np -import obspy -import pyasdf -from datetimerange import DateTimeRange - -from . import noise_module -from .constants import PROGRESS_DATATYPE -from .datatypes import ( - Channel, - ChannelData, - ChannelType, - CrossCorrelation, - Stack, - Station, -) -from .stores import ( - CrossCorrelationDataStore, - RawDataStore, - StackStore, - parse_station_pair, - parse_timespan, - timespan_str, -) - -logger = logging.getLogger(__name__) - -T = TypeVar("T") - - -class ASDFDirectory(Generic[T]): - """ - Utility class used byt ASDFRawDataStore and ASDFCCStore to provide easy access - to a set of ASDF files in a directory that follow a specific naming convention. - The files are named after a generic type T (e.g. a timestamp or a pair of stations) - so the constructor takes two functions to map between the type T and the corresponding - file name. - """ - - def __init__( - self, directory: str, mode: str, get_filename: Callable[[T], str], parse_filename: Callable[[str], T] - ) -> None: - if mode not in ["a", "r"]: - raise ValueError(f"Invalid mode {mode}. Must be 'a' or 'r'") - - self.directory = directory - self.mode = mode - self.get_filename = get_filename - self.parse_filename = parse_filename - - def __getitem__(self, key: T) -> pyasdf.ASDFDataSet: - return self._get_dataset(key, self.mode) - - def _get_dataset(self, key: T, mode: str) -> pyasdf.ASDFDataSet: - file_name = self.get_filename(key) - file_path = os.path.join(self.directory, file_name) - return _get_dataset(file_path, mode) - - def get_keys(self) -> List[T]: - h5files = sorted(glob.glob(os.path.join(self.directory, "**/*.h5"), recursive=True)) - return list(map(self.parse_filename, h5files)) - - def contains(self, key: T, data_type: str, path: str = None): - # contains is always a read - ccf_ds = self._get_dataset(key, "r") - - if not ccf_ds: - return False - with ccf_ds: - # source-receiver pair - exists = data_type in ccf_ds.auxiliary_data - if path is not None and exists: - return path in ccf_ds.auxiliary_data[data_type] - return exists - - def add_aux_data(self, key: T, params: Dict, data_type: str, path: str, data: np.ndarray): - with self[key] as ccf_ds: - ccf_ds.add_auxiliary_data(data=data, data_type=data_type, path=path, parameters=params) - - -class ASDFRawDataStore(RawDataStore): - """ - A data store implementation to read from a directory of ASDF files. Each file is considered - a timespan with the naming convention: 2019_02_01_00_00_00T2019_02_02_00_00_00.h5 - """ - - def __init__(self, directory: str, mode: str = "r"): - super().__init__() - self.datasets = ASDFDirectory(directory, mode, _filename_from_timespan, parse_timespan) - - def get_channels(self, timespan: DateTimeRange) -> List[Channel]: - with self.datasets[timespan] as ds: - stations = [self._create_station(timespan, sta) for sta in ds.waveforms.list() if sta is not None] - channels = [ - Channel(ChannelType(tag), sta) for sta in stations for tag in ds.waveforms[str(sta)].get_waveform_tags() - ] - return channels - - def get_timespans(self) -> List[DateTimeRange]: - return self.datasets.get_keys() - - def read_data(self, timespan: DateTimeRange, chan: Channel) -> ChannelData: - with self.datasets[timespan] as ds: - stream = ds.waveforms[str(chan.station)][str(chan.type)] - return ChannelData(stream) - - def get_inventory(self, timespan: DateTimeRange, station: Station) -> obspy.Inventory: - with self.datasets[timespan] as ds: - return ds.waveforms[str(station)]["StationXML"] - - def _create_station(self, timespan: DateTimeRange, name: str) -> Optional[Station]: - # What should we do if there's no StationXML? - try: - with self.datasets[timespan] as ds: - inventory = ds.waveforms[name]["StationXML"] - sta, net, lon, lat, elv, loc = noise_module.sta_info_from_inv(inventory) - return Station(net, sta, lat, lon, elv, loc) - except Exception as e: - logger.warning(f"Missing StationXML for station {name}. {e}") - return None - - -class ASDFCCStore(CrossCorrelationDataStore): - def __init__(self, directory: str, mode: str = "a") -> None: - super().__init__() - Path(directory).mkdir(exist_ok=True) - self.datasets = ASDFDirectory(directory, mode, _filename_from_timespan, parse_timespan) - - # CrossCorrelationDataStore implementation - def contains(self, src: Station, rec: Station, timespan: DateTimeRange) -> bool: - station_pair = self._get_station_pair(src, rec) - contains = self.datasets.contains(timespan, station_pair) - if contains: - logger.info(f"Cross-correlation {station_pair} already exists") - return contains - - def append( - self, - timespan: DateTimeRange, - src: Station, - rec: Station, - ccs: List[CrossCorrelation], - ): - for cc in ccs: - station_pair = self._get_station_pair(src, rec) - # source-receiver pair: e.g. CI.ARV_CI.BAK - # channels, e.g. bhn_bhn - channels = self._get_channel_pair(cc.src, cc.rec) - self.datasets.add_aux_data(timespan, cc.parameters, station_pair, channels, cc.data) - - def get_timespans(self, src: Station, rec: Station) -> List[DateTimeRange]: - timespans = {} - pair_key = self._get_station_pair(src, rec) - - def visit(pairs, ts): - if pair_key in pairs: - timespans[str(ts)] = ts - - self._visit_pairs(visit) - return sorted(timespans.values(), key=lambda t: str(t)) - - def get_station_pairs(self) -> List[Tuple[Station, Station]]: - pairs_all = set() - self._visit_pairs(lambda pairs, _: pairs_all.update((parse_station_pair(p) for p in pairs))) - return list(pairs_all) - - def read(self, timespan: DateTimeRange, src_sta: Station, rec_sta: Station) -> List[CrossCorrelation]: - with self.datasets[timespan] as ccf_ds: - dtype = self._get_station_pair(src_sta, rec_sta) - if dtype not in ccf_ds.auxiliary_data: - logging.warning(f"No data available for {timespan}/{dtype}") - return [] - ccs = [] - ch_pair_paths = ccf_ds.auxiliary_data[dtype].list() - for ch_pair_path in ch_pair_paths: - src_ch, rec_ch = _parse_channel_path(ch_pair_path) - stream = ccf_ds.auxiliary_data[dtype][ch_pair_path] - ccs.append(CrossCorrelation(src_ch, rec_ch, stream.parameters, stream.data[:])) - return ccs - - def _visit_pairs(self, visitor: Callable[[Set[Tuple[str, str]], DateTimeRange], None]): - all_timespans = self.datasets.get_keys() - for timespan in all_timespans: - with self.datasets[timespan] as ccf_ds: - data = ccf_ds.auxiliary_data.list() - pairs = {p for p in data if p != PROGRESS_DATATYPE} - visitor(pairs, timespan) - - def _get_channel_pair(self, src_chan: ChannelType, rec_chan: ChannelType) -> str: - return f"{src_chan}_{rec_chan}" - - def _get_station_pair(self, src_sta: Station, rec_sta: Station) -> str: - return f"{src_sta}_{rec_sta}" - - -class ASDFStackStore(StackStore): - def __init__(self, directory: str, mode: str = "a"): - super().__init__() - self.datasets = ASDFDirectory(directory, mode, _filename_from_stations, _parse_station_pair_h5file) - - # TODO: Do we want to support storing stacks from different timespans in the same store? - def append(self, timespan: DateTimeRange, src: Station, rec: Station, stacks: List[Stack]): - for stack in stacks: - self.datasets.add_aux_data((src, rec), stack.parameters, stack.name, stack.component, stack.data) - - def get_station_pairs(self) -> List[Tuple[Station, Station]]: - return self.datasets.get_keys() - - def get_timespans(self, src: Station, rec: Station) -> List[DateTimeRange]: - # TODO: Do we want to support storing stacks from different timespans in the same store? - return [] - - def read(self, timespan: DateTimeRange, src: Station, rec: Station) -> List[Stack]: - stacks = [] - with self.datasets[(src, rec)] as ds: - for name in ds.auxiliary_data.list(): - for component in ds.auxiliary_data[name].list(): - stream = ds.auxiliary_data[name][component] - stacks.append(Stack(component, name, stream.parameters, stream.data[:])) - return stacks - - -def _get_dataset(filename: str, mode: str) -> pyasdf.ASDFDataSet: - logger.debug(f"Opening {filename}") - if os.path.exists(filename): - return pyasdf.ASDFDataSet(filename, mode=mode, mpi=False, compression=None) - elif mode == "r": - return None - else: # create new file - Path(filename).parent.mkdir(exist_ok=True, parents=True) - return pyasdf.ASDFDataSet(filename, mode=mode, mpi=False, compression=None) - - -def _filename_from_stations(pair: Tuple[Station, Station]) -> str: - return f"{pair[0]}/{pair[0]}_{pair[1]}.h5" - - -def _filename_from_timespan(timespan: DateTimeRange) -> str: - return f"{timespan_str(timespan)}.h5" - - -def _parse_station_pair_h5file(path: str) -> Tuple[Station, Station]: - pair = Path(path).stem - return parse_station_pair(pair) - - -def _parse_channel_path(path: str) -> Tuple[ChannelType, ChannelType]: - parts = path.split("_") - if len(parts) == 2: # e.g. bhn_bhn - return tuple(map(ChannelType, parts)) - elif len(parts) == 3: # when we have one location code - if parts[1].isdigit(): # e.g. bhn_00_bhn - return tuple(map(ChannelType, ["_".join(parts[0:2]), parts[2]])) - else: # e.g. bhn_bhn_00 - return tuple(map(ChannelType, [parts[0], "_".join(parts[1:3])])) - elif len(parts) == 4: # when we have two location codes: e.g. bhn_00_bhn_00 - return tuple(map(ChannelType, ["_".join(parts[0:2]), "_".join(parts[2:4])])) - else: - raise ValueError(f"Invalid channel path {path}") diff --git a/src/noisepy/seis/channel_filter_store.py b/src/noisepy/seis/channel_filter_store.py deleted file mode 100644 index 4997a5ff..00000000 --- a/src/noisepy/seis/channel_filter_store.py +++ /dev/null @@ -1,40 +0,0 @@ -from typing import List - -import obspy -from datetimerange import DateTimeRange - -from .datatypes import Channel, ChannelData, Station -from .stores import RawDataStore - - -class LocationChannelFilterStore(RawDataStore): - """ - This 'store' simply wraps another store and filters out duplicate channels that differ only - by location. It does this by keeping the channel with the 'lowest' (lexicographic) location code. - """ - - def __init__(self, store: RawDataStore): - super().__init__() - self.store = store - - def get_timespans(self) -> List[DateTimeRange]: - return self.store.get_timespans() - - def read_data(self, timespan: DateTimeRange, chan: Channel) -> ChannelData: - return self.store.read_data(timespan, chan) - - def get_inventory(self, timespan: DateTimeRange, station: Station) -> obspy.Inventory: - return self.store.get_inventory(timespan, station) - - def get_channels(self, timespan: DateTimeRange) -> List[Channel]: - channels = self.store.get_channels(timespan) - min_chans = {} - for ch in channels: - key = f"{ch.station.network}_{ch.station.name}_{ch.type.name}" - if key not in min_chans: - min_chans[key] = ch - # lexicographic comparison of location codes - # http://docs.python.org/reference/expressions.html - elif ch.type.location < min_chans[key].type.location: - min_chans[key] = ch - return list(min_chans.values()) diff --git a/src/noisepy/seis/channelcatalog.py b/src/noisepy/seis/channelcatalog.py deleted file mode 100644 index d688b611..00000000 --- a/src/noisepy/seis/channelcatalog.py +++ /dev/null @@ -1,179 +0,0 @@ -import logging -from abc import ABC, abstractmethod -from functools import lru_cache - -import diskcache as dc -import obspy -import pandas as pd -from datetimerange import DateTimeRange -from obspy import UTCDateTime, read_inventory -from obspy.clients.fdsn import Client - -from .datatypes import Channel, Station -from .utils import fs_join, get_filesystem - -logger = logging.getLogger(__name__) - - -class ChannelCatalog(ABC): - """ - An abstract catalog for getting full channel information (lat, lon, elev, resp) - """ - - def populate_from_inventory(self, inv: obspy.Inventory, ch: Channel) -> Channel: - filtered = inv.select(network=ch.station.network, station=ch.station.name, channel=ch.type.name) - if ( - len(filtered) == 0 - or len(filtered.networks[0].stations) == 0 - or len(filtered.networks[0].stations[0].channels) == 0 - ): - logger.warning(f"Could not find channel {ch} in the inventory") - return ch - - inv_chan = filtered.networks[0].stations[0].channels[0] - return Channel( - ch.type, - Station( - network=ch.station.network, - name=ch.station.name, - lat=inv_chan.latitude, - lon=inv_chan.longitude, - elevation=inv_chan.elevation, - location=ch.station.location, - ), - ) - - def get_full_channel(self, timespan: DateTimeRange, channel: Channel) -> Channel: - inv = self.get_inventory(timespan, channel.station) - return self.populate_from_inventory(inv, channel) - - @abstractmethod - def get_inventory(self, timespan: DateTimeRange, station: Station) -> obspy.Inventory: - pass - - -class XMLStationChannelCatalog(ChannelCatalog): - """ - A channel catalog that reads .XML files from a directory or an s3://... bucket url path. - """ - - def __init__(self, xmlpath: str, path_format: str = "{network}_{name}.xml", storage_options={}) -> None: - """ - Constructs a XMLStationChannelCatalog - - Args: - xmlpath (str): Base directory where to find the files - path_format (str): Format string to construct the file name from a station. - The argument names are 'network' and 'name'. - """ - super().__init__() - self.xmlpath = xmlpath - self.path_format = path_format - self.fs = get_filesystem(xmlpath, storage_options=storage_options) - if not self.fs.exists(self.xmlpath): - raise Exception(f"The XML Station file directory '{xmlpath}' doesn't exist") - - def get_inventory(self, timespan: DateTimeRange, station: Station) -> obspy.Inventory: - file_name = self.path_format.format(network=station.network, name=station.name) - xmlfile = fs_join(self.xmlpath, file_name) - return self._get_inventory_from_file(xmlfile) - - @lru_cache(maxsize=None) - def _get_inventory_from_file(self, xmlfile): - if not self.fs.exists(xmlfile): - logger.warning(f"Could not find StationXML file {xmlfile}. Returning empty Inventory()") - return obspy.Inventory() - with self.fs.open(xmlfile) as f: - logger.info(f"Reading StationXML file {xmlfile}") - return read_inventory(f) - - -class FDSNChannelCatalog(ChannelCatalog): - """ - A channel catalog that queries the FDSN service - FDSN ~ International Federation of Digital Seismograph Network - """ - - def __init__( - self, - url_key: str, - cache_dir: str, - ): - super().__init__() - self.url_key = url_key - - logger.info(f"Cache dir: ${cache_dir}") - self.cache = dc.Cache(cache_dir) - - def get_full_channel(self, timespan: DateTimeRange, channel: Channel) -> Channel: - inv = self._get_inventory(str(timespan)) - return self.populate_from_inventory(inv, channel) - - def get_inventory(self, timespan: DateTimeRange, station: Station) -> obspy.Inventory: - return self._get_inventory(str(timespan)) - - def _get_cache_key(self, ts_str: str) -> str: - return f"{self.url_key}_{ts_str}" - - @lru_cache - # pass the timestamp (DateTimeRange) as string so that the method is cacheable - # since DateTimeRange is not hasheable - def _get_inventory(self, ts_str: str) -> obspy.Inventory: - ts = DateTimeRange.from_range_text(ts_str) - key = self._get_cache_key(ts_str) - inventory = self.cache.get(key, None) - if inventory is None: - logging.info(f"Inventory not found in cache for key: '{key}'. Fetching from {self.url_key}.") - bulk_station_request = [("*", "*", "*", "*", UTCDateTime(ts.start_datetime), UTCDateTime(ts.end_datetime))] - client = Client(self.url_key) - inventory = client.get_stations_bulk(bulk_station_request, level="channel") - self.cache[key] = inventory - return inventory - - -class CSVChannelCatalog(ChannelCatalog): - """ - A channel catalog implentations that reads the station csv file - """ - - def __init__(self, file: str): - self.df = pd.read_csv(file) - - def get_full_channel(self, timespan: DateTimeRange, ch: Channel) -> Channel: - ista = self.df[self.df["station"] == ch.station.name].index.values.astype("int64")[0] - return Channel( - ch.type, - Station( - network=ch.station.network, - name=ch.station.name, - lat=self.df.iloc[ista]["latitude"], - lon=self.df.iloc[ista]["longitude"], - elevation=self.df.iloc[ista]["elevation"], - location=ch.station.location, - ), - ) - - def get_inventory(self, timespan: DateTimeRange, station: Station) -> obspy.Inventory: - # Build a obspy.Inventory from the dataframe - network_codes = list(self.df["network"].unique()) - df = self.df - nets = [] - for net in network_codes: - sta_names = list(df[df.network == net]["station"].unique()) - stations = [] - for sta in sta_names: - sta_row = df[df.network == net][df.station == sta].iloc[0] - lat = sta_row["latitude"] - lon = sta_row["longitude"] - elevation = sta_row["elevation"] - channels = [ - obspy.core.inventory.Channel(ch, "", lat, lon, elevation, 0) - for ch in df[df.network == net][df.station == sta]["channel"].values - ] - station = obspy.core.inventory.Station(sta, lat, lon, elevation, channels=channels) - stations.append(station) - nets.append(obspy.core.inventory.Network(net, stations)) - return obspy.Inventory(nets) - - -# TODO: A channel catalog that uses the files in the SCEDC S3 bucket: s3://scedc-pds/FDSNstationXML/ diff --git a/src/noisepy/seis/correlate.py b/src/noisepy/seis/correlate.py index 60cbce60..2c8e99f9 100644 --- a/src/noisepy/seis/correlate.py +++ b/src/noisepy/seis/correlate.py @@ -11,9 +11,7 @@ from datetimerange import DateTimeRange from scipy.fftpack.helper import next_fast_len -from . import noise_module -from .constants import NO_DATA_MSG -from .datatypes import ( +from noisepy.seis.io.datatypes import ( CCMethod, Channel, ChannelData, @@ -22,9 +20,12 @@ NoiseFFT, Station, ) +from noisepy.seis.io.stores import CrossCorrelationDataStore, RawDataStore +from noisepy.seis.io.utils import TimeLogger, get_results + +from . import noise_module +from .constants import NO_DATA_MSG from .scheduler import Scheduler, SingleNodeScheduler -from .stores import CrossCorrelationDataStore, RawDataStore -from .utils import TimeLogger, get_results logger = logging.getLogger(__name__) # ignore warnings diff --git a/src/noisepy/seis/datatypes.py b/src/noisepy/seis/datatypes.py deleted file mode 100644 index 05274a62..00000000 --- a/src/noisepy/seis/datatypes.py +++ /dev/null @@ -1,471 +0,0 @@ -from __future__ import annotations - -import sys -import typing -from abc import ABC, abstractmethod -from collections import defaultdict -from dataclasses import dataclass -from datetime import datetime, timezone -from enum import Enum -from typing import Any, DefaultDict, Dict, List, Optional, Tuple -from urllib.parse import urlparse - -import numpy as np -import obspy -from pydantic import BaseModel, ConfigDict, Field -from pydantic.functional_validators import model_validator -from pydantic_yaml import parse_yaml_raw_as, to_yaml_str - -from noisepy.seis.utils import get_filesystem, remove_nan_rows, remove_nans - -INVALID_COORD = -sys.float_info.max - - -@dataclass -class ChannelType: - """ - A type of channel, e.g. BHN, but not associated with a particular station - """ - - name: str - location: str = "" - - def __post_init__(self): - assert ( - len(self.name) == 3 or len(self.name) == 6 - ), "A channel type name should be length 3 (e.g. bhn) or 6 (e.g. bhn_00)" - if "_" in self.name: - parts = self.name.split("_") - self.name = parts[0] - self.location = parts[1] - - # Japanese channels use 'U' (up) for the vertical direction. Here we normalize to 'z' - if self.name[-1] == "U": - self.name = self.name.replace("U", "Z") - - def __repr__(self) -> str: - if len(self.location) > 0: - return f"{self.name}_{self.location}" - else: - return self.name - - def get_orientation(self) -> str: - if "_" in self.name: - return self.name.split("_")[0][-1] - else: - assert len(self.name) == 3 - return self.name[-1] - - -@dataclass -class Station: - network: str # 2 chars - name: str # 3-5 chars - lat: float - lon: float - elevation: float - location: str - - def __init__( - self, - network: str, - name: str, - lat: float = INVALID_COORD, - lon: float = INVALID_COORD, - elevation: float = INVALID_COORD, - location: str = "", - ): - self.network = network - self.name = name - self.lat = lat - self.lon = lon - self.elevation = elevation - self.location = location - - def parse(sta: str) -> Optional[Station]: - # Parse from: CI.ARV_CI.BAK - parts = sta.split(".") - if len(parts) != 2: - return None - return Station(parts[0], parts[1]) - - def valid(self) -> bool: - return min(self.lat, self.lon, self.elevation) > INVALID_COORD - - def __repr__(self) -> str: - return f"{self.network}.{self.name}" - - def __hash__(self) -> int: - return str(self).__hash__() - - def __eq__(self, __value: object) -> bool: - return str(self) == str(__value) - - -class CorrelationMethod(Enum): - XCORR = 1 - DECONV = 2 - - -class StackMethod(Enum): - LINEAR = "linear" - PWS = "pws" - ROBUST = "robust" - AUTO_COVARIANCE = "auto_covariance" - NROOT = "nroot" - SELECTIVE = "selective" - ALL = "all" - - -class FreqNorm(Enum): - RMA = "rma" - NO = "no" - PHASE_ONLY = "phase_only" - - -class CCMethod(str, Enum): - XCORR = "xcorr" - DECONV = "deconv" - COHERENCY = "coherency" - - -class TimeNorm(str, Enum): - NO = "no" - ONE_BIT = "one_bit" - RMA = "rma" - - -class RmResp(str, Enum): - NO = "no" - INV = "inv" - SPECTRUM = "spectrum" - POLES_ZEROS = "poleszeros" - RESP = "RESP" - - -class ConfigParameters(BaseModel): - model_config = ConfigDict(validate_default=True) - - client_url_key: str = "SCEDC" - start_date: datetime = Field(default=datetime(2019, 1, 1, tzinfo=timezone.utc)) - end_date: datetime = Field(default=datetime(2019, 1, 2, tzinfo=timezone.utc)) - samp_freq: float = Field(default=20.0) # TODO: change this samp_freq for the obspy "sampling_rate" - single_freq: bool = Field( - default=True, - description="Filter to only data sampled at ONE frequency (the closest >= to samp_freq). " - "If False, it will use all data sample at >=samp_freq", - ) - - cc_len: int = Field(default=1800, description="basic unit of data length for fft (sec)") - # download params. - # Targeted region/station information: only needed when down_list is False - lamin: float = Field(default=31.0, description="Download: minimum latitude") - lamax: float = Field(default=36.0, description="Download: maximum latitude") - lomin: float = Field(default=-122.0, description="Download: minimum longitude") - lomax: float = Field(default=-115.0, description="Download: maximum longitude") - down_list: bool = Field(default=False, description="download stations from a pre-compiled list or not") - net_list: List[str] = Field(default=["CI"], description="network list") - stations: List[str] = Field(default=["*"], description="station list") - channels: List[str] = Field(default=["BHE", "BHN", "BHZ"], description="channel list") - # pre-processing parameters - step: float = Field(default=450.0, description="overlapping between each cc_len (sec)") - freqmin: float = Field(default=0.05) - freqmax: float = Field(default=2.0) - freq_norm: FreqNorm = Field( - default=FreqNorm.RMA.value, description="choose between 'rma' for a soft whitenning or 'no' for no whitening" - ) - # TODO: change "no"for "None", and add "one_bit"as an option - # TODO: change time_norm option from "no"to "None" - time_norm: TimeNorm = Field( - default=TimeNorm.NO.value, - description="'no' for no normalization, or 'rma', 'one_bit' for normalization in time domain,", - ) - # FOR "COHERENCY"PLEASE set freq_norm to "rma", time_norm to "no"and cc_method to "xcorr" - cc_method: CCMethod = Field( - default=CCMethod.XCORR.value, description="'xcorr' for pure cross correlation, 'deconv' for deconvolution;" - ) - smooth_N: int = Field( - default=10, description="moving window length for time/freq domain normalization if selected (points)" - ) - smoothspect_N: int = Field(default=10, description="moving window length to smooth spectrum amplitude (points)") - # if substack=True, substack_len=2*cc_len, then you pre-stack every 2 correlation windows. - # For instance: substack=True, substack_len=cc_len means that you keep ALL of the correlations" - substack: bool = Field( - default=False, description="True: smaller stacks within the time chunk. False: it will stack over inc_hours" - ) - substack_len: int = Field( - default=1800, description="how long to stack over (for monitoring purpose): need to be multiples of cc_len" - ) - maxlag: int = Field(default=200, description="lags of cross-correlation to save (sec)") - inc_hours: int = Field(default=24, description="Time increment size in hours") - # criteria for data selection - max_over_std: int = Field( - default=10, - description="threahold to remove window of bad signals: set it to 10*9 if prefer not to remove them", - ) - ncomp: int = Field(default=3, description="1 or 3 component data (needed to decide whether to do rotation)") - # station/instrument info for input_fmt=='sac' or 'mseed' - stationxml: bool = Field( - default=False, description="station.XML file used to remove instrument response for SAC/miniseed data" - ) - rm_resp: RmResp = Field( - default=RmResp.INV.value, description="select 'no' to not remove response and use 'inv','spectrum'," - ) - rm_resp_out: str = Field(default="VEL", description="output location from response removal") - respdir: Optional[str] = Field(default=None, description="response directory") - # some control parameters - acorr_only: bool = Field(default=False, description="only perform auto-correlation") - xcorr_only: bool = Field(default=True, description="only perform cross-correlation or not") - # Stacking parameters: - stack_method: StackMethod = Field(default=StackMethod.LINEAR.value) - keep_substack: bool = Field(default=False, description="keep all sub-stacks in final ASDF file") - # new rotation para - rotation: bool = Field(default=True, description="rotation from E-N-Z to R-T-Z") - correction: bool = Field(default=False, description="angle correction due to mis-orientation") - correction_csv: Optional[str] = Field(default=None, description="Path to e.g. meso_angles.csv") - # 'RESP', or 'polozeros' to remove response - - storage_options: DefaultDict[str, typing.MutableMapping] = Field( - default=defaultdict(dict), - description="Storage options to pass to fsspec, keyed by protocol (local files are ''))", - ) - - stations_file: Optional[str] = Field(default=None) - - def get_storage_options(self, path: str) -> Dict[str, Any]: - """The storage options for the given path""" - url = urlparse(path) - return self.storage_options.get(url.scheme, {}) - - @property - def dt(self) -> float: - return 1.0 / self.samp_freq - - def load_stations(self, stations_list=None) -> Optional[List[str]]: - if stations_list is None and self.stations_file: - # Use get_filesystem to get the filesystem associated with the filename - fs = get_filesystem(self.stations_file, storage_options=self.storage_options) - - # Load the list from the file - with fs.open(self.stations_file, "r") as file: - self.stations = file.read().splitlines() - else: - self.stations = stations_list - - return self.stations if self.stations else None - - def save_stations(self, value: List[str]): - if self.stations_file: - fs = get_filesystem(self.stations_file, storage_options=self.storage_options) - # Save the list to the file - with fs.open(self.stations_file, "w") as file: - file.write("\n".join(value) + "\n") - # Set stations field to Empty List - # self.stations = [] - return None - - @model_validator(mode="after") - def validate(cls, m: ConfigParameters) -> ConfigParameters: - def validate_date(d: datetime, name: str): - if d.tzinfo is None: - raise ValueError(f"{name} must have a timezone") - if d.tzinfo.utcoffset(None) != timezone.utc.utcoffset(None): - raise ValueError(f"{name} must be in UTC, but is {d.tzinfo}") - - validate_date(m.start_date, "start_date") - validate_date(m.end_date, "end_date") - if m.substack_len % m.cc_len != 0: - raise ValueError(f"substack_len ({m.substack_len}) must be a multiple of cc_len ({m.cc_len})") - - if m.stations_file: - fs = get_filesystem(m.stations_file, storage_options=m.storage_options) - - # Check if the file exists - if not fs.exists(m.stations_file): - raise ValueError(f"{m.stations_file} is not a valid file path in stations_file.") - - return m - - # TODO: Remove once all uses of ConfigParameters have been converted to use strongly typed access - def __getitem__(self, key): - # Hack since pydantic model properties are nor part of the object's __dict__ - if key == "dt": - return self.dt - return self.__dict__[key] - - def save_yaml(self, filename: str): - yaml_str = to_yaml_str(self) - fs = get_filesystem(filename, storage_options=self.storage_options) - with fs.open(filename, "w") as f: - f.write(yaml_str) - - def load_yaml(filename: str, storage_options={}) -> ConfigParameters: - fs = get_filesystem(filename, storage_options=storage_options) - with fs.open(filename, "r") as f: - yaml_str = f.read() - config = parse_yaml_raw_as(ConfigParameters, yaml_str) - return config - - -@dataclass -class Channel: - """ - A channel instance belonging to a station. E.g. CI.ARV.BHN - """ - - type: ChannelType - station: Station - - def __repr__(self) -> str: - return f"{self.station}.{self.type}" - - -class ChannelData: - """ - A 1D time series of channel data - - Attributes: - data: series values - sampling_rate: In HZ - start_timestamp: Seconds since 01/01/1970 - """ - - stream: obspy.Stream - data: np.ndarray - sampling_rate: int - start_timestamp: float - - def empty() -> ChannelData: - return ChannelData(obspy.Stream([obspy.Trace(np.empty(0))])) - - def __init__(self, stream: obspy.Stream): - self.stream = stream - self.data = stream[0].data[:] - self.sampling_rate = stream[0].stats.sampling_rate - self.start_timestamp = stream[0].stats.starttime.timestamp - - -@dataclass -class NoiseFFT: - """ - Data class to hold FFT and associated data - """ - - fft: np.ndarray - std: np.ndarray - fft_time: np.ndarray - window_count: int - length: int - - -class AnnotatedData(ABC): - """ - A base class for grouping data+metdata - """ - - parameters: Dict[str, Any] - data: np.ndarray - - def __init__(self, params: Dict[str, Any], data: np.ndarray): - self.data = data - self.parameters = to_json_types(params) - - @abstractmethod - def get_metadata(self) -> Tuple: - pass - - def pack(datas: List[AnnotatedData]) -> Tuple[np.ndarray, List[Dict[str, Any]]]: - if len(datas) == 0: - raise ValueError("Cannot pack empty list of data") - # Some arrays may have different lengths, so pad them with NaNs for stacking - max_rows = max(d.data.shape[0] for d in datas) - if len(datas[0].data.shape) == 1: - padded = [ - np.pad( - d.data, - (0, max_rows - d.data.shape[0]), - mode="constant", - constant_values=np.nan, - ) - for d in datas - ] - elif len(datas[0].data.shape) == 2: - max_cols = max(d.data.shape[1] for d in datas) - padded = [ - np.pad( - d.data, - ((0, max_rows - d.data.shape[0]), (0, max_cols - d.data.shape[1])), - mode="constant", - constant_values=np.nan, - ) - for d in datas - ] - else: - raise ValueError(f"Cannot pack data with shape {datas[0].data.shape}") - - data_stack = np.stack(padded, axis=0) - json_params = [p.get_metadata() for p in datas] - return data_stack, json_params - - -class CrossCorrelation(AnnotatedData): - src: ChannelType - rec: ChannelType - - def __init__(self, src: ChannelType, rec: ChannelType, params: Dict[str, Any], data: np.ndarray): - super().__init__(params, data) - self.src = src - self.rec = rec - - def __repr__(self) -> str: - return f"{self.src}_{self.rec}/{self.data.shape}" - - def get_metadata(self) -> Tuple: - return (self.src.name, self.src.location, self.rec.name, self.rec.location, self.parameters) - - def load_instances(tuples: List[Tuple[np.ndarray, Dict[str, Any]]]) -> List[CrossCorrelation]: - return [ - CrossCorrelation(ChannelType(src, src_loc), ChannelType(rec, rec_loc), params, remove_nan_rows(data)) - for data, (src, src_loc, rec, rec_loc, params) in tuples - ] - - -class Stack(AnnotatedData): - component: str # e.g. "EE", "EN", ... - name: str # e.g. "Allstack_linear" - - def __init__(self, component: str, name: str, params: Dict[str, Any], data: np.ndarray): - super().__init__(params, data) - self.component = component - self.name = name - - def __repr__(self) -> str: - return f"{self.component}/{self.name}/{self.data.shape}" - - def get_metadata(self) -> Tuple: - return (self.component, self.name, self.parameters) - - def load_instances(tuples: List[Tuple[np.ndarray, Dict[str, Any]]]) -> List[Stack]: - return [Stack(comp, name, params, remove_nans(data)) for data, (comp, name, params) in tuples] - - -def to_json_types(params: Dict[str, Any]) -> Dict[str, Any]: - return {k: _to_json_type(v) for k, v in params.items()} - - -def _to_json_type(value: Any) -> Any: - # special case since numpy arrays are not json serializable - if isinstance(value, np.ndarray): - return list(map(_to_json_type, value)) - elif isinstance(value, np.float64) or isinstance(value, np.float32): - return float(value) - elif ( - isinstance(value, np.int64) - or isinstance(value, np.int32) - or isinstance(value, np.int16) - or isinstance(value, np.int8) - ): - return int(value) - elif isinstance(value, np.bool_): - return bool(value) - return value diff --git a/src/noisepy/seis/fdsn_download.py b/src/noisepy/seis/fdsn_download.py index 370f3b5f..53263981 100644 --- a/src/noisepy/seis/fdsn_download.py +++ b/src/noisepy/seis/fdsn_download.py @@ -15,9 +15,9 @@ import pandas as pd from concurrent.futures import ThreadPoolExecutor, as_completed -from .datatypes import ConfigParameters -from .channelcatalog import CSVChannelCatalog -from .utils import TimeLogger +from noisepy.seis.io.datatypes import ConfigParameters +from noisepy.seis.io.channelcatalog import CSVChannelCatalog +from noisepy.seis.io.utils import TimeLogger from . import noise_module from obspy.clients.fdsn import Client from obspy.clients.fdsn.header import FDSNNoDataException diff --git a/src/noisepy/seis/hierarchicalstores.py b/src/noisepy/seis/hierarchicalstores.py deleted file mode 100644 index c5bdfdfb..00000000 --- a/src/noisepy/seis/hierarchicalstores.py +++ /dev/null @@ -1,279 +0,0 @@ -import logging -import threading -from abc import ABC, abstractmethod -from bisect import bisect -from collections import defaultdict -from concurrent.futures import ThreadPoolExecutor -from datetime import datetime, timezone -from pathlib import Path -from typing import Any, Callable, Dict, Generic, List, Optional, Tuple, TypeVar - -import fsspec -import numpy as np -from datetimerange import DateTimeRange - -from noisepy.seis.utils import io_retry - -from .datatypes import AnnotatedData, Station -from .stores import timespan_str -from .utils import TimeLogger, fs_join, get_filesystem, unstack - -META_ATTR = "metadata" -VERSION_ATTR = "version" -FAKE_STA = "FAKE_STATION" - -logger = logging.getLogger(__name__) - - -class ArrayStore(ABC): - """ - An interface definition for reading and writing arrays with metadata - """ - - def __init__(self, root_dir: str, storage_options={}) -> None: - super().__init__() - self.root_dir = root_dir - self.fs = get_filesystem(root_dir, storage_options=storage_options) - - @abstractmethod - def append(self, path: str, params: Dict[str, Any], data: np.ndarray): - pass - - @abstractmethod - def read(self, path: str) -> Optional[Tuple[np.ndarray, Dict[str, Any]]]: - pass - - @abstractmethod - def parse_path(path: str) -> Optional[Tuple[str, DateTimeRange]]: - """ - Parse a full file path into a receiving station and timespan tuple - """ - - def get_fs(self) -> fsspec.AbstractFileSystem: - return self.fs - - def get_root_dir(self) -> str: - return self.root_dir - - -class PairDirectoryCache: - """ - Data structure to store the timespans for each station pair. The data is stored in a nested dictionary: - First, stations are mapped to an integer index to save memory. The first dictionary is indexed by the source station - and each entry holds a dictiory keyed by receiver station. The value of the second dictionary is a list of tuples. - Each tuple is a pair of the time delta in the timespans and a numpy array of the start times of the timespans. - E.g. - .. code-block:: python - stations_idx: {"CI.ACP": 0} - idx_stations: ["CI.ACP"] - items: - {0: {0: [(86400, np.array([1625097600, 1625184000]))]} - - We keep the timespans in a sorted list so that we can use binary search to check if a timespan is contained. A - python dictionary would be O(1) to check but use a lot more memory. Also the timestamp (np.uint32) is a lot - more compact to store than the string representation of the timespan - (e.g. "2023_07_01_00_00_00T2023_07_02_00_00_00"). - """ - - def __init__(self) -> None: - self.items: Dict[int, Dict[int, List[Tuple[int, np.ndarray]]]] = defaultdict(lambda: defaultdict(list)) - self.stations_idx: Dict[str, int] = {} - self.idx_stations: List[str] = [] - self._lock: threading.Lock = threading.Lock() - - # We need to be able to pickle this across processes when using multiprocessing - def __getstate__(self) -> object: - state = self.__dict__.copy() - del state["_lock"] - del state["items"] - return state - - def __setstate__(self, state: object) -> None: - self.__dict__.update(state) - self._lock = threading.Lock() - self.items = defaultdict(lambda: defaultdict(list)) - - def _sta_index(self, sta: str) -> int: - idx = self.stations_idx.get(sta, -1) - if idx == -1: - with self._lock: - # check again in case another thread added it before we got the lock - idx = self.stations_idx.get(sta, -1) - if idx == -1: - idx = len(self.stations_idx) - self.stations_idx[sta] = idx - self.idx_stations.append(sta) - return idx - - def get_pairs(self) -> List[Tuple[str, str]]: - return [ - (self.idx_stations[src], self.idx_stations[rec]) - for src in sorted(self.items.keys()) - for rec in sorted(self.items.get(src, {}).keys()) - ] - - def add(self, src: str, rec: str, timespans: List[DateTimeRange]): - src_idx = self._sta_index(src) - rec_idx = self._sta_index(rec) - if len(timespans) == 0: - if not self.items.get(src_idx, {}).get(rec_idx, None): - self.items[src_idx][rec_idx] = [] - return - grouped_timespans = defaultdict(list) - for t in timespans: - grouped_timespans[int(t.timedelta.total_seconds())].append(int(t.start_datetime.timestamp())) - for delta, starts in grouped_timespans.items(): - self._add(src_idx, rec_idx, delta, starts) - - def _add(self, src_idx: int, rec_idx: int, delta: int, start_ts: List[DateTimeRange]): - starts = np.array(sorted(start_ts), dtype=np.uint32) - with self._lock: - time_tuples = self.items[src_idx][rec_idx] - for t in time_tuples: - if t[0] == delta: - new_t = (delta, np.array(sorted(list(t[1]) + list(starts)), dtype=np.uint32)) - # not generally safe to modify a list while iterating over it, but we return after this - time_tuples.remove(t) - time_tuples.append(new_t) - self.items[src_idx][rec_idx] = time_tuples - return - # delta not found, so add it - self.items[src_idx][rec_idx].append((delta, starts)) - - def is_src_loaded(self, src: str) -> bool: - return self._sta_index(src) in self.items - - def contains(self, src: str, rec: str, timespan: DateTimeRange) -> bool: - time_tuples = self._get_tuples(src, rec) - if time_tuples is None: - return False - - delta = int(timespan.timedelta.total_seconds()) - start = int(timespan.start_datetime.timestamp()) - for t in time_tuples: - if t[0] == delta: - result = bisect(t[1], start) - return result != 0 and t[1][result - 1] == start - - return False - - def get_timespans(self, src: str, rec: str) -> List[DateTimeRange]: - time_tuples = self._get_tuples(src, rec) - if time_tuples is None: - return [] - - timespans = [] - for delta, timestamps in time_tuples: - for ts in timestamps: - timespans.append( - DateTimeRange( - datetime.fromtimestamp(ts, timezone.utc), - datetime.fromtimestamp(ts + delta, timezone.utc), - ) - ) - return timespans - - def _get_tuples(self, src: str, rec: str) -> List[Tuple[int, np.ndarray]]: - src_idx = self._sta_index(src) - rec_idx = self._sta_index(rec) - - with self._lock: - time_tuples = self.items.get(src_idx, {}).get(rec_idx, None) - return time_tuples - - -T = TypeVar("T", bound=AnnotatedData) - - -class HierarchicalStoreBase(Generic[T]): - """ - A CC and Stack store bases class that uses hierarchical files for storage. The directory organization is as follows: - / - src_sta/ - rec_sta/ - timespan - - The specific file format stored at the timespan level is delegated to the ArrayStore helper class. - """ - - def __init__( - self, - helper: ArrayStore, - loader_func: Callable[[List[Tuple[np.ndarray, Dict[str, Any]]]], List[T]], - ) -> None: - super().__init__() - self.helper = helper - self.dir_cache = PairDirectoryCache() - self.loader_func = loader_func - - def contains(self, src_sta: Station, rec_sta: Station, timespan: DateTimeRange) -> bool: - src = str(src_sta) - rec = str(rec_sta) - if self.dir_cache.contains(src, rec, timespan): - return True - self._load_src(src) - return self.dir_cache.contains(src, rec, timespan) - - def _load_src(self, src: str): - if self.dir_cache.is_src_loaded(src): - return - logger.info(f"Loading directory cache for {src} - ix: {self.dir_cache.stations_idx.get(src, -4)}") - paths = io_retry(self._fs_find, src) - - grouped_paths = defaultdict(list) - for rec_sta, timespan in [p for p in paths if p]: - grouped_paths[rec_sta].append(timespan) - for rec_sta, timespans in grouped_paths.items(): - self.dir_cache.add(src, rec_sta, sorted(timespans, key=lambda t: t.start_datetime.timestamp())) - # if we didn't find any paths, add a fake entry so we don't try again and is_src_loaded returns True - if len(grouped_paths) == 0: - self.dir_cache.add(src, FAKE_STA, []) - - def _fs_find(self, src): - paths = [self.helper.parse_path(p) for p in self.helper.get_fs().find(fs_join(self.helper.get_root_dir(), src))] - return paths - - def append(self, timespan: DateTimeRange, src: Station, rec: Station, data: List[T]): - path = self._get_path(src, rec, timespan) - packed_data, metadata = AnnotatedData.pack(data) - tlog = TimeLogger(logger, logging.DEBUG) - self.helper.append(path, {META_ATTR: metadata, VERSION_ATTR: 1.0}, packed_data) - tlog.log(f"writing {len(data)} arrays to {path}") - self.dir_cache.add(str(src), str(rec), [timespan]) - - def get_timespans(self, src: Station, rec: Station) -> List[DateTimeRange]: - self._load_src(str(src)) - return self.dir_cache.get_timespans(str(src), str(rec)) - - def get_station_pairs(self) -> List[Tuple[Station, Station]]: - if not self.helper.get_fs().exists(self.helper.get_root_dir()): - return [] # fsspec.ls throws if the directory doesn't exist - - def is_sta(path): - return self.helper.get_fs().isdir(path) and Station.parse(Path(path).parts[-1]) - - # ls gets us the first level - sta = self.helper.get_fs().ls(self.helper.get_root_dir()) - # make sure we have stations - sta = [s for s in sta if is_sta(s)] - # ls the second level in parallel - with ThreadPoolExecutor() as exec: - sub_ls = list(exec.map(self.helper.get_fs().ls, sta)) - - sub_ls = [item for sublist in sub_ls for item in sublist if is_sta(item)] - pairs = [(Station.parse(Path(p).parts[-2]), Station.parse(Path(p).parts[-1])) for p in sub_ls] - return [p for p in pairs if p[0] and p[1]] - - def read(self, timespan: DateTimeRange, src: Station, rec: Station) -> List[T]: - path = self._get_path(src, rec, timespan) - tuple = self.helper.read(path) - if not tuple: - return [] - array, metadata = tuple - arrays = unstack(array) - meta = metadata[META_ATTR] - tuples = list(zip(arrays, meta)) - return self.loader_func(tuples) - - def _get_path(self, src: Station, rec: Station, timespan: DateTimeRange) -> str: - return f"{src}/{rec}/{timespan_str(timespan)}" diff --git a/src/noisepy/seis/main.py b/src/noisepy/seis/main.py index 8327e351..f74ee59e 100644 --- a/src/noisepy/seis/main.py +++ b/src/noisepy/seis/main.py @@ -11,16 +11,19 @@ import dateutil.parser from datetimerange import DateTimeRange +from noisepy.seis.io.asdfstore import ASDFCCStore, ASDFRawDataStore, ASDFStackStore +from noisepy.seis.io.channel_filter_store import LocationChannelFilterStore +from noisepy.seis.io.channelcatalog import CSVChannelCatalog, XMLStationChannelCatalog +from noisepy.seis.io.datatypes import Channel, ConfigParameters +from noisepy.seis.io.numpystore import NumpyCCStore, NumpyStackStore +from noisepy.seis.io.scedc_s3store import SCEDCS3DataStore +from noisepy.seis.io.utils import fs_join, get_filesystem, io_retry +from noisepy.seis.io.zarrstore import ZarrCCStore, ZarrStackStore + from . import __version__ -from .asdfstore import ASDFCCStore, ASDFRawDataStore, ASDFStackStore -from .channel_filter_store import LocationChannelFilterStore -from .channelcatalog import CSVChannelCatalog, XMLStationChannelCatalog from .constants import CONFIG_FILE, STATION_FILE, WILD_CARD from .correlate import cross_correlate -from .datatypes import Channel, ConfigParameters from .fdsn_download import download -from .numpystore import NumpyCCStore, NumpyStackStore -from .scedc_s3store import SCEDCS3DataStore from .scheduler import ( AWSBatchArrayScheduler, MPIScheduler, @@ -28,8 +31,6 @@ SingleNodeScheduler, ) from .stack import stack_cross_correlations -from .utils import fs_join, get_filesystem, io_retry -from .zarrstore import ZarrCCStore, ZarrStackStore logger = logging.getLogger(__name__) # Utility running the different steps from the command line. Defines the arguments for each step diff --git a/src/noisepy/seis/noise_module.py b/src/noisepy/seis/noise_module.py index d18ce90c..eb7d172f 100644 --- a/src/noisepy/seis/noise_module.py +++ b/src/noisepy/seis/noise_module.py @@ -25,7 +25,7 @@ from scipy.fftpack import next_fast_len from scipy.signal import hilbert -from .datatypes import ( +from noisepy.seis.io.datatypes import ( CCMethod, ChannelData, ConfigParameters, diff --git a/src/noisepy/seis/numpystore.py b/src/noisepy/seis/numpystore.py deleted file mode 100644 index 2b719246..00000000 --- a/src/noisepy/seis/numpystore.py +++ /dev/null @@ -1,101 +0,0 @@ -import io -import json -import logging -import tarfile -from pathlib import Path -from typing import Any, Dict, Optional, Tuple - -import numpy as np -from datetimerange import DateTimeRange - -from .datatypes import CrossCorrelation, Stack, to_json_types -from .hierarchicalstores import ArrayStore, HierarchicalStoreBase -from .stores import CrossCorrelationDataStore, StackStore, parse_timespan -from .utils import fs_join - -logger = logging.getLogger(__name__) - -TAR_GZ_EXTENSION = ".tar.gz" -FILE_ARRAY_NPY = "array.npy" -FILE_PARAMS_JSON = "params.json" - - -class NumpyArrayStore(ArrayStore): - def __init__(self, root_dir: str, mode: str, storage_options={}) -> None: - super().__init__(root_dir, storage_options) - logger.info(f"store creating at {root_dir}, mode={mode}, storage_options={storage_options}") - # TODO: This needs to come in as part of the storage_options - storage_options["client_kwargs"] = {"region_name": "us-west-2"} - self.root_path = root_dir - self.storage_options = storage_options - logger.info(f"Numpy store created at {root_dir}") - - def append(self, path: str, params: Dict[str, Any], data: np.ndarray): - logger.debug(f"Appending to {path}: {data.shape}") - - params = to_json_types(params) - js = json.dumps(params) - - def add_file_bytes(tar, name, f): - f.seek(0) - ti = tarfile.TarInfo(name=name) - ti.size = f.getbuffer().nbytes - tar.addfile(ti, fileobj=f) - - dir = fs_join(self.root_path, str(Path(path).parent)) - self.get_fs().makedirs(dir, exist_ok=True) - with self.get_fs().open(fs_join(self.root_path, path + TAR_GZ_EXTENSION), "wb") as f: - with tarfile.open(fileobj=f, mode="w:gz") as tar: - with io.BytesIO() as npyf: - np.save(npyf, data, allow_pickle=False) - with io.BytesIO() as jsf: - jsf.write(js.encode("utf-8")) - - add_file_bytes(tar, FILE_ARRAY_NPY, npyf) - add_file_bytes(tar, FILE_PARAMS_JSON, jsf) - - def parse_path(self, path: str) -> Optional[Tuple[str, DateTimeRange]]: - if not path.endswith(TAR_GZ_EXTENSION): - return None - path = path.removesuffix(TAR_GZ_EXTENSION) - parts = Path(path).parts - if len(parts) < 2: - return None - ts = parse_timespan(parts[-1]) - if ts is None: - return None - return (parts[-2], ts) - - def read(self, path: str) -> Optional[Tuple[np.ndarray, Dict[str, Any]]]: - file = fs_join(self.root_path, path + TAR_GZ_EXTENSION) - if not self.get_fs().exists(file): - return None - - try: - with self.get_fs().open(file, "rb") as f: - with tarfile.open(fileobj=f, mode="r:gz") as tar: - npy_mem = tar.getmember(FILE_ARRAY_NPY) - with tar.extractfile(npy_mem) as f: - array_file = io.BytesIO() - array_file.write(f.read()) - array_file.seek(0) - array = np.load(array_file, allow_pickle=False) - params_mem = tar.getmember(FILE_PARAMS_JSON) - with tar.extractfile(params_mem) as f: - params = json.load(f) - return (array, params) - except Exception as e: - logger.error(f"Error reading {file}: {e}") - return None - - -class NumpyStackStore(HierarchicalStoreBase[Stack], StackStore): - def __init__(self, root_dir: str, mode: str = "a", storage_options={}): - super().__init__(NumpyArrayStore(root_dir, mode, storage_options=storage_options), Stack.load_instances) - - -class NumpyCCStore(HierarchicalStoreBase[CrossCorrelation], CrossCorrelationDataStore): - def __init__(self, root_dir: str, mode: str = "a", storage_options={}): - super().__init__( - NumpyArrayStore(root_dir, mode, storage_options=storage_options), CrossCorrelation.load_instances - ) diff --git a/src/noisepy/seis/plotting_modules.py b/src/noisepy/seis/plotting_modules.py index 36510a68..352594c3 100644 --- a/src/noisepy/seis/plotting_modules.py +++ b/src/noisepy/seis/plotting_modules.py @@ -11,8 +11,8 @@ from obspy.signal.filter import bandpass from scipy.fftpack import next_fast_len -from noisepy.seis.datatypes import Stack, Station -from noisepy.seis.stores import CrossCorrelationDataStore, RawDataStore +from noisepy.seis.io.datatypes import Stack, Station +from noisepy.seis.io.stores import CrossCorrelationDataStore, RawDataStore logging.getLogger("matplotlib.font_manager").disabled = True logger = logging.getLogger(__name__) diff --git a/src/noisepy/seis/pnwstore.py b/src/noisepy/seis/pnwstore.py index 10aa5453..cf42aafb 100644 --- a/src/noisepy/seis/pnwstore.py +++ b/src/noisepy/seis/pnwstore.py @@ -8,11 +8,10 @@ import obspy from datetimerange import DateTimeRange -from noisepy.seis.channelcatalog import ChannelCatalog -from noisepy.seis.stores import RawDataStore - -from .datatypes import Channel, ChannelData, ChannelType, Station -from .utils import fs_join, get_filesystem +from noisepy.seis.io.channelcatalog import ChannelCatalog +from noisepy.seis.io.datatypes import Channel, ChannelData, ChannelType, Station +from noisepy.seis.io.stores import RawDataStore +from noisepy.seis.io.utils import fs_join, get_filesystem logger = logging.getLogger(__name__) diff --git a/src/noisepy/seis/scedc_s3store.py b/src/noisepy/seis/scedc_s3store.py deleted file mode 100644 index 621e0efd..00000000 --- a/src/noisepy/seis/scedc_s3store.py +++ /dev/null @@ -1,169 +0,0 @@ -import logging -import os -import re -from collections import defaultdict -from concurrent.futures import ThreadPoolExecutor -from datetime import datetime, timedelta, timezone -from typing import Callable, List - -import obspy -from datetimerange import DateTimeRange - -from noisepy.seis.channelcatalog import ChannelCatalog -from noisepy.seis.stores import RawDataStore - -from .datatypes import Channel, ChannelData, ChannelType, Station -from .utils import TimeLogger, fs_join, get_filesystem - -logger = logging.getLogger(__name__) - - -def channel_filter(stations: List[str], ch_prefixes: str) -> Callable[[Channel], bool]: - """ - Helper function for creating a channel filter to be used in the constructor of the store. - This filter uses a list of allowed station name along with a channel filter prefix. - """ - sta_set = set(stations) - - def filter(ch: Channel) -> bool: - if sta_set == {"*"}: - return ch.type.name.lower().startswith(tuple(ch_prefixes.lower().split(","))) - else: - return ch.station.name in sta_set and ch.type.name.lower().startswith(tuple(ch_prefixes.lower().split(","))) - - return filter - - -class SCEDCS3DataStore(RawDataStore): - """ - A data store implementation to read from a directory of miniSEED (.ms) files from the SCEDC S3 bucket. - Every directory is a a day and each .ms file contains the data for a channel. - """ - - # TODO: Support reading directly from the S3 bucket - - # for checking the filename has the form: CIGMR__LHN___2022002.ms - file_re = re.compile(r".*[0-9]{7}\.ms$", re.IGNORECASE) - - def __init__( - self, - path: str, - chan_catalog: ChannelCatalog, - ch_filter: Callable[[Channel], bool] = lambda s: True, # noqa: E731 - date_range: DateTimeRange = None, - storage_options: dict = {}, - ): - """ - Parameters: - path: path to look for ms files. Can be a local file directory or an s3://... url path - chan_catalog: ChannelCatalog to retrieve inventory information for the channels - channel_filter: Function to decide whether a channel should be used or not, - if None, all channels are used - """ - super().__init__() - self.fs = get_filesystem(path, storage_options=storage_options) - self.channel_catalog = chan_catalog - self.path = path - self.paths = {} - # to store a dict of {timerange: list of channels} - self.channels = defaultdict(list) - self.ch_filter = ch_filter - if date_range is not None and date_range.start_datetime.tzinfo is None: - start_datetime = date_range.start_datetime.replace(tzinfo=timezone.utc) - end_datetime = date_range.end_datetime.replace(tzinfo=timezone.utc) - date_range = DateTimeRange(start_datetime, end_datetime) - - self.date_range = date_range - - if date_range is None: - self._load_channels(self.path, ch_filter) - - def _load_channels(self, full_path: str, ch_filter: Callable[[Channel], bool]): - tlog = TimeLogger(logger=logger, level=logging.INFO) - msfiles = [f for f in self.fs.glob(fs_join(full_path, "*.ms")) if self.file_re.match(f) is not None] - tlog.log(f"Loading {len(msfiles)} files from {full_path}") - for f in msfiles: - timespan = SCEDCS3DataStore._parse_timespan(f) - self.paths[timespan.start_datetime] = full_path - channel = SCEDCS3DataStore._parse_channel(os.path.basename(f)) - if not ch_filter(channel): - continue - key = str(timespan) # DataTimeFrame is not hashable - self.channels[key].append(channel) - tlog.log(f"Init: {len(self.channels)} timespans and {sum(len(ch) for ch in self.channels.values())} channels") - - def _ensure_channels_loaded(self, date_range: DateTimeRange): - key = str(date_range) - if key not in self.channels or date_range.start_datetime not in self.paths: - dt = date_range.end_datetime - date_range.start_datetime - for d in range(0, dt.days): - date = date_range.start_datetime + timedelta(days=d) - if self.date_range is None or date not in self.date_range: - continue - date_path = str(date.year) + "/" + str(date.year) + "_" + str(date.timetuple().tm_yday).zfill(3) + "/" - full_path = fs_join(self.path, date_path) - self._load_channels(full_path, self.ch_filter) - - def get_channels(self, date_range: DateTimeRange) -> List[Channel]: - self._ensure_channels_loaded(date_range) - tmp_channels = self.channels.get(str(date_range), []) - executor = ThreadPoolExecutor() - stations = set(map(lambda c: c.station, tmp_channels)) - _ = list(executor.map(lambda s: self.channel_catalog.get_inventory(date_range, s), stations)) - logger.info(f"Getting {len(tmp_channels)} channels for {date_range}: {tmp_channels}") - return list(executor.map(lambda c: self.channel_catalog.get_full_channel(date_range, c), tmp_channels)) - - def get_timespans(self) -> List[DateTimeRange]: - if self.date_range is not None: - days = (self.date_range.end_datetime - self.date_range.start_datetime).days - return [ - DateTimeRange( - self.date_range.start_datetime.replace(tzinfo=timezone.utc) + timedelta(days=d), - self.date_range.start_datetime.replace(tzinfo=timezone.utc) + timedelta(days=d + 1), - ) - for d in range(0, days) - ] - return list([DateTimeRange.from_range_text(d) for d in sorted(self.channels.keys())]) - - def read_data(self, timespan: DateTimeRange, chan: Channel) -> ChannelData: - self._ensure_channels_loaded(timespan) - # reconstruct the file name from the channel parameters - chan_str = ( - f"{chan.station.network}{chan.station.name.ljust(5, '_')}{chan.type.name}" - f"{chan.station.location.ljust(3, '_')}" - ) - filename = fs_join( - self.paths[timespan.start_datetime], f"{chan_str}{timespan.start_datetime.strftime('%Y%j')}.ms" - ) - if not self.fs.exists(filename): - logger.warning(f"Could not find file {filename}") - return ChannelData.empty() - - with self.fs.open(filename) as f: - stream = obspy.read(f) - data = ChannelData(stream) - return data - - def get_inventory(self, timespan: DateTimeRange, station: Station) -> obspy.Inventory: - return self.channel_catalog.get_inventory(timespan, station) - - def _parse_timespan(filename: str) -> DateTimeRange: - # The SCEDC S3 bucket stores files in the form: CIGMR__LHN___2022002.ms - year = int(filename[-10:-6]) - day = int(filename[-6:-3]) - jan1 = datetime(year, 1, 1, tzinfo=timezone.utc) - return DateTimeRange(jan1 + timedelta(days=day - 1), jan1 + timedelta(days=day)) - - def _parse_channel(filename: str) -> Channel: - # e.g. - # CIGMR__LHN___2022002 - # CE13884HNZ10_2022002 - network = filename[:2] - station = filename[2:7].rstrip("_") - channel = filename[7:10] - location = filename[10:12].strip("_") - return Channel( - ChannelType(channel, location), - # lat/lon/elev will be populated later - Station(network, station, location=location), - ) diff --git a/src/noisepy/seis/stack.py b/src/noisepy/seis/stack.py index 7df4640b..83f5e671 100644 --- a/src/noisepy/seis/stack.py +++ b/src/noisepy/seis/stack.py @@ -8,12 +8,13 @@ import pandas as pd from datetimerange import DateTimeRange +from noisepy.seis.io.datatypes import ConfigParameters, Stack, StackMethod, Station +from noisepy.seis.io.stores import CrossCorrelationDataStore, StackStore +from noisepy.seis.io.utils import TimeLogger, get_results + from . import noise_module from .constants import NO_CCF_DATA_MSG, WILD_CARD -from .datatypes import ConfigParameters, Stack, StackMethod, Station from .scheduler import Scheduler, SingleNodeScheduler -from .stores import CrossCorrelationDataStore, StackStore -from .utils import TimeLogger, get_results logger = logging.getLogger(__name__) if not sys.warnoptions: diff --git a/src/noisepy/seis/stores.py b/src/noisepy/seis/stores.py deleted file mode 100644 index d90a487f..00000000 --- a/src/noisepy/seis/stores.py +++ /dev/null @@ -1,132 +0,0 @@ -import datetime -import logging -import os -import re -from abc import ABC, abstractmethod -from concurrent.futures import Executor, ThreadPoolExecutor -from typing import Generic, List, Optional, Tuple, TypeVar - -import obspy -from datetimerange import DateTimeRange - -from noisepy.seis.constants import DATE_FORMAT -from noisepy.seis.utils import TimeLogger, get_results - -from .datatypes import ( - AnnotatedData, - Channel, - ChannelData, - CrossCorrelation, - Stack, - Station, -) - - -class DataStore(ABC): - """ - A base abstraction over a data source for seismic data - """ - - @abstractmethod - def get_channels(self, timespan: DateTimeRange) -> List[Channel]: - pass - - @abstractmethod - def get_timespans(self) -> List[DateTimeRange]: - pass - - -class RawDataStore(DataStore): - """ - A class for reading the raw data for a given channel. - TODO: write support? - """ - - @abstractmethod - def read_data(self, timespan: DateTimeRange, chan: Channel) -> ChannelData: - pass - - @abstractmethod - def get_inventory(self, timespan: DateTimeRange, station: Station) -> obspy.Inventory: - pass - - -T = TypeVar("T", bound=AnnotatedData) - - -class ComputedDataStore(Generic[T]): - """ - A class for reading and writing cross-correlation data - """ - - @abstractmethod - def contains(self, src: Station, rec: Station, timespan: DateTimeRange) -> bool: - pass - - @abstractmethod - def append( - self, - timespan: DateTimeRange, - src: Station, - rec: Station, - ccs: List[T], - ): - pass - - @abstractmethod - def get_timespans(self, src_sta: Station, rec_sta: Station) -> List[DateTimeRange]: - pass - - @abstractmethod - def get_station_pairs(self) -> List[Tuple[Station, Station]]: - pass - - @abstractmethod - def read(self, timespan: DateTimeRange, src_sta: Station, rec_sta: Station) -> List[T]: - pass - - def read_bulk( - self, timespan: DateTimeRange, pairs: List[Tuple[Station, Station]], executor: Executor = ThreadPoolExecutor() - ) -> List[Tuple[Tuple[Station, Station], List[T]]]: - """ - Reads the data for all the given station pairs (and timespan) in parallel. - """ - tlog = TimeLogger(level=logging.INFO) - futures = [executor.submit(self.read, timespan, p[0], p[1]) for p in pairs] - results = get_results(futures) - tlog.log(f"loading {len(pairs)} stacks") - return list(zip(pairs, results)) - - -class CrossCorrelationDataStore(ComputedDataStore[CrossCorrelation]): - pass - - -class StackStore(ComputedDataStore[Stack]): - """ - A class for reading and writing stack data - """ - - pass - - -def timespan_str(timespan: DateTimeRange) -> str: - return f"{timespan.start_datetime.strftime(DATE_FORMAT)}T{timespan.end_datetime.strftime(DATE_FORMAT)}" - - -def parse_station_pair(pair: str) -> Optional[Tuple[Station, Station]]: - if re.match(r"([A-Z0-9]+)\.([A-Z0-9]+)_([A-Z0-9]+)\.([A-Z0-9]+)", pair, re.IGNORECASE) is None: - return None - - tup = tuple(map(Station.parse, pair.split("_"))) - if None in tup: - return None - return tup - - -def parse_timespan(filename: str) -> Optional[DateTimeRange]: - parts = os.path.splitext(os.path.basename(filename))[0].split("T") - if parts is None or len(parts) != 2: - return None - dates = [obspy.UTCDateTime(p).datetime.replace(tzinfo=datetime.timezone.utc) for p in parts] - return DateTimeRange(dates[0], dates[1]) diff --git a/src/noisepy/seis/tiledb.py b/src/noisepy/seis/tiledb.py index b13944bc..f1669e67 100644 --- a/src/noisepy/seis/tiledb.py +++ b/src/noisepy/seis/tiledb.py @@ -4,8 +4,8 @@ import numpy as np import tiledb -from noisepy.seis.utils import fs_join, get_filesystem -from noisepy.seis.zarrstore import logger +from noisepy.seis.io.utils import fs_join, get_filesystem +from noisepy.seis.io.zarrstore import logger # Experimentation with TileDB diff --git a/src/noisepy/seis/utils.py b/src/noisepy/seis/utils.py deleted file mode 100644 index adb426cf..00000000 --- a/src/noisepy/seis/utils.py +++ /dev/null @@ -1,170 +0,0 @@ -import errno -import logging -import os -import posixpath -import time -from concurrent.futures import Future -from typing import Any, Callable, Iterable, List -from urllib.parse import urlparse - -import fsspec -import numpy as np -import psutil -from tqdm.autonotebook import tqdm -from tqdm.contrib.logging import logging_redirect_tqdm - -from noisepy.seis.constants import AWS_EXECUTION_ENV - -S3_SCHEME = "s3" -FIND_RETRIES = 5 -FIND_RETRY_SLEEP = 0.1 # (100ms) -utils_logger = logging.getLogger(__name__) - - -def get_filesystem(path: str, storage_options: dict = {}) -> fsspec.AbstractFileSystem: - """Construct an fsspec filesystem for the given path""" - url = urlparse(path) - # The storage_options coming from the ConfigParameters is keyed by protocol - storage_options = storage_options.get(url.scheme, storage_options) - if url.scheme == S3_SCHEME: - return fsspec.filesystem(url.scheme, **storage_options) - else: - return fsspec.filesystem("file", **storage_options) - - -def fs_join(path1: str, path2: str) -> str: - """Helper for joining two paths that can handle both S3 URLs and local file system paths""" - url = urlparse(path1) - if url.scheme == S3_SCHEME: - return posixpath.join(path1, path2) - else: - return os.path.join(path1, path2) - - -def get_fs_sep(path1: str) -> str: - """Helper for getting a path separator that can handle both S3 URLs and local file system paths""" - url = urlparse(path1) - if url.scheme == S3_SCHEME: - return posixpath.sep - else: - return os.sep - - -def io_retry(func: Callable, *args, **kwargs) -> Any: - """ - Retry the given function up to FIND_RETRIES times if an OSError(EBUSY) is raised, sleeping between retries. - If the function still fails, the exception is raised. - """ - for i in range(1, FIND_RETRIES + 1): - try: - return func(*args, **kwargs) - except OSError as e: - # when using S3 we sometimes get a SlowDown client error which is turned into - # an OSError(EBUSY) so back off if that happens - if e.errno != errno.EBUSY or i >= FIND_RETRIES: - raise - utils_logger.warning(f"Retrying {func.__name__} after error: {e}. Retry {i} of {FIND_RETRIES}") - time.sleep(FIND_RETRY_SLEEP * i) - - -class TimeLogger: - """ - A utility class to measure and log the time spent in code fragments. The basic usage is to call:: - - tlog.log(message) - - This will measure the time between the last time checkpoint and the call to ``log()``. The - checkpoint is updated when: - - ``TimeLogger`` instance is created - - ``log()`` is called - - ``reset()`` is called - - Alternatively, an explicit reference start time may be passed when logging: ``log(message, start_time)``. E.g.:: - - start_time = tlog.reset() - ... - tlog.log("step 1") - ... - tlog.log("step 2") - ... - tlog.log("overall", start_time) - """ - - enabled: bool = True - - def __init__(self, logger: logging.Logger = utils_logger, level: int = logging.DEBUG, prefix: str = None): - """ - Create an instance that will use the given logger and logging level to log the times - """ - self.logger = logger - self.level = level - self.prefix = "" if prefix is None else f" {prefix}" - self.reset() - - def reset(self) -> float: - self.time = time.time() - return self.time - - def log(self, message: str = None, start: float = -1.0) -> float: - stop = time.time() - dt = stop - self.time if start <= 0 else stop - start - self.reset() - self.log_raw(message, dt) - return self.time - - def log_raw(self, message: str, dt: float): - if self.enabled: - self.logger.log(self.level, f"TIMING{self.prefix}: {dt:6.4f} secs. for {message}") - return self.time - - -def error_if(condition: bool, msg: str, error_type: type = RuntimeError): - """ - Raise an error if the condition is True - - Args: - condition (bool): Condition to evaluate - msg (str): Error message - error_type (type): Type of error to raise, e.g. ValueError - """ - if condition: - raise error_type(msg) - - -def get_results(futures: Iterable[Future], task_name: str = "", logger: logging.Logger = None) -> Iterable[Any]: - # if running in AWS, use -1 for the position so it's logged properly - position = -1 if AWS_EXECUTION_ENV in os.environ else None - - def pbar_update(_: Future): - mem_mb = psutil.Process().memory_info().rss / (1024 * 1024) - pbar.update(1) - pbar.set_description(f"{task_name}. Memory: {mem_mb:5.0f} MB") - - # Show a progress bar when processing futures - with logging_redirect_tqdm(): - size = len(futures) - with tqdm(total=size, desc=task_name, position=position, miniters=int(size / 100)) as pbar: - for f in futures: - f.add_done_callback(pbar_update) - return [f.result() for f in futures] - - -def unstack(stack: np.ndarray, axis=0) -> List[np.ndarray]: - """ - Split a stack along the given axis into a list of arrays - """ - return [np.squeeze(a, axis=axis) for a in np.split(stack, stack.shape[axis], axis=axis)] - - -def remove_nans(a: np.ndarray) -> np.ndarray: - """ - Remove NaN values from a 1D array - """ - return a[~np.isnan(a)] - - -def remove_nan_rows(a: np.ndarray) -> np.ndarray: - """ - Remove rows from a 2D array that contain NaN values - """ - return a[~np.isnan(a).any(axis=1)] diff --git a/src/noisepy/seis/zarrstore.py b/src/noisepy/seis/zarrstore.py deleted file mode 100644 index 48f6cd4d..00000000 --- a/src/noisepy/seis/zarrstore.py +++ /dev/null @@ -1,74 +0,0 @@ -import logging -from pathlib import Path -from typing import Any, Dict, Optional, Tuple - -import numpy as np -import zarr -from datetimerange import DateTimeRange - -from .datatypes import CrossCorrelation, Stack -from .hierarchicalstores import ArrayStore, HierarchicalStoreBase -from .stores import CrossCorrelationDataStore, StackStore, parse_timespan - -logger = logging.getLogger(__name__) - - -class ZarrStoreHelper(ArrayStore): - """ - Helper object for storing data and parameters into Zarr and track a "done" bit for subsets of the data - 'done' is a dummy array to track completed sets in its attribute dictionary. - Args: - root_dir: Storage location, can be a local or S3 path - mode: "r" or "a" for read-only or writing mode - storage_options: options to pass to fsspec - """ - - def __init__(self, root_dir: str, mode: str, storage_options={}) -> None: - super().__init__(root_dir, storage_options) - logger.info(f"store creating at {root_dir}, mode={mode}, storage_options={storage_options}") - # TODO: - storage_options["client_kwargs"] = {"region_name": "us-west-2"} - store = zarr.storage.FSStore(root_dir, **storage_options) - self.root = zarr.open_group(store, mode=mode) - logger.info(f"store created at {root_dir}: {type(store)}. Zarr version {zarr.__version__}") - - def append(self, path: str, params: Dict[str, Any], data: np.ndarray): - logger.debug(f"Appending to {path}: {data.shape}") - array = self.root.create_dataset( - path, - data=data, - chunks=data.shape, - dtype=data.dtype, - ) - array.attrs.update(params) - - def read(self, path: str) -> Optional[Tuple[np.ndarray, Dict[str, Any]]]: - if path not in self.root: - return None - array = self.root[path] - metadata = {} - metadata.update(array.attrs) - return (array[:], metadata) - - def parse_path(self, path: str) -> Optional[Tuple[str, DateTimeRange]]: - if not path.endswith("0.0"): - return None - parts = Path(path).parts - if len(parts) < 3: - return None - ts = parse_timespan(parts[-2]) - if ts is None: - return None - return (parts[-3], ts) - - -class ZarrCCStore(HierarchicalStoreBase, CrossCorrelationDataStore): - def __init__(self, root_dir: str, mode: str = "a", storage_options={}) -> None: - helper = ZarrStoreHelper(root_dir, mode, storage_options=storage_options) - super().__init__(helper, CrossCorrelation.load_instances) - - -class ZarrStackStore(HierarchicalStoreBase, StackStore): - def __init__(self, root_dir: str, mode: str = "a", storage_options={}) -> None: - helper = ZarrStoreHelper(root_dir, mode, storage_options=storage_options) - super().__init__(helper, Stack.load_instances) diff --git a/tests/test_asdfstore.py b/tests/test_asdfstore.py deleted file mode 100644 index 3ce42221..00000000 --- a/tests/test_asdfstore.py +++ /dev/null @@ -1,50 +0,0 @@ -from datetime import datetime - -import pytest - -from noisepy.seis.asdfstore import ASDFRawDataStore, _parse_channel_path -from noisepy.seis.datatypes import ChannelType - - -@pytest.fixture -def store(): - import os - - return ASDFRawDataStore(os.path.join(os.path.dirname(__file__), "./data")) - - -def test_get_timespans(store: ASDFRawDataStore): - ts = store.get_timespans() - assert len(ts) == 1 - assert ts[0].start_datetime == datetime.fromisoformat("2019-02-01T00:00:00+00:00") - assert ts[0].end_datetime == datetime.fromisoformat("2019-02-01T01:00:00+00:00") - - -def test_get_channels(store: ASDFRawDataStore): - ts = store.get_timespans()[0] - chans = store.get_channels(ts) - assert len(chans) == 1 - assert str(chans[0].type) == "bhn_00" - assert chans[0].station.name == "BAK" - - -def test_get_data(store: ASDFRawDataStore): - ts = store.get_timespans()[0] - chan = store.get_channels(ts)[0] - chdata = store.read_data(ts, chan) - assert chdata.data.size == 72001 - assert chdata.sampling_rate == 20.0 - assert chdata.start_timestamp == ts.start_datetime.timestamp() - - -@pytest.mark.parametrize( - "path,ch1, ch2", - [ - ("bhn_bhn", ChannelType("bhn"), ChannelType("bhn")), - ("bhn_00_bhn_01", ChannelType("bhn", "00"), ChannelType("bhn", "01")), - ("bhn_bhn_01", ChannelType("bhn"), ChannelType("bhn", "01")), - ("bhn_00_bhn", ChannelType("bhn", "00"), ChannelType("bhn")), - ], -) -def test_parse_channel_path(path, ch1, ch2): - assert _parse_channel_path(path) == (ch1, ch2) diff --git a/tests/test_ccstores.py b/tests/test_ccstores.py deleted file mode 100644 index 6823086e..00000000 --- a/tests/test_ccstores.py +++ /dev/null @@ -1,105 +0,0 @@ -import json -from datetime import datetime, timedelta, timezone -from typing import Dict - -import numpy as np -from datetimerange import DateTimeRange - -from noisepy.seis.asdfstore import ASDFCCStore -from noisepy.seis.datatypes import ( - Channel, - ChannelType, - ConfigParameters, - CrossCorrelation, - Station, - to_json_types, -) -from noisepy.seis.noise_module import cc_parameters -from noisepy.seis.numpystore import NumpyCCStore -from noisepy.seis.stores import CrossCorrelationDataStore -from noisepy.seis.zarrstore import ZarrCCStore - - -def make_1dts(dt: datetime): - dt = dt.replace(tzinfo=timezone.utc, microsecond=0) - return DateTimeRange(dt, dt + timedelta(days=1)) - - -ts1 = make_1dts(datetime.now()) -ts2 = make_1dts(ts1.end_datetime) -src = Channel(ChannelType("foo"), Station("nw", "sta1")) -rec = Channel(ChannelType("bar"), Station("nw", "sta2")) - - -def _ccstore_test_helper(ccstore: CrossCorrelationDataStore): - data = np.random.random((10, 10)) - coor = { - "lonS": 0, - "latS": 0, - "lonR": 0, - "latR": 0, - } - conf = ConfigParameters() - # test param serialization with real types - params = cc_parameters(conf, coor, np.zeros(10, dtype=np.float32), np.zeros(10, dtype=np.int16), ["nn"]) - - # assert empty state - assert not ccstore.contains(src.station, rec.station, ts1) - assert not ccstore.contains(src.station, rec.station, ts2) - - # add CC (src->rec) for ts1 - ccstore.append(ts1, src.station, rec.station, [CrossCorrelation(src.type, rec.type, params, data)]) - # assert ts1 is there, but not ts2 - assert ccstore.contains(src.station, rec.station, ts1) - assert not ccstore.contains(src.station, rec.station, ts2) - # also rec->src should not be there for ts1 - assert not ccstore.contains(rec.station, src.station, ts1) - - # now add CC for ts2 - ccstore.append(ts2, src.station, rec.station, [CrossCorrelation(src.type, rec.type, {}, data)]) - sta_pairs = check_populated_store(ccstore) - ccs = ccstore.read(ts1, sta_pairs[0][0], sta_pairs[0][1]) - cha_pairs = [(c.src, c.rec) for c in ccs] - assert cha_pairs == [(src.type, rec.type)] - - assert_dict_equal(params, ccs[0].parameters) - assert np.all(data == ccs[0].data) - - wrong_ccs = ccstore.read(ts1, src.station, Station("nw", "wrong")) - assert len(wrong_ccs) == 0 - - -def assert_dict_equal(d1: Dict, d2: Dict): - # use json to compare dicts with nested values such as numpy arrays - d1_str = json.dumps(to_json_types(d1), sort_keys=True) - d2_str = json.dumps(to_json_types(d2), sort_keys=True) - assert d1_str == d2_str - - -def check_populated_store(ccstore): - assert ccstore.contains(src.station, rec.station, ts2) - - timespans = ccstore.get_timespans(src.station, rec.station) - assert timespans == [ts1, ts2] - sta_pairs = ccstore.get_station_pairs() - assert sta_pairs == [(src.station, rec.station)] - return sta_pairs - - -# Use the built in tmp_path fixture: https://docs.pytest.org/en/7.1.x/how-to/tmp_path.html -def test_asdfccstore(tmp_path): - path = str(tmp_path) - _ccstore_test_helper(ASDFCCStore(path)) - check_populated_store(ASDFCCStore(tmp_path)) - - -def test_zarrccstore(tmp_path): - path = str(tmp_path) - _ccstore_test_helper(ZarrCCStore(path)) - check_populated_store(ZarrCCStore(path)) - - -def test_numpyccstore(tmp_path): - path = str(tmp_path) - _ccstore_test_helper(NumpyCCStore(path)) - check_populated_store(NumpyCCStore(path)) diff --git a/tests/test_channel_filter_store.py b/tests/test_channel_filter_store.py deleted file mode 100644 index 952f17cb..00000000 --- a/tests/test_channel_filter_store.py +++ /dev/null @@ -1,21 +0,0 @@ -import os - -from test_channelcatalog import MockCatalog -from test_scedc_s3store import timespan1 - -from noisepy.seis.channel_filter_store import LocationChannelFilterStore -from noisepy.seis.scedc_s3store import SCEDCS3DataStore - - -def test_location_filtering(): - # This folder has 4 channel .ms files, 2 of which are the same channel, different location - path = os.path.join(os.path.dirname(__file__), "./data/s3scedc") - store = SCEDCS3DataStore(path, MockCatalog()) - channels = store.get_channels(timespan1) - assert len(channels) == 4 - filter_store = LocationChannelFilterStore(store) - # This should filter out the BKTHIS_LHZ10 channel and leave the BKTHIS_LHZ00 channel - channels = filter_store.get_channels(timespan1) - assert len(channels) == 3 - bkthis_chan = next(filter(lambda c: c.station.network == "BK", channels)) - assert bkthis_chan.type.location == "00" diff --git a/tests/test_channelcatalog.py b/tests/test_channelcatalog.py deleted file mode 100644 index 795d420e..00000000 --- a/tests/test_channelcatalog.py +++ /dev/null @@ -1,82 +0,0 @@ -import os - -import obspy -import pandas as pd -import pytest -from datetimerange import DateTimeRange -from obspy import UTCDateTime - -from noisepy.seis.channelcatalog import ( - ChannelCatalog, - CSVChannelCatalog, - XMLStationChannelCatalog, -) -from noisepy.seis.datatypes import Channel, ChannelType, Station -from noisepy.seis.noise_module import stats2inv_mseed - -chan_data = [("ARV", "BHE", 35.1269, -118.83009, 258.0), ("BAK", "BHZ", 35.34444, -119.10445, 116.0)] - -file = os.path.join(os.path.dirname(__file__), "./data/station.csv") - - -@pytest.mark.parametrize("stat,ch,lat,lon,elev", chan_data) -def test_csv(stat: str, ch: str, lat: float, lon: float, elev: float): - cat = CSVChannelCatalog(file) - chan = Channel(ChannelType(ch), Station("CI", stat)) - full_ch = cat.get_full_channel(DateTimeRange(), chan) - assert full_ch.station.lat == lat - assert full_ch.station.lon == lon - assert full_ch.station.elevation == elev - - -class MockCatalog(ChannelCatalog): - def get_full_channel(self, timespan: DateTimeRange, channel: Channel) -> Channel: - return channel - - def get_inventory(self, timespan: DateTimeRange, station: Station) -> obspy.Inventory: - return obspy.Inventory() - - -@pytest.mark.parametrize("station,ch,lat,lon,elev", chan_data) -def test_frominventory(station: str, ch: str, lat: float, lon: float, elev: float): - file = os.path.join(os.path.dirname(__file__), "./data/station.csv") - df = pd.read_csv(file) - - class MockStat: - station = "" - starttime = UTCDateTime() - channel = ch - sampling_rate = 1.0 - location = "00" - - stat = MockStat() - stat.station = station - - inv = stats2inv_mseed(stat, df) - cat = MockCatalog() - chan = Channel(ChannelType(ch), Station("CI", station)) - full_ch = cat.populate_from_inventory(inv, chan) - assert full_ch.station.lat == lat - assert full_ch.station.lon == lon - assert full_ch.station.elevation == elev - - -xmlpaths = [os.path.join(os.path.dirname(__file__), "./data/"), "s3://scedc-pds/FDSNstationXML/CI/"] - - -@pytest.mark.parametrize("path", xmlpaths) -def test_XMLStationChannelCatalog(path): - cat = XMLStationChannelCatalog(path, storage_options={"s3": {"anon": True}}) - empty_inv = cat.get_inventory(DateTimeRange(), Station("non-existent", "non-existent", "")) - assert len(empty_inv) == 0 - yaq_inv = cat.get_inventory(DateTimeRange(), Station("CI", "YAQ")) - assert len(yaq_inv) == 1 - assert len(yaq_inv.networks[0].stations) == 1 - - -def test_XMLStationChannelCatalogCustomPath(): - # Check that a custom file name is also found properly, e.g. CI/CI.YAQ.xml - cat = XMLStationChannelCatalog(xmlpaths[0], "{network}" + os.path.sep + "{network}.{name}.xml") - yaq_inv = cat.get_inventory(DateTimeRange(), Station("CI", "YAQ")) - assert len(yaq_inv) == 1 - assert len(yaq_inv.networks[0].stations) == 1 diff --git a/tests/test_cross_correlation.py b/tests/test_cross_correlation.py index 49ada629..6dc80cd6 100644 --- a/tests/test_cross_correlation.py +++ b/tests/test_cross_correlation.py @@ -1,8 +1,9 @@ import os from unittest.mock import Mock +import obspy import pytest -from test_channelcatalog import MockCatalog +from datetimerange import DateTimeRange from noisepy.seis.constants import NO_DATA_MSG from noisepy.seis.correlate import ( @@ -10,14 +11,16 @@ _safe_read_data, cross_correlate, ) -from noisepy.seis.datatypes import ( +from noisepy.seis.io.datatypes import ( Channel, ChannelData, ConfigParameters, RmResp, Station, ) -from noisepy.seis.scedc_s3store import SCEDCS3DataStore +from noisepy.seis.io.scedc_s3store import SCEDCS3DataStore + +# from noisepy.seis.io.channelcatalog import MockCatalog def test_read_channels(): @@ -60,13 +63,22 @@ def test_correlation_nodata(): assert NO_DATA_MSG in str(excinfo.value) +class MockCatalogMock: + def get_full_channel(self, timespan: DateTimeRange, channel: Channel) -> Channel: + return channel + + def get_inventory(self, timespan: DateTimeRange, station: Station) -> obspy.Inventory: + return obspy.Inventory() + + @pytest.mark.parametrize("rm_resp", [RmResp.NO, RmResp.POLES_ZEROS, RmResp.RESP]) def test_correlation(rm_resp: RmResp): config = ConfigParameters() config.samp_freq = 1.0 config.rm_resp = rm_resp path = os.path.join(os.path.dirname(__file__), "./data/cc") - raw_store = SCEDCS3DataStore(path, MockCatalog()) + + raw_store = SCEDCS3DataStore(path, MockCatalogMock()) ts = raw_store.get_timespans() assert len(ts) == 1 channels = raw_store.get_channels(ts[0]) diff --git a/tests/test_datatypes.py b/tests/test_datatypes.py deleted file mode 100644 index e1fc7adf..00000000 --- a/tests/test_datatypes.py +++ /dev/null @@ -1,68 +0,0 @@ -from pathlib import Path - -import dateutil -import pytest - -from noisepy.seis.datatypes import ChannelType, ConfigParameters, StackMethod, Station - - -def test_channeltype(): - with pytest.raises(Exception): - ChannelType("toolong") - - -@pytest.mark.parametrize("ch,orien", [("bhe", "e"), ("bhe_00", "e")]) -def test_orientation(ch, orien): - ch = ChannelType(ch) - assert ch.get_orientation() == orien - - -def test_config_yaml(tmp_path: Path): - file = str(tmp_path.joinpath("config.yaml")) - c1 = ConfigParameters() - ConfigParameters.validate(c1) - # change a couple of properties - c1.step = 800 - c1.stack_method = StackMethod.ROBUST - c1.save_yaml(file) - c2 = ConfigParameters.load_yaml(file) - assert c1 == c2 - - -def test_station_valid(): - s = Station("CI", "BAK") - assert not s.valid() - s = Station("CI", "BAK", 110.0, 120.1, 15.0) - assert s.valid() - s = Station("CI", "BAK", -110.0, 120.1, 15.0) - assert s.valid() - - -def test_storage_options(): - c = ConfigParameters() - assert c.storage_options == {} - # make sure missing keys default to {} - assert c.storage_options["some_key"] == {} - c.storage_options["some_key"]["some_other_key"] = 6 - assert c.storage_options["some_key"]["some_other_key"] == 6 - - c.storage_options["s3"] = {"profile": "my_profile"} - assert c.get_storage_options("s3://my_bucket/my_file") == {"profile": "my_profile"} - - # scheme is '' for local files - c.storage_options[""]["foo"] = "bar" - assert c.get_storage_options("/local/file") == {"foo": "bar"} - - -def test_config_dates(): - c = ConfigParameters() - # defaults should be valid - c = ConfigParameters.model_validate(dict(c), strict=True) - c.start_date = dateutil.parser.isoparse("2021-01-01") # no timezone - with pytest.raises(Exception): - c = ConfigParameters.model_validate(dict(c), strict=True) - c.start_date = dateutil.parser.isoparse("2021-01-01T09:00:00+09:00") # not utc - with pytest.raises(Exception): - c = ConfigParameters.model_validate(dict(c), strict=True) - c.start_date = dateutil.parser.isoparse("2021-01-01T09:00:00+00:00") # utc - c = ConfigParameters.model_validate(dict(c), strict=True) diff --git a/tests/test_download.py b/tests/test_download.py index 46d1bb0a..48d0e8dc 100644 --- a/tests/test_download.py +++ b/tests/test_download.py @@ -10,9 +10,9 @@ from obspy.clients.fdsn import Client from obspy.clients.fdsn.header import FDSNNoDataException -from noisepy.seis.channelcatalog import CSVChannelCatalog -from noisepy.seis.datatypes import ConfigParameters from noisepy.seis.fdsn_download import download, download_stream +from noisepy.seis.io.channelcatalog import CSVChannelCatalog +from noisepy.seis.io.datatypes import ConfigParameters @patch("noisepy.seis.fdsn_download.download_stream") diff --git a/tests/test_hierarchicalstores.py b/tests/test_hierarchicalstores.py deleted file mode 100644 index b3be9cd4..00000000 --- a/tests/test_hierarchicalstores.py +++ /dev/null @@ -1,132 +0,0 @@ -import errno -from concurrent.futures import ThreadPoolExecutor -from typing import Tuple -from unittest import mock - -import pytest -from datetimerange import DateTimeRange -from fsspec.implementations.local import LocalFileSystem # noqa F401 -from utils import date_range - -from noisepy.seis.hierarchicalstores import PairDirectoryCache -from noisepy.seis.numpystore import NumpyArrayStore, NumpyCCStore -from noisepy.seis.utils import FIND_RETRIES, io_retry -from noisepy.seis.zarrstore import ZarrStoreHelper - - -def test_dircache(): - cache = PairDirectoryCache() - - ts1 = date_range(4, 1, 2) - ts2 = date_range(4, 2, 3) - ts3 = date_range(2, 1, 2) - - assert not cache.contains("src", "rec", ts1) - assert cache.get_pairs() == [] - assert not cache.is_src_loaded("src") - cache.add("src", "rec", []) - assert cache.is_src_loaded("src") - cache.add("src", "rec", [ts1, ts2]) - assert cache.contains("src", "rec", ts1) - assert cache.contains("src", "rec", ts2) - - cache.add("src", "rec", [ts3]) - - def check_1day(): - assert cache.contains("src", "rec", ts1) - assert cache.contains("src", "rec", ts2) - assert cache.contains("src", "rec", ts3) - - assert not cache.contains("src", "rec2", ts1) - assert not cache.contains("src2", "rec", ts1) - - check_1day() - assert cache.get_timespans("src", "rec") == [ts3, ts1, ts2] - assert cache.get_timespans("src2", "rec2") == [] - - tsh1 = date_range(4, 1, 1, 0, 1) - assert not cache.contains("src", "rec", tsh1) - cache.add("src", "rec", [tsh1]) - assert cache.contains("src", "rec", tsh1) - check_1day() - assert cache.get_timespans("src", "rec") == [ts3, ts1, ts2, tsh1] - - # add timespans with different lentghs - cache.add("src", "rec", [ts1, tsh1]) - check_1day() - assert cache.contains("src", "rec", tsh1) - - -def test_concurrent(): - cache = PairDirectoryCache() - ts1 = date_range(4, 1, 2) - - sta = [f"s{i}" for i in range(100)] - exec = ThreadPoolExecutor() - res = list(exec.map(lambda s: cache.add(s, "rec", [ts1]), sta)) - assert len(res) == len(sta) - assert not any(res) - for s in sta: - assert cache.is_src_loaded(s) - assert cache.contains(s, "rec", ts1) - - -numpy_paths = [ - ( - "some/path/CI.BAK/CI.ARV/2021_07_01_00_00_00T2021_07_02_00_00_00.tar.gz", - ("CI.ARV", date_range(7, 1, 2)), - ), - ("some/path/CI.BAK/CI.BAK_CI.ARV/2021_07_01_00_00_00.tar.gz", None), - ("some/path/CI.BAK/CI.BAK_CI.ARV/2021_07_01_00_00_00.tar.gz", None), - ("path/2021_07_01_00_00_00.tar.gz", None), - ("2021_07_01_00_00_00.tar.gz", None), - ("some/path/CI.BAK/CI.BAK_CI.ARV/2021_07_01_00_00_00T2021_07_02_00_00_00.TXT", None), -] - - -@pytest.mark.parametrize("path,expected", numpy_paths) -def test_numpy_parse_path(path: str, expected: Tuple[str, DateTimeRange]): - store = NumpyArrayStore("some/path", "r") - assert store.parse_path(path) == expected - - -zarr_paths = [ - ( - "some/path/CI.BAK/CI.ARV/2021_07_01_00_00_00T2021_07_02_00_00_00/0.0.0", - ("CI.ARV", date_range(7, 1, 2)), - ), - ("some/path/CI.BAK/CI.BAK_CI.ARV/2021_07_01_00_00_00/0.0.0", None), - ("some/path/CI.BAK/CI.BAK/2021_07_01_00_00_00/.zgroup", None), - ("path/non_ts/0.0.0", None), - ("too_short/0.0.0", None), -] - - -@pytest.mark.parametrize("path,expected", zarr_paths) -def test_zarr_parse_path(tmp_path, path: str, expected: Tuple[str, DateTimeRange]): - store = ZarrStoreHelper(str(tmp_path), "a") - assert store.parse_path(path) == expected - - -@mock.patch("fsspec.implementations.local.LocalFileSystem.find") -def test_find(find_mock, tmp_path): - def retry_find(): - io_retry(store._fs_find, "foo") - - store = NumpyCCStore(str(tmp_path), "r") - find_mock.side_effect = OSError(errno.EBUSY, "busy") - with pytest.raises(OSError): - retry_find() - assert FIND_RETRIES == find_mock.call_count - - # if it's not a SlowDown error then we shouldn't retry - find_mock.side_effect = OSError(errno.EFAULT, "auth") - with pytest.raises(OSError): - retry_find() - assert FIND_RETRIES + 1 == find_mock.call_count - - # same with other type of exceptoins - find_mock.side_effect = Exception() - with pytest.raises(Exception): - retry_find() - assert FIND_RETRIES + 2 == find_mock.call_count diff --git a/tests/test_main.py b/tests/test_main.py index 8cd6aabd..34448ed6 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -7,7 +7,7 @@ import pytest from noisepy.seis.constants import NO_CCF_DATA_MSG -from noisepy.seis.datatypes import CCMethod +from noisepy.seis.io.datatypes import CCMethod from noisepy.seis.main import ( Command, ErrorStopHandler, diff --git a/tests/test_scedc_s3store.py b/tests/test_scedc_s3store.py deleted file mode 100644 index 02139940..00000000 --- a/tests/test_scedc_s3store.py +++ /dev/null @@ -1,80 +0,0 @@ -import os -from datetime import datetime, timezone - -import pytest -from datetimerange import DateTimeRange -from test_channelcatalog import MockCatalog - -from noisepy.seis.datatypes import Channel, ChannelType, Station -from noisepy.seis.scedc_s3store import SCEDCS3DataStore, channel_filter - -timespan1 = DateTimeRange(datetime(2022, 1, 2, tzinfo=timezone.utc), datetime(2022, 1, 3, tzinfo=timezone.utc)) -timespan2 = DateTimeRange(datetime(2021, 2, 3, tzinfo=timezone.utc), datetime(2021, 2, 4, tzinfo=timezone.utc)) -timespan3 = DateTimeRange(datetime(2023, 6, 1, tzinfo=timezone.utc), datetime(2023, 6, 2, tzinfo=timezone.utc)) -files_dates = [ - ("CIGMR__LHN___2022002.ms", timespan1), - ("CIGMR__LHN___2021034.ms", timespan2), - ("AZCRY__BHE___2023152.ms", timespan3), -] - - -@pytest.mark.parametrize("file,expected", files_dates) -def test_parsefilename(file: str, expected: DateTimeRange): - assert expected == SCEDCS3DataStore._parse_timespan(file) - - -data_paths = [ - (os.path.join(os.path.dirname(__file__), "./data/s3scedc"), None), - ("s3://scedc-pds/continuous_waveforms/2022/2022_002/", None), - ("s3://scedc-pds/continuous_waveforms/", timespan1), -] - - -read_channels = [ - (SCEDCS3DataStore._parse_channel("BKTHIS_LHZ00_2022002.ms")), - (SCEDCS3DataStore._parse_channel("CIFOX2_LHZ___2022002.ms")), - (SCEDCS3DataStore._parse_channel("CINCH__LHZ___2022002.ms")), -] - - -@pytest.fixture(params=data_paths) -def store(request): - storage_options = {"s3": {"anon": True}} - (path, timespan) = request.param - return SCEDCS3DataStore(path, MockCatalog(), lambda ch: ch in read_channels, timespan, storage_options) - - -@pytest.mark.parametrize("channel", read_channels) -def test_read(store: SCEDCS3DataStore, channel: str): - chdata = store.read_data(timespan1, channel) - assert chdata.sampling_rate == 1.0 - assert chdata.start_timestamp >= timespan1.start_datetime.timestamp() - assert chdata.start_timestamp < timespan1.end_datetime.timestamp() - assert chdata.data.size > 0 - - -def test_timespan_channels(store: SCEDCS3DataStore): - timespans = store.get_timespans() - assert len(timespans) == 1 - assert timespans[0] == timespan1 - channels = store.get_channels(timespan1) - assert len(channels) == len(read_channels) - channels = store.get_channels(timespan2) - assert len(channels) == 0 - - -def test_filter(): - # filter for station 'staX' or 'staY' and channel type starts with 'B' - f = channel_filter(["staX", "staY"], "B") - staX = Station("CI", "staX") - staZ = Station("CI", "staZ") - - def check(sta, ch_name): - ch = Channel(ChannelType((ch_name)), sta) - return f(ch) - - assert check(staX, "BHE") is True - assert check(staX, "BBB") is True - assert check(staX, "CHE") is False # invalid channel name - assert check(staZ, "BHE") is False # invalid station - assert check(staZ, "CHE") is False # invalid station and channel name diff --git a/tests/test_stack.py b/tests/test_stack.py index b355f9c2..1c17aa4e 100644 --- a/tests/test_stack.py +++ b/tests/test_stack.py @@ -3,10 +3,10 @@ import numpy as np import pytest +import utils from datetimerange import DateTimeRange -from utils import date_range -from noisepy.seis.datatypes import ( +from noisepy.seis.io.datatypes import ( ChannelType, ConfigParameters, CrossCorrelation, @@ -38,7 +38,7 @@ def __reduce__(self): def test_stack_error(caplog): - ts = date_range(1, 1, 2) + ts = utils.date_range(1, 1, 2) config = ConfigParameters(start_date=ts.start_datetime, end_date=ts.end_datetime) sta = Station("CI", "BAK") cc_store = SerializableMock() @@ -56,7 +56,7 @@ def test_stack_error(caplog): def test_stack_contains(): - ts = date_range(1, 1, 2) + ts = utils.date_range(1, 1, 2) config = ConfigParameters(start_date=ts.start_datetime, end_date=ts.end_datetime) sta = Station("CI", "BAK") cc_store = SerializableMock() @@ -69,7 +69,7 @@ def test_stack_contains(): def test_stack_pair(): - ts = date_range(1, 1, 2) + ts = utils.date_range(1, 1, 2) config = ConfigParameters(start_date=ts.start_datetime, end_date=ts.end_datetime) sta = Station("CI", "BAK") params = { @@ -87,6 +87,6 @@ def test_stack_pair(): cc_store.read.return_value = ccs stacks = stack_pair(sta, sta, [ts, ts], cc_store, config) assert len(stacks) == 6 - ts2 = date_range(1, 20, 22) + ts2 = utils.date_range(1, 20, 22) stacks = stack_pair(sta, sta, [ts2], cc_store, config) assert len(stacks) == 0 diff --git a/tests/test_stackstores.py b/tests/test_stackstores.py deleted file mode 100644 index 03fd20f6..00000000 --- a/tests/test_stackstores.py +++ /dev/null @@ -1,70 +0,0 @@ -from pathlib import Path - -import numpy as np -import pytest -from utils import date_range - -from noisepy.seis.asdfstore import ASDFStackStore -from noisepy.seis.datatypes import Stack, Station -from noisepy.seis.numpystore import NumpyStackStore -from noisepy.seis.stores import StackStore -from noisepy.seis.zarrstore import ZarrStackStore - - -# Use the built in tmp_path fixture: https://docs.pytest.org/en/7.1.x/how-to/tmp_path.html -# to create CC stores -@pytest.fixture -def asdfstore(tmp_path: Path) -> ASDFStackStore: - return ASDFStackStore(str(tmp_path)) - - -@pytest.fixture -def zarrstore(tmp_path: Path) -> ZarrStackStore: - return ZarrStackStore(str(tmp_path)) - - -@pytest.fixture -def numpystore(tmp_path: Path) -> NumpyStackStore: - return NumpyStackStore(str(tmp_path)) - - -def _stackstore_test_helper(store: StackStore): - src = Station("nw", "sta1") - rec = Station("nw", "sta2") - ts = date_range(4, 1, 2) - - stack1 = Stack("EE", "Allstack_linear", {"key1": "value1"}, np.random.random(10)) - stack2 = Stack("NZ", "Allstack_robust", {"key2": "value2"}, np.random.random(7)) - stacks = [stack1, stack2] - store.append(ts, src, rec, stacks) - - sta_pairs = store.get_station_pairs() - assert sta_pairs == [(src, rec)] - read_stacks = store.read(ts, src, rec) - assert len(read_stacks) == len(stacks) - for s1, s2 in zip(read_stacks, stacks): - assert s1.name == s2.name - assert s1.component == s2.component - assert s1.parameters == s2.parameters - assert s1.data.shape == s2.data.shape - assert np.all(s1.data == s2.data) - - bad_read = store.read(ts, Station("nw", "sta3"), rec) - assert len(bad_read) == 0 - - sta_stacks = store.read_bulk(ts, [(src, rec)]) - assert len(sta_stacks) == 1 - assert sta_stacks[0][0] == (src, rec) - assert len(sta_stacks[0][1]) == len(stacks) - - -def test_asdfstore(asdfstore: ASDFStackStore): - _stackstore_test_helper(asdfstore) - - -def test_zarrstore(zarrstore: ZarrStackStore): - _stackstore_test_helper(zarrstore) - - -def test_numpystore(numpystore: NumpyStackStore): - _stackstore_test_helper(numpystore) diff --git a/tests/test_stations_file.py b/tests/test_stations_file.py index 27d2047a..1b549c3e 100644 --- a/tests/test_stations_file.py +++ b/tests/test_stations_file.py @@ -1,6 +1,6 @@ import os -from noisepy.seis.datatypes import ConfigParameters +from noisepy.seis.io.datatypes import ConfigParameters def test_stations_file_behavior(): diff --git a/tests/test_utils.py b/tests/test_utils.py deleted file mode 100644 index 9ab9ee59..00000000 --- a/tests/test_utils.py +++ /dev/null @@ -1,47 +0,0 @@ -import os - -import numpy as np -import pytest -from fsspec.implementations.local import LocalFileSystem -from s3fs import S3FileSystem - -from noisepy.seis.utils import fs_join, get_filesystem, remove_nan_rows, unstack - -SEP = os.path.sep -paths = [ - (f"{SEP}dir{SEP}", "file.csv", SEP.join(["", "dir", "file.csv"])), - (f"..{SEP}relative", "file.json", SEP.join(["..", "relative", "file.json"])), - ("s3://bucket/path", "file.xml", "s3://bucket/path/file.xml"), -] - - -@pytest.mark.parametrize("path1, path2, expected", paths) -def test_fs_join(path1: str, path2: str, expected: str): - assert expected == fs_join(path1, path2) - - -fs_types = [ - ("s3://bucket/path", S3FileSystem), - ("/some/file", LocalFileSystem), - ("s3/local/file", LocalFileSystem), -] - - -@pytest.mark.parametrize("path, fs_type", fs_types) -def test_get_filesystem(path, fs_type): - assert isinstance(get_filesystem(path), fs_type) - - -def test_unstack(): - array_list = [np.array([[1, 2, 3], [5, 6, 7]]), np.array([[7, 6, 5], [4, 3, 2]])] - stacked = np.stack(array_list, axis=0) - unstacked = unstack(stacked, axis=0) - assert len(unstacked) == len(array_list) - for i in range(len(array_list)): - assert np.all(unstacked[i] == array_list[i]) - - -def test_remove_nan_rows(): - a = np.array([[1, 2, 3], [4, 5, 6], [np.nan, np.nan, np.nan]]) - b = remove_nan_rows(a) - assert np.all(b == np.array([[1, 2, 3], [4, 5, 6]])) diff --git a/tests/test_whiten.py b/tests/test_whiten.py index 3bfbda2e..929ff9bf 100644 --- a/tests/test_whiten.py +++ b/tests/test_whiten.py @@ -5,7 +5,7 @@ from scipy.fftpack import next_fast_len from noisepy.seis.correlate import ConfigParameters -from noisepy.seis.datatypes import FreqNorm +from noisepy.seis.io.datatypes import FreqNorm from noisepy.seis.noise_module import moving_ave, whiten