Skip to content

Commit

Permalink
wait until threads complete and max threads control
Browse files Browse the repository at this point in the history
  • Loading branch information
james-jdgtl committed Mar 3, 2025
1 parent cd2281c commit 5758081
Showing 1 changed file with 52 additions and 31 deletions.
83 changes: 52 additions & 31 deletions health_ping.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import datetime, timezone
import os
import threading
from concurrent.futures import ThreadPoolExecutor
import logging
import requests
import json
Expand All @@ -13,7 +14,11 @@
import http.server
import socketserver
import github
import psutil

process = psutil.Process(os.getpid())

max_threads = os.getenv('MAX_THREADS', 200)
sc_api_endpoint = os.getenv('SERVICE_CATALOGUE_API_ENDPOINT')
sc_api_token = os.getenv('SERVICE_CATALOGUE_API_KEY')
redis_host = os.getenv('REDIS_ENDPOINT')
Expand Down Expand Up @@ -98,6 +103,7 @@ def git_compare_commits(github_repo, from_sha, to_sha):


def update_app_version(app_version, c_name, e_name, github_repo):
log.debug(f'Starting update_app_version for {c_name}-{e_name}')
version_key = f'version:{c_name}:{e_name}'
version_data = {'v': app_version, 'dateAdded': datetime.now(timezone.utc).isoformat()}
try:
Expand Down Expand Up @@ -134,23 +140,28 @@ def update_app_version(app_version, c_name, e_name, github_repo):
version_data.update({'git_compare': json.dumps(commits)})
redis.xadd(version_key, version_data, maxlen=200, approximate=False)
log.info(
f'Updating redis stream with new version. {version_key} = {version_data}'
f'Updated redis stream with new version. {version_key} = {version_data}'
)
else:
# Must be first time entry to version redis stream
redis.xadd(version_key, version_data, maxlen=200, approximate=False)
log.debug(f'First version entry = {version_key}:{version_data}')
log.debug(f'Adding first entry to version: {version_key} = {version_data}')
return

# Always update the latest version key
redis.json().set('latest:versions', f'$.{version_key}', version_data)
log.info(f'Updating redis key with latest version. {version_key} = {version_data}')
log.info(f'Updated redis key with latest version. {version_key} = {version_data}')

except Exception as e:
log.error(e)
log.debug(f'Completed update_app_version for {c_name}-{e_name}')


def process_env(c_name, e_id, endpoint, endpoint_type, component, env_attributes):
log.debug(
f'Starting process_env for {c_name}-{env_attributes.get("name")}:{endpoint_type}'
)
log.debug(f'Memory usage: {process.memory_info().rss / 1024**2} MB')
output = {}
# Redis key to use for stream
e_name = env_attributes['name']
Expand All @@ -176,7 +187,7 @@ def process_env(c_name, e_id, endpoint, endpoint_type, component, env_attributes
log.error(e)

stream_data.update({'http_s': r.status_code})
log.info(f'{r.status_code}: {endpoint}')
log.info(f'{c_name}-{e_type} response: {r.status_code}: {endpoint}')

except requests.exceptions.RequestException as e:
# Set status code to 0 for failed connections
Expand Down Expand Up @@ -235,6 +246,10 @@ def process_env(c_name, e_id, endpoint, endpoint_type, component, env_attributes
except Exception as e:
log.error(f'Unable to add data to redis stream. {e}')

log.debug(
f'Completed process_env for {c_name}-{env_attributes.get("name")}:{endpoint_type}'
)


class HealthHttpRequestHandler(http.server.SimpleHTTPRequestHandler):
def do_GET(self):
Expand All @@ -257,10 +272,11 @@ def startHttpServer():
)
log = logging.getLogger(__name__)

threads = list()
main_threads = list()
http_thread = list()
# Start health endpoint.
httpHealth = threading.Thread(target=startHttpServer, daemon=True)
threads.append(httpHealth)
http_thread.append(httpHealth)
httpHealth.start()

# Test connection to redis
Expand Down Expand Up @@ -322,7 +338,9 @@ def startHttpServer():
sc_endpoint = f'{sc_api_endpoint}/v1/components?populate=envs{sc_api_filter}'

while True:
log.info(sc_endpoint)
log.info(
f'Starting a new run. Service Catalogue endpoint: {sc_endpoint}. Current memory usage: {process.memory_info().rss / 1024**2} MB'
)
try:
r = requests.get(sc_endpoint, headers=sc_api_headers, timeout=20)
log.debug(r)
Expand All @@ -340,33 +358,36 @@ def startHttpServer():
c_name = component['attributes']['name']
env_attributes = env['attributes']
e_id = env['id']
if (env_attributes['url']) and (env_attributes['monitor'] == True):
if env_attributes['health_path']:
endpoint = f'{env_attributes["url"]}{env_attributes["health_path"]}'
endpoint_type = 'health'
t_health = threading.Thread(
target=process_env,
args=(c_name, e_id, endpoint, endpoint_type, component, env_attributes),
daemon=True,
)
threads.append(t_health)
t_health.start()
log.info(f'Started thread for {c_name}:{endpoint_type}')
if env_attributes['info_path']:
endpoint = f'{env_attributes["url"]}{env_attributes["info_path"]}'
endpoint_type = 'info'
t_info = threading.Thread(
target=process_env,
args=(c_name, e_id, endpoint, endpoint_type, component, env_attributes),
daemon=True,
)
threads.append(t_info)
t_info.start()
log.info(f'Started thread for {c_name}:{endpoint_type}')
if env_attributes.get('url') and env_attributes.get('monitor'):
attributes = [('health_path', 'health'), ('info_path', 'info')]
for each_attribute in attributes:
if endpoint_uri := env_attributes.get(each_attribute[0]):
endpoint = f'{env_attributes["url"]}{endpoint_uri}'
endpoint_type = each_attribute[1]
thread = threading.Thread(
target=process_env,
args=(c_name, e_id, endpoint, endpoint_type, component, env_attributes),
daemon=True,
)
main_threads.append(thread)
# Apply limit on total active threads, avoid github secondary API rate limit
while threading.active_count() > (max_threads - 1):
log.info(
f'Active Threads={threading.active_count()}, Max Threads={max_threads} - backing off for a few seconds'
)
sleep(3)
thread.start()
log.info(
f'Started thread for {c_name}-{env_attributes.get("name")}:{endpoint_type} (active threads: {threading.active_count()})'
)
else:
continue
log.debug(f'Active threads: {threading.active_count()}')

# Allow the threads to finish before sleeping
threads.join()
for thread in main_threads:
thread.join()
log.info(
f'Completed all threads. Sleeping for {refresh_interval} seconds. Current memory usage: {process.memory_info().rss / 1024**2} MB.'
)
sleep(refresh_interval)

0 comments on commit 5758081

Please sign in to comment.