11import random
22from datetime import datetime , timedelta
3-
4- import awswrangler as wr
5- import boto3
6- import botocore
7- from decimal import Decimal
83import numpy as np
9- import pandas as pd
104from pyspark .sql import SparkSession
115from pyspark .sql .types import StructType , StructField , DoubleType , IntegerType , StringType , TimestampType , BooleanType
6+ import boto3
7+ import awswrangler as wr
8+ import os
129
1310# Initialize Spark session
1411spark = SparkSession .builder .appName ("FraudClassificationSchema" ).getOrCreate ()
1512
16- try :
17- dynamodb = boto3 . client ( 'dynamodb' )
18- except botocore . exceptions . NoCredentialsError :
19- print ( "AWS credentials not found. Please configure your AWS credentials." )
20- raise
13+ ENDPOINT_URL = os . environ . get ( "DYNAMO_ENDPOINT" ) if os . environ . get ( "DYNAMO_ENDPOINT" ) is not None else 'http://localhost:8000'
14+
15+ wr . config . dynamodb_endpoint_url = ENDPOINT_URL
16+ dynamodb = boto3 . client ( 'dynamodb' , endpoint_url = ENDPOINT_URL )
17+
2118
2219def time_to_value (t , base_value , amplitude , noise_level , scale = 1 ):
2320 if scale is None :
@@ -106,18 +103,18 @@ def generate_timeseries_with_anomalies(num_samples=1000, base_value=100, amplitu
106103
107104 # User features (dim_user) – 7
108105 StructField ("user_account_age" , IntegerType (), True ),
109- StructField ("user_account_balance " , DoubleType (), True ),
110- StructField ("user_credit_score " , IntegerType (), True ),
111- StructField ("user_number_of_devices " , IntegerType (), True ),
106+ StructField ("account_balance " , DoubleType (), True ),
107+ StructField ("credit_score " , IntegerType (), True ),
108+ StructField ("number_of_devices " , IntegerType (), True ),
112109 StructField ("user_country" , StringType (), True ),
113110 StructField ("user_account_type" , IntegerType (), True ),
114111 StructField ("user_preferred_language" , StringType (), True ),
115112
116113 # merchant features (dim_merchant) – 4
117114 StructField ("merchant_account_age" , IntegerType (), True ),
118- StructField ("merchant_zipcode " , IntegerType (), True ),
115+ StructField ("zipcode " , IntegerType (), True ),
119116 # set to true for 100 merchant_ids
120- StructField ("merchant_is_big_merchant " , BooleanType (), True ),
117+ StructField ("is_big_merchant " , BooleanType (), True ),
121118 StructField ("merchant_country" , StringType (), True ),
122119 StructField ("merchant_account_type" , IntegerType (), True ),
123120 StructField ("merchant_preferred_language" , StringType (), True ),
@@ -260,80 +257,8 @@ def generate_fraud_sample_data(num_samples=10000):
260257 data .append (tuple (row ))
261258 return data
262259
263- fraud_data = generate_fraud_sample_data (5000 )
260+ fraud_data = generate_fraud_sample_data (20000 )
264261fraud_df = spark .createDataFrame (fraud_data , schema = fraud_schema )
265262
266263fraud_df .write .mode ("overwrite" ).parquet ("data" )
267- print ("Successfully wrote user data to parquet" , flush = True )
268-
269-
270- dynamodb = boto3 .client ('dynamodb' )
271-
272- skip_data_upload = False
273- table_name = "transaction_risk"
274- try :
275- dynamodb .create_table (
276- AttributeDefinitions = [
277- {
278- 'AttributeName' : 'user_id' ,
279- 'AttributeType' : 'N' ,
280- },
281- {
282- 'AttributeName' : 'ts' ,
283- 'AttributeType' : 'N' ,
284- },
285- ],
286- KeySchema = [
287- {
288- 'AttributeName' : 'user_id' ,
289- 'KeyType' : 'HASH'
290- },
291- {
292- 'AttributeName' : 'ts' ,
293- 'KeyType' : 'RANGE' ,
294- },
295- ],
296- ProvisionedThroughput = {
297- 'ReadCapacityUnits' : 5 ,
298- 'WriteCapacityUnits' : 5 ,
299- },
300- TableName = table_name ,
301- )
302- dynamodb .get_waiter ('table_exists' ).wait (TableName = table_name )
303- except botocore .exceptions .ClientError as error :
304- if error .response ['Error' ]['Code' ] == 'ResourceInUseException' :
305- # Table already exists
306- print ("Dynamo Table already exists. Skipping upload" )
307- skip_data_upload = True
308- else :
309- raise error
310-
311- if not skip_data_upload :
312- print ("Done creating table. Beginning data upload (This may take a few minutes)" , flush = True )
313-
314- fraud_df = fraud_df .toPandas ()
315-
316- # Convert Float types to Decimal
317- for column in fraud_df .columns :
318- if str (fraud_df .dtypes [column ]) == "float64" :
319- fraud_df [column ] = fraud_df [column ].apply (str ).apply (Decimal )
320-
321- # Convert datetime to string and int
322- fraud_df ['ds' ] = pd .to_datetime (fraud_df ['transaction_time' ]).dt .date .apply (str )
323- fraud_df ['ts' ] = pd .to_datetime (fraud_df ['transaction_time' ]).astype ('int64' )
324- fraud_df .drop ('transaction_time' , axis = 1 , inplace = True )
325-
326- print ("Uploading to DynamoDB" )
327-
328-
329- # Upload data in batches
330- batch_size = 1000 # Adjust based on your needs
331- for i in range (0 , len (fraud_df ), batch_size ):
332- batch = fraud_df .iloc [i :i + batch_size ]
333- wr .dynamodb .put_df (df = batch , table_name = table_name )
334- print (f"Uploaded batch { i // batch_size + 1 } /{ len (fraud_df )// batch_size + 1 } " , flush = True )
335-
336- # wr.dynamodb.put_df(df=fraud_df, table_name=table_name)
337-
338-
339- print ("Wrote parquet to Dynamo" )
264+ print ("Successfully wrote user data to parquet" )
0 commit comments