Skip to content

Commit

Permalink
[fine_tuning] Add CLI for fine_tuning.jobs (#592)
Browse files Browse the repository at this point in the history
  • Loading branch information
jhallard authored Aug 30, 2023
1 parent 2942bf4 commit 9d559b0
Show file tree
Hide file tree
Showing 2 changed files with 290 additions and 0 deletions.
18 changes: 18 additions & 0 deletions openai/api_resources/file.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import os
from typing import cast
import time

import openai
from openai import api_requestor, util, error
Expand Down Expand Up @@ -259,3 +260,20 @@ async def afind_matching_files(
)
).get("data", [])
return cls.__find_matching_files(name, bytes, all_files, purpose)

@classmethod
def wait_for_processing(cls, id, max_wait_seconds=30 * 60):
TERMINAL_STATES = ["processed", "error", "deleted"]

start = time.time()
file = cls.retrieve(id=id)
while file.status not in TERMINAL_STATES:
file = cls.retrieve(id=id)
time.sleep(5.0)
if time.time() - start > max_wait_seconds:
raise openai.error.OpenAIError(
message="Giving up on waiting for file {id} to finish processing after {max_wait_seconds} seconds.".format(
id=id, max_wait_seconds=max_wait_seconds
)
)
return file.status
272 changes: 272 additions & 0 deletions openai/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,201 @@ def prepare_data(cls, args):
)


class FineTuningJob:
@classmethod
def list(cls, args):
has_ft_jobs = False
for fine_tune_job in openai.FineTuningJob.auto_paging_iter():
has_ft_jobs = True
print(fine_tune_job)
if not has_ft_jobs:
print("No fine-tuning jobs found.")

@classmethod
def _is_url(cls, file: str):
return file.lower().startswith("http")

@classmethod
def _download_file_from_public_url(cls, url: str) -> Optional[bytes]:
resp = requests.get(url)
if resp.status_code == 200:
return resp.content
else:
return None

@classmethod
def _maybe_upload_file(
cls,
file: Optional[str] = None,
content: Optional[bytes] = None,
user_provided_file: Optional[str] = None,
check_if_file_exists: bool = True,
):
# Exactly one of `file` or `content` must be provided
if (file is None) == (content is None):
raise ValueError("Exactly one of `file` or `content` must be provided")

if content is None:
assert file is not None
with open(file, "rb") as f:
content = f.read()

if check_if_file_exists:
bytes = len(content)
matching_files = openai.File.find_matching_files(
name=user_provided_file or f.name,
bytes=bytes,
purpose="fine-tune",
)
if len(matching_files) > 0:
file_ids = [f["id"] for f in matching_files]
sys.stdout.write(
"Found potentially duplicated files with name '{name}', purpose 'fine-tune', and size {size} bytes\n".format(
name=os.path.basename(matching_files[0]["filename"]),
size=matching_files[0]["bytes"]
if "bytes" in matching_files[0]
else matching_files[0]["size"],
)
)
sys.stdout.write("\n".join(file_ids))
while True:
sys.stdout.write(
"\nEnter file ID to reuse an already uploaded file, or an empty string to upload this file anyway: "
)
inp = sys.stdin.readline().strip()
if inp in file_ids:
sys.stdout.write(
"Reusing already uploaded file: {id}\n".format(id=inp)
)
return inp
elif inp == "":
break
else:
sys.stdout.write(
"File id '{id}' is not among the IDs of the potentially duplicated files\n".format(
id=inp
)
)

buffer_reader = BufferReader(content, desc="Upload progress")
resp = openai.File.create(
file=buffer_reader,
purpose="fine-tune",
user_provided_filename=user_provided_file or file,
)
sys.stdout.write(
"Uploaded file from {file}: {id}\n".format(
file=user_provided_file or file, id=resp["id"]
)
)
sys.stdout.write("Waiting for file to finish processing before proceeding..\n")
sys.stdout.flush()
status = openai.File.wait_for_processing(resp["id"])
if status != "processed":
raise openai.error.OpenAIError(
"File {id} failed to process, status={status}.".format(
id=resp["id"], status=status
)
)

sys.stdout.write(
"File {id} finished processing and is ready for use in fine-tuning".format(
id=resp["id"]
)
)
sys.stdout.flush()
return resp["id"]

@classmethod
def _get_or_upload(cls, file, check_if_file_exists=True):
try:
# 1. If it's a valid file, use it
openai.File.retrieve(file)
return file
except openai.error.InvalidRequestError:
pass
if os.path.isfile(file):
# 2. If it's a file on the filesystem, upload it
return cls._maybe_upload_file(
file=file, check_if_file_exists=check_if_file_exists
)
if cls._is_url(file):
# 3. If it's a URL, download it temporarily
content = cls._download_file_from_public_url(file)
if content is not None:
return cls._maybe_upload_file(
content=content,
check_if_file_exists=check_if_file_exists,
user_provided_file=file,
)
return file

@classmethod
def create(cls, args):
create_args = {
"training_file": cls._get_or_upload(
args.training_file, args.check_if_files_exist
),
}
if args.validation_file:
create_args["validation_file"] = cls._get_or_upload(
args.validation_file, args.check_if_files_exist
)

