Skip to content

Commit

Permalink
Merge pull request #283 from noisepy/kuanfu_ncedc2
Browse files Browse the repository at this point in the history
Adding scripts for using the NCEDC data stored on s3
  • Loading branch information
kuanfufeng authored Feb 12, 2024
2 parents cea7c95 + 3baf92e commit c58d6b8
Show file tree
Hide file tree
Showing 8 changed files with 659 additions and 29 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ src/noisepy/seis/_version.py
# Temp locations for tutorials
tutorials/get_started_data/
tutorials/scedc_data/
tutorials/ncedc_data/
tutorials/cli/tmpdata
tutorials/pnw_data

Expand Down
1 change: 1 addition & 0 deletions src/noisepy/seis/channelcatalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __init__(self, xmlpath: str, path_format: str = "{network}_{name}.xml", stor
super().__init__()
self.xmlpath = xmlpath
self.path_format = path_format

self.fs = get_filesystem(xmlpath, storage_options=storage_options)
if not self.fs.exists(self.xmlpath):
raise Exception(f"The XML Station file directory '{xmlpath}' doesn't exist")
Expand Down
1 change: 1 addition & 0 deletions src/noisepy/seis/noise_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ def preprocess_raw(
st[ii].data = np.float32(st[ii].data)
st[ii].data = scipy.signal.detrend(st[ii].data, type="constant")
st[ii].data = scipy.signal.detrend(st[ii].data, type="linear")
st[ii] = st[ii].taper(max_percentage=0.05)

# merge, taper and filter the data
if len(st) > 1:
Expand Down
141 changes: 116 additions & 25 deletions src/noisepy/seis/scedc_s3store.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
import re
from abc import abstractmethod
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta, timezone
Expand Down Expand Up @@ -34,23 +35,19 @@ def filter(ch: Channel) -> bool:
return filter


class SCEDCS3DataStore(RawDataStore):
class MiniSeedS3DataStore(RawDataStore):
"""
A data store implementation to read from a directory of miniSEED (.ms) files from the SCEDC S3 bucket.
A data store implementation to read from a directory of miniSEED (.ms) files from an S3 bucket.
Every directory is a a day and each .ms file contains the data for a channel.
"""

# TODO: Support reading directly from the S3 bucket

# for checking the filename has the form: CIGMR__LHN___2022002.ms
file_re = re.compile(r".*[0-9]{7}\.ms$", re.IGNORECASE)

def __init__(
self,
path: str,
chan_catalog: ChannelCatalog,
ch_filter: Callable[[Channel], bool] = lambda s: True, # noqa: E731
date_range: DateTimeRange = None,
file_name_regex: str = None,
storage_options: dict = {},
):
"""
Expand All @@ -61,6 +58,7 @@ def __init__(
if None, all channels are used
"""
super().__init__()
self.file_re = re.compile(file_name_regex, re.IGNORECASE)
self.fs = get_filesystem(path, storage_options=storage_options)
self.channel_catalog = chan_catalog
self.path = path
Expand All @@ -80,12 +78,12 @@ def __init__(

def _load_channels(self, full_path: str, ch_filter: Callable[[Channel], bool]):
tlog = TimeLogger(logger=logger, level=logging.INFO)
msfiles = [f for f in self.fs.glob(fs_join(full_path, "*.ms")) if self.file_re.match(f) is not None]
msfiles = [f for f in self.fs.glob(fs_join(full_path, "*")) if self.file_re.match(f) is not None]
tlog.log(f"Loading {len(msfiles)} files from {full_path}")
for f in msfiles:
timespan = SCEDCS3DataStore._parse_timespan(f)
timespan = self._parse_timespan(f)
self.paths[timespan.start_datetime] = full_path
channel = SCEDCS3DataStore._parse_channel(os.path.basename(f))
channel = self._parse_channel(os.path.basename(f))
if not ch_filter(channel):
continue
key = str(timespan) # DataTimeFrame is not hashable
Expand All @@ -100,7 +98,7 @@ def _ensure_channels_loaded(self, date_range: DateTimeRange):
date = date_range.start_datetime + timedelta(days=d)
if self.date_range is None or date not in self.date_range:
continue
date_path = str(date.year) + "/" + str(date.year) + "_" + str(date.timetuple().tm_yday).zfill(3) + "/"
date_path = self._get_datepath(date)
full_path = fs_join(self.path, date_path)
self._load_channels(full_path, self.ch_filter)

Expand Down Expand Up @@ -128,13 +126,7 @@ def get_timespans(self) -> List[DateTimeRange]:
def read_data(self, timespan: DateTimeRange, chan: Channel) -> ChannelData:
self._ensure_channels_loaded(timespan)
# reconstruct the file name from the channel parameters
chan_str = (
f"{chan.station.network}{chan.station.name.ljust(5, '_')}{chan.type.name}"
f"{chan.station.location.ljust(3, '_')}"
)
filename = fs_join(
self.paths[timespan.start_datetime], f"{chan_str}{timespan.start_datetime.strftime('%Y%j')}.ms"
)
filename = self._get_filename(timespan, chan)
if not self.fs.exists(filename):
logger.warning(f"Could not find file {filename}")
return ChannelData.empty()
Expand All @@ -147,14 +139,43 @@ def read_data(self, timespan: DateTimeRange, chan: Channel) -> ChannelData:
def get_inventory(self, timespan: DateTimeRange, station: Station) -> obspy.Inventory:
return self.channel_catalog.get_inventory(timespan, station)

def _parse_timespan(filename: str) -> DateTimeRange:
# The SCEDC S3 bucket stores files in the form: CIGMR__LHN___2022002.ms
year = int(filename[-10:-6])
day = int(filename[-6:-3])
jan1 = datetime(year, 1, 1, tzinfo=timezone.utc)
return DateTimeRange(jan1 + timedelta(days=day - 1), jan1 + timedelta(days=day))
@abstractmethod
def _get_datepath(self, timespan: datetime) -> str:
pass

@abstractmethod
def _get_filename(self, timespan: DateTimeRange, channel: Channel) -> str:
pass

@abstractmethod
def _parse_channel(self, filename: str) -> Channel:
pass

@abstractmethod
def _parse_timespan(self, filename: str) -> DateTimeRange:
pass


def _parse_channel(filename: str) -> Channel:
class SCEDCS3DataStore(MiniSeedS3DataStore):
def __init__(
self,
path: str,
chan_catalog: ChannelCatalog,
ch_filter: Callable[[Channel], bool] = lambda s: True, # noqa: E731
date_range: DateTimeRange = None,
storage_options: dict = {},
):
super().__init__(
path,
chan_catalog,
ch_filter=ch_filter,
date_range=date_range,
# for checking the filename has the form: CIGMR__LHN___2022002.ms
file_name_regex=r".*[0-9]{7}\.ms$",
storage_options=storage_options,
)

def _parse_channel(self, filename: str) -> Channel:
# e.g.
# CIGMR__LHN___2022002
# CE13884HNZ10_2022002
Expand All @@ -167,3 +188,73 @@ def _parse_channel(filename: str) -> Channel:
# lat/lon/elev will be populated later
Station(network, station, location=location),
)

def _parse_timespan(self, filename: str) -> DateTimeRange:
# The SCEDC S3 bucket stores files in the form: CIGMR__LHN___2022002.ms
year = int(filename[-10:-6])
day = int(filename[-6:-3])
jan1 = datetime(year, 1, 1, tzinfo=timezone.utc)
return DateTimeRange(jan1 + timedelta(days=day - 1), jan1 + timedelta(days=day))

def _get_filename(self, timespan: DateTimeRange, channel: Channel) -> str:
chan_str = (
f"{channel.station.network}{channel.station.name.ljust(5, '_')}{channel.type.name}"
f"{channel.station.location.ljust(3, '_')}"
)
filename = fs_join(
self.paths[timespan.start_datetime], f"{chan_str}{timespan.start_datetime.strftime('%Y%j')}.ms"
)
return filename

def _get_datepath(self, date: datetime) -> str:
return str(date.year) + "/" + str(date.year) + "_" + str(date.timetuple().tm_yday).zfill(3) + "/"


class NCEDCS3DataStore(MiniSeedS3DataStore):
def __init__(
self,
path: str,
chan_catalog: ChannelCatalog,
ch_filter: Callable[[Channel], bool] = lambda s: True, # noqa: E731
date_range: DateTimeRange = None,
storage_options: dict = {},
):
super().__init__(
path,
chan_catalog,
ch_filter=ch_filter,
date_range=date_range,
# for checking the filename has the form: AAS.NC.EHZ..D.2020.002
file_name_regex=r".*[0-9]{4}.*[0-9]{3}$",
storage_options=storage_options,
)

def _parse_channel(self, filename: str) -> Channel:
# e.g.
# AAS.NC.EHZ..D.2020.002
split_fn = filename.split(".")
network = split_fn[1]
station = split_fn[0]
channel = split_fn[2]
location = split_fn[3]
if len(channel) > 3:
channel = channel[:3]
return Channel(
ChannelType(channel, location),
# lat/lon/elev will be populated later
Station(network, station, location=location),
)

def _parse_timespan(self, filename: str) -> DateTimeRange:
# The NCEDC S3 bucket stores files in the form: AAS.NC.EHZ..D.2020.002
year = int(filename[-8:-4])
day = int(filename[-3:])
jan1 = datetime(year, 1, 1, tzinfo=timezone.utc)
return DateTimeRange(jan1 + timedelta(days=day - 1), jan1 + timedelta(days=day))

def _get_datepath(self, date: datetime) -> str:
return str(date.year) + "/" + str(date.year) + "." + str(date.timetuple().tm_yday).zfill(3) + "/"

def _get_filename(self, timespan: DateTimeRange, chan: Channel) -> str:
chan_str = f"{chan.station.name}.{chan.station.network}.{chan.type.name}." f"{chan.station.location}.D"
return fs_join(self.paths[timespan.start_datetime], f"{chan_str}.{timespan.start_datetime.strftime('%Y.%j')}")
35 changes: 35 additions & 0 deletions tests/test_ncedc_s3store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import os
from datetime import datetime, timezone

import pytest
from datetimerange import DateTimeRange

from noisepy.seis.scedc_s3store import NCEDCS3DataStore

timespan1 = DateTimeRange(datetime(2022, 1, 2, tzinfo=timezone.utc), datetime(2022, 1, 3, tzinfo=timezone.utc))
timespan2 = DateTimeRange(datetime(2021, 2, 3, tzinfo=timezone.utc), datetime(2021, 2, 4, tzinfo=timezone.utc))
timespan3 = DateTimeRange(datetime(2023, 6, 1, tzinfo=timezone.utc), datetime(2023, 6, 2, tzinfo=timezone.utc))
files_dates = [
("AFD.NC.HHZ..D.2022.002", timespan1),
("KCPB.NC.HHN..D.2021.034", timespan2),
("LMC.NC.HHN..D.2023.152", timespan3),
]


@pytest.mark.parametrize("file,expected", files_dates)
def test_parsefilename2(file: str, expected: DateTimeRange):
assert expected == NCEDCS3DataStore._parse_timespan(None, file)


data_paths = [
(os.path.join(os.path.dirname(__file__), "./data/s3ncedc"), None),
("s3://ncedc-pds/continuous_waveforms/NC/2022/2022.002/", None),
("s3://ncedc-pds/continuous_waveforms/", timespan1),
]


read_channels = [
(NCEDCS3DataStore._parse_channel(None, "AFD.NC.HHZ..D.2022.002")),
(NCEDCS3DataStore._parse_channel(None, "NSP.NC.EHZ..D.2022.002")),
(NCEDCS3DataStore._parse_channel(None, "PSN.NC.EHZ..D.2022.002")),
]
8 changes: 4 additions & 4 deletions tests/test_scedc_s3store.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

@pytest.mark.parametrize("file,expected", files_dates)
def test_parsefilename(file: str, expected: DateTimeRange):
assert expected == SCEDCS3DataStore._parse_timespan(file)
assert expected == SCEDCS3DataStore._parse_timespan(None, file)


data_paths = [
Expand All @@ -31,9 +31,9 @@ def test_parsefilename(file: str, expected: DateTimeRange):


read_channels = [
(SCEDCS3DataStore._parse_channel("BKTHIS_LHZ00_2022002.ms")),
(SCEDCS3DataStore._parse_channel("CIFOX2_LHZ___2022002.ms")),
(SCEDCS3DataStore._parse_channel("CINCH__LHZ___2022002.ms")),
(SCEDCS3DataStore._parse_channel(None, "BKTHIS_LHZ00_2022002.ms")),
(SCEDCS3DataStore._parse_channel(None, "CIFOX2_LHZ___2022002.ms")),
(SCEDCS3DataStore._parse_channel(None, "CINCH__LHZ___2022002.ms")),
]


Expand Down
1 change: 1 addition & 0 deletions tutorials/_toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ chapters:
- file: get_started.ipynb
- file: noisepy_datastore.ipynb
- file: noisepy_scedc_tutorial.ipynb
- file: noisepy_ncedc_tutorial.ipynb
- file: CLI.md
- file: cloud/checklist.md
- file: cloud/aws-ec2.md
Expand Down
Loading

0 comments on commit c58d6b8

Please sign in to comment.