-
Notifications
You must be signed in to change notification settings - Fork 0
/
handler_py.py
144 lines (115 loc) · 5.75 KB
/
handler_py.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import time
import cProfile
import os
import logging
from datetime import datetime
from helpers import generate_random_seeds, generate_payments, setup_environment, initialize_profiler, log_time
import math
import joblib
import pandas as pd
logging.basicConfig(
level=logging.INFO, # Set the logging level to INFO
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', # Log format
)
# Create a logger object
logger = logging.getLogger(__name__)
class PaymentHandler:
def __init__(self):
self.payments = {}
self.payment_counts = {}
self.current_balances = {}
self.cumulative_balances = {}
self.periodic_average_payments = {}
self.periodic_std_payments = {}
self.var_cumulative_balances = {}
def initialize_merchant(self, merchant_id):
"""Initialize merchant data structures if not already initialized."""
if merchant_id not in self.payments:
self.payments[merchant_id] = []
self.payment_counts[merchant_id] = 0
self.current_balances[merchant_id] = 0.0
self.cumulative_balances[merchant_id] = [0.0]
self.var_cumulative_balances[merchant_id] = []
def process_payment(self, merchant_id, amount):
"""Process a payment for a merchant."""
self.initialize_merchant(merchant_id)
self.payments[merchant_id].append(amount)
self.payment_counts[merchant_id] += 1
self.current_balances[merchant_id] += amount
self.cumulative_balances[merchant_id].append(self.current_balances[merchant_id]) # Update cumulative balance
def get_payment_count(self, merchant_id):
"""Get the payment count for a merchant."""
if merchant_id in self.payment_counts:
return self.payment_counts[merchant_id]
else:
raise ValueError("Merchant ID not found")
def calculate_periodic_average(self, values, window_size):
"""Calculate the average of the last 'window_size' values."""
if len(values) < window_size:
raise ValueError("Not enough values to calculate average")
return sum(values[-window_size:]) / window_size
def calculate_periodic_std(self, values, window_size):
"""Calculate the standard deviation of the last 'window_size' values."""
avg = self.calculate_periodic_average(values, window_size)
variance = sum((x - avg) ** 2 for x in values[-window_size:]) / window_size
return math.sqrt(variance)
def update_periodic_statistics(self, merchant_id, window_size):
"""Update the periodic statistics for a merchant."""
if merchant_id in self.payments:
payments = self.payments[merchant_id]
if len(payments) >= window_size:
avg = self.calculate_periodic_average(payments, window_size)
std = self.calculate_periodic_std(payments, window_size)
self._append_to_dict_list(self.periodic_average_payments, merchant_id, avg)
self._append_to_dict_list(self.periodic_std_payments, merchant_id, std)
else:
raise ValueError("Merchant ID not found")
def calculate_var(self, values, confidence_level):
"""Calculate the Value at Risk (VaR) for a list of values."""
if not values:
raise ValueError("The values list is empty")
sorted_values = sorted(values)
index = int((1 - confidence_level) * len(sorted_values))
return sorted_values[index]
def calculate_balance_var(self, merchant_id, confidence_level):
"""Calculate the VaR for the cumulative balances of a merchant."""
if merchant_id not in self.cumulative_balances:
raise ValueError("Merchant ID not found")
cumulative_balances = self.cumulative_balances[merchant_id]
if len(cumulative_balances) <= 1:
raise ValueError("Not enough balance data to calculate VaR")
var = self.calculate_var(cumulative_balances, confidence_level)
self._append_to_dict_list(self.var_cumulative_balances, merchant_id, var)
return var
def _append_to_dict_list(self, dictionary, key, value):
"""Helper method to append a value to a list in a dictionary."""
if key not in dictionary:
dictionary[key] = []
dictionary[key].append(value)
def main():
config = setup_environment()
profiler = initialize_profiler()
time_taken = []
for run_index in range(config['runs']):
logger.info("Loading payments")
payments = pd.read_parquet(f'artefacts/payments/payments_{run_index}.parquet').to_dict(orient='records')
logger.info(f"Starting run {run_index + 1}/{config['runs']}")
start_time = time.time()
profiler.enable()
payment_handler = PaymentHandler()
for payment in payments:
payment_handler.process_payment(payment["merchant_id"], payment["amount"])
if payment_handler.get_payment_count(payment["merchant_id"]) % config['periodic_statistics_interval'] == 0:
payment_handler.update_periodic_statistics(payment["merchant_id"], config['periodic_statistics_window_size'])
payment_handler.calculate_balance_var(payment["merchant_id"], config['confidence_interval'])
profiler.disable()
profiler.dump_stats(f"artefacts/python/run_{run_index + 1}.prof")
end_time = time.time()
elapsed_time = log_time(start_time, end_time)
time_taken.append(elapsed_time)
joblib.dump(time_taken, 'artefacts/python/time_taken.joblib')
logger.info(f"Average time taken: {sum(time_taken)/len(time_taken):.2f} seconds")
logger.info(f"Max time taken: {max(time_taken):.2f} seconds")
logger.info(f"Min time taken: {min(time_taken):.2f} seconds")
if __name__ == "__main__":
main()