Skip to content

Commit

Permalink
Support for the carbon, file transfer node, and new sla added
Browse files Browse the repository at this point in the history
  • Loading branch information
jgoldverg committed Nov 12, 2024
1 parent e422905 commit 0fbd089
Show file tree
Hide file tree
Showing 10 changed files with 434 additions and 36 deletions.
2 changes: 1 addition & 1 deletion _version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.1.4"
__version__ = "1.1.5"
6 changes: 4 additions & 2 deletions odscli/ods_cli.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import click
from odscli.sdk.schedule_job import schedule_cli
from odscli.sdk.scheduler import schedule_cli
from odscli.sdk.credential import credential_cli
from odscli.sdk.endpoint_management import endpoint_cli
from odscli.sdk.query_transfer_data import query_cli
from odscli.sdk.login import auth_cli
from odscli.sdk.measure import measure_cli
from odscli.sdk.ftn_nodes import nodes_cli
from odscli.sdk.carbon_scheduler import carbon_cli

odscli = click.CommandCollection(sources=[schedule_cli, credential_cli, endpoint_cli, query_cli, auth_cli, measure_cli])
odscli = click.CommandCollection(sources=[schedule_cli, credential_cli, endpoint_cli, query_cli, auth_cli, measure_cli, nodes_cli, carbon_cli])

def main():
odscli()
154 changes: 154 additions & 0 deletions odscli/sdk/carbon_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import json
from datetime import datetime
from inspect import trace

from rich.columns import Columns
from rich.panel import Panel

from odscli.sdk import token_utils
import odscli.sdk.constants as constants
import requests
import click
from rich.console import Console
import odscli.sdk.carbon_scheduler_gui as scheduler_gui

console = Console()


@click.group('carbon_cli')
@click.pass_context
def carbon_cli():
pass


@carbon_cli.group('carbon')
def carbon():
pass


@carbon.command('entries', help="List the measurements for a job measured on a given node")
@click.argument('job_uuid', type=click.UUID)
@click.argument('transfer_node_name', type=click.STRING)
@click.option('--save_to_file', type=click.Path(), default=None, help="File path to save the output. Format is in json")
def queryCarbonEntries(job_uuid, transfer_node_name, save_to_file):
host, user, token = token_utils.readConfig()
uri = constants.ODS_PROTOCOL + host + constants.CARBON_API + constants.CARBON_NODE_AND_JOB.format(
transferNodeName=transfer_node_name, jobUuid=job_uuid)
cookies = dict(ATOKEN=token)
headers = {"Authorization": "Bearer " + token + ""}
data = send_request(uri=uri, cookies=cookies, headers=headers, params={})
if data is None: return

if save_to_file:
save_data_to_file(data, save_to_file)
scheduler_gui.buildMainCarbonTable(data, console)


@carbon.command('user', help="List carbon measurements for a user across all jobs and nodes")
@click.option('--save_to_file', type=click.Path(), default=None, help="File path to save the output. Format is in json")
def getCarbonEntriesForUser(save_to_file):
host, user, token = token_utils.readConfig()
uri = constants.ODS_PROTOCOL + host + constants.CARBON_API + constants.CARBON_USER
cookies = dict(ATOKEN=token)
headers = {"Authorization": "Bearer " + token + ""}

data = send_request(uri, cookies, headers, {})
if data is None: return

if save_to_file:
save_data_to_file(data, save_to_file)

data_sorted = sorted(data, key=lambda job: datetime.fromisoformat(job["timeMeasuredAt"]))
scheduler_gui.buildMainCarbonTable(data_sorted, console)


@carbon.command("job", help="List all carbon measurements for a job")
@click.argument('job_uuid', type=click.UUID)
@click.option('--save_to_file', type=click.Path(), default=None, help="File path to save the output. Format is in json")
def job_measurements(job_uuid, save_to_file):
host, user, token = token_utils.readConfig()
uri = constants.ODS_PROTOCOL + host + constants.CARBON_API + constants.CARBON_JOB.format(jobUuid=job_uuid)
cookies = dict(ATOKEN=token)
headers = {"Authorization": "Bearer " + token + ""}
data = send_request(uri, cookies, headers, params={})
print(data)
if data is None: return
if save_to_file: save_data_to_file(data, save_to_file)

scheduler_gui.buildMainCarbonTable(data, console)


@carbon.command("latest", help="Get latest carbon measurement for scheduled job")
@click.argument('job_uuid', type=click.UUID)
@click.option('--save_to_file', type=click.Path(), default=None, help="File path to save the output. Format is in json")
def query_latest_job_measurement(job_uuid, save_to_file):
host, user, token = token_utils.readConfig()
uri = constants.ODS_PROTOCOL + host + constants.CARBON_API + constants.CARBON_LATEST.format(jobUuid=job_uuid)
cookies = dict(ATOKEN=token)
headers = {"Authorization": "Bearer " + token + ""}
data = send_request(uri, cookies, headers, params={})
if data is None: return
if save_to_file: save_data_to_file(data, save_to_file)

