Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added deepspeed/monitor/__init__.py
Empty file.
50 changes: 50 additions & 0 deletions deepspeed/monitor/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""
Copyright (c) Microsoft Corporation
Licensed under the MIT license.
"""

from typing import Optional
from deepspeed.runtime.config_utils import get_scalar_param
from pydantic import BaseModel, validator, ValidationError, create_model
from .constants import *


class MonitorConfig(BaseModel):
class Config:
validate_all = True
validate_assignment = True
use_enum_values = True
extra = 'forbid'


class TensorBoardConfig(MonitorConfig):
enabled: bool = TENSORBOARD_ENABLED_DEFAULT
output_path: str = TENSORBOARD_OUTPUT_PATH_DEFAULT
job_name: str = TENSORBOARD_JOB_NAME_DEFAULT


class WandbConfig(MonitorConfig):
enabled: bool = WANDB_ENABLED_DEFAULT
group: str = WANDB_GROUP_NAME_DEFAULT
team: str = WANDB_TEAM_NAME_DEFAULT
project: str = WANDB_PROJECT_NAME_DEFAULT


class CSVConfig(MonitorConfig):
enabled: bool = CSV_MONITOR_ENABLED_DEFAULT
output_path: str = CSV_MONITOR_OUTPUT_PATH_DEFAULT
job_name: str = CSV_MONITOR_JOB_NAME_DEFAULT


class DeepSpeedMonitorConfig:
def __init__(self, ds_config):
self.tensorboard_enabled = 'tensorboard' in ds_config
self.wandb_enabled = 'wandb' in ds_config
self.csv_monitor_enabled = 'csv_monitor' in ds_config

if self.tensorboard_enabled:
self.tensorboard_config = TensorBoardConfig(**ds_config['tensorboard'])
if self.wandb_enabled:
self.wandb_config = WandbConfig(**ds_config['wandb'])
if self.csv_monitor_enabled:
self.csv_monitor_config = CSVConfig(**ds_config['csv_monitor'])
85 changes: 85 additions & 0 deletions deepspeed/monitor/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#########################################
# Tensorboard
#########################################
# Tensorboard. By default, this feature is not enabled.
# Users can configure in ds_config.json as below example:
TENSORBOARD_FORMAT = '''
Tensorboard can be specified as:
"tensorboard": {
"enabled": true,
"output_path": "/home/myname/foo",
"job_name": "model_lr2e-5_epoch3_seed2_seq64"
}
'''
TENSORBOARD = "tensorboard"

# Tensorboard enable signal
TENSORBOARD_ENABLED = "enabled"
TENSORBOARD_ENABLED_DEFAULT = False

# Tensorboard output path
TENSORBOARD_OUTPUT_PATH = "output_path"
TENSORBOARD_OUTPUT_PATH_DEFAULT = ""

# Tensorboard job name
TENSORBOARD_JOB_NAME = "job_name"
TENSORBOARD_JOB_NAME_DEFAULT = "DeepSpeedJobName"

#########################################
# Wandb
#########################################
# Wandb. By default, this feature is not enabled.
# Users can configure in ds_config.json as below example:
WANDB_FORMAT = '''
Wandb can be specified as:
"wandb": {
"enabled": true,
"team_name": "deepspeed"
"project_name": "zero"
"group_name": "zero: stage 3",
}
'''
WANDB = "wandb"

# Wandb enable signal
WANDB_ENABLED = "enabled"
WANDB_ENABLED_DEFAULT = False

# Wandb team
WANDB_TEAM_NAME = "team"
WANDB_TEAM_NAME_DEFAULT = None

# Wandb project
WANDB_PROJECT_NAME = "project"
WANDB_PROJECT_NAME_DEFAULT = "deepspeed"

# Wandb group
WANDB_GROUP_NAME = "group"
WANDB_GROUP_NAME_DEFAULT = None

#########################################
# csv monitor
#########################################
# Basic CSV monitor. By default, this feature is not enabled.
# Users can configure in ds_config.json as below example:
CSV_FORMAT = '''
The basic csv monitor can be specified as:
"csv_monitor": {
"enabled": true,
"output_path": "/home/myname/foo",
"job_name": "model_lr2e-5_epoch3_seed2_seq64"
}
'''
CSV_MONITOR = "csv_monitor"

# csv monitor enable signal
CSV_MONITOR_ENABLED = "enabled"
CSV_MONITOR_ENABLED_DEFAULT = False

# csv monitor output path
CSV_MONITOR_OUTPUT_PATH = "output_path"
CSV_MONITOR_OUTPUT_PATH_DEFAULT = ""

# csv_monitor job name
CSV_MONITOR_JOB_NAME = "job_name"
CSV_MONITOR_JOB_NAME_DEFAULT = "DeepSpeedJobName"
62 changes: 62 additions & 0 deletions deepspeed/monitor/csv_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from .monitor import Monitor
import os

import deepspeed.comm as dist


class csvMonitor(Monitor):
def __init__(self, monitor_config):
super().__init__(monitor_config)
import csv
self.filenames = []
self.enabled = monitor_config.csv_monitor_config.enabled
self.output_path = monitor_config.csv_monitor_config.output_path
self.job_name = monitor_config.csv_monitor_config.job_name
self.log_dir = self.setup_log_dir()

def setup_log_dir(self, base=os.path.join(os.path.expanduser("~"), "csv_monitor")):
if self.enabled and dist.get_rank() == 0:
if self.output_path is not None:
log_dir = os.path.join(self.output_path, self.job_name)
# NOTE: This code path currently is never used since the default tensorboard_output_path is an empty string and not None. Saving it in case we want this functionality in the future.
else:
if "DLWS_JOB_ID" in os.environ:
infra_job_id = os.environ["DLWS_JOB_ID"]
elif "DLTS_JOB_ID" in os.environ:
infra_job_id = os.environ["DLTS_JOB_ID"]
else:
infra_job_id = "unknown-job-id"

csv_monitor_dir_name = os.path.join(infra_job_id, "logs")
log_dir = os.path.join(base, csv_monitor_dir_name, self.job_name)
os.makedirs(log_dir, exist_ok=True)
return log_dir

def write_events(self, event_list):
if self.enabled and dist.get_rank() == 0:
import csv
# We assume each event_list element is a tensorboard-style tuple in the format: (log_name: String, value, step: Int)
for event in event_list:
log_name = event[0]
value = event[1]
step = event[2]

# Set the header to the log_name
# Need this check because the deepspeed engine currently formats log strings to separate with '/'
if '/' in log_name:
record_splits = log_name.split('/')
header = record_splits[len(record_splits) - 1]
else:
header = log_name

# sanitize common naming conventions into filename
filename = log_name.replace('/', '_').replace(' ', '_')
fname = self.log_dir + '/' + filename + '.csv'

# Open file and record event. Insert header if this is the first time writing
with open(fname, 'a+') as csv_monitor_file:
csv_monitor_writer = csv.writer(csv_monitor_file)
if filename not in self.filenames:
self.filenames.append(filename)
csv_monitor_writer.writerow(['step', header])
csv_monitor_writer.writerow([step, value])
47 changes: 47 additions & 0 deletions deepspeed/monitor/monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""
Support different forms of monitoring such as wandb and tensorboard
"""

