diff --git a/docs/samples/Microsoft.ML.Samples/Dynamic/Transforms/TimeSeries/DetectAnomalyBySrCnnBatchPrediction.cs b/docs/samples/Microsoft.ML.Samples/Dynamic/Transforms/TimeSeries/DetectAnomalyBySrCnnBatchPrediction.cs index 5ef334bc76..493673e2af 100644 --- a/docs/samples/Microsoft.ML.Samples/Dynamic/Transforms/TimeSeries/DetectAnomalyBySrCnnBatchPrediction.cs +++ b/docs/samples/Microsoft.ML.Samples/Dynamic/Transforms/TimeSeries/DetectAnomalyBySrCnnBatchPrediction.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using Microsoft.ML; using Microsoft.ML.Data; +using Microsoft.ML.TimeSeries; namespace Samples.Dynamic { @@ -29,69 +30,44 @@ public static void Example() // Convert data to IDataView. var dataView = ml.Data.LoadFromEnumerable(data); - // Setup the estimator arguments + + // Setup the detection arguments string outputColumnName = nameof(SrCnnAnomalyDetection.Prediction); string inputColumnName = nameof(TimeSeriesData.Value); - // The transformed data. - var transformedData = ml.Transforms.DetectAnomalyBySrCnn( - outputColumnName, inputColumnName, 16, 5, 5, 3, 8, 0.35).Fit( - dataView).Transform(dataView); + // Do batch anomaly detection + var outputDataView = ml.Data.BatchDetectAnomalyBySrCnn(dataView, outputColumnName, inputColumnName, batchSize: 512, sensitivity: 70, detectMode: SrCnnDetectMode.AnomalyAndMargin); // Getting the data of the newly created column as an IEnumerable of // SrCnnAnomalyDetection. var predictionColumn = ml.Data.CreateEnumerable( - transformedData, reuseRowObject: false); + outputDataView, reuseRowObject: false); Console.WriteLine($"{outputColumnName} column obtained post-" + $"transformation."); - Console.WriteLine("Data\tAlert\tScore\tMag"); + Console.WriteLine("Data\tAlert\tScore\tMag\tExpectedValue\tBoundaryUnit\tUpperBoundary\tLowerBoundary"); int k = 0; foreach (var prediction in predictionColumn) PrintPrediction(data[k++].Value, prediction); //Prediction column obtained post-transformation. - //Data Alert Score Mag - //5 0 0.00 0.00 - //5 0 0.00 0.00 - //5 0 0.00 0.00 - //5 0 0.00 0.00 - //5 0 0.00 0.00 - //5 0 0.00 0.00 - //5 0 0.00 0.00 - //5 0 0.00 0.00 - //5 0 0.00 0.00 - //5 0 0.00 0.00 - //5 0 0.00 0.00 - //5 0 0.00 0.00 - //5 0 0.00 0.00 - //5 0 0.00 0.00 - //5 0 0.00 0.00 - //5 0 0.03 0.18 - //5 0 0.03 0.18 - //5 0 0.03 0.18 - //5 0 0.03 0.18 - //5 0 0.03 0.18 - //10 1 0.47 0.93 - //5 0 0.31 0.50 - //5 0 0.05 0.30 - //5 0 0.01 0.23 - //5 0 0.00 0.21 - //5 0 0.01 0.25 + //Data Alert Score Mag ExpectedValue BoundaryUnit UpperBoundary LowerBoundary + // TODO: update with actual output from SrCnn } - private static void PrintPrediction(float value, SrCnnAnomalyDetection + private static void PrintPrediction(double value, SrCnnAnomalyDetection prediction) => - Console.WriteLine("{0}\t{1}\t{2:0.00}\t{3:0.00}", value, prediction - .Prediction[0], prediction.Prediction[1], prediction.Prediction[2]); + Console.WriteLine("{0}\t{1}\t{2:0.00}\t{3:0.00}\t{4:0.00}\t{5:0.00}\t{6:0.00}", value, + prediction.Prediction[0], prediction.Prediction[1], prediction.Prediction[2], + prediction.Prediction[3], prediction.Prediction[4], prediction.Prediction[5]); private class TimeSeriesData { - public float Value; + public double Value; - public TimeSeriesData(float value) + public TimeSeriesData(double value) { Value = value; } @@ -99,7 +75,7 @@ public TimeSeriesData(float value) private class SrCnnAnomalyDetection { - [VectorType(3)] + [VectorType(6)] public double[] Prediction { get; set; } } } diff --git a/src/Microsoft.ML.Data/DataView/BatchDataViewMapperBase.cs b/src/Microsoft.ML.Data/DataView/BatchDataViewMapperBase.cs new file mode 100644 index 0000000000..e2d49abcbb --- /dev/null +++ b/src/Microsoft.ML.Data/DataView/BatchDataViewMapperBase.cs @@ -0,0 +1,172 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Collections.Generic; +using System.Linq; +using Microsoft.ML.Runtime; + +namespace Microsoft.ML.Data.DataView +{ + internal abstract class BatchDataViewMapperBase : IDataView + { + public bool CanShuffle => false; + + public DataViewSchema Schema => SchemaBindings.AsSchema; + + private readonly IDataView _source; + private readonly IHost _host; + + protected BatchDataViewMapperBase(IHostEnvironment env, string registrationName, IDataView input) + { + _host = env.Register(registrationName); + _source = input; + } + + public long? GetRowCount() => _source.GetRowCount(); + + public DataViewRowCursor GetRowCursor(IEnumerable columnsNeeded, Random rand = null) + { + _host.CheckValue(columnsNeeded, nameof(columnsNeeded)); + _host.CheckValueOrNull(rand); + + var predicate = RowCursorUtils.FromColumnsToPredicate(columnsNeeded, SchemaBindings.AsSchema); + + // If we aren't selecting any of the output columns, don't construct our cursor. + // Note that because we cannot support random due to the inherently + // stratified nature, neither can we allow the base data to be shuffled, + // even if it supports shuffling. + if (!SchemaBindings.AnyNewColumnsActive(predicate)) + { + var activeInput = SchemaBindings.GetActiveInput(predicate); + var inputCursor = _source.GetRowCursor(_source.Schema.Where(c => activeInput[c.Index]), null); + return new BindingsWrappedRowCursor(_host, inputCursor, SchemaBindings); + } + var active = SchemaBindings.GetActive(predicate); + Contracts.Assert(active.Length == SchemaBindings.ColumnCount); + + // REVIEW: We can get a different input predicate for the input cursor and for the lookahead cursor. The lookahead + // cursor is only used for getting the values from the input column, so it only needs that column activated. The + // other cursor is used to get source columns, so it needs the rest of them activated. + var predInput = GetSchemaBindingDependencies(predicate); + var inputCols = _source.Schema.Where(c => predInput(c.Index)); + return new Cursor(this, _source.GetRowCursor(inputCols), _source.GetRowCursor(inputCols), active); + } + + public DataViewRowCursor[] GetRowCursorSet(IEnumerable columnsNeeded, int n, Random rand = null) + { + return new[] { GetRowCursor(columnsNeeded, rand) }; + } + + protected abstract ColumnBindingsBase SchemaBindings { get; } + protected abstract TBatch InitializeBatch(DataViewRowCursor input); + protected abstract void ProcessBatch(TBatch currentBatch); + protected abstract void ProcessExample(TBatch currentBatch, TInput currentInput); + protected abstract Func GetLastInBatchDelegate(DataViewRowCursor lookAheadCursor); + protected abstract Func GetIsNewBatchDelegate(DataViewRowCursor lookAheadCursor); + protected abstract ValueGetter GetLookAheadGetter(DataViewRowCursor lookAheadCursor); + protected abstract Delegate[] CreateGetters(DataViewRowCursor input, TBatch currentBatch, bool[] active); + protected abstract Func GetSchemaBindingDependencies(Func predicate); + + private sealed class Cursor : RootCursorBase + { + private readonly BatchDataViewMapperBase _parent; + private readonly DataViewRowCursor _lookAheadCursor; + private readonly DataViewRowCursor _input; + + private readonly bool[] _active; + private readonly Delegate[] _getters; + + private readonly TBatch _currentBatch; + private readonly Func _lastInBatchInLookAheadCursorDel; + private readonly Func _firstInBatchInInputCursorDel; + private readonly ValueGetter _inputGetterInLookAheadCursor; + private TInput _currentInput; + + public override long Batch => 0; + + public override DataViewSchema Schema => _parent.Schema; + + public Cursor(BatchDataViewMapperBase parent, DataViewRowCursor input, DataViewRowCursor lookAheadCursor, bool[] active) + : base(parent._host) + { + _parent = parent; + _input = input; + _lookAheadCursor = lookAheadCursor; + _active = active; + + _currentBatch = _parent.InitializeBatch(_input); + + _getters = _parent.CreateGetters(_input, _currentBatch, _active); + + _lastInBatchInLookAheadCursorDel = _parent.GetLastInBatchDelegate(_lookAheadCursor); + _firstInBatchInInputCursorDel = _parent.GetIsNewBatchDelegate(_input); + _inputGetterInLookAheadCursor = _parent.GetLookAheadGetter(_lookAheadCursor); + } + + public override ValueGetter GetGetter(DataViewSchema.Column column) + { + Contracts.CheckParam(IsColumnActive(column), nameof(column), "requested column is not active"); + + var col = _parent.SchemaBindings.MapColumnIndex(out bool isSrc, column.Index); + if (isSrc) + { + Contracts.AssertValue(_input); + return _input.GetGetter(_input.Schema[col]); + } + + Ch.AssertValue(_getters); + var getter = _getters[col]; + Ch.Assert(getter != null); + var fn = getter as ValueGetter; + if (fn == null) + throw Ch.Except("Invalid TValue in GetGetter: '{0}'", typeof(TValue)); + return fn; + } + + public override ValueGetter GetIdGetter() + { + return + (ref DataViewRowId val) => + { + Ch.Check(IsGood, "Cannot call ID getter in current state"); + val = new DataViewRowId((ulong)Position, 0); + }; + } + + public override bool IsColumnActive(DataViewSchema.Column column) + { + Ch.Check(column.Index < _parent.SchemaBindings.AsSchema.Count); + return _active[column.Index]; + } + + protected override bool MoveNextCore() + { + if (!_input.MoveNext()) + return false; + if (!_firstInBatchInInputCursorDel()) + return true; + + // If we are here, this means that _input.MoveNext() has gotten us to the beginning of the next batch, + // so now we need to look ahead at the entire next batch in the _lookAheadCursor. + // The _lookAheadCursor's position should be on the last row of the previous batch (or -1). + Ch.Assert(_lastInBatchInLookAheadCursorDel()); + + var good = _lookAheadCursor.MoveNext(); + // The two cursors should have the same number of elements, so if _input.MoveNext() returned true, + // then it must return true here too. + Ch.Assert(good); + + do + { + _inputGetterInLookAheadCursor(ref _currentInput); + _parent.ProcessExample(_currentBatch, _currentInput); + } while (!_lastInBatchInLookAheadCursorDel() && _lookAheadCursor.MoveNext()); + + _parent.ProcessBatch(_currentBatch); + return true; + } + } + } +} diff --git a/src/Microsoft.ML.TimeSeries/ExtensionsCatalog.cs b/src/Microsoft.ML.TimeSeries/ExtensionsCatalog.cs index 2cc2aa3b0a..abb411f8e9 100644 --- a/src/Microsoft.ML.TimeSeries/ExtensionsCatalog.cs +++ b/src/Microsoft.ML.TimeSeries/ExtensionsCatalog.cs @@ -2,8 +2,6 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. -using System; -using System.Reflection; using Microsoft.ML.Data; using Microsoft.ML.Runtime; using Microsoft.ML.TimeSeries; @@ -150,6 +148,29 @@ public static SrCnnAnomalyEstimator DetectAnomalyBySrCnn(this TransformsCatalog int windowSize = 64, int backAddWindowSize = 5, int lookaheadWindowSize = 5, int averageingWindowSize = 3, int judgementWindowSize = 21, double threshold = 0.3) => new SrCnnAnomalyEstimator(CatalogUtils.GetEnvironment(catalog), outputColumnName, windowSize, backAddWindowSize, lookaheadWindowSize, averageingWindowSize, judgementWindowSize, threshold, inputColumnName); + /// + /// Create , which detects timeseries anomalies using SRCNN algorithm. + /// + /// The transform's catalog. + /// ... + /// Name of the column resulting from the transformation of . + /// The column data is a vector of . The vector contains 3 elements: alert (1 means anomaly while 0 means normal), raw score, and magnitude of spectual residual. + /// Name of column to transform. The column data must be . + /// The threshold to determine anomaly, score larger than the threshold is considered as anomaly. Should be in (0,1) + /// .Divide the input data into batches to fit SrCnn model. Must be -1 or a positive integer no less than 12. Default value is 1024. + /// The sensitivity of boundaries. Must be in the interval (0, 100). + /// The detect mode of the SrCnn model. + /// + /// + /// + /// + /// + public static IDataView BatchDetectAnomalyBySrCnn(this DataOperationsCatalog catalog, IDataView input, string outputColumnName, string inputColumnName, + double threshold = 0.3, int batchSize = 1024, double sensitivity = 99, SrCnnDetectMode detectMode = SrCnnDetectMode.AnomalyAndMargin) + => new SrCnnBatchAnomalyDetector(CatalogUtils.GetEnvironment(catalog), input, inputColumnName, outputColumnName, threshold, batchSize, sensitivity, detectMode); + /// /// Create , which localizes root causes using decision tree algorithm. /// diff --git a/src/Microsoft.ML.TimeSeries/SrCnnBatchAnomalyDetection.cs b/src/Microsoft.ML.TimeSeries/SrCnnBatchAnomalyDetection.cs new file mode 100644 index 0000000000..e724a9990e --- /dev/null +++ b/src/Microsoft.ML.TimeSeries/SrCnnBatchAnomalyDetection.cs @@ -0,0 +1,196 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Collections.Generic; +using System.Linq; +using Microsoft.ML.Data; +using Microsoft.ML.Data.DataView; +using Microsoft.ML.Numeric; +using Microsoft.ML.Runtime; + +namespace Microsoft.ML.TimeSeries +{ + /// + /// The detect modes of SrCnn models. + /// + public enum SrCnnDetectMode + { + /// + /// In this mode, output (IsAnomaly, RawScore, Mag). + /// + AnomalyOnly = 0, + + /// + /// In this mode, output (IsAnomaly, AnomalyScore, Mag, ExpectedValue, BoundaryUnit, UpperBoundary, LowerBoundary). + /// + AnomalyAndMargin = 1, + + /// + /// In this mode, output (IsAnomaly, RawScore, Mag, ExpectedValue). + /// + AnomalyAndExpectedValue = 2 + } + + // TODO: SrCnn + internal sealed class SrCnnBatchAnomalyDetector : BatchDataViewMapperBase + { + private const int MinBatchSize = 12; + private readonly int _batchSize; + private readonly string _inputColumnName; + private readonly SrCnnDetectMode _detectMode; + + private class Bindings : ColumnBindingsBase + { + private readonly VectorDataViewType _outputColumnType; + private readonly int _inputColumnIndex; + + public Bindings(DataViewSchema input, string inputColumnName, string outputColumnName, VectorDataViewType outputColumnType) + : base(input, true, outputColumnName) + { + _outputColumnType = outputColumnType; + _inputColumnIndex = Input[inputColumnName].Index; + } + + protected override DataViewType GetColumnTypeCore(int iinfo) + { + Contracts.Check(iinfo == 0); + return _outputColumnType; + } + + // Get a predicate for the input columns. + public Func GetDependencies(Func predicate) + { + Contracts.AssertValue(predicate); + + var active = new bool[Input.Count]; + for (int col = 0; col < ColumnCount; col++) + { + if (!predicate(col)) + continue; + + bool isSrc; + int index = MapColumnIndex(out isSrc, col); + if (isSrc) + active[index] = true; + else + active[_inputColumnIndex] = true; + } + + return col => 0 <= col && col < active.Length && active[col]; + } + } + + public SrCnnBatchAnomalyDetector(IHostEnvironment env, IDataView input, string inputColumnName, string outputColumnName, double threshold, int batchSize, double sensitivity, SrCnnDetectMode detectMode) + : base(env, "SrCnnBatchAnomalyDetector", input) + { + + Contracts.CheckParam(batchSize >= MinBatchSize, nameof(batchSize), "batch size is too small"); + _detectMode = detectMode; + int outputSize = 6; // TODO: determine based on detectMode + SchemaBindings = new Bindings(input.Schema, inputColumnName, outputColumnName, new VectorDataViewType(NumberDataViewType.Double, outputSize)); + _batchSize = batchSize; + _inputColumnName = inputColumnName; + } + + protected override ColumnBindingsBase SchemaBindings { get; } + + protected override Delegate[] CreateGetters(DataViewRowCursor input, Batch currentBatch, bool[] active) + { + if (!SchemaBindings.AnyNewColumnsActive(x => active[x])) + return new Delegate[1]; + return new[] { currentBatch.CreateGetter(input, _inputColumnName) }; + } + + protected override Batch InitializeBatch(DataViewRowCursor input) => new Batch(_batchSize, _detectMode); + + protected override Func GetIsNewBatchDelegate(DataViewRowCursor input) + { + return () => input.Position % _batchSize == 0; + } + + protected override Func GetLastInBatchDelegate(DataViewRowCursor input) + { + return () => (input.Position + 1) % _batchSize == 0; + } + + protected override ValueGetter GetLookAheadGetter(DataViewRowCursor input) + { + return input.GetGetter(input.Schema[_inputColumnName]); + } + + protected override Func GetSchemaBindingDependencies(Func predicate) + { + return (SchemaBindings as Bindings).GetDependencies(predicate); + } + + protected override void ProcessExample(Batch currentBatch, double currentInput) + { + currentBatch.AddValue(currentInput); + } + + protected override void ProcessBatch(Batch currentBatch) + { + currentBatch.Process(); + currentBatch.Reset(); + } + + public sealed class Batch + { + private List _previousBatch; + private List _batch; + private readonly SrCnnDetectMode _detectMode; + private double _cursor; + + public Batch(int batchSize, SrCnnDetectMode detectMode) + { + _detectMode = detectMode; + _previousBatch = new List(batchSize); + _batch = new List(batchSize); + } + + public void AddValue(double value) + { + _batch.Add(value); + } + + public int Count => _batch.Count; + + public void Process() + { + // TODO: replace with run of SrCnn + _cursor = _batch.Sum(); + } + + public void Reset() + { + var tempBatch = _previousBatch; + _previousBatch = _batch; + _batch = tempBatch; + _batch.Clear(); + } + + public ValueGetter> CreateGetter(DataViewRowCursor input, string inputCol) + { + ValueGetter srcGetter = input.GetGetter(input.Schema[inputCol]); + ValueGetter> getter = + (ref VBuffer dst) => + { + double src = default; + srcGetter(ref src); + // TODO: replace with SrCnn result + dst = new VBuffer(6, new[] { + src * _cursor, + (src + 1) * _cursor, + (src + 2) * _cursor, + (src + 3) * _cursor, + (src + 4) * _cursor, + (src + 5) * _cursor, + }); + }; + return getter; + } + } + } +} diff --git a/test/Microsoft.ML.TimeSeries.Tests/TimeSeriesDirectApi.cs b/test/Microsoft.ML.TimeSeries.Tests/TimeSeriesDirectApi.cs index f3f548fd7b..7811e72967 100644 --- a/test/Microsoft.ML.TimeSeries.Tests/TimeSeriesDirectApi.cs +++ b/test/Microsoft.ML.TimeSeries.Tests/TimeSeriesDirectApi.cs @@ -5,10 +5,9 @@ using System; using System.Collections.Generic; using System.IO; +using System.Linq; using Microsoft.ML.Data; using Microsoft.ML.TestFramework; -using Microsoft.ML.TestFramework.Attributes; -using Microsoft.ML.TestFrameworkCommon; using Microsoft.ML.TimeSeries; using Microsoft.ML.Transforms.TimeSeries; using Xunit; @@ -571,6 +570,37 @@ public void AnomalyDetectionWithSrCnn(bool loadDataFromFile) } } + [Fact] + public void TestSrCnnBatchAnomalyDetector() + { + // TODO: delete/replace with SrCnn tests + var ml = new MLContext(1); + var bldr = new ArrayDataViewBuilder(ml); + bldr.AddColumn("Input", NumberDataViewType.Double, new[] { 1.0, 2.0, 3.0, 2.0, 3.0, 4.0, 3.0, 4.0, 5.0, 4.0, 6.0, 7.0, 1.0, }); + var input = bldr.GetDataView(); + var output = new SrCnnBatchAnomalyDetector( + ml, + input, + "Input", + "Output", + 0.3, + 12, + 99, + SrCnnDetectMode.AnomalyAndExpectedValue); + var batchTransformOutput = ml.Data.CreateEnumerable(output, reuseRowObject: false).ToList(); + + var inputs = batchTransformOutput.Select(e => e.Input); + var outputs = batchTransformOutput.Select(e => e.Output); + } + + private class BatchTransformOutput + { + public double Input { get; set; } + + [VectorType] + public double[] Output { get; set; } + } + [Fact] public void RootCauseLocalization() {