Skip to content
This repository was archived by the owner on Nov 16, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/DotNetBridge/RunGraph.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,15 @@ private static void SaveIdvToFile(IDataView idv, string path, IHost host)
var extension = Path.GetExtension(path);
IDataSaver saver;
if (extension != ".csv" && extension != ".tsv" && extension != ".txt")
{
saver = new BinarySaver(host, new BinarySaver.Arguments());

var schemaFilePath = Path.GetDirectoryName(path) +
Path.DirectorySeparatorChar +
Path.GetFileNameWithoutExtension(path) +
".schema";
SaveIdvSchemaToFile(idv, schemaFilePath, host);
}
else
{
var saverArgs = new TextSaver.Arguments
Expand All @@ -57,6 +65,25 @@ private static void SaveIdvToFile(IDataView idv, string path, IHost host)
}
}

private static void SaveIdvSchemaToFile(IDataView idv, string path, IHost host)
{
var emptyDataView = new EmptyDataView(host, idv.Schema);
var saverArgs = new TextSaver.Arguments
{
OutputHeader = false,
OutputSchema = true,
Dense = true
};
IDataSaver saver = new TextSaver(host, saverArgs);

using (var fs = File.OpenWrite(path))
{
saver.SaveData(fs, emptyDataView, Utils.GetIdentityPermutation(emptyDataView.Schema.Count)
.Where(x => !emptyDataView.Schema[x].IsHidden && saver.IsColumnSavable(emptyDataView.Schema[x].Type))
.ToArray());
}
}

