|
11 | 11 | from datetime import datetime, timezone
|
12 | 12 | from monitoring.traffic_monitor import collect_region_traffic
|
13 | 13 | from utils.history_manager import update_traffic_history
|
14 |
| -from prediction.placement_predictor import predict_placement_actions |
| 14 | +from prediction.placement_predictor import PlacementPredictor |
15 | 15 | from utils.state_manager import load_deployment_state, save_deployment_state
|
16 | 16 | from utils.fancy_logger import get_logger
|
17 | 17 | from utils.history_manager import load_traffic_history
|
18 | 18 | from dateutil.parser import isoparse
|
19 | 19 | from utils.config_loader import Config
|
20 | 20 | from logging.handlers import RotatingFileHandler
|
21 | 21 | from utils.metrics_fetcher import MetricsFetcher
|
| 22 | +from metrics.metrics_client import MetricsClient |
22 | 23 |
|
23 | 24 | # Load configuration
|
24 | 25 | config = Config.get_config()
|
|
64 | 65 | # set the app name to the current directory name
|
65 | 66 | FLY_APP_NAME = os.path.basename(os.getcwd())
|
66 | 67 |
|
67 |
| -def get_current_regions(): |
68 |
| - deployment_state = load_deployment_state(dry_run=DRY_RUN) |
69 |
| - current_regions = list(deployment_state.keys()) |
70 |
| - print(f"Current deployment regions: {current_regions}") |
71 |
| - return current_regions |
| 68 | +class AutoPlacer: |
| 69 | + def __init__(self, config): |
| 70 | + self.dry_run = config.get('dry_run', True) |
| 71 | + self.excluded_regions = config.get('excluded_regions', []) |
| 72 | + self.allowed_regions = config.get('allowed_regions', []) # Add this line |
| 73 | + self.always_running_regions = config.get('always_running_regions', []) |
| 74 | + self.predictor = PlacementPredictor(config) |
| 75 | + self.logger = get_logger(__name__) |
| 76 | + |
| 77 | + async def process_traffic_data(self): |
| 78 | + """Main processing loop""" |
| 79 | + metrics_fetcher = MetricsFetcher(dry_run=self.dry_run) |
| 80 | + app_name = metrics_fetcher.get_app_name() |
| 81 | + |
| 82 | + self.logger.info(f"Starting auto-placer execution for app: {app_name}") |
| 83 | + |
| 84 | + # Collect and process traffic data |
| 85 | + current_data = collect_region_traffic() |
| 86 | + update_traffic_history(current_data, dry_run=self.dry_run) |
| 87 | + |
| 88 | + # Get current state |
| 89 | + current_state = load_deployment_state(dry_run=self.dry_run) |
| 90 | + current_regions = list(current_state.keys()) |
| 91 | + |
| 92 | + # Load traffic history |
| 93 | + traffic_history = load_traffic_history(dry_run=self.dry_run) |
| 94 | + |
| 95 | + # Process each region |
| 96 | + actions_needed = [] |
| 97 | + for region, traffic_stats in traffic_history.items(): |
| 98 | + if self._should_process_region(region): |
| 99 | + action = self.predictor.predict_placement_actions(region, traffic_stats) |
| 100 | + if action: |
| 101 | + actions_needed.append((region, action)) |
| 102 | + |
| 103 | + # Execute the needed actions |
| 104 | + return await self._execute_actions(actions_needed, current_state) |
| 105 | + |
| 106 | + def _should_process_region(self, region: str) -> bool: |
| 107 | + """Determine if a region should be processed based on configuration.""" |
| 108 | + if region in self.excluded_regions: |
| 109 | + return False |
| 110 | + if self.allowed_regions and region not in self.allowed_regions: |
| 111 | + return False |
| 112 | + return True |
| 113 | + |
| 114 | + async def _execute_actions(self, actions_needed, current_state): |
| 115 | + """Execute the required placement actions.""" |
| 116 | + regions_to_deploy = [] |
| 117 | + regions_to_remove = [] |
| 118 | + |
| 119 | + for region, action in actions_needed: |
| 120 | + if action == 'scale_up' and region not in current_state: |
| 121 | + regions_to_deploy.append(region) |
| 122 | + elif action == 'scale_down' and region in current_state: |
| 123 | + regions_to_remove.append(region) |
| 124 | + |
| 125 | + updated_regions, action_results = update_placements( |
| 126 | + regions_to_deploy, |
| 127 | + regions_to_remove |
| 128 | + ) |
| 129 | + |
| 130 | + return { |
| 131 | + "actions_taken": action_results, |
| 132 | + "updated_regions": updated_regions, |
| 133 | + "timestamp": datetime.now(timezone.utc).isoformat() |
| 134 | + } |
| 135 | + |
| 136 | + def _is_in_cooldown(self, region: str, current_state: dict) -> bool: |
| 137 | + """Check if a region is in cooldown period""" |
| 138 | + last_action_time_str = current_state.get(region) |
| 139 | + if not last_action_time_str: |
| 140 | + return False |
72 | 141 |
|
73 |
| -from dateutil.parser import isoparse |
74 |
| -from datetime import datetime, timezone |
| 142 | + last_action_time = isoparse(last_action_time_str) |
| 143 | + if last_action_time.tzinfo is None: |
| 144 | + last_action_time = last_action_time.replace(tzinfo=timezone.utc) |
| 145 | + |
| 146 | + elapsed_time = (datetime.now(timezone.utc) - last_action_time).total_seconds() |
| 147 | + return elapsed_time < self.cooldown_period |
75 | 148 |
|
76 | 149 | def update_placements(regions_to_deploy, regions_to_remove):
|
77 |
| - current_state = load_deployment_state(dry_run=DRY_RUN) |
78 |
| - updated_state = current_state.copy() |
79 |
| - now = datetime.now(timezone.utc) |
80 |
| - action_results = {"deployed": [], "removed": [], "skipped": []} |
81 |
| - |
82 |
| - # Deploy machines |
| 150 | + """Update machine placements in Fly.io regions.""" |
| 151 | + action_results = { |
| 152 | + "deployed": [], |
| 153 | + "removed": [], |
| 154 | + "skipped": [], |
| 155 | + "errors": [] |
| 156 | + } |
| 157 | + updated_regions = [] |
| 158 | + |
| 159 | + # Process deployments |
83 | 160 | for region in regions_to_deploy:
|
84 |
| - last_action_time_str = current_state.get(region) |
85 |
| - if last_action_time_str: |
86 |
| - last_action_time = isoparse(last_action_time_str) |
87 |
| - if last_action_time.tzinfo is None: |
88 |
| - last_action_time = last_action_time.replace(tzinfo=timezone.utc) |
89 |
| - elapsed_time = (now - last_action_time).total_seconds() |
90 |
| - if elapsed_time < COOLDOWN_PERIOD: |
91 |
| - remaining = int(COOLDOWN_PERIOD - elapsed_time) |
92 |
| - logger.info(f"Skipping deployment to {region} due to cooldown period ({remaining} seconds remaining).") |
93 |
| - action_results["skipped"].append({ |
94 |
| - "region": region, |
95 |
| - "action": "deploy", |
96 |
| - "reason": f"Cooldown period ({remaining} seconds remaining)" |
97 |
| - }) |
98 |
| - continue |
99 |
| - |
100 |
| - # Proceed to deploy |
101 |
| - updated_state[region] = now.isoformat() |
102 |
| - |
103 |
| - if DRY_RUN: |
104 |
| - logger.info(f"[DRY RUN] Would deploy machine to region: {region}") |
105 |
| - else: |
106 |
| - logger.info(f"Deploying machine to region: {region}") |
107 |
| - # deploy_machine(region) |
108 |
| - |
109 |
| - action_results["deployed"].append(region) |
110 |
| - |
111 |
| - # Remove machines |
| 161 | + try: |
| 162 | + if not DRY_RUN: |
| 163 | + subprocess.run(['fly', 'scale', 'count', '1', '--region', region], check=True) |
| 164 | + action_results["deployed"].append(region) |
| 165 | + updated_regions.append(region) |
| 166 | + except Exception as e: |
| 167 | + action_results["errors"].append({"region": region, "action": "deploy", "error": str(e)}) |
| 168 | + |
| 169 | + # Process removals |
112 | 170 | for region in regions_to_remove:
|
113 |
| - last_action_time_str = current_state.get(region, None) |
114 |
| - if last_action_time_str: |
115 |
| - last_action_time = isoparse(last_action_time_str) |
116 |
| - if last_action_time.tzinfo is None: |
117 |
| - last_action_time = last_action_time.replace(tzinfo=timezone.utc) |
118 |
| - elapsed_time = (now - last_action_time).total_seconds() |
119 |
| - if elapsed_time < COOLDOWN_PERIOD: |
120 |
| - remaining = int(COOLDOWN_PERIOD - elapsed_time) |
121 |
| - logger.info(f"Skipping removal from {region} due to cooldown period ({remaining} seconds remaining).") |
122 |
| - action_results["skipped"].append({ |
123 |
| - "region": region, |
124 |
| - "action": "remove", |
125 |
| - "reason": f"Cooldown period ({remaining} seconds remaining)" |
126 |
| - }) |
127 |
| - continue |
128 |
| - |
129 |
| - # Proceed to remove |
130 |
| - removed = updated_state.pop(region, None) |
131 |
| - |
132 |
| - if removed is not None: |
133 |
| - if DRY_RUN: |
134 |
| - logger.info(f"[DRY RUN] Would remove machine from region: {region}") |
135 |
| - else: |
136 |
| - logger.info(f"Removing machine from region: {region}") |
137 |
| - # remove_machine(region) |
| 171 | + try: |
| 172 | + if not DRY_RUN: |
| 173 | + subprocess.run(['fly', 'scale', 'count', '0', '--region', region], check=True) |
138 | 174 | action_results["removed"].append(region)
|
139 |
| - else: |
140 |
| - logger.warning(f"Tried to remove {region}, but it was not found in deployment state.") |
141 |
| - action_results["skipped"].append({ |
142 |
| - "region": region, |
143 |
| - "action": "remove", |
144 |
| - "reason": "Region not found in deployment state" |
145 |
| - }) |
| 175 | + updated_regions.append(region) |
| 176 | + except Exception as e: |
| 177 | + action_results["errors"].append({"region": region, "action": "remove", "error": str(e)}) |
146 | 178 |
|
147 |
| - save_deployment_state(updated_state, dry_run=DRY_RUN) |
148 |
| - return list(updated_state.keys()), action_results |
| 179 | + return updated_regions, action_results |
149 | 180 |
|
150 | 181 | def main():
|
151 |
| - metrics_fetcher = MetricsFetcher(dry_run=DRY_RUN) |
152 |
| - app_name = metrics_fetcher.get_app_name() |
153 |
| - |
154 |
| - logger.info(f"Starting auto-placer execution for app: {app_name}") |
155 |
| - logger.info(f"Collecting current traffic data for app: {app_name}...") |
156 |
| - current_data = collect_region_traffic() |
157 |
| - logger.debug(f"Current traffic data for app {app_name}: {current_data}") |
158 |
| - |
159 |
| - logger.info("Updating traffic history...") |
160 |
| - update_traffic_history(current_data, dry_run=DRY_RUN) |
161 |
| - |
162 |
| - logger.info("Retrieving current deployment regions...") |
163 |
| - current_state = load_deployment_state(dry_run=DRY_RUN) |
164 |
| - current_regions = list(current_state.keys()) |
165 |
| - logger.info(f"Current deployment regions: {current_regions}") |
| 182 | + config = Config.get_config() |
| 183 | + metrics_client = MetricsClient() |
| 184 | + auto_placer = AutoPlacer(config, metrics_client) |
166 | 185 |
|
167 |
| - logger.info("Loading traffic history...") |
168 |
| - traffic_history = load_traffic_history(dry_run=DRY_RUN) |
169 |
| - |
170 |
| - logger.info("Predicting placement actions...") |
171 |
| - regions_to_deploy, regions_to_remove, skipped_in_prediction = predict_placement_actions(traffic_history, current_regions) |
172 |
| - |
173 |
| - logger.info(f"Regions to deploy machines: {regions_to_deploy}") |
174 |
| - logger.info(f"Regions to remove machines: {regions_to_remove}") |
175 |
| - |
176 |
| - logger.info("Updating placements...") |
177 |
| - updated_regions, action_results = update_placements(regions_to_deploy, regions_to_remove) |
178 |
| - |
179 |
| - # Merge skipped actions from prediction into action_results |
180 |
| - action_results.setdefault("skipped", []).extend(skipped_in_prediction) |
181 |
| - |
182 |
| - # Add current and updated deployment state to the results |
183 |
| - action_results["current_deployment"] = current_regions |
184 |
| - action_results["updated_deployment"] = updated_regions |
185 |
| - |
186 |
| - return action_results |
| 186 | + try: |
| 187 | + action_results = auto_placer.process_traffic_data() |
| 188 | + logger.info(f"Auto-placer execution completed. Results: {action_results}") |
| 189 | + return action_results |
| 190 | + except Exception as e: |
| 191 | + logger.error(f"Error during auto-placer execution: {e}", exc_info=True) |
| 192 | + raise |
| 193 | + |
| 194 | +if __name__ == "__main__": |
| 195 | + main() |
0 commit comments