@@ -38,34 +38,32 @@ def __init__(self, app_name, ppml_args=None, spark_conf=None):
38
38
for (k , v ) in spark_conf .getAll ():
39
39
conf [k ] = v
40
40
if ppml_args :
41
- kms_type = ppml_args .get ("kms_type" , "SimpleKeyManagementService " )
42
- conf ["spark.bigdl.kms.type" ] = kms_type
41
+ kms_type = ppml_args .get ("kms_type" , "" )
42
+ conf ["spark.bigdl.primaryKey.defaultKey. kms.type" ] = kms_type
43
43
if kms_type == "SimpleKeyManagementService" :
44
- conf ["spark.bigdl.kms.appId" ] = check (ppml_args , "app_id" )
45
- conf ["spark.bigdl.kms.apiKey" ] = check (ppml_args , "api_key" )
46
- conf ["spark.bigdl.kms.primaryKey" ] = check (ppml_args , "primary_key" )
47
- conf ["spark.bigdl.kms.dataKey" ] = check (ppml_args , "data_key" )
44
+ conf ["spark.bigdl.primaryKey.defaultKey.kms.appId" ] = check (ppml_args , "app_id" )
45
+ conf ["spark.bigdl.primaryKey.defaultKey.kms.apiKey" ] = check (ppml_args , "api_key" )
46
+ conf ["spark.bigdl.primaryKey.defaultKey.material" ] = check (ppml_args , "primary_key_material" )
48
47
elif kms_type == "EHSMKeyManagementService" :
49
- conf ["spark.bigdl.kms.ip" ] = check (ppml_args , "kms_server_ip" )
50
- conf ["spark.bigdl.kms.port" ] = check (ppml_args , "kms_server_port" )
51
- conf ["spark.bigdl.kms.id" ] = check (ppml_args , "app_id" )
52
- conf ["spark.bigdl.kms.apiKey" ] = check (ppml_args , "api_key" )
53
- conf ["spark.bigdl.kms.primaryKey" ] = check (ppml_args , "primary_key" )
54
- conf ["spark.bigdl.kms.dataKey" ] = check (ppml_args , "data_key" )
48
+ conf ["spark.bigdl.primaryKey.defaultKey.kms.ip" ] = check (ppml_args , "kms_server_ip" )
49
+ conf ["spark.bigdl.primaryKey.defaultKey.kms.port" ] = check (ppml_args , "kms_server_port" )
50
+ conf ["spark.bigdl.primaryKey.defaultKey.kms.id" ] = check (ppml_args , "app_id" )
51
+ conf ["spark.bigdl.primaryKey.defaultKey.kms.apiKey" ] = check (ppml_args , "api_key" )
52
+ conf ["spark.bigdl.primaryKey.defaultKey.material" ] = check (ppml_args , "primary_key_material" )
55
53
elif kms_type == "AzureKeyManagementService" :
56
- conf ["spark.bigdl.kms.vault" ] = check (ppml_args , "vault" )
57
- conf ["spark.bigdl.kms.clientId" ] = ppml_args .get ("client_id" , "" )
58
- conf ["spark.bigdl.kms.primaryKey" ] = check (ppml_args , "primary_key" )
59
- conf ["spark.bigdl.kms.dataKey" ] = check (ppml_args , "data_key" )
54
+ conf ["spark.bigdl.primaryKey.defaultKey.kms.vault" ] = check (ppml_args , "vault" )
55
+ conf ["spark.bigdl.primaryKey.defaultKey.kms.clientId" ] = ppml_args .get ("client_id" , "" )
56
+ conf ["spark.bigdl.primaryKey.defaultKey.material" ] = check (ppml_args , "primary_key_material" )
60
57
elif kms_type == "BigDLKeyManagementService" :
61
- conf ["spark.bigdl.kms.ip" ] = check (ppml_args , "kms_server_ip" )
62
- conf ["spark.bigdl.kms.port" ] = check (ppml_args , "kms_server_port" )
63
- conf ["spark.bigdl.kms.user" ] = check (ppml_args , "kms_user_name" )
64
- conf ["spark.bigdl.kms.token" ] = check (ppml_args , "kms_user_token" )
65
- conf ["spark.bigdl.kms.primaryKey" ] = check (ppml_args , "primary_key" )
66
- conf ["spark.bigdl.kms.dataKey" ] = check (ppml_args , "data_key" )
58
+ conf ["spark.bigdl.primaryKey.defaultKey.kms.ip" ] = check (ppml_args , "kms_server_ip" )
59
+ conf ["spark.bigdl.primaryKey.defaultKey.kms.port" ] = check (ppml_args , "kms_server_port" )
60
+ conf ["spark.bigdl.primaryKey.defaultKey.kms.user" ] = check (ppml_args , "kms_user_name" )
61
+ conf ["spark.bigdl.primaryKey.defaultKey.kms.token" ] = check (ppml_args , "kms_user_token" )
62
+ conf ["spark.bigdl.primaryKey.defaultKey.material" ] = check (ppml_args , "primary_key_material" )
63
+ elif kms_type == "" :
64
+ conf ["spark.bigdl.primaryKey.defaultKey.plainText" ] = check (ppml_args , "primary_key_plaintext" )
67
65
else :
68
- invalidInputError (False , "invalid KMS type" )
66
+ invalidInputError (False , "invalid KMS type. " )
69
67
70
68
conf ["spark.hadoop.io.compression.codecs" ] = "com.intel.analytics.bigdl.ppml.crypto.CryptoCodec"
71
69
spark_conf = init_spark_conf (conf )
@@ -76,32 +74,29 @@ def __init__(self, app_name, ppml_args=None, spark_conf=None):
76
74
args = [self .spark ._jsparkSession ]
77
75
super ().__init__ (None , self .bigdl_type , * args )
78
76
79
- def load_keys (self , primary_key_path , data_key_path ):
80
- self .value = callBigDlFunc (self .bigdl_type , "loadKeys" , self .value , primary_key_path , data_key_path )
81
-
82
- def read (self , crypto_mode , kms_name = "" , primary_key = "" , data_key = "" ):
77
+ def read (self , crypto_mode , primary_key_name = "defaultKey" ):
83
78
if isinstance (crypto_mode , CryptoMode ):
84
79
crypto_mode = crypto_mode .value
85
- df_reader = callBigDlFunc (self .bigdl_type , "read" , self . value , crypto_mode ,
86
- kms_name , primary_key , data_key )
80
+ df_reader = callBigDlFunc (self .bigdl_type , "read" ,
81
+ self . value , crypto_mode , primary_key_name )
87
82
return EncryptedDataFrameReader (self .bigdl_type , df_reader )
88
83
89
- def write (self , dataframe , crypto_mode , kms_name = "" , primary_key = "" , data_key = " " ):
84
+ def write (self , dataframe , crypto_mode , primary_key_name = "defaultKey " ):
90
85
if isinstance (crypto_mode , CryptoMode ):
91
86
crypto_mode = crypto_mode .value
92
- df_writer = callBigDlFunc (self .bigdl_type , "write" , self .value , dataframe , crypto_mode ,
93
- kms_name , primary_key , data_key )
87
+ df_writer = callBigDlFunc (self .bigdl_type , "write" , self .value ,
88
+ dataframe , crypto_mode , primary_key_name )
94
89
return EncryptedDataFrameWriter (self .bigdl_type , df_writer )
95
90
96
- def textfile (self , path , min_partitions = None , crypto_mode = "plain_text" ,
97
- kms_name = " " , primary_key = "" , data_key = " " ):
91
+ def textfile (self , path , min_partitions = None ,
92
+ crypto_mode = "plain_text " , primary_key_name = "defaultKey " ):
98
93
if min_partitions is None :
99
94
min_partitions = self .spark .sparkContext .defaultMinPartitions
100
95
if isinstance (crypto_mode , CryptoMode ):
101
96
crypto_mode = crypto_mode .value
102
- return callBigDlFunc (self .bigdl_type , "textFile" , self . value ,
103
- path , min_partitions , crypto_mode ,
104
- kms_name , primary_key , data_key )
97
+ return callBigDlFunc (self .bigdl_type , "textFile" ,
98
+ self . value , path , min_partitions ,
99
+ crypto_mode , primary_key_name )
105
100
106
101
107
102
class EncryptedDataFrameReader :
@@ -168,9 +163,8 @@ class CryptoMode(Enum):
168
163
# CryptoMode AES_GCM_CTR_V1 for parquet only
169
164
AES_GCM_CTR_V1 = "AES_GCM_CTR_V1"
170
165
171
-
172
- def init_keys (app_id , api_key , primary_key_path , data_key_path ):
173
- return callBigDlFunc ("float" , "initKeys" , app_id , api_key , primary_key_path , data_key_path )
166
+ def init_keys (app_id , api_key , primary_key_path ):
167
+ return callBigDlFunc ("float" , "initKeys" , app_id , api_key , primary_key_path )
174
168
175
169
176
170
def generate_encrypted_file (kms , primary_key_path , data_key_path , input_path , output_path ):
0 commit comments