trace_route_table = scheduler_gui.buildTraceRouteTable(data['transferNodeName'], data['jobUuid'],
data['timeMeasuredAt'], data['traceRouteCarbon'])
console.print(trace_route_table)


@carbon.command("node", help="Get the carbon measurements produced by a Node")
@click.argument('transfer_node_name', type=click.STRING)
@click.option('--save_to_file', type=click.Path(), default=None, help="File path to save the output. Format is in json")
def query_measurements_made_by_node(transfer_node_name, save_to_file):
host, user, token = token_utils.readConfig()
uri = constants.ODS_PROTOCOL + host + constants.CARBON_API + constants.CARBON_NODE.format(
transferNodeName=transfer_node_name)
cookies = dict(ATOKEN=token)
headers = {"Authorization": "Bearer " + token + ""}
data = send_request(uri, cookies, headers, params={})
if data is None: return
if save_to_file: save_data_to_file(data, save_to_file)

scheduler_gui.buildMainCarbonTable(data, console)


@carbon.command("result",
help="The initial and final measurement for a scheduled job. Allows to compare the carbon intensity of the job that caused it to launch")
@click.argument('job_uuid', type=click.UUID)
@click.option('--save_to_file', type=click.Path(), default=None, help="File path to save the output. Format is in json")
def query_result(job_uuid, save_to_file):
host, user, token = token_utils.readConfig()
uri = constants.ODS_PROTOCOL + host + constants.CARBON_API + constants.CARBON_RESULT.format(job_uuid=job_uuid)
cookies = dict(ATOKEN=token)
headers = {"Authorization": "Bearer " + token + ""}
data = send_request(uri, cookies, headers, params={})
if data is None: return
if save_to_file: save_data_to_file(data, save_to_file)
start_json = data['start']
end_json = data['end']
start_table = scheduler_gui.buildTraceRouteTable(transferNodeName=start_json['transferNodeName'],
jobUuid=start_json['jobUuid'],
timeMeasuredAt=start_json['timeMeasuredAt'],
trace_route_data=start_json['traceRouteCarbon'])
end_table = scheduler_gui.buildTraceRouteTable(transferNodeName=end_json['transferNodeName'],
jobUuid=end_json['jobUuid'],
timeMeasuredAt=end_json['timeMeasuredAt'],
trace_route_data=end_json['traceRouteCarbon'])
columns = Columns([Panel(start_table, title="Initial Job Measurement"),
Panel(end_table, title="Last Job Measurement Before Job Execution")])
console.print(columns)


def send_request(uri, cookies, headers, params):
try:
console.print(f"Sending request: {uri}")
resp = requests.get(uri, cookies=cookies, headers=headers, params=params)
return resp.json()
except requests.RequestException as e:
console.print(f"[red]Error fetching data: {e.errno} ")
return None


def save_data_to_file(data, file_path):
with open(file_path, 'w') as file:
json.dump(data, file, indent=2)
console.print(f"[green]Data saved to {file_path}[/green]")
67 changes: 67 additions & 0 deletions odscli/sdk/carbon_scheduler_gui.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from rich.columns import Columns
from rich.table import Table
from rich.panel import Panel
from rich.console import Console


def buildMainCarbonTable(data, console:Console):
terminal_width = console.width

# Estimate width per subtable (you can adjust this based on actual subtable width)
subtable_width_estimate = 50 # Approximate width per subtable