for param in ("model", "suffix"):
attr = getattr(args, param)
if attr is not None:
create_args[param] = attr

if getattr(args, "n_epochs"):
create_args["hyperparameters"] = {
"n_epochs": args.n_epochs,
}

resp = openai.FineTuningJob.create(**create_args)
print(resp)
return

@classmethod
def get(cls, args):
resp = openai.FineTuningJob.retrieve(id=args.id)
print(resp)

@classmethod
def results(cls, args):
fine_tune = openai.FineTuningJob.retrieve(id=args.id)
if "result_files" not in fine_tune or len(fine_tune["result_files"]) == 0:
raise openai.error.InvalidRequestError(
f"No results file available for fine-tune {args.id}", "id"
)
result_file = openai.FineTuningJob.retrieve(id=args.id)["result_files"][0]
resp = openai.File.download(id=result_file)
print(resp.decode("utf-8"))

@classmethod
def events(cls, args):
seen, has_more = 0, True
while has_more:
resp = openai.FineTuningJob.list_events(id=args.id) # type: ignore
for event in resp["data"]:
print(event)
seen += 1
if args.limit is not None and seen >= args.limit:
return
has_more = resp["has_more"]

@classmethod
def follow(cls, args):
raise openai.error.OpenAIError(
message="Event streaming is not yet supported for `fine_tuning.job` events"
)

@classmethod
def cancel(cls, args):
resp = openai.FineTuningJob.cancel(id=args.id)
print(resp)


class WandbLogger:
@classmethod
def sync(cls, args):
Expand Down Expand Up @@ -1098,6 +1293,83 @@ def help(args):
sub.add_argument("--prompt", type=str)
sub.set_defaults(func=Audio.translate)

# FineTuning Jobs
sub = subparsers.add_parser("fine_tuning.job.list")
sub.set_defaults(func=FineTuningJob.list)

sub = subparsers.add_parser("fine_tuning.job.create")
sub.add_argument(
"-t",
"--training_file",
required=True,
help="JSONL file containing either chat-completion or prompt-completion examples for training. "
"This can be the ID of a file uploaded through the OpenAI API (e.g. file-abcde12345), "
'a local file path, or a URL that starts with "http".',
)
sub.add_argument(
"-v",
"--validation_file",
help="JSONL file containing either chat-completion or prompt-completion examples for validation. "
"This can be the ID of a file uploaded through the OpenAI API (e.g. file-abcde12345), "
'a local file path, or a URL that starts with "http".',
)
sub.add_argument(
"--no_check_if_files_exist",
dest="check_if_files_exist",
action="store_false",
help="If this argument is set and training_file or validation_file are file paths, immediately upload them. If this argument is not set, check if they may be duplicates of already uploaded files before uploading, based on file name and file size.",
)
sub.add_argument(
"-m",
"--model",
help="The model to start fine-tuning from",
)
sub.add_argument(
"--suffix",
help="If set, this argument can be used to customize the generated fine-tuned model name."
"All punctuation and whitespace in `suffix` will be replaced with a "
"single dash, and the string will be lower cased. The max "
"length of `suffix` is 18 chars. "
"The generated name will match the form `ft:{base_model}:{org-title}:{suffix}:{rstring}` where `rstring` "
"is a random string sortable as a timestamp. "
'For example, `openai api fine_tuning.job.create -t test.jsonl -m gpt-3.5-turbo-0613 --suffix "first finetune!" '
"could generate a model with the name "
"ft:gpt-3.5-turbo-0613:your-org:first-finetune:7p4PqAoY",
)
sub.add_argument(
"--n_epochs",
type=int,
help="The number of epochs to train the model for. An epoch refers to one "
"full cycle through the training dataset.",
)
sub.set_defaults(func=FineTuningJob.create)

sub = subparsers.add_parser("fine_tuning.job.get")
sub.add_argument("-i", "--id", required=True, help="The id of the fine-tune job")
sub.set_defaults(func=FineTuningJob.get)

sub = subparsers.add_parser("fine_tuning.job.results")
sub.add_argument("-i", "--id", required=True, help="The id of the fine-tune job")
sub.set_defaults(func=FineTuningJob.results)

sub = subparsers.add_parser("fine_tuning.job.events")
sub.add_argument("-i", "--id", required=True, help="The id of the fine-tune job")
sub.add_argument(
"--limit",
type=int,
required=False,
help="The number of events to return, starting from most recent. If not specified, all events will be returned.",
)
sub.set_defaults(func=FineTuningJob.events)

sub = subparsers.add_parser("fine_tuning.job.follow")
sub.add_argument("-i", "--id", required=True, help="The id of the fine-tune job")
sub.set_defaults(func=FineTuningJob.follow)

sub = subparsers.add_parser("fine_tuning.job.cancel")
sub.add_argument("-i", "--id", required=True, help="The id of the fine-tune job")
sub.set_defaults(func=FineTuningJob.cancel)


def wandb_register(parser):
subparsers = parser.add_subparsers(
Expand Down

0 comments on commit 9d559b0

Please sign in to comment.