diff --git a/src/DotNetBridge/RunGraph.cs b/src/DotNetBridge/RunGraph.cs index d97c3249..55f02795 100644 --- a/src/DotNetBridge/RunGraph.cs +++ b/src/DotNetBridge/RunGraph.cs @@ -35,7 +35,15 @@ private static void SaveIdvToFile(IDataView idv, string path, IHost host) var extension = Path.GetExtension(path); IDataSaver saver; if (extension != ".csv" && extension != ".tsv" && extension != ".txt") + { saver = new BinarySaver(host, new BinarySaver.Arguments()); + + var schemaFilePath = Path.GetDirectoryName(path) + + Path.DirectorySeparatorChar + + Path.GetFileNameWithoutExtension(path) + + ".schema"; + SaveIdvSchemaToFile(idv, schemaFilePath, host); + } else { var saverArgs = new TextSaver.Arguments @@ -57,6 +65,25 @@ private static void SaveIdvToFile(IDataView idv, string path, IHost host) } } + private static void SaveIdvSchemaToFile(IDataView idv, string path, IHost host) + { + var emptyDataView = new EmptyDataView(host, idv.Schema); + var saverArgs = new TextSaver.Arguments + { + OutputHeader = false, + OutputSchema = true, + Dense = true + }; + IDataSaver saver = new TextSaver(host, saverArgs); + + using (var fs = File.OpenWrite(path)) + { + saver.SaveData(fs, emptyDataView, Utils.GetIdentityPermutation(emptyDataView.Schema.Count) + .Where(x => !emptyDataView.Schema[x].IsHidden && saver.IsColumnSavable(emptyDataView.Schema[x].Type)) + .ToArray()); + } + } + private static void SavePredictorModelToFile(PredictorModel model, string path, IHost host) { using (var fs = File.OpenWrite(path)) diff --git a/src/python/nimbusml/internal/utils/data_schema.py b/src/python/nimbusml/internal/utils/data_schema.py index 8cd0ca1d..e1880dab 100644 --- a/src/python/nimbusml/internal/utils/data_schema.py +++ b/src/python/nimbusml/internal/utils/data_schema.py @@ -882,6 +882,21 @@ def clean_name(col): final_schema.sort() return DataSchema(final_schema, **opt) + @staticmethod + def extract_idv_schema_from_file(path): + with open(path, 'r') as f: + lines = f.readlines() + + col_regex = re.compile(r'#@\s*(col=.*)$') + col_specs = [] + + for line in lines: + match = col_regex.match(line) + if match: + col_specs.append(match.group(1)) + + return DataSchema(' '.join(col_specs)) + class COL: """ diff --git a/src/python/nimbusml/internal/utils/data_stream.py b/src/python/nimbusml/internal/utils/data_stream.py index 128d71b1..ea544307 100644 --- a/src/python/nimbusml/internal/utils/data_stream.py +++ b/src/python/nimbusml/internal/utils/data_stream.py @@ -402,10 +402,14 @@ class BinaryDataStream(DataStream): Data accessor for IDV data format, see here https://github.com/dotnet/machinelearning/blob/master/docs/code/IDataViewImplementation.md """ - def __init__(self, filename): - # REVIEW: would be good to figure out a way to know the schema of the - # binary IDV. - super(BinaryDataStream, self).__init__(DataSchema("")) + def __init__(self, filename=None): + if filename: + schema_file_path = os.path.splitext(filename)[0] + '.schema' + schema = DataSchema.extract_idv_schema_from_file(schema_file_path) + else: + schema = DataSchema("") + + super(BinaryDataStream, self).__init__(schema) self._filename = filename def __repr__(self): @@ -460,6 +464,12 @@ def head(self, n=5, skip=0): (out_model, out_data, out_metrics, _) = graph.run(verbose=True, X=self) return out_data + def get_dataframe_schema(self): + if not hasattr(self, '_df_schema') or not self._df_schema: + head = self.head(n=1) + self._df_schema = DataSchema.read_schema(head) + return self._df_schema + def clone(self): """ Copy/clone the object. @@ -479,7 +489,7 @@ class DprepDataStream(BinaryDataStream): def __init__(self, dataflow=None, filename=None): if dataflow is None and filename is None: raise ValueError('Both dataflow object and filename are None') - super(DprepDataStream, self).__init__(DataSchema("")) + super(DprepDataStream, self).__init__() if dataflow is not None: (fd, filename) = tempfile.mkstemp(suffix='.dprep') fl = os.fdopen(fd, "wt") diff --git a/src/python/nimbusml/pipeline.py b/src/python/nimbusml/pipeline.py index fa2542c2..efa81735 100644 --- a/src/python/nimbusml/pipeline.py +++ b/src/python/nimbusml/pipeline.py @@ -567,9 +567,12 @@ def _init_graph_nodes( inputs = OrderedDict([(file_data.replace('$', ''), '')]) # connect transform node inputs/outputs - if feature_columns is None and not isinstance(X, BinaryDataStream): + if feature_columns is None: if schema is None: - schema = DataSchema.read_schema(X) + if isinstance(X, BinaryDataStream): + schema = X.schema + else: + schema = DataSchema.read_schema(X) feature_columns = [c.Name for c in schema] if label_column: # if label_column is a string, remove it from diff --git a/src/python/nimbusml/tests/idv/test_idv.py b/src/python/nimbusml/tests/idv/test_idv.py index e86f2226..c92fb092 100644 --- a/src/python/nimbusml/tests/idv/test_idv.py +++ b/src/python/nimbusml/tests/idv/test_idv.py @@ -9,8 +9,10 @@ import pandas as pd from nimbusml import Pipeline, FileDataStream, BinaryDataStream from nimbusml.datasets import get_dataset -from nimbusml.linear_model import FastLinearRegressor +from nimbusml.feature_extraction.categorical import OneHotVectorizer +from nimbusml.linear_model import FastLinearRegressor, OnlineGradientDescentRegressor from nimbusml.preprocessing.normalization import MinMaxScaler +from nimbusml.preprocessing.schema import ColumnDropper from sklearn.utils.testing import assert_true, assert_array_equal # data input (as a FileDataStream) @@ -105,6 +107,113 @@ def test_test(self): assert_array_equal(scores, scores_df) assert_array_equal(metrics, metrics_df) + def test_fit_predictor_with_idv(self): + train_data = {'c0': ['a', 'b', 'a', 'b'], + 'c1': [1, 2, 3, 4], + 'c2': [2, 3, 4, 5]} + train_df = pd.DataFrame(train_data).astype({'c1': np.float64, + 'c2': np.float64}) + + test_data = {'c0': ['a', 'b', 'b'], + 'c1': [1.5, 2.3, 3.7], + 'c2': [2.2, 4.9, 2.7]} + test_df = pd.DataFrame(test_data).astype({'c1': np.float64, + 'c2': np.float64}) + + # Fit a transform pipeline to the training data + transform_pipeline = Pipeline([OneHotVectorizer() << 'c0']) + transform_pipeline.fit(train_df) + df = transform_pipeline.transform(train_df, as_binary_data_stream=True) + + # Fit a predictor pipeline given a transformed BinaryDataStream + predictor = OnlineGradientDescentRegressor(label='c2', feature=['c0', 'c1']) + predictor_pipeline = Pipeline([predictor]) + predictor_pipeline.fit(df) + + # Perform a prediction given the test data using + # the transform and predictor defined previously. + df = transform_pipeline.transform(test_df, as_binary_data_stream=True) + result_1 = predictor_pipeline.predict(df) + + # Create expected result + xf = OneHotVectorizer() << 'c0' + df = xf.fit_transform(train_df) + predictor = OnlineGradientDescentRegressor(label='c2', feature=['c0.a', 'c0.b', 'c1']) + predictor.fit(df) + df = xf.transform(test_df) + expected_result = predictor.predict(df) + + self.assertTrue(result_1.loc[:, 'Score'].equals(expected_result)) + + def test_fit_transform_with_idv(self): + path = get_dataset('infert').as_filepath() + data = FileDataStream.read_csv(path) + + featurization_pipeline = Pipeline([OneHotVectorizer(columns={'education': 'education'})]) + featurization_pipeline.fit(data) + featurized_data = featurization_pipeline.transform(data, as_binary_data_stream=True) + + schema = featurized_data.schema + num_columns = len(schema) + self.assertTrue('case' in schema) + self.assertTrue('row_num' in schema) + + pipeline = Pipeline([ColumnDropper() << ['case', 'row_num']]) + pipeline.fit(featurized_data) + result = pipeline.transform(featurized_data, as_binary_data_stream=True) + + schema = result.schema + self.assertEqual(len(schema), num_columns - 2) + self.assertTrue('case' not in schema) + self.assertTrue('row_num' not in schema) + + def test_schema_with_vectorized_column(self): + path = get_dataset('infert').as_filepath() + data = FileDataStream.read_csv(path) + + featurization_pipeline = Pipeline([OneHotVectorizer(columns={'education': 'education'})]) + featurization_pipeline.fit(data) + featurized_data = featurization_pipeline.transform(data, as_binary_data_stream=True) + + # col=row_num:I8:0 col=education:R4:1-3 col=age:I8:4 col=parity:I8:5 + # col=induced:I8:6 col=case:I8:7 col=spontaneous:I8:8 col=stratum:I8:9 + # col=pooled.stratum:I8:10 quote+ + schema = featurized_data.schema + + self.assertEqual(len(schema), 9) + self.assertEqual(schema['age'].Type, 'I8') + self.assertEqual(schema['age'].Name, 'age') + self.assertEqual(schema['age'].IsVector, False) + + self.assertEqual(schema['education'].Type, 'R4') + self.assertEqual(schema['education'].Name, 'education') + self.assertEqual(len(schema['education'].Pos), 3) + self.assertEqual(schema['education'].IsVector, True) + + self.assertTrue('education.0-5yrs' not in schema) + self.assertTrue('education.6-11yrs' not in schema) + self.assertTrue('education.12+yrs' not in schema) + + # col=row_num:I8:0 col=education.0-5yrs:R4:1 col=education.6-11yrs:R4:2 + # col=education.12+yrs:R4:3 col=age:I8:4 col=parity:I8:5 col=induced:I8:6 + # col=case:I8:7 col=spontaneous:I8:8 col=stratum:I8:9 col=pooled.stratum:I8:10 + # quote+ header=+ + schema = featurized_data.get_dataframe_schema() + + self.assertEqual(len(schema), 11) + self.assertEqual(schema['age'].Type, 'I8') + self.assertEqual(schema['age'].Name, 'age') + self.assertEqual(schema['age'].IsVector, False) + + self.assertTrue('education' not in schema) + self.assertTrue('education.0-5yrs' in schema) + self.assertTrue('education.6-11yrs' in schema) + self.assertTrue('education.12+yrs' in schema) + + self.assertEqual(schema['education.0-5yrs'].Type, 'R4') + self.assertEqual(schema['education.0-5yrs'].Name, 'education.0-5yrs') + self.assertEqual(schema['education.0-5yrs'].IsVector, False) + if __name__ == '__main__': unittest.main()