Skip to content

Commit 2dc2b38

Browse files
committed
work on data imputation
1 parent ecd40a6 commit 2dc2b38

File tree

8 files changed

+221
-21
lines changed

8 files changed

+221
-21
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,4 @@ build/
1111
*.egg-info/
1212
.DS_Store
1313
corl/wc_data/impute/impute_run.sh
14+
PIP_TARGET=/

.python-version

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
3.7.7
+90
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import random
2+
import math
3+
from sklearn.ensemble import BaggingClassifier
4+
from sklearn.ensemble import BaggingRegressor
5+
from sklearn.ensemble import RandomForestClassifier
6+
import numpy as np
7+
import pandas as pd
8+
import matplotlib.pyplot as plt
9+
from sklearn import datasets
10+
import random
11+
import tqdm
12+
from sklearn.preprocessing import LabelEncoder
13+
14+
def impute_missing_values(df,var_deviation_tolerance=0.97, actual_or_gaussian_residuals='actual',
15+
col_floor_ceiling_dict=None, scores=False):
16+
17+
'''Impute missing values while minimizing distortion of variable distribution
18+
by creating a bagged model using other variables and adding residuals to output values
19+
20+
Parameters:
21+
df: dataframe with missing values
22+
var_deviation_tolerance: target percent deviation from original variable distributions
23+
actual_or_guassian_residuals: apply residuals to model outputs from actual distribution or from
24+
a gaussian distribution based on residuals' means and variances
25+
col_floor_ceiling_dict: a dictionary with the variable name and a tuple of the min and max for variables
26+
with a finite range. Use float(inf) or float(-inf) for variables that are limited in only one direction
27+
scores: return accuracy score of models per variable
28+
29+
Returns:
30+
df: df with imputed values
31+
problems: columns that failed to impute
32+
column_scores: accuracy scores of imputation model on non-missing values
33+
'''
34+
df = df.copy()
35+
columns = df.columns
36+
type_dict = df.dtypes.to_dict()
37+
missing_columns = list(df.isna().sum()[df.isna().sum()>0].sort_values().index)
38+
have_columns = [i for i in columns if i not in missing_columns]
39+
column_scores = {}
40+
problems = []
41+
for col in tqdm.tqdm(missing_columns):
42+
try:
43+
percent_missing = df[col].isna().sum()/df.shape[0]
44+
m = math.ceil(percent_missing/((1/.97)-1))
45+
other_columns = [i for i in columns if i != col]
46+
na_index = df[df[col].isna()==1].index
47+
have_index = [i for i in df.index if i not in na_index]
48+
na_have_cols = set(df.loc[na_index,other_columns].dropna(axis=1).columns)
49+
have_have_cols = set(df.loc[have_index,other_columns].dropna(axis=1).columns)
50+
both_cols = na_have_cols.intersection(have_have_cols)
51+
int_df = pd.get_dummies(df.loc[:,both_cols],drop_first=True)
52+
X_have = int_df.loc[have_index,:]
53+
y_have = df[col][have_index]
54+
X_na = int_df.loc[na_index,:]
55+
if type_dict[col]=='object':
56+
le = LabelEncoder()
57+
y_have = le.fit_transform(y_have)
58+
df[col][have_index] = y_have
59+
rf = RandomForestClassifier()
60+
bagc = BaggingClassifier(base_estimator=rf,n_estimators=m)
61+
bagc.fit(X_have,y_have)
62+
column_scores[col]=bagc.score(X_have,y_have)
63+
resid_preds = bagc.predict(X_have)
64+
residuals = y_have-resid_preds
65+
preds = bagc.predict(X_na)
66+
else:
67+
bagr = BaggingRegressor(n_estimators=m)
68+
bagr.fit(X_have,y_have)
69+
column_scores[col] = bagr.score(X_have,y_have)
70+
resid_preds = bagr.predict(X_have)
71+
residuals = y_have-resid_preds
72+
preds = bagr.predict(X_na)
73+
if actual_or_gaussian_residuals=='actual':
74+
rand_resids = np.random.choice(residuals,len(X_na),replace=True)
75+
else:
76+
rand_resids = np.random.normal(residuals.mean(),residuals.std(),len(X_na))
77+
preds = preds + rand_resids
78+
if type_dict[col]=='object':
79+
preds = preds.round()
80+
if col_floor_ceiling_dict!=None:
81+
if col in col_floor_ceiling_dict.keys():
82+
preds = np.clip(preds,col_floor_ceiling_dict[col][0],col_floor_ceiling_dict[col][1])
83+
df[col][na_index] = preds
84+
have_columns.append(col)
85+
except:
86+
problems.append(col)
87+
if scores == False:
88+
return df,problems
89+
else:
90+
return df, problems, column_scores
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1-
import pandas as pd
2-
import numpy as np
1+
import argparse
2+
from time import strftime
33
from mysql.connector.pooling import MySQLConnectionPool
44

