-
Notifications
You must be signed in to change notification settings - Fork 2
/
main.py
120 lines (98 loc) · 4.07 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""
Copyright 2021 Schoening Consulting, LLC
"""
import json
import pandas as pd
from collections import namedtuple
import logging
from ff3 import FF3Cipher
from rules import clipl, date_generalize, fpe, redact, us_zipcode
# display data set on CLI
def display_df(dframe):
pd.set_option("display.max_columns", 12)
pd.set_option("display.width", 120)
print(dframe)
dframe
def local_load_csv(fname, dtype={}, date_types=[]):
# turn off NA filter so that missing values don't become NaN
# df=pd.read_csv('demo/CCSampleData.csv', dtype={"Zipcode": str}, parse_dates=dtypes, na_filter=False)
print(date_types)
df=pd.read_csv(fname, dtype={"Zipcode": str}, parse_dates=date_types, na_filter=False)
return df
# TODO: consider refactoring global
date_types = []
class Cipher:
instance = None
def initialize_fpe(key, tweak, radix = 10):
Cipher.instance = FF3Cipher(key, tweak, radix)
def old_anonymize(df):
# for now, read rules from here instead of .deid file
deid = [
'{"column" : "First name", "type" : "clipl", "n" : "1"}',
'{"column" : "Last name", "type" : "clipl", "n" : "3"}',
'{"column" : "Zipcode", "type" : "us_zipcode"}',
'{"column" : "SSN", "type" : "FF3", "format" : "000-00-0000", "sep" : "-"}',
'{"column" : "Canadian SIN", "type" : "FF3", "format" : "000-000-000"}',
'{"column" : "Employee ID", "type" : "redact"}',
'{"column" : "Birth Date", "type" : "Generalize.Date", "format" : "5"}',
'{"column" : "Acct num", "type" : "Mask", "format" : "5"}'
]
return anonymize(df, deid)
def build_rules(rules):
rulemap = {}
for row in rules:
# map each row in deid to a tuple
x = json.loads(row, object_hook= lambda d: namedtuple('X', d.keys()) (*d.values()))
if x.type == "FF3":
obj = lambda s: fpe(Cipher.instance,s)
elif x.type == "Generalize.Date":
# obj = lambda x: datetime.date(x.year, 1, 1)
obj = lambda x: date_generalize(x)
date_types.append(x.column)
elif x.type == "Mask":
width = int(x.format)
obj = lambda s: len(s[:-width])*"#"+s[-width:]
elif x.type == "us_zipcode":
obj = lambda s: us_zipcode(s)
elif x.type == "clipl":
lm = lambda n: lambda s: clipl(s,int(n))
obj = lm(x.n)
elif x.type == "redact":
obj = lambda s: redact(s)
else:
raise ValueError(f"Unexpected rule {x.type}")
rulemap[x.column] = obj
#print(rulemap)
#print(date_types)
return rulemap
def anonymize(df, rulelist):
# Data rules require type coercion by pandas read_csv
# date_rules = { "Generalize.Date" }
df = df.copy()
rulemap = build_rules(rulelist)
# iterate over the bound rules, appying them to columns
for colname, functor in rulemap.items():
if colname in df.columns:
logging.info("anonymize %s", colname)
df[colname] = df[colname].apply(rulemap[colname])
else:
logging.warning("missing column %s", colname)
print(f"processed {df.shape[0]} rows")
return df
if __name__ == "__main__":
df = pd.read_csv('../demo/CCSampleData.csv', dtype={"Zipcode": str}, parse_dates=['Birth Date'], na_filter=False)
# TODO: load rules from a .json file
rules = [
'{"column" : "Zipcode", "type" : "us_zipcode"}',
'{"column" : "Birth Date", "type" : "Generalize.Date", "format" : "5"}',
'{"column" : "First name", "type" : "clipl", "n" : "1"}',
'{"column" : "Last name", "type" : "clipl", "n" : "3"}',
'{"column" : "Acct num", "type" : "Mask", "format" : "5"}',
'{"column" : "SSN", "type" : "FF3", "format" : "000-00-0000", "sep" : "-"}',
'{"column" : "Canadian SIN", "type" : "FF3", "format" : "000-000-000"}',
'{"column" : "Acct num", "type" : "Mask", "format" : "5"}'
]
initialize_fpe("EF4359D8D580AA4F7F036D6F04FC6A94", "D8E7920AFA330A73")
out_df = anonymize(df, rules)
display_df(out_df)
out_df.to_csv('../demo/CCSampleDataOut.csv')