Skip to content

Commit

Permalink
Memory optimizations when loading historic data for anomaly detection.
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrczarnas committed Oct 17, 2024
1 parent 158c052 commit 1a91a09
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ public class SensorReadoutsColumnNames {

COLUMN_NAME_COLUMN_NAME,
DATA_GROUP_NAME_COLUMN_NAME,
DATA_GROUP_HASH_COLUMN_NAME,
TABLE_COMPARISON_NAME_COLUMN_NAME,

DURATION_MS_COLUMN_NAME,
Expand Down Expand Up @@ -259,4 +260,16 @@ public class SensorReadoutsColumnNames {
DATA_GROUPING_LEVEL_COLUMN_NAME_PREFIX + "8",
DATA_GROUPING_LEVEL_COLUMN_NAME_PREFIX + "9"
};

/**
* A list of a minimum set of columns from sensor readouts that are used to feed time series sensors with historical data.
*/
public static final String[] SENSOR_READOUT_COLUMN_NAMES_HISTORIC_DATA = new String[] {
TIME_PERIOD_COLUMN_NAME,
TIME_PERIOD_UTC_COLUMN_NAME,
CHECK_HASH_COLUMN_NAME,
DATA_GROUP_HASH_COLUMN_NAME,
ACTUAL_VALUE_COLUMN_NAME,
EXPECTED_VALUE_COLUMN_NAME
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@
import com.dqops.core.principal.UserDomainIdentity;
import com.dqops.core.synchronization.contract.DqoRoot;
import com.dqops.data.readouts.factory.SensorReadoutsColumnNames;
import com.dqops.data.storage.FileStorageSettings;
import com.dqops.data.storage.ParquetPartitionStorageService;
import com.dqops.data.storage.TableDataSnapshot;
import com.dqops.data.storage.TablePartitioningPattern;
import com.dqops.data.storage.*;
import com.dqops.metadata.sources.PhysicalTableName;
import com.dqops.utils.reflection.ObjectMemorySizeUtility;
import com.dqops.utils.tables.TableColumnUtility;
Expand All @@ -39,7 +36,6 @@
*/
public class SensorReadoutsSnapshot extends TableDataSnapshot {
public static String PARQUET_FILE_NAME = "sensor_readout.0.parquet";
public static boolean ENABLE_PRE_FILLING_TIME_SERIES_CACHE = false;
private SensorReadoutsTimeSeriesMap timeSeriesMap;

/**
Expand Down Expand Up @@ -101,30 +97,8 @@ public SensorReadoutsTimeSeriesMap getHistoricReadoutsTimeSeries() {
return this.timeSeriesMap;
}

Table allLoadedData = this.getAllData();
this.timeSeriesMap = new SensorReadoutsTimeSeriesMap(this.getFirstLoadedMonth(), this.getLastLoadedMonth(), allLoadedData);

if (allLoadedData != null && ENABLE_PRE_FILLING_TIME_SERIES_CACHE) {
// THIS SECTION is disabled for the moment in favor of using an index and searching for time series on demand

TableSliceGroup tableSlices = allLoadedData.splitOn(SensorReadoutsColumnNames.CHECK_HASH_COLUMN_NAME,
SensorReadoutsColumnNames.DATA_GROUP_HASH_COLUMN_NAME);

for (TableSlice tableSlice : tableSlices) {
Table timeSeriesTable = tableSlice.asTable();
LongColumn checkHashColumn = (LongColumn) timeSeriesTable.column(SensorReadoutsColumnNames.CHECK_HASH_COLUMN_NAME);
LongColumn dataStreamHashColumn = (LongColumn) TableColumnUtility.findColumn(timeSeriesTable,
SensorReadoutsColumnNames.DATA_GROUP_HASH_COLUMN_NAME);
long checkHashId = checkHashColumn.get(0); // the first row has the value
long dataStreamHash = dataStreamHashColumn.isMissing(0) ? 0L : dataStreamHashColumn.get(0);

SensorReadoutTimeSeriesKey timeSeriesKey = new SensorReadoutTimeSeriesKey(checkHashId, dataStreamHash);
Table sortedTimeSeriesTable = timeSeriesTable.sortOn(SensorReadoutsColumnNames.TIME_PERIOD_COLUMN_NAME);
SensorReadoutsTimeSeriesData timeSeriesData = new SensorReadoutsTimeSeriesData(timeSeriesKey, sortedTimeSeriesTable);
this.timeSeriesMap.add(timeSeriesData);
}
}

this.timeSeriesMap = new SensorReadoutsTimeSeriesMap(this.getFirstLoadedMonth(), this.getLastLoadedMonth(),
this.getLoadedMonthlyPartitions());
return this.timeSeriesMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,46 +16,58 @@
package com.dqops.data.readouts.snapshot;

import com.dqops.data.readouts.factory.SensorReadoutsColumnNames;
import com.dqops.data.storage.LoadedMonthlyPartition;
import com.dqops.data.storage.ParquetPartitionId;
import com.dqops.utils.tables.TableColumnUtility;
import lombok.Data;
import tech.tablesaw.api.LongColumn;
import tech.tablesaw.api.Table;
import tech.tablesaw.index.LongIndex;
import tech.tablesaw.selection.Selection;

import java.lang.ref.WeakReference;
import java.time.LocalDate;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.TreeMap;

/**
* Dictionary of identified time series in the historic sensor readout results.
*/
public class SensorReadoutsTimeSeriesMap {
private final Map<SensorReadoutTimeSeriesKey, SensorReadoutsTimeSeriesData> entries = new LinkedHashMap<>();
private final Map<SensorReadoutTimeSeriesKey, WeakReference<SensorReadoutsTimeSeriesData>> entries = new LinkedHashMap<>();
private final Map<ParquetPartitionId, LoadedMonthlyPartition> partitionMap;
private final Map<ParquetPartitionId, PartitionIndexes> partitionIndexes = new TreeMap<>();
private LocalDate firstLoadedMonth;
private LocalDate lastLoadedMonth;
private Table allLoadedData;
private LongColumn checkHashColumn;
private LongColumn dataStreamHashColumn;
private LongIndex checkHashIndex;
private LongIndex dataStreamHashIndex;

/**
* Create a time series map.
* @param firstLoadedMonth The date of the first loaded month.
* @param lastLoadedMonth The date of the last loaded month.
* @param partitionMap Dictionary of loaded partitions.
*/
public SensorReadoutsTimeSeriesMap(LocalDate firstLoadedMonth, LocalDate lastLoadedMonth, Table allLoadedData) {
public SensorReadoutsTimeSeriesMap(LocalDate firstLoadedMonth, LocalDate lastLoadedMonth,
Map<ParquetPartitionId, LoadedMonthlyPartition> partitionMap) {
this.firstLoadedMonth = firstLoadedMonth;
this.lastLoadedMonth = lastLoadedMonth;
this.allLoadedData = allLoadedData;

if (allLoadedData != null) {
this.checkHashColumn = (LongColumn) allLoadedData.column(SensorReadoutsColumnNames.CHECK_HASH_COLUMN_NAME);
this.dataStreamHashColumn = (LongColumn) TableColumnUtility.findColumn(allLoadedData,
SensorReadoutsColumnNames.DATA_GROUP_HASH_COLUMN_NAME);
this.checkHashIndex = new LongIndex(this.checkHashColumn);
this.dataStreamHashIndex = new LongIndex(this.dataStreamHashColumn);
this.partitionMap = partitionMap;
if (partitionMap != null) {
for (Map.Entry<ParquetPartitionId, LoadedMonthlyPartition> partitionKeyValue : partitionMap.entrySet()) {
Table partitionData = partitionKeyValue.getValue().getData();
if (partitionData == null) {
return;
}

LongColumn checkHashColumn = (LongColumn) partitionData.column(SensorReadoutsColumnNames.CHECK_HASH_COLUMN_NAME);
LongColumn dataStreamHashColumn = (LongColumn) TableColumnUtility.findColumn(partitionData,
SensorReadoutsColumnNames.DATA_GROUP_HASH_COLUMN_NAME);
LongIndex checkHashIndex = new LongIndex(checkHashColumn);
LongIndex dataStreamHashIndex = new LongIndex(dataStreamHashColumn);

PartitionIndexes partitionIndexesEntry = new PartitionIndexes(checkHashIndex, dataStreamHashIndex, partitionKeyValue.getValue());
this.partitionIndexes.put(partitionKeyValue.getKey(), partitionIndexesEntry);
}
}
}

Expand Down Expand Up @@ -83,30 +95,66 @@ public LocalDate getLastLoadedMonth() {
*/
public SensorReadoutsTimeSeriesData findTimeSeriesData(long checkHashId, long dimensionId) {
SensorReadoutTimeSeriesKey key = new SensorReadoutTimeSeriesKey(checkHashId, dimensionId);
SensorReadoutsTimeSeriesData sensorReadoutsTimeSeriesData = this.entries.get(key);
WeakReference<SensorReadoutsTimeSeriesData> sensorReadoutsTimeSeriesDataRef = this.entries.get(key);
SensorReadoutsTimeSeriesData sensorReadoutsTimeSeriesData = sensorReadoutsTimeSeriesDataRef != null ?
sensorReadoutsTimeSeriesDataRef.get() : null;

if (sensorReadoutsTimeSeriesData != null) {
return sensorReadoutsTimeSeriesData;
}

if (this.checkHashIndex == null) {
return null;
}
Table allTimeSeriesData = null;

for (Map.Entry<ParquetPartitionId, PartitionIndexes> partitionIndexesKeyValue : this.partitionIndexes.entrySet()) {
PartitionIndexes partitionIndexesEntry = partitionIndexesKeyValue.getValue();
Selection checkHashRows = partitionIndexesEntry.checkHashIndex.get(checkHashId);
Selection groupHashRows = partitionIndexesEntry.dataStreamHashIndex.get(dimensionId);

Table partitionDataTable = partitionIndexesEntry.partitionData.getData();
if (partitionDataTable == null) {
continue;
}

Selection checkHashRows = this.checkHashIndex.get(checkHashId);
Selection groupHashRows = this.dataStreamHashIndex.get(dimensionId);
Table filteredPartitionRows = partitionDataTable.where(checkHashRows.and(groupHashRows));
Table sortedTimeSeriesTable = filteredPartitionRows.sortOn(SensorReadoutsColumnNames.TIME_PERIOD_COLUMN_NAME);

Table filteredRows = this.allLoadedData.where(checkHashRows.and(groupHashRows));
Table sortedTimeSeriesTable = filteredRows.sortOn(SensorReadoutsColumnNames.TIME_PERIOD_COLUMN_NAME);
if (allTimeSeriesData == null) {
allTimeSeriesData = sortedTimeSeriesTable;
} else {
allTimeSeriesData.append(sortedTimeSeriesTable);
}
}

SensorReadoutsTimeSeriesData timeSeriesDataSlice = new SensorReadoutsTimeSeriesData(key, allTimeSeriesData);

SensorReadoutsTimeSeriesData newSubset = new SensorReadoutsTimeSeriesData(key, sortedTimeSeriesTable);
return newSubset;
// TODO: we could store it in the cache.. but not for the moment, maybe for a different use case
return timeSeriesDataSlice;
}

/**
* Adds a time series object to the dictionary.
* @param timeSeries Time series object.
* Partition indexes container.
*/
public void add(SensorReadoutsTimeSeriesData timeSeries) {
this.entries.put(timeSeries.getKey(), timeSeries);
@Data
public static class PartitionIndexes {
/**
* Check hash index.
*/
private final LongIndex checkHashIndex;

/**
* Data stream (data group) hash index.
*/
private final LongIndex dataStreamHashIndex;

/**
* The partition data.
*/
private final LoadedMonthlyPartition partitionData;

public PartitionIndexes(LongIndex checkHashIndex, LongIndex dataStreamHashIndex, LoadedMonthlyPartition monthlyPartition) {
this.checkHashIndex = checkHashIndex;
this.dataStreamHashIndex = dataStreamHashIndex;
this.partitionData = monthlyPartition;
}
}
}
Loading

0 comments on commit 1a91a09

Please sign in to comment.