From 8f770f4a410ff0ce122b0f6bf774566f167dda30 Mon Sep 17 00:00:00 2001 From: Shreya Rajpal Date: Tue, 15 Mar 2022 14:40:20 -0700 Subject: [PATCH] Added lightweight benchmarking --- benchmarks/configs/ludwig/criteo.yaml | 146 +++++++++++++++++++++ benchmarks/configs/ray/cluster_config.yaml | 65 +++++++++ benchmarks/scripts/train_criteo.py | 141 ++++++++++++++++++++ 3 files changed, 352 insertions(+) create mode 100644 benchmarks/configs/ludwig/criteo.yaml create mode 100644 benchmarks/configs/ray/cluster_config.yaml create mode 100644 benchmarks/scripts/train_criteo.py diff --git a/benchmarks/configs/ludwig/criteo.yaml b/benchmarks/configs/ludwig/criteo.yaml new file mode 100644 index 0000000..ef26642 --- /dev/null +++ b/benchmarks/configs/ludwig/criteo.yaml @@ -0,0 +1,146 @@ +input_features: + - + name: numerical_1 + type: numerical + - + name: numerical_2 + type: numerical + - + name: numerical_3 + type: numerical + - + name: numerical_4 + type: numerical + - + name: numerical_5 + type: numerical + - + name: numerical_6 + type: numerical + - + name: numerical_7 + type: numerical + - + name: numerical_8 + type: numerical + - + name: numerical_9 + type: numerical + - + name: numerical_10 + type: numerical + - + name: numerical_11 + type: numerical + - + name: numerical_12 + type: numerical + - + name: numerical_13 + type: numerical + - + name: categorical_1 + type: category + - + name: categorical_2 + type: category + - + name: categorical_3 + type: category + - + name: categorical_4 + type: category + - + name: categorical_5 + type: category + - + name: categorical_6 + type: category + - + name: categorical_7 + type: category + - + name: categorical_8 + type: category + - + name: categorical_9 + type: category + - + name: categorical_10 + type: category + - + name: categorical_11 + type: category + - + name: categorical_12 + type: category + - + name: categorical_13 + type: category + - + name: categorical_14 + type: category + - + name: categorical_15 + type: category + - + name: categorical_16 + type: category + - + name: categorical_17 + type: category + - + name: categorical_18 + type: category + - + name: categorical_19 + type: category + - + name: categorical_20 + type: category + - + name: categorical_21 + type: category + - + name: categorical_22 + type: category + - + name: categorical_23 + type: category + - + name: categorical_24 + type: category + - + name: categorical_25 + type: category + - + name: categorical_26 + type: category +output_features: + - + name: label + type: binary +combiner: + type: tabnet + bn_momentum: 0.95 + bn_virtual_bs: 1024 + dropout: 0.05252744300130521 + fc_size: 128 + num_fc_layers: 3 + num_steps: 3 + output_size: 128 + relaxation_factor: 1.5 + size: 32 + sparsity: 0.0001 +trainer: + batch_size: 32768 +backend: + type: ray + processor: + type: modin + # cache_dir: s3://ludwig-cache.us-west-2.predibase.com/criteo + trainer: + num_workers: 4 + resources_per_worker: + GPU: 1 +dataset_size: 10GB \ No newline at end of file diff --git a/benchmarks/configs/ray/cluster_config.yaml b/benchmarks/configs/ray/cluster_config.yaml new file mode 100644 index 0000000..4709460 --- /dev/null +++ b/benchmarks/configs/ray/cluster_config.yaml @@ -0,0 +1,65 @@ +cluster_name: shreya-ludwig-ray-c5_9xlarge + +max_workers: 3 + +docker: + image: "ludwigai/ludwig-ray:master" + # image: "ludwigai/ludwig-ray:sha-1db913b" + container_name: "ray_container" + pull_before_run: True + run_options: # Extra options to pass into "docker run" + - --ulimit nofile=65536:65536 + - --cap-add SYS_PTRACE + +provider: + type: aws + region: us-west-2 + availability_zone: us-west-2a + cache_stopped_nodes: False + +available_node_types: + ray.head.default: + resources: {} + node_config: + InstanceType: c5.9xlarge + ImageId: latest_dlami + BlockDeviceMappings: + - DeviceName: /dev/sda1 + Ebs: + VolumeSize: 200 + ray.worker.default: + min_workers: 0 + max_workers: 0 + resources: {} + node_config: + InstanceType: c5.9xlarge + ImageId: latest_dlami + +head_node_type: ray.head.default + +file_mounts: { + /home/ubuntu/ludwig/: /Users/shreyarajpal/Predibase/predibase/ludwig, + /home/ubuntu/benchmarks/: /Users/shreyarajpal/Predibase/benchmarks, + /home/ray/.aws: /Users/shreyarajpal/.aws, +} + +rsync_exclude: + - "**/.git" + - "**/.git/**" + +rsync_filter: + - ".gitignore" + +setup_commands: + - pip uninstall -y ludwig && pip install -e /home/ubuntu/ludwig/. + - pip install s3fs==2021.10.0 aiobotocore==1.4.2 boto3==1.17.106 + - pip install pandas==1.1.4 + - pip install hydra-core --upgrade + +head_start_ray_commands: + - ray stop --force + - ray start --head --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml + +worker_start_ray_commands: + - ray stop --force + - ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076 diff --git a/benchmarks/scripts/train_criteo.py b/benchmarks/scripts/train_criteo.py new file mode 100644 index 0000000..270beed --- /dev/null +++ b/benchmarks/scripts/train_criteo.py @@ -0,0 +1,141 @@ +import contextlib +import logging +import threading +import time + +import dask.dataframe as dd +import hydra +from omegaconf import DictConfig, OmegaConf + +from ludwig.api import LudwigModel +# from ludwig.backend import get_local_backend +from ludwig.callbacks import Callback + +logger = logging.getLogger(__name__) + + +MODE = 'ray' +CRITEO_DATASET_PATH = 's3://datasets.us-west-2.predibase.com/criteo/10GB.parquet' +DATASETS_MAP = { + '100MB': 's3://datasets.us-west-2.predibase.com/criteo/all.parquet/part.0.parquet', + '1GB': 's3://datasets.us-west-2.predibase.com/criteo/1GB.parquet', + '10GB': 's3://datasets.us-west-2.predibase.com/criteo/10GB.parquet', + '100GB': 's3://datasets.us-west-2.predibase.com/criteo/100GB.parquet', + '1TB': 's3://datasets.us-west-2.predibase.com/criteo/all.parquet', +} + + +if MODE == 'local': + CONFIG_DIR = '/Users/shreyarajpal/Predibase/benchmarks/configs/ludwig' + experiment_name = 'criteo_local' + # backend_config = {'type': get_local_backend()} +elif MODE == 'ray': + CONFIG_DIR = '/home/ubuntu/benchmarks/configs/ludwig' + experiment_name = 'criteo_ray' + datasets_dir = 's3://datasets.us-west-2.predibase.com' + + +class TimerCallback(Callback): + # def __init__(self, queue): + def __init__(self): + self.epoch = 0 + # self.queue = queue + self.train_duration = 0 + self.preproc_duration = 0 + + def on_preprocess_start(self, *args, **kwargs): + self.preproc_start_t = time.time() + logging.info("Starting Preprocessing Now") + + def on_preprocess_end(self, *args, **kwargs): + self.preproc_duration = time.time() - self.preproc_start_t + logging.info("Finished Preprocessing") + logging.info(f"Total Preprocessing Time: {self.preproc_duration}") + + def on_train_start(self, *args, **kwargs): + self.train_start_t = time.time() + + def on_train_end(self, output_directory): + self.train_duration = time.time() - self.train_start_t + logging.info(f"Total Preprocessing Time: {self.preproc_duration}") + logging.info(f"Total Training Time: {self.train_duration}") + + def on_epoch_start(self, trainer, progress_tracker, save_path): + self.epoch += 1 + self.epoch_start_t = time.time() + + def on_epoch_end(self, trainer, progress_tracker, save_path): + epoch_duration = time.time() - self.epoch_start_t + # self.queue.put((self.epoch, epoch_duration)) + + +@contextlib.contextmanager +def timeit(duration=None): + start_t = time.time() + try: + yield + finally: + if duration is not None: + duration.append(round(time.time() - start_t, 2)) + + +@hydra.main(config_path=CONFIG_DIR, config_name="criteo") +def app(cfg: DictConfig) -> None: + + logging.warning(f"Experiment: {experiment_name}") + + # from ray.util.queue import Queue as RayQueue + # queue = RayQueue(actor_options={"num_cpus": 0}) + + time_per_epoch = [] + + # def read_queue(): + # while True: + # epoch_num, epoch_time = queue.get() + # logging.warning(f"Epoch: {epoch_num}, Time: {epoch_time}") + # time_per_epoch.append(epoch_time) + + # t = threading.Thread(target=read_queue, daemon=True) + # t.start() + + # timer_callback = TimerCallback(queue) + timer_callback = TimerCallback() + + config = OmegaConf.to_container(cfg) + + model = LudwigModel( + config=config, + logging_level=logging.WARNING, + callbacks=[timer_callback], + ) + + dataset_size = config['dataset_size'] + # PQ_FILES = [f's3://datasets.us-west-2.predibase.com/criteo/all.parquet/part.{i}.parquet' for i in range(int(dataset_size[:-2]) * 10)] + # dataset = dd.read_parquet(PQ_FILES) + dataset = DATASETS_MAP[dataset_size] + num_workers = config['backend']['trainer']['num_workers'] + + duration = [] + with timeit(duration): + train_stats, _, _ = model.train( + dataset=dataset, + # dataset=PQ_FILES, + experiment_name=f'{experiment_name}_{num_workers}_workers_{dataset_size}', + model_name='simple_model', + skip_save_model=True, + skip_save_progress=True, + skip_save_processed_output=True, + skip_save_processed_input=True, + ) + + # time_per_epoch = sum(time_per_epoch) / len(time_per_epoch) + + # print(f'Average time per epoch: {time_per_epoch}') + # print(f'Samples per sec: {config["trainer"]["batch_size"] / time_per_epoch}') + print(f'Wall Clock Time: {duration[0]}') + # print(f'Total number of epochs: {len(train_stats["training"]["combined"])}') + print('Done') + + +if __name__ == "__main__": + app()