5+
print_header = lambda msg: print(f"{msg}\n{'-'*len(msg)}")
6+
57
cnxpool = None
68

7-
def _parseArgs():
8-
parser = argparse.ArgumentParser()
9+
def parseArgs():
10+
parser = argparse.ArgumentParser(add_help=False)
911
parser.add_argument('--db_host',
1012
type=str,
1113
help='database host address',
@@ -20,8 +22,10 @@ def _parseArgs():
2022
default=None)
2123
return parser.parse_args()
2224

23-
def _init(db_pool_size=None, db_host=None, db_port=None, db_pwd=None):
25+
def init(db_pool_size=None, db_host=None, db_port=None, db_pwd=None):
2426
global cnxpool
27+
if cnxpool is not None:
28+
return cnxpool
2529
print("{} initializing mysql connection pool...".format(
2630
strftime("%H:%M:%S")))
2731
cnxpool = MySQLConnectionPool(
@@ -35,18 +39,4 @@ def _init(db_pool_size=None, db_host=None, db_port=None, db_pwd=None):
3539
# ssl_ca='',
3640
# use_pure=True,
3741
connect_timeout=90000)
38-
39-
def _impute():
40-
global cnxpool
41-
c = cnxpool.get_connection()
42-
query = 'select code, date, amount, xrate, close, high, high_close, open, open_close, low, low_close, volume from index_d_n_lr'
43-
df = pd.read_sql(query, c)
44-
45-
if __name__ == '__main__':
46-
args = _parseArgs()
47-
_init(4,
48-
db_host=args.db_host,
49-
db_port=args.db_port,
50-
db_pwd=args.db_pwd
51-
)
52-
_impute()
42+
return cnxpool

corl/wc_data/impute/impute_tmpl.sh

100644100755
+1-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
#!/bin/sh
2-
python data_imputer.py --db_host=['replace'] --db_port=['replace'] --db_pwd=['replace']
2+
python with_autoimpute.py --db_host=['replace'] --db_port=['replace'] --db_pwd=['replace']
+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import pandas as pd
2+
import numpy as np
3+
from time import strftime
4+
from mysql.connector.pooling import MySQLConnectionPool
5+
from corl.wc_data.impute.common import parseArgs, init, print_header
6+
from autoimpute.imputations import SingleImputer, MultipleImputer, MiceImputer
7+
8+
import warnings
9+
warnings.filterwarnings("ignore")
10+
11+
cnxpool = None
12+
13+
14+
def _impute():
15+
c = cnxpool.get_connection()
16+
query = 'select code, date, amount, xrate, close, high, high_close, open, open_close, low, low_close, volume from index_d_n_lr order by code asc, date asc'
17+
df = pd.read_sql(query, c)
18+
print_header('original table:')
19+
print(df)
20+
print_header('Rows having NaN:')
21+
nan_df = df[df.isna().any(axis=1)]
22+
print(nan_df)
23+
sdf = df[['amount', 'xrate', 'close', 'high', 'high_close',
24+
'open', 'open_close', 'low', 'low_close', 'volume']]
25+
print_header('Subset of Table')
26+
print(sdf)
27+
mi = MiceImputer()
28+
mi_data_full = mi.fit_transform(sdf)
29+
30+
for i, m in enumerate(mi_data_full):
31+
print_header("After Imputation #{}".format(i+1))
32+
imputed_filtered = m[[m.isna().any(axis=1)]]
33+
print(imputed_filtered)
34+
35+
# print the results
36+
# print_header("Results from SingleImputer running PMM on column y one time")
37+
# conc = pd.concat([data_miss.head(20), si_data_full.head(20)], axis=1)
38+
# conc.columns = ["x", "y_orig", "x_imp", "y_imp"]
39+
# conc[["x", "y_orig", "y_imp"]]
40+
41+
42+
if __name__ == '__main__':
43+
args = parseArgs()
44+
cnxpool = init(4,
45+
db_host=args.db_host,
46+
db_port=args.db_port,
47+
db_pwd=args.db_pwd
48+
)
49+
_impute()