# Calculate the number of columns per row based on terminal width
columns_per_row = max(1, terminal_width // subtable_width_estimate)

# Create the main table (one row of subtables)
main_table = Table(title="Carbon Intensity Report by Job", show_lines=True)

# Add columns to the main table based on the calculated columns per row
for i in range(columns_per_row):
main_table.add_column(f"Subtable {i + 1}", style="white")

# Populate the main table with subtables in each row
row = []
for idx, job in enumerate(data):
# Build the subtable (TraceRoute table)
trace_route_table = buildTraceRouteTable(
job["transferNodeName"],
job["jobUuid"],
job["timeMeasuredAt"],
job["traceRouteCarbon"]
)

# Convert the trace_route_table to a Panel
trace_route_panel = Panel(trace_route_table, border_style="bold blue", title="Carbon Trace Route")

# Add the rendered panel (subtable) to the row
row.append(trace_route_panel)

# Once we've filled the row (columns_per_row), add it to the main table and reset the row for the next batch
if (idx + 1) % columns_per_row == 0 or (idx + 1) == len(data):
# Create columns layout from the current row of panels
columns_layout = Columns(row)
console.print(columns_layout)
row = []


def buildTraceRouteTable(transferNodeName, jobUuid, timeMeasuredAt, trace_route_data):
# Create a nested table for trace route details
title = f"Transfer Node: {transferNodeName} - Job Uuid: {jobUuid} - Measured At: {timeMeasuredAt}"
trace_route_table = Table(title=title, show_header=True, header_style="bold blue")
trace_route_table.add_column("Index", justify="right", style="cyan")
trace_route_table.add_column("IP", style="cyan")
trace_route_table.add_column("Carbon Intensity", justify="right", style="green")
trace_route_table.add_column("Latitude", justify="right")
trace_route_table.add_column("Longitude", justify="right")

# Populate nested table with each hop
for idx, hop in enumerate(trace_route_data, start=1):
trace_route_table.add_row(
str(idx),
hop["ip"],
str(hop["carbonIntensity"]),
str(hop["lat"]),
str(hop["lon"])
)
return trace_route_table
21 changes: 21 additions & 0 deletions odscli/sdk/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,24 @@
CRED_OAUTH_REGISTERV2 = "/api/oauth"
CRED_ACCOUNT_GETV2 = "/endpoint-cred/{userId}/{type}"
CRED_ACCOUNTID_GETV2 = "/endpoint-cred/{userId}/{type}/{accountId}"

NODE_LIST_CONNECTORS = "/api/nodes/{user}"
NODE_LIST_ODS = "/api/nodes/ods"
NODE_COUNT = "/api/nodes/count"

CARBON_API = "/api/carbon"
CARBON_NODE_AND_JOB = "/query/{transferNodeName}/{jobUuid}"
CARBON_USER = "/user"
CARBON_JOB = "/job/{jobUuid}"
CARBON_NODE = "/node/{transferNodeName}"
CARBON_LATEST = "/latest/{jobUuid}"
CARBON_RESULT = "/result/{job_uuid}"


def human_readable_size(size_in_bytes):
"""Convert bytes to a human-readable string with appropriate units."""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size_in_bytes < 1024:
return f"{size_in_bytes:.2f} {unit}"
size_in_bytes /= 1024
return f"{size_in_bytes:.2f} PB" # Handles very large sizes
69 changes: 69 additions & 0 deletions odscli/sdk/ftn_nodes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import click
import requests
from rich.console import Console
from rich.table import Table
from rich import print

from odscli.sdk import token_utils, constants

console = Console()


@click.group('nodes_cli')
@click.pass_context
def nodes_cli():
pass


@nodes_cli.group('nodes')
def nodes():
pass


@nodes.command("ls")
@click.argument('type', type=click.Choice(['ods', 'connectors']))
def ls_connectors(type):
host, user, token = token_utils.readConfig()
if type == 'connectors':
req = constants.ODS_PROTOCOL + host + constants.NODE_LIST_CONNECTORS
req = req.format(user=user)
node_table_name = "{user} ODS Connectors".format(user=user)
else:
req = constants.ODS_PROTOCOL + host + constants.NODE_LIST_ODS
node_table_name = "ODS File Transfer Nodes"

cookies = dict(ATOKEN=token)
res = requests.get(req, cookies=cookies)
node_table = build_node_table(node_table_name)
for entry in res.json():
odsOwner = entry['odsOwner']
nodeName = entry['nodeName']
nodeUuid = entry['nodeUuid']
runningJob = entry['runningJob']
online = entry['online']
jobId = entry['jobId']
jobUuid = entry['jobUuid']
node_table.add_row(odsOwner, nodeName, nodeUuid, str(runningJob), str(online), str(jobId), jobUuid)

console.print(node_table)


@nodes.command("count")
def ls_node_count():
host, user, token = token_utils.readConfig()
req = constants.ODS_PROTOCOL + host + constants.NODE_COUNT
cookies = dict(ATOKEN=token)
res = requests.get(req, cookies=cookies)
print(res.json())


def build_node_table(name):
node_table = Table(title=name)
node_table.add_column("ODS Owner")
node_table.add_column("Node Name")
node_table.add_column("Node UUID")
node_table.add_column("Running Job")
node_table.add_column("Online")
node_table.add_column("Job Id")
node_table.add_column("Job UUID")
return node_table
2 changes: 1 addition & 1 deletion odscli/sdk/meta_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def query_all_jobs_ids(self):

def query_job_ids_direct(self, transfer_url):
# http://localhost:8092
hostStr = transfer_url + "/api/v1/job/ids"
hostStr = transfer_url + "/api/v1/job/id"
r = requests.get(hostStr)
return r.json()

Expand Down
Loading

0 comments on commit 0fbd089

Please sign in to comment.