Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 146 additions & 0 deletions benchmarks/configs/ludwig/criteo.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
input_features:
-
name: numerical_1
type: numerical
-
name: numerical_2
type: numerical
-
name: numerical_3
type: numerical
-
name: numerical_4
type: numerical
-
name: numerical_5
type: numerical
-
name: numerical_6
type: numerical
-
name: numerical_7
type: numerical
-
name: numerical_8
type: numerical
-
name: numerical_9
type: numerical
-
name: numerical_10
type: numerical
-
name: numerical_11
type: numerical
-
name: numerical_12
type: numerical
-
name: numerical_13
type: numerical
-
name: categorical_1
type: category
-
name: categorical_2
type: category
-
name: categorical_3
type: category
-
name: categorical_4
type: category
-
name: categorical_5
type: category
-
name: categorical_6
type: category
-
name: categorical_7
type: category
-
name: categorical_8
type: category
-
name: categorical_9
type: category
-
name: categorical_10
type: category
-
name: categorical_11
type: category
-
name: categorical_12
type: category
-
name: categorical_13
type: category
-
name: categorical_14
type: category
-
name: categorical_15
type: category
-
name: categorical_16
type: category
-
name: categorical_17
type: category
-
name: categorical_18
type: category
-
name: categorical_19
type: category
-
name: categorical_20
type: category
-
name: categorical_21
type: category
-
name: categorical_22
type: category
-
name: categorical_23
type: category
-
name: categorical_24
type: category
-
name: categorical_25
type: category
-
name: categorical_26
type: category
output_features:
-
name: label
type: binary
combiner:
type: tabnet
bn_momentum: 0.95
bn_virtual_bs: 1024
dropout: 0.05252744300130521
fc_size: 128
num_fc_layers: 3
num_steps: 3
output_size: 128
relaxation_factor: 1.5
size: 32
sparsity: 0.0001
trainer:
batch_size: 32768
backend:
type: ray
processor:
type: modin
# cache_dir: s3://ludwig-cache.us-west-2.predibase.com/criteo
trainer:
num_workers: 4
resources_per_worker:
GPU: 1
dataset_size: 10GB
65 changes: 65 additions & 0 deletions benchmarks/configs/ray/cluster_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
cluster_name: shreya-ludwig-ray-c5_9xlarge

max_workers: 3

docker:
image: "ludwigai/ludwig-ray:master"
# image: "ludwigai/ludwig-ray:sha-1db913b"
container_name: "ray_container"
pull_before_run: True
run_options: # Extra options to pass into "docker run"
- --ulimit nofile=65536:65536
- --cap-add SYS_PTRACE

provider:
type: aws
region: us-west-2
availability_zone: us-west-2a
cache_stopped_nodes: False

available_node_types:
ray.head.default:
resources: {}
node_config:
InstanceType: c5.9xlarge
ImageId: latest_dlami
BlockDeviceMappings:
- DeviceName: /dev/sda1
Ebs:
VolumeSize: 200
ray.worker.default:
min_workers: 0
max_workers: 0
resources: {}
node_config:
InstanceType: c5.9xlarge
ImageId: latest_dlami

head_node_type: ray.head.default

file_mounts: {
/home/ubuntu/ludwig/: /Users/shreyarajpal/Predibase/predibase/ludwig,
/home/ubuntu/benchmarks/: /Users/shreyarajpal/Predibase/benchmarks,
/home/ray/.aws: /Users/shreyarajpal/.aws,
}

rsync_exclude:
- "**/.git"
- "**/.git/**"

rsync_filter:
- ".gitignore"

setup_commands:
- pip uninstall -y ludwig && pip install -e /home/ubuntu/ludwig/.
- pip install s3fs==2021.10.0 aiobotocore==1.4.2 boto3==1.17.106
- pip install pandas==1.1.4
- pip install hydra-core --upgrade

head_start_ray_commands:
- ray stop --force
- ray start --head --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml

worker_start_ray_commands:
- ray stop --force
- ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076
141 changes: 141 additions & 0 deletions benchmarks/scripts/train_criteo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import contextlib
import logging
import threading
import time

import dask.dataframe as dd
import hydra
from omegaconf import DictConfig, OmegaConf

from ludwig.api import LudwigModel
# from ludwig.backend import get_local_backend
from ludwig.callbacks import Callback

logger = logging.getLogger(__name__)