corl/wc_data/impute/with_multimp.py

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import pandas as pd
2+
import numpy as np
3+
from time import strftime
4+
from mysql.connector.pooling import MySQLConnectionPool
5+
from corl.wc_data.impute.common import parseArgs, init
6+
from corl.wc_data.impute.MultipleImputation import impute_missing_values
7+
8+
9+
10+
def _impute():
11+
global cnxpool
12+
c = cnxpool.get_connection()
13+
query = 'select code, date, amount, xrate, close, high, high_close, open, open_close, low, low_close, volume from index_d_n_lr order by code asc, date asc'
14+
df = pd.read_sql(query, c)
15+
print_header('original table:')
16+
print(df)
17+
print_header('Rows having NaN:')
18+
nan_df = df[df.isna().any(axis=1)]
19+
print(nan_df)
20+
si = SingleImputer()
21+
si_data_full = si.fit_transform(df)
22+
23+
print_header("After Imputation")
24+
imputed_filtered = si_data_full[[si_data_full.isna().any(axis=1)]]
25+
print(imputed_filtered)
26+
27+
# print the results
28+
# print_header("Results from SingleImputer running PMM on column y one time")
29+
# conc = pd.concat([data_miss.head(20), si_data_full.head(20)], axis=1)
30+
# conc.columns = ["x", "y_orig", "x_imp", "y_imp"]
31+
# conc[["x", "y_orig", "y_imp"]]
32+
33+
if __name__ == '__main__':
34+
args = parseArgs()
35+
init(4,
36+
db_host=args.db_host,
37+
db_port=args.db_port,
38+
db_pwd=args.db_pwd
39+
)
40+
_impute()

corl/wc_data/impute/with_tfp.py

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import tensorflow_probability as tfp
2+
import tensorflow as tf
3+
import numpy as np
4+
5+
6+
time_series_with_nans = [-1., 1., np.nan, 2.4, np.nan, 5]
7+
observed_time_series = tfp.sts.MaskedTimeSeries(
8+
time_series=time_series_with_nans,
9+
is_missing=tf.math.is_nan(time_series_with_nans))
10+
11+
12+
# Build model using observed time series to set heuristic priors.
13+
linear_trend_model = tfp.sts.LocalLinearTrend(
14+
observed_time_series=observed_time_series)
15+
model = tfp.sts.Sum([linear_trend_model],
16+
observed_time_series=observed_time_series)
17+
18+
19+
# Fit model to data
20+
parameter_samples, _ = tfp.sts.fit_with_hmc(model, observed_time_series)
21+
22+
23+
# Impute missing values
24+
imputed_series_distribution = tfp.sts.impute_missing_values(
25+
model, observed_time_series, parameter_samples, include_observation_noise=True)
26+
print('imputed means and stddevs: ',
27+
imputed_series_distribution.mean(),
28+
imputed_series_distribution.stddev(),
29+
imputed_series_distribution.)

0 commit comments

Comments
 (0)