Skip to content

Commit

Permalink
ZarrCCStore implementation (#150)
Browse files Browse the repository at this point in the history
* ZarrCCStore implementation

* Fix json issue

* reduce set/list conversions
  • Loading branch information
carlosgjs authored Jul 18, 2023
1 parent e9cd804 commit c03b95b
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 93 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ dependencies = [
"pydantic>=2.0.0",
"PyYAML>=6.0",
"pydantic-yaml>=1.0",
"zarr>=2.14.2",
]


Expand Down
2 changes: 1 addition & 1 deletion src/noisepy/seis/S2_stacking.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def stack(
#######################################
def initializer():
timespans = cc_store.get_timespans()
pairs_all = list(set(pair for ts in timespans for pair in cc_store.get_station_pairs(ts)))
pairs_all = cc_store.get_station_pairs()
logger.info(f"Station pairs: {pairs_all}")

if len(timespans) == 0 or len(pairs_all) == 0:
Expand Down
57 changes: 22 additions & 35 deletions src/noisepy/seis/asdfstore.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import datetime
import glob
import logging
import os
Expand All @@ -11,9 +10,16 @@
from datetimerange import DateTimeRange

from . import noise_module
from .constants import DATE_FORMAT, DONE_PATH, PROGRESS_DATATYPE
from .constants import DONE_PATH, PROGRESS_DATATYPE
from .datatypes import Channel, ChannelData, ChannelType, Station
from .stores import CrossCorrelationDataStore, RawDataStore, StackStore
from .stores import (
CrossCorrelationDataStore,
RawDataStore,
StackStore,
parse_station_pair,
parse_timespan,
timespan_str,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -78,7 +84,7 @@ class ASDFRawDataStore(RawDataStore):

def __init__(self, directory: str, mode: str = "r"):
super().__init__()
self.datasets = ASDFDirectory(directory, mode, _filename_from_timespan, _parse_timespan)
self.datasets = ASDFDirectory(directory, mode, _filename_from_timespan, parse_timespan)

def get_channels(self, timespan: DateTimeRange) -> List[Channel]:
with self.datasets[timespan] as ds:
Expand Down Expand Up @@ -116,7 +122,7 @@ class ASDFCCStore(CrossCorrelationDataStore):
def __init__(self, directory: str, mode: str = "a") -> None:
super().__init__()
Path(directory).mkdir(exist_ok=True)
self.datasets = ASDFDirectory(directory, mode, _filename_from_timespan, _parse_timespan)
self.datasets = ASDFDirectory(directory, mode, _filename_from_timespan, parse_timespan)

# CrossCorrelationDataStore implementation
def contains(self, timespan: DateTimeRange, src_chan: Channel, rec_chan: Channel) -> bool:
Expand Down Expand Up @@ -151,10 +157,14 @@ def is_done(self, timespan: DateTimeRange):
def get_timespans(self) -> List[DateTimeRange]:
return self.datasets.get_keys()

def get_station_pairs(self, timespan: DateTimeRange) -> List[Tuple[Station, Station]]:
with self.datasets[timespan] as ccf_ds:
data = ccf_ds.auxiliary_data.list()
return [_parse_station_pair(p) for p in data if p != PROGRESS_DATATYPE]
def get_station_pairs(self) -> List[Tuple[Station, Station]]:
timespans = self.get_timespans()
pairs_all = set()
for timespan in timespans:
with self.datasets[timespan] as ccf_ds:
data = ccf_ds.auxiliary_data.list()
pairs_all.update(parse_station_pair(p) for p in data if p != PROGRESS_DATATYPE)
return list(pairs_all)

def get_channeltype_pairs(
self, timespan: DateTimeRange, src_sta: Station, rec_sta: Station
Expand All @@ -173,19 +183,11 @@ def read(
stream = ds.auxiliary_data[dtype][path]
return (stream.parameters, stream.data[:])

# private helper methods

def _get_station_pair(self, src_sta: Station, rec_sta: Station) -> str:
return f"{src_sta}_{rec_sta}"

def _get_channel_pair(self, src_chan: ChannelType, rec_chan: ChannelType) -> str:
return f"{src_chan.name}_{rec_chan.name}"


class ASDFStackStore(StackStore):
def __init__(self, directory: str, mode: str = "a"):
super().__init__()
self.datasets = ASDFDirectory(directory, mode, _filename_from_stations, _parse_station_pair)
self.datasets = ASDFDirectory(directory, mode, _filename_from_stations, parse_station_pair)

def mark_done(self, src: Station, rec: Station):
self.datasets.mark_done((src, rec))
Expand All @@ -210,24 +212,9 @@ def _get_dataset(filename: str, mode: str) -> pyasdf.ASDFDataSet:
return pyasdf.ASDFDataSet(filename, mode=mode, mpi=False, compression=None)


def _parse_timespan(filename: str) -> DateTimeRange:
parts = os.path.splitext(os.path.basename(filename))[0].split("T")
dates = [obspy.UTCDateTime(p).datetime.replace(tzinfo=datetime.timezone.utc) for p in parts]
return DateTimeRange(dates[0], dates[1])


def _filename_from_timespan(timespan: DateTimeRange) -> str:
return f"{timespan.start_datetime.strftime(DATE_FORMAT)}T{timespan.end_datetime.strftime(DATE_FORMAT)}.h5"


def _filename_from_stations(pair: Tuple[Station, Station]) -> str:
return f"{pair[0]}/{pair[0]}_{pair[1]}.h5"


def _parse_station_pair(pair: str) -> Tuple[Station, Station]:
# Parse from:'CI.ARV_CI.BAK
def station(sta: str) -> Station:
net, name = sta.split(".")
return Station(net, name)

return tuple(map(station, pair.split("_")))
def _filename_from_timespan(timespan: DateTimeRange) -> str:
return f"{timespan_str(timespan)}.h5"
41 changes: 33 additions & 8 deletions src/noisepy/seis/stores.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import datetime
import os
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Tuple

import numpy as np
import obspy
from datetimerange import DateTimeRange

from noisepy.seis.constants import DATE_FORMAT

from .datatypes import Channel, ChannelData, ChannelType, ConfigParameters, Station


Expand Down Expand Up @@ -44,17 +48,12 @@ def contains(
) -> bool:
pass

@abstractmethod
def save_parameters(self, parameters: ConfigParameters):
pass

@abstractmethod
def append(
self,
timespan: DateTimeRange,
chan1: Channel,
chan2: Channel,
parameters: ConfigParameters,
src_chan: Channel,
rec_chan: Channel,
cc_params: Dict[str, Any],
data: np.ndarray,
):
Expand All @@ -73,7 +72,7 @@ def get_timespans(self) -> List[DateTimeRange]:
pass

@abstractmethod
def get_station_pairs(self, timespan: DateTimeRange) -> List[Tuple[Station, Station]]:
def get_station_pairs(self) -> List[Tuple[Station, Station]]:
pass

@abstractmethod
Expand All @@ -88,6 +87,13 @@ def read(
) -> Tuple[Dict, np.ndarray]:
pass

# private helpers
def _get_station_pair(self, src_sta: Station, rec_sta: Station) -> str:
return f"{src_sta}_{rec_sta}"

def _get_channel_pair(self, src_chan: ChannelType, rec_chan: ChannelType) -> str:
return f"{src_chan.name}_{rec_chan.name}"


class StackStore:
"""
Expand All @@ -107,3 +113,22 @@ def append(
self, src: Station, rec: Station, components: str, name: str, stack_params: Dict[str, Any], data: np.ndarray
):
pass


def timespan_str(timespan: DateTimeRange) -> str:
return f"{timespan.start_datetime.strftime(DATE_FORMAT)}T{timespan.end_datetime.strftime(DATE_FORMAT)}"


def parse_station_pair(pair: str) -> Tuple[Station, Station]:
# Parse from:'CI.ARV_CI.BAK
def station(sta: str) -> Station:
net, name = sta.split(".")
return Station(net, name)

return tuple(map(station, pair.split("_")))


def parse_timespan(filename: str) -> DateTimeRange:
parts = os.path.splitext(os.path.basename(filename))[0].split("T")
dates = [obspy.UTCDateTime(p).datetime.replace(tzinfo=datetime.timezone.utc) for p in parts]
return DateTimeRange(dates[0], dates[1])
123 changes: 123 additions & 0 deletions src/noisepy/seis/zarrstore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import logging
from typing import Any, Dict, List, Tuple

import numpy as np
import zarr
from datetimerange import DateTimeRange

from noisepy.seis.constants import DONE_PATH

from .datatypes import Channel, ChannelType, Station
from .stores import (
CrossCorrelationDataStore,
parse_station_pair,
parse_timespan,
timespan_str,
)

logger = logging.getLogger(__name__)


class ZarrCCStore(CrossCorrelationDataStore):
"""
CrossCorrelationDataStore that uses hierarchical Zarr files for storage. The directory organization is as follows:
/ (root)
station_pair (group)
timestamp (group)
channel_pair (array)
'done' (array)
'done' is a dummy array to track completed timespans in its attribute dictionary.
Args:
root_dir: Storage location
mode: "r" or "w" for read-only or writing mode
chunks: Chunking or tile size for the arrays. The individual arrays should not be huge since the data
is already partioned by station pair, timespan and channel pair.
"""

def __init__(self, root_dir: str, mode: str = "w", chunks: Tuple[int, int] = (4 * 1024, 4 * 1024)) -> None:
super().__init__()
self.chunks = chunks
self.root = zarr.open(root_dir, mode=mode)
logging.info(f"store created at {root_dir}")

def contains(self, timespan: DateTimeRange, src_chan: Channel, rec_chan: Channel) -> bool:
path = self._get_path(timespan, src_chan, rec_chan)
return path in self.root

def append(
self,
timespan: DateTimeRange,
src_chan: Channel,
rec_chan: Channel,
cc_params: Dict[str, Any],
data: np.ndarray,
):
path = self._get_path(timespan, src_chan, rec_chan)
logging.debug(f"Appending to {path}: {data.shape}")
array = self.root.require_dataset(
path,
shape=data.shape,
chunks=self.chunks,
dtype=data.dtype,
)
array[:] = data
for k, v in cc_params.items():
array.attrs[k] = _to_json(v)

def is_done(self, timespan: DateTimeRange):
if DONE_PATH not in self.root.array_keys():
return False
done_array = self.root[DONE_PATH]
return timespan_str(timespan) in done_array.attrs.keys()

def mark_done(self, timespan: DateTimeRange):
done_array = self.root.require_dataset(DONE_PATH, shape=(1, 1))
done_array.attrs[timespan_str(timespan)] = True

def get_timespans(self) -> List[DateTimeRange]:
pairs = [k for k in self.root.group_keys() if k != DONE_PATH]
timespans = []
for p in pairs:
timespans.extend(k for k in self.root[p].group_keys())
return list(parse_timespan(t) for t in sorted(set(timespans)))

def get_station_pairs(self) -> List[Tuple[Station, Station]]:
pairs = [parse_station_pair(k) for k in self.root.group_keys() if k != DONE_PATH]
return pairs

def get_channeltype_pairs(
self, timespan: DateTimeRange, src_sta: Station, rec_sta: Station
) -> List[Tuple[ChannelType, ChannelType]]:
path = self._get_station_path(timespan, src_sta, rec_sta)
if path not in self.root:
return []
ch_pairs = self.root[path].array_keys()
return [tuple(map(ChannelType, ch.split("_"))) for ch in ch_pairs]

def read(
self, timespan: DateTimeRange, src_sta: Station, rec_sta: Station, src_ch: ChannelType, rec_ch: ChannelType
) -> Tuple[Dict, np.ndarray]:
path = self._get_path(timespan, Channel(src_ch, src_sta), Channel(rec_ch, rec_sta))
if path not in self.root:
return ({}, np.empty)
array = self.root[path]
data = array[:]
params = {k: array.attrs[k] for k in array.attrs.keys()}
return (params, data)

def _get_station_path(self, timespan: DateTimeRange, src_sta: Station, rec_sta: Station) -> str:
stations = self._get_station_pair(src_sta, rec_sta)
times = timespan_str(timespan)
return f"{stations}/{times}"

def _get_path(self, timespan: DateTimeRange, src_chan: Channel, rec_chan: Channel) -> str:
channels = self._get_channel_pair(src_chan.type, rec_chan.type)
station_path = self._get_station_path(timespan, src_chan.station, rec_chan.station)
return f"{station_path}/{channels}"


def _to_json(value: Any) -> Any:
if type(value) == np.ndarray:
return value.tolist()
return value
50 changes: 1 addition & 49 deletions tests/test_asdfstore.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
from datetime import datetime, timedelta, timezone
from datetime import datetime
from pathlib import Path

import numpy as np
import pytest
from datetimerange import DateTimeRange

from noisepy.seis.asdfstore import ASDFCCStore, ASDFRawDataStore
from noisepy.seis.datatypes import Channel, ChannelType, Station


@pytest.fixture
Expand Down Expand Up @@ -45,48 +42,3 @@ def test_get_data(store: ASDFRawDataStore):
assert chdata.data.size == 72001
assert chdata.sampling_rate == 20.0
assert chdata.start_timestamp == ts.start_datetime.timestamp()


def test_ccstore(ccstore: ASDFCCStore):
def make_1dts(dt: datetime):
dt = dt.replace(tzinfo=timezone.utc, microsecond=0)
return DateTimeRange(dt, dt + timedelta(days=1))

data = np.zeros(0)
ts1 = make_1dts(datetime.now())
ts2 = make_1dts(ts1.end_datetime)
src = Channel(ChannelType("foo"), Station("nw", "sta1"))
rec = Channel(ChannelType("bar"), Station("nw", "sta2"))

# assert empty state
assert not ccstore.is_done(ts1)
assert not ccstore.is_done(ts2)
assert not ccstore.contains(ts1, src, rec)
assert not ccstore.contains(ts2, src, rec)

# add CC (src->rec) for ts1
ccstore.append(ts1, src, rec, {}, data)
# assert ts1 is there, but not ts2
assert ccstore.contains(ts1, src, rec)
assert not ccstore.contains(ts2, src, rec)
# also rec->src should not be there for ts1
assert not ccstore.contains(ts1, rec, src)
assert not ccstore.is_done(ts1)
# now mark ts1 done and assert it
ccstore.mark_done(ts1)
assert ccstore.is_done(ts1)
assert not ccstore.is_done(ts2)

# now add CC for ts2
ccstore.append(ts2, src, rec, {}, data)
assert ccstore.contains(ts2, src, rec)
assert not ccstore.is_done(ts2)
ccstore.mark_done(ts2)
assert ccstore.is_done(ts2)

timespans = ccstore.get_timespans()
assert timespans == [ts1, ts2]
sta_pairs = ccstore.get_station_pairs(ts1)
assert sta_pairs == [(src.station, rec.station)]
cha_pairs = ccstore.get_channeltype_pairs(ts1, sta_pairs[0][0], sta_pairs[0][1])
assert cha_pairs == [(src.type, rec.type)]
Loading

0 comments on commit c03b95b

Please sign in to comment.