from abc import ABC, abstractmethod
import deepspeed.comm as dist


class Monitor(ABC):
@abstractmethod
def __init__(self, monitor_config):
self.monitor_config = monitor_config

@abstractmethod
def write_events(self, event_list):
pass


from .wandb import WandbMonitor
from .tensorboard import TensorBoardMonitor
from .csv_monitor import csvMonitor


class MonitorMaster(Monitor):
def __init__(self, monitor_config):
super().__init__(monitor_config)
self.tb_monitor = None
self.wandb_monitor = None
self.csv_monitor = None
self.enabled = monitor_config.tensorboard_enabled or monitor_config.csv_monitor_enabled or monitor_config.wandb_enabled

if dist.get_rank() == 0:
if monitor_config.tensorboard_enabled:
self.tb_monitor = TensorBoardMonitor(monitor_config)
if monitor_config.wandb_enabled:
self.wandb_monitor = WandbMonitor(monitor_config)
if monitor_config.csv_monitor_enabled:
self.csv_monitor = csvMonitor(monitor_config)

def write_events(self, event_list):
if dist.get_rank() == 0:
if self.tb_monitor is not None:
self.tb_monitor.write_events(event_list)
if self.wandb_monitor is not None:
self.wandb_monitor.write_events(event_list)
if self.csv_monitor is not None:
self.csv_monitor.write_events(event_list)
52 changes: 52 additions & 0 deletions deepspeed/monitor/tensorboard.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from .utils import check_tb_availability
from .monitor import Monitor
import os