private static void SavePredictorModelToFile(PredictorModel model, string path, IHost host)
{
using (var fs = File.OpenWrite(path))
Expand Down
15 changes: 15 additions & 0 deletions src/python/nimbusml/internal/utils/data_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,21 @@ def clean_name(col):
final_schema.sort()
return DataSchema(final_schema, **opt)

@staticmethod
def extract_idv_schema_from_file(path):
with open(path, 'r') as f:
lines = f.readlines()

col_regex = re.compile(r'#@\s*(col=.*)$')
col_specs = []

for line in lines:
match = col_regex.match(line)
if match:
col_specs.append(match.group(1))

return DataSchema(' '.join(col_specs))


class COL:
"""
Expand Down
20 changes: 15 additions & 5 deletions src/python/nimbusml/internal/utils/data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,14 @@ class BinaryDataStream(DataStream):
Data accessor for IDV data format, see here https://github.com/dotnet/machinelearning/blob/master/docs/code/IDataViewImplementation.md
"""

def __init__(self, filename):
# REVIEW: would be good to figure out a way to know the schema of the
# binary IDV.
super(BinaryDataStream, self).__init__(DataSchema(""))
def __init__(self, filename=None):
if filename:
schema_file_path = os.path.splitext(filename)[0] + '.schema'
schema = DataSchema.extract_idv_schema_from_file(schema_file_path)
else:
schema = DataSchema("")

super(BinaryDataStream, self).__init__(schema)
self._filename = filename

def __repr__(self):
Expand Down Expand Up @@ -460,6 +464,12 @@ def head(self, n=5, skip=0):
(out_model, out_data, out_metrics, _) = graph.run(verbose=True, X=self)
return out_data

def get_dataframe_schema(self):
if not hasattr(self, '_df_schema') or not self._df_schema:
head = self.head(n=1)
self._df_schema = DataSchema.read_schema(head)
return self._df_schema

def clone(self):
"""
Copy/clone the object.
Expand All @@ -479,7 +489,7 @@ class DprepDataStream(BinaryDataStream):
def __init__(self, dataflow=None, filename=None):
if dataflow is None and filename is None:
raise ValueError('Both dataflow object and filename are None')
super(DprepDataStream, self).__init__(DataSchema(""))
super(DprepDataStream, self).__init__()
if dataflow is not None:
(fd, filename) = tempfile.mkstemp(suffix='.dprep')
fl = os.fdopen(fd, "wt")
Expand Down
7 changes: 5 additions & 2 deletions src/python/nimbusml/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,9 +567,12 @@ def _init_graph_nodes(
inputs = OrderedDict([(file_data.replace('$', ''), '')])

# connect transform node inputs/outputs
if feature_columns is None and not isinstance(X, BinaryDataStream):
if feature_columns is None:
if schema is None:
schema = DataSchema.read_schema(X)
if isinstance(X, BinaryDataStream):
schema = X.schema
else:
schema = DataSchema.read_schema(X)
feature_columns = [c.Name for c in schema]
if label_column:
# if label_column is a string, remove it from
Expand Down
111 changes: 110 additions & 1 deletion src/python/nimbusml/tests/idv/test_idv.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
import pandas as pd
from nimbusml import Pipeline, FileDataStream, BinaryDataStream
from nimbusml.datasets import get_dataset
from nimbusml.linear_model import FastLinearRegressor
from nimbusml.feature_extraction.categorical import OneHotVectorizer
from nimbusml.linear_model import FastLinearRegressor, OnlineGradientDescentRegressor
from nimbusml.preprocessing.normalization import MinMaxScaler
from nimbusml.preprocessing.schema import ColumnDropper
from sklearn.utils.testing import assert_true, assert_array_equal

# data input (as a FileDataStream)
Expand Down Expand Up @@ -105,6 +107,113 @@ def test_test(self):
assert_array_equal(scores, scores_df)
assert_array_equal(metrics, metrics_df)

def test_fit_predictor_with_idv(self):
train_data = {'c0': ['a', 'b', 'a', 'b'],
'c1': [1, 2, 3, 4],
'c2': [2, 3, 4, 5]}
train_df = pd.DataFrame(train_data).astype({'c1': np.float64,
'c2': np.float64})

test_data = {'c0': ['a', 'b', 'b'],
'c1': [1.5, 2.3, 3.7],
'c2': [2.2, 4.9, 2.7]}
test_df = pd.DataFrame(test_data).astype({'c1': np.float64,
'c2': np.float64})

# Fit a transform pipeline to the training data
transform_pipeline = Pipeline([OneHotVectorizer() << 'c0'])
transform_pipeline.fit(train_df)
df = transform_pipeline.transform(train_df, as_binary_data_stream=True)

# Fit a predictor pipeline given a transformed BinaryDataStream
predictor = OnlineGradientDescentRegressor(label='c2', feature=['c0', 'c1'])
predictor_pipeline = Pipeline([predictor])
predictor_pipeline.fit(df)

# Perform a prediction given the test data using
# the transform and predictor defined previously.
df = transform_pipeline.transform(test_df, as_binary_data_stream=True)
result_1 = predictor_pipeline.predict(df)

# Create expected result
xf = OneHotVectorizer() << 'c0'
df = xf.fit_transform(train_df)
predictor = OnlineGradientDescentRegressor(label='c2', feature=['c0.a', 'c0.b', 'c1'])
predictor.fit(df)
df = xf.transform(test_df)
expected_result = predictor.predict(df)

self.assertTrue(result_1.loc[:, 'Score'].equals(expected_result))

def test_fit_transform_with_idv(self):
path = get_dataset('infert').as_filepath()
data = FileDataStream.read_csv(path)

featurization_pipeline = Pipeline([OneHotVectorizer(columns={'education': 'education'})])
featurization_pipeline.fit(data)
featurized_data = featurization_pipeline.transform(data, as_binary_data_stream=True)

schema = featurized_data.schema
num_columns = len(schema)
self.assertTrue('case' in schema)
self.assertTrue('row_num' in schema)

pipeline = Pipeline([ColumnDropper() << ['case', 'row_num']])
pipeline.fit(featurized_data)
result = pipeline.transform(featurized_data, as_binary_data_stream=True)

schema = result.schema
self.assertEqual(len(schema), num_columns - 2)
self.assertTrue('case' not in schema)
self.assertTrue('row_num' not in schema)

def test_schema_with_vectorized_column(self):
path = get_dataset('infert').as_filepath()
data = FileDataStream.read_csv(path)

featurization_pipeline = Pipeline([OneHotVectorizer(columns={'education': 'education'})])
featurization_pipeline.fit(data)
featurized_data = featurization_pipeline.transform(data, as_binary_data_stream=True)

# col=row_num:I8:0 col=education:R4:1-3 col=age:I8:4 col=parity:I8:5
# col=induced:I8:6 col=case:I8:7 col=spontaneous:I8:8 col=stratum:I8:9
# col=pooled.stratum:I8:10 quote+
schema = featurized_data.schema

self.assertEqual(len(schema), 9)
self.assertEqual(schema['age'].Type, 'I8')
self.assertEqual(schema['age'].Name, 'age')
self.assertEqual(schema['age'].IsVector, False)

self.assertEqual(schema['education'].Type, 'R4')
self.assertEqual(schema['education'].Name, 'education')
self.assertEqual(len(schema['education'].Pos), 3)
self.assertEqual(schema['education'].IsVector, True)

self.assertTrue('education.0-5yrs' not in schema)
self.assertTrue('education.6-11yrs' not in schema)
self.assertTrue('education.12+yrs' not in schema)

# col=row_num:I8:0 col=education.0-5yrs:R4:1 col=education.6-11yrs:R4:2
# col=education.12+yrs:R4:3 col=age:I8:4 col=parity:I8:5 col=induced:I8:6
# col=case:I8:7 col=spontaneous:I8:8 col=stratum:I8:9 col=pooled.stratum:I8:10
# quote+ header=+
schema = featurized_data.get_dataframe_schema()

self.assertEqual(len(schema), 11)
self.assertEqual(schema['age'].Type, 'I8')
self.assertEqual(schema['age'].Name, 'age')
self.assertEqual(schema['age'].IsVector, False)

self.assertTrue('education' not in schema)
self.assertTrue('education.0-5yrs' in schema)
self.assertTrue('education.6-11yrs' in schema)
self.assertTrue('education.12+yrs' in schema)

self.assertEqual(schema['education.0-5yrs'].Type, 'R4')
self.assertEqual(schema['education.0-5yrs'].Name, 'education.0-5yrs')
self.assertEqual(schema['education.0-5yrs'].IsVector, False)


if __name__ == '__main__':
unittest.main()