Skip to content

Commit

Permalink
Update TimeSeriesDataset metadata (#912)
Browse files Browse the repository at this point in the history
Remove current metadata, as it is basically a duplicate
of what `.to_dict()` does in the resulting machine.dataset
storage.

Updates the underlying `.join_timeseries` to record
information about each series which is loaded along with
the final joining of those series.
  • Loading branch information
milesgranger authored Feb 14, 2020
1 parent 1f1bbc5 commit a21f254
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 50 deletions.
20 changes: 16 additions & 4 deletions gordo/machine/dataset/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class InsufficientDataError(ValueError):
class GordoBaseDataset:

_params: Dict[Any, Any] = dict() # provided by @capture_args on child's __init__
_metadata: Dict[Any, Any] = dict()

@abc.abstractmethod
def get_data(self):
Expand Down Expand Up @@ -73,8 +74,8 @@ def get_metadata(self):
Return metadata about the dataset in primitive / json encode-able dict form.
"""

@staticmethod
def join_timeseries(
self,
series_iterable: Iterable[pd.Series],
resampling_startpoint: datetime,
resampling_endpoint: datetime,
Expand Down Expand Up @@ -121,7 +122,11 @@ def join_timeseries(
resampled_series = []
missing_data_series = []

key = "tag_loading_metadata"
self._metadata[key] = dict()

for series in series_iterable:
self._metadata[key][series.name] = dict(original_length=len(series))
try:
resampled = GordoBaseDataset._resample(
series,
Expand All @@ -134,17 +139,24 @@ def join_timeseries(
missing_data_series.append(series.name)
else:
resampled_series.append(resampled)

self._metadata[key][series.name].update(
dict(resampled_length=len(resampled))
)
if missing_data_series:
raise InsufficientDataError(
f"The following features are missing data: {missing_data_series}"
)

new_series = pd.concat(resampled_series, axis=1, join="inner")
joined_df = pd.concat(resampled_series, axis=1, join="inner")

# Before returning, delete all rows with NaN, they were introduced by the
# insertion of NaNs in the beginning of all timeseries
dropped_na = joined_df.dropna()

return new_series.dropna()
self._metadata[key]["aggregate_metadata"] = dict(
joined_length=len(joined_df), dropped_na_length=len(dropped_na)
)
return dropped_na

@staticmethod
def _resample(
Expand Down
21 changes: 7 additions & 14 deletions gordo/machine/dataset/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,21 +228,14 @@ def get_data(self) -> Tuple[pd.DataFrame, Optional[pd.DataFrame]]:
X = data[x_tag_names]
y = data[y_tag_names] if self.target_tag_list else None

if X.first_valid_index():
self._metadata["train_start_date_actual"] = X.index[0]
self._metadata["train_end_date_actual"] = X.index[-1]

return X, y

def get_metadata(self):
metadata = {
"tag_list": self.tag_list,
"target_tag_list": self.target_tag_list,
"train_start_date": self.train_start_date,
"train_end_date": self.train_end_date,
"resolution": self.resolution,
"filter": self.row_filter,
"row_filter_buffer_size": self.row_filter_buffer_size,
"data_provider": self.data_provider.to_dict(),
"asset": self.asset,
}
return metadata
return self._metadata.copy()


class RandomDataset(TimeSeriesDataset):
Expand All @@ -255,8 +248,8 @@ class RandomDataset(TimeSeriesDataset):
@capture_args
def __init__(
self,
train_start_date: datetime,
train_end_date: datetime,
train_start_date: Union[datetime, str],
train_end_date: Union[datetime, str],
tag_list: list,
**kwargs,
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,6 @@ def test_init():
), f"Failed to create dataset object of type {config['type']}"


def test_get_metadata():
dataset_config = _get_default_dataset_config()
dl_backed = dataset._get_dataset(dataset_config)
metadata = dl_backed.get_metadata()

assert metadata["train_start_date"] == dataset_config["train_start_date"]
assert metadata["train_end_date"] == dataset_config["train_end_date"]
assert metadata["tag_list"] == dataset_config["tag_list"]
assert metadata["resolution"] == "10T"

dataset_config["resolution"] = "10M"
dl_backed = dataset._get_dataset(dataset_config)
metadata = dl_backed.get_metadata()
assert metadata["resolution"] == dataset_config["resolution"]


@pytest.mark.skipif(
os.getenv("INTERACTIVE") is None,
reason="Skipping test, INTERACTIVE not set in environment variable",
Expand Down
32 changes: 16 additions & 16 deletions tests/gordo/machine/dataset/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@
from gordo.machine.dataset.sensor_tag import SensorTag


@pytest.fixture
def dataset():
return RandomDataset(
train_start_date="2017-12-25 06:00:00Z",
train_end_date="2017-12-29 06:00:00Z",
tag_list=[SensorTag("Tag 1", None), SensorTag("Tag 2", None)],
)


def create_timeseries_list():
"""Create three dataframes with different resolution and different start/ends"""
# Test for no NaNs, test for correct first and last date
Expand Down Expand Up @@ -57,20 +66,11 @@ def create_timeseries_list():
)


def test_random_dataset_attrs():
def test_random_dataset_attrs(dataset):
"""
Test expected attributes
"""

start = dateutil.parser.isoparse("2017-12-25 06:00:00Z")
end = dateutil.parser.isoparse("2017-12-29 06:00:00Z")

dataset = RandomDataset(
train_start_date=start,
train_end_date=end,
tag_list=[SensorTag("Tag 1", None), SensorTag("Tag 2", None)],
)

assert isinstance(dataset, GordoBaseDataset)
assert hasattr(dataset, "get_data")
assert hasattr(dataset, "get_metadata")
Expand All @@ -85,7 +85,7 @@ def test_random_dataset_attrs():
assert isinstance(metadata, dict)


def test_join_timeseries():
def test_join_timeseries(dataset):

timeseries_list, latest_start, earliest_end = create_timeseries_list()

Expand All @@ -95,7 +95,7 @@ def test_join_timeseries():
timedelta = pd.Timedelta("7 minutes")
resampling_start = dateutil.parser.isoparse("2017-12-25 06:00:00Z")
resampling_end = dateutil.parser.isoparse("2018-01-15 08:00:00Z")
all_in_frame = GordoBaseDataset.join_timeseries(
all_in_frame = dataset.join_timeseries(
timeseries_list, resampling_start, resampling_end, frequency
)

Expand Down Expand Up @@ -139,18 +139,18 @@ def test_join_timeseries_empty_series(value, n_rows, resolution, row_threshold,
TimeSeriesDataset(**kwargs).get_data()


def test_join_timeseries_nonutcstart():
def test_join_timeseries_nonutcstart(dataset):
timeseries_list, latest_start, earliest_end = create_timeseries_list()
frequency = "7T"
resampling_start = dateutil.parser.isoparse("2017-12-25 06:00:00+07:00")
resampling_end = dateutil.parser.isoparse("2018-01-12 13:07:00+07:00")
all_in_frame = GordoBaseDataset.join_timeseries(
all_in_frame = dataset.join_timeseries(
timeseries_list, resampling_start, resampling_end, frequency
)
assert len(all_in_frame) == 1854


def test_join_timeseries_with_gaps():
def test_join_timeseries_with_gaps(dataset):

timeseries_list, latest_start, earliest_end = create_timeseries_list()

Expand All @@ -166,7 +166,7 @@ def test_join_timeseries_with_gaps():
resampling_start = dateutil.parser.isoparse("2017-12-25 06:00:00Z")
resampling_end = dateutil.parser.isoparse("2018-01-12 07:00:00Z")

all_in_frame = GordoBaseDataset.join_timeseries(
all_in_frame = dataset.join_timeseries(
timeseries_with_holes, resampling_start, resampling_end, frequency
)
assert all_in_frame.index[0] == pd.Timestamp(latest_start)
Expand Down

0 comments on commit a21f254

Please sign in to comment.