import deepspeed.comm as dist


class TensorBoardMonitor(Monitor):
def __init__(self, monitor_config):
super().__init__(monitor_config)
check_tb_availability()

self.summary_writer = None
self.enabled = monitor_config.tensorboard_config.enabled
self.output_path = monitor_config.tensorboard_config.output_path
self.job_name = monitor_config.tensorboard_config.job_name

if self.enabled and dist.get_rank() == 0:
self.get_summary_writer()

def get_summary_writer(self,
base=os.path.join(os.path.expanduser("~"),
"tensorboard")):
if self.enabled and dist.get_rank() == 0:
from torch.utils.tensorboard import SummaryWriter
if self.output_path is not None:
log_dir = os.path.join(self.output_path, self.job_name)
# NOTE: This code path currently is never used since the default output_path is an empty string and not None. Saving it in case we want this functionality in the future.
else:
if "DLWS_JOB_ID" in os.environ:
infra_job_id = os.environ["DLWS_JOB_ID"]
elif "DLTS_JOB_ID" in os.environ:
infra_job_id = os.environ["DLTS_JOB_ID"]
else:
infra_job_id = "unknown-job-id"

summary_writer_dir_name = os.path.join(infra_job_id, "logs")
log_dir = os.path.join(base, summary_writer_dir_name, self.output_path)
os.makedirs(log_dir, exist_ok=True)
self.summary_writer = SummaryWriter(log_dir=log_dir)
return self.summary_writer

def write_events(self, event_list, flush=True):
if self.enabled and self.summary_writer is not None and dist.get_rank() == 0:
for event in event_list:
self.summary_writer.add_scalar(*event)
if flush:
self.summary_writer.flush()

def flush(self):
if self.enabled and self.summary_writer is not None and dist.get_rank() == 0:
self.summary_writer.flush()
18 changes: 18 additions & 0 deletions deepspeed/monitor/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
def check_tb_availability():
try:
# torch.utils.tensorboard will fail if `tensorboard` is not available,
# see their docs for more details: https://pytorch.org/docs/1.8.0/tensorboard.html
import tensorboard
except ImportError:
print('If you want to use tensorboard logging, please `pip install tensorboard`')
raise


def check_wandb_availability():
try:
import wandb
except ImportError:
print(
'If you want to use wandb logging, please `pip install wandb` and follow the instructions at https://docs.wandb.ai/quickstart'
)
raise
32 changes: 32 additions & 0 deletions deepspeed/monitor/wandb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from .utils import check_wandb_availability
from .monitor import Monitor

import deepspeed.comm as dist


class WandbMonitor(Monitor):
def __init__(self, monitor_config):
super().__init__(monitor_config)
check_wandb_availability()
import wandb

self.enabled = monitor_config.wandb_config.enabled
self.group = monitor_config.wandb_config.group
self.team = monitor_config.wandb_config.team
self.project = monitor_config.wandb_config.project

if self.enabled and dist.get_rank() == 0:
wandb.init(project=self.project, group=self.group, entity=self.team)

def log(self, data, step=None, commit=None, sync=None):
if self.enabled and dist.get_rank() == 0:
import wandb
return wandb.log(data, step=step, commit=commit, sync=sync)

def write_events(self, event_list):
if self.enabled and dist.get_rank() == 0:
for event in event_list:
label = event[0]
value = event[1]
step = event[2]
self.log({label: value}, step=step)
Loading