MODE = 'ray'
CRITEO_DATASET_PATH = 's3://datasets.us-west-2.predibase.com/criteo/10GB.parquet'
DATASETS_MAP = {
'100MB': 's3://datasets.us-west-2.predibase.com/criteo/all.parquet/part.0.parquet',
'1GB': 's3://datasets.us-west-2.predibase.com/criteo/1GB.parquet',
'10GB': 's3://datasets.us-west-2.predibase.com/criteo/10GB.parquet',
'100GB': 's3://datasets.us-west-2.predibase.com/criteo/100GB.parquet',
'1TB': 's3://datasets.us-west-2.predibase.com/criteo/all.parquet',
}


if MODE == 'local':
CONFIG_DIR = '/Users/shreyarajpal/Predibase/benchmarks/configs/ludwig'
experiment_name = 'criteo_local'
# backend_config = {'type': get_local_backend()}
elif MODE == 'ray':
CONFIG_DIR = '/home/ubuntu/benchmarks/configs/ludwig'
experiment_name = 'criteo_ray'
datasets_dir = 's3://datasets.us-west-2.predibase.com'


class TimerCallback(Callback):
# def __init__(self, queue):
def __init__(self):
self.epoch = 0
# self.queue = queue
self.train_duration = 0
self.preproc_duration = 0

def on_preprocess_start(self, *args, **kwargs):
self.preproc_start_t = time.time()
logging.info("Starting Preprocessing Now")

def on_preprocess_end(self, *args, **kwargs):
self.preproc_duration = time.time() - self.preproc_start_t
logging.info("Finished Preprocessing")
logging.info(f"Total Preprocessing Time: {self.preproc_duration}")

def on_train_start(self, *args, **kwargs):
self.train_start_t = time.time()

def on_train_end(self, output_directory):
self.train_duration = time.time() - self.train_start_t
logging.info(f"Total Preprocessing Time: {self.preproc_duration}")
logging.info(f"Total Training Time: {self.train_duration}")

def on_epoch_start(self, trainer, progress_tracker, save_path):
self.epoch += 1
self.epoch_start_t = time.time()

def on_epoch_end(self, trainer, progress_tracker, save_path):
epoch_duration = time.time() - self.epoch_start_t
# self.queue.put((self.epoch, epoch_duration))


@contextlib.contextmanager
def timeit(duration=None):
start_t = time.time()
try:
yield
finally:
if duration is not None:
duration.append(round(time.time() - start_t, 2))


@hydra.main(config_path=CONFIG_DIR, config_name="criteo")
def app(cfg: DictConfig) -> None:

logging.warning(f"Experiment: {experiment_name}")

# from ray.util.queue import Queue as RayQueue
# queue = RayQueue(actor_options={"num_cpus": 0})

time_per_epoch = []

# def read_queue():
# while True:
# epoch_num, epoch_time = queue.get()
# logging.warning(f"Epoch: {epoch_num}, Time: {epoch_time}")
# time_per_epoch.append(epoch_time)

# t = threading.Thread(target=read_queue, daemon=True)
# t.start()

# timer_callback = TimerCallback(queue)
timer_callback = TimerCallback()

config = OmegaConf.to_container(cfg)

model = LudwigModel(
config=config,
logging_level=logging.WARNING,
callbacks=[timer_callback],
)

dataset_size = config['dataset_size']
# PQ_FILES = [f's3://datasets.us-west-2.predibase.com/criteo/all.parquet/part.{i}.parquet' for i in range(int(dataset_size[:-2]) * 10)]
# dataset = dd.read_parquet(PQ_FILES)
dataset = DATASETS_MAP[dataset_size]
num_workers = config['backend']['trainer']['num_workers']

duration = []
with timeit(duration):
train_stats, _, _ = model.train(
dataset=dataset,
# dataset=PQ_FILES,
experiment_name=f'{experiment_name}_{num_workers}_workers_{dataset_size}',
model_name='simple_model',
skip_save_model=True,
skip_save_progress=True,
skip_save_processed_output=True,
skip_save_processed_input=True,
)

# time_per_epoch = sum(time_per_epoch) / len(time_per_epoch)

# print(f'Average time per epoch: {time_per_epoch}')
# print(f'Samples per sec: {config["trainer"]["batch_size"] / time_per_epoch}')
print(f'Wall Clock Time: {duration[0]}')
# print(f'Total number of epochs: {len(train_stats["training"]["combined"])}')
print('Done')


if __name__ == "__main__":
app()