diff --git a/flair/data.py b/flair/data.py index 1593de6963..5ccd47405f 100644 --- a/flair/data.py +++ b/flair/data.py @@ -27,7 +27,7 @@ def _iter_dataset(dataset: Optional[Dataset]) -> typing.Iterable: return [] from flair.datasets import DataLoader - return map(lambda x: x[0], DataLoader(dataset, batch_size=1, num_workers=0)) + return map(lambda x: x[0], DataLoader(dataset, batch_size=1)) def _len_dataset(dataset: Optional[Dataset]) -> int: @@ -35,7 +35,7 @@ def _len_dataset(dataset: Optional[Dataset]) -> int: return 0 from flair.datasets import DataLoader - loader = DataLoader(dataset, batch_size=1, num_workers=0) + loader = DataLoader(dataset, batch_size=1) return len(loader) diff --git a/flair/datasets/base.py b/flair/datasets/base.py index fd31ec630c..04b3ab881f 100644 --- a/flair/datasets/base.py +++ b/flair/datasets/base.py @@ -1,12 +1,10 @@ import logging -import os from abc import abstractmethod from pathlib import Path from typing import Generic, List, Union import torch.utils.data.dataloader from deprecated import deprecated -from torch.utils.data.dataset import ConcatDataset, Subset from flair.data import DT, FlairDataset, Sentence, Tokenizer from flair.tokenization import SegtokTokenizer, SpaceTokenizer @@ -22,55 +20,23 @@ def __init__( shuffle=False, sampler=None, batch_sampler=None, - num_workers=None, drop_last=False, timeout=0, worker_init_fn=None, ): - # in certain cases, multi-CPU data loading makes no sense and slows - # everything down. For this reason, we detect if a dataset is in-memory: - # if so, num_workers is set to 0 for faster processing - flair_dataset = dataset - while True: - if type(flair_dataset) is Subset: - flair_dataset = flair_dataset.dataset - elif type(flair_dataset) is ConcatDataset: - flair_dataset = flair_dataset.datasets[0] - else: - break - - if type(flair_dataset) is list: - num_workers = 0 - elif isinstance(flair_dataset, FlairDataset) and flair_dataset.is_in_memory(): - num_workers = 0 - - if num_workers is None: - num_workers = min(self.estimate_max_workers(), 8) - else: - num_workers = min(num_workers, self.estimate_max_workers()) - super(DataLoader, self).__init__( dataset, batch_size=batch_size, shuffle=shuffle, sampler=sampler, batch_sampler=batch_sampler, - num_workers=num_workers, + num_workers=0, collate_fn=list, drop_last=drop_last, timeout=timeout, worker_init_fn=worker_init_fn, ) - @staticmethod - def estimate_max_workers(): - if hasattr(os, "sched_getaffinity"): - try: - return len(os.sched_getaffinity(0)) - except Exception: - pass - return os.cpu_count() or 1 - class FlairDatapointDataset(FlairDataset, Generic[DT]): """ diff --git a/flair/hyperparameter/__init__.py b/flair/hyperparameter/__init__.py deleted file mode 100644 index 8286cd30e9..0000000000 --- a/flair/hyperparameter/__init__.py +++ /dev/null @@ -1,21 +0,0 @@ -from .param_selection import ( - SearchSpace, - SequenceTaggerParamSelector, - TextClassifierParamSelector, -) -from .parameter import ( - SEQUENCE_TAGGER_PARAMETERS, - TEXT_CLASSIFICATION_PARAMETERS, - TRAINING_PARAMETERS, - Parameter, -) - -__all__ = [ - "Parameter", - "SEQUENCE_TAGGER_PARAMETERS", - "TRAINING_PARAMETERS", - "TEXT_CLASSIFICATION_PARAMETERS", - "SequenceTaggerParamSelector", - "TextClassifierParamSelector", - "SearchSpace", -] diff --git a/flair/hyperparameter/param_selection.py b/flair/hyperparameter/param_selection.py deleted file mode 100644 index 4a0bd09f26..0000000000 --- a/flair/hyperparameter/param_selection.py +++ /dev/null @@ -1,258 +0,0 @@ -import logging -from abc import abstractmethod -from enum import Enum -from pathlib import Path -from typing import Union - -import numpy as np -from hyperopt import fmin, hp, tpe - -import flair.nn -from flair.data import Corpus -from flair.embeddings import TransformerDocumentEmbeddings -from flair.hyperparameter.parameter import ( - SEQUENCE_TAGGER_PARAMETERS, - TEXT_CLASSIFICATION_PARAMETERS, - TRAINING_PARAMETERS, - Parameter, -) -from flair.models import SequenceTagger, TextClassifier -from flair.trainers import ModelTrainer -from flair.training_utils import EvaluationMetric, init_output_file, log_line - -log = logging.getLogger("flair") - - -class OptimizationValue(Enum): - DEV_LOSS = "loss" - DEV_SCORE = "score" - - -class SearchSpace(object): - def __init__(self): - self.search_space = {} - - def add(self, parameter: Parameter, func, **kwargs): - self.search_space[parameter.value] = func(parameter.value, **kwargs) - - def get_search_space(self): - return hp.choice("parameters", [self.search_space]) - - -class ParamSelector(object): - def __init__( - self, - corpus: Corpus, - base_path: Union[str, Path], - max_epochs: int, - evaluation_metric: EvaluationMetric, - training_runs: int, - optimization_value: OptimizationValue, - ): - if type(base_path) is str: - base_path = Path(base_path) - - self.corpus = corpus - self.max_epochs = max_epochs - self.base_path = base_path - self.evaluation_metric = evaluation_metric - self.run = 1 - self.training_runs = training_runs - self.optimization_value = optimization_value - - self.param_selection_file = init_output_file(base_path, "param_selection.txt") - - @abstractmethod - def _set_up_model(self, params: dict) -> flair.nn.Model: - pass - - def _objective(self, params: dict): - log_line(log) - log.info(f"Evaluation run: {self.run}") - log.info("Evaluating parameter combination:") - for k, v in params.items(): - if isinstance(v, tuple): - v = ",".join([str(x) for x in v]) - log.info(f"\t{k}: {str(v)}") - log_line(log) - - scores = [] - vars = [] - - for i in range(0, self.training_runs): - log_line(log) - log.info(f"Training run: {i + 1}") - - for sent in self.corpus.get_all_sentences(): # type: ignore - sent.clear_embeddings() - - model = self._set_up_model(params) - - training_params = {key: params[key] for key in params if key in TRAINING_PARAMETERS} - - trainer: ModelTrainer = ModelTrainer(model, self.corpus) - - result = trainer.train( - self.base_path, - max_epochs=self.max_epochs, - param_selection_mode=True, - **training_params, - ) - - # take the average over the last three scores of training - if self.optimization_value == OptimizationValue.DEV_LOSS: - curr_scores = result["dev_loss_history"][-3:] - else: - curr_scores = list(map(lambda s: 1 - s, result["dev_score_history"][-3:])) - - score = sum(curr_scores) / float(len(curr_scores)) - var = np.var(curr_scores) - scores.append(score) - vars.append(var) - - # take average over the scores from the different training runs - final_score = sum(scores) / float(len(scores)) - final_var = sum(vars) / float(len(vars)) - - test_score = result["test_score"] - log_line(log) - log.info("Done evaluating parameter combination:") - for k, v in params.items(): - if isinstance(v, tuple): - v = ",".join([str(x) for x in v]) - log.info(f"\t{k}: {v}") - log.info(f"{self.optimization_value.value}: {final_score}") - log.info(f"variance: {final_var}") - log.info(f"test_score: {test_score}\n") - log_line(log) - - with open(self.param_selection_file, "a") as f: - f.write(f"evaluation run {self.run}\n") - for k, v in params.items(): - if isinstance(v, tuple): - v = ",".join([str(x) for x in v]) - f.write(f"\t{k}: {str(v)}\n") - f.write(f"{self.optimization_value.value}: {final_score}\n") - f.write(f"variance: {final_var}\n") - f.write(f"test_score: {test_score}\n") - f.write("-" * 100 + "\n") - - self.run += 1 - - return {"status": "ok", "loss": final_score, "loss_variance": final_var} - - def optimize(self, space: SearchSpace, max_evals=100): - search_space = space.search_space - best = fmin(self._objective, search_space, algo=tpe.suggest, max_evals=max_evals) - - log_line(log) - log.info("Optimizing parameter configuration done.") - log.info("Best parameter configuration found:") - for k, v in best.items(): - log.info(f"\t{k}: {v}") - log_line(log) - - with open(self.param_selection_file, "a") as f: - f.write("best parameter combination\n") - for k, v in best.items(): - if isinstance(v, tuple): - v = ",".join([str(x) for x in v]) - f.write(f"\t{k}: {str(v)}\n") - - -class SequenceTaggerParamSelector(ParamSelector): - def __init__( - self, - corpus: Corpus, - tag_type: str, - base_path: Union[str, Path], - max_epochs: int = 50, - evaluation_metric: EvaluationMetric = EvaluationMetric.MICRO_F1_SCORE, - training_runs: int = 1, - optimization_value: OptimizationValue = OptimizationValue.DEV_LOSS, - ): - """ - :param corpus: the corpus - :param tag_type: tag type to use - :param base_path: the path to the result folder (results will be written to that folder) - :param max_epochs: number of epochs to perform on every evaluation run - :param evaluation_metric: evaluation metric used during training - :param training_runs: number of training runs per evaluation run - :param optimization_value: value to optimize - """ - super().__init__( - corpus, - base_path, - max_epochs, - evaluation_metric, - training_runs, - optimization_value, - ) - - self.tag_type = tag_type - self.tag_dictionary = self.corpus.make_label_dictionary(self.tag_type) - - def _set_up_model(self, params: dict): - sequence_tagger_params = {key: params[key] for key in params if key in SEQUENCE_TAGGER_PARAMETERS} - - tagger: SequenceTagger = SequenceTagger( - tag_dictionary=self.tag_dictionary, - tag_type=self.tag_type, - **sequence_tagger_params, - ) - return tagger - - -class TextClassifierParamSelector(ParamSelector): - def __init__( - self, - corpus: Corpus, - label_type: str, - multi_label: bool, - base_path: Union[str, Path], - max_epochs: int = 50, - fine_tune: bool = True, - evaluation_metric: EvaluationMetric = EvaluationMetric.MICRO_F1_SCORE, - training_runs: int = 1, - optimization_value: OptimizationValue = OptimizationValue.DEV_LOSS, - ): - """ - Text classifier hyperparameter selector that leverages TransformerDocumentEmbeddings - :param corpus: the corpus - :param label_type: string to identify the label type ('question_class', 'sentiment', etc.) - :param multi_label: true, if the dataset is multi label, false otherwise - :param base_path: the path to the result folder (results will be written to that folder) - :param max_epochs: number of epochs to perform on every evaluation run - :param fine_tune: if True, allows transformers to be fine-tuned during training - :param evaluation_metric: evaluation metric used during training - :param training_runs: number of training runs per evaluation run - :param optimization_value: value to optimize - """ - super().__init__( - corpus, - base_path, - max_epochs, - evaluation_metric, - training_runs, - optimization_value, - ) - - self.multi_label = multi_label - self.label_type = label_type - self.fine_tune = fine_tune - - self.label_dictionary = self.corpus.make_label_dictionary(self.label_type) - - def _set_up_model(self, params: dict): - text_classification_params = {key: params[key] for key in params if key in TEXT_CLASSIFICATION_PARAMETERS} - - embedding = TransformerDocumentEmbeddings(fine_tune=self.fine_tune, **text_classification_params) - - text_classifier: TextClassifier = TextClassifier( - label_dictionary=self.label_dictionary, - multi_label=self.multi_label, - label_type=self.label_type, - embeddings=embedding, - ) - - return text_classifier diff --git a/flair/hyperparameter/parameter.py b/flair/hyperparameter/parameter.py deleted file mode 100644 index 8f26a5f433..0000000000 --- a/flair/hyperparameter/parameter.py +++ /dev/null @@ -1,58 +0,0 @@ -from enum import Enum - - -class Parameter(Enum): - EMBEDDINGS = "embeddings" - HIDDEN_SIZE = "hidden_size" - USE_CRF = "use_crf" - USE_RNN = "use_rnn" - RNN_LAYERS = "rnn_layers" - DROPOUT = "dropout" - WORD_DROPOUT = "word_dropout" - LOCKED_DROPOUT = "locked_dropout" - LEARNING_RATE = "learning_rate" - MINI_BATCH_SIZE = "mini_batch_size" - ANNEAL_FACTOR = "anneal_factor" - ANNEAL_WITH_RESTARTS = "anneal_with_restarts" - PATIENCE = "patience" - OPTIMIZER = "optimizer" - MOMENTUM = "momentum" - DAMPENING = "dampening" - WEIGHT_DECAY = "weight_decay" - NESTEROV = "nesterov" - AMSGRAD = "amsgrad" - BETAS = "betas" - EPS = "eps" - TRANSFORMER_MODEL = "model" - LAYERS = "LAYERS" - - -TRAINING_PARAMETERS = [ - Parameter.LEARNING_RATE.value, - Parameter.MINI_BATCH_SIZE.value, - Parameter.OPTIMIZER.value, - Parameter.ANNEAL_FACTOR.value, - Parameter.PATIENCE.value, - Parameter.ANNEAL_WITH_RESTARTS.value, - Parameter.MOMENTUM.value, - Parameter.DAMPENING.value, - Parameter.WEIGHT_DECAY.value, - Parameter.NESTEROV.value, - Parameter.AMSGRAD.value, - Parameter.BETAS.value, - Parameter.EPS.value, -] -SEQUENCE_TAGGER_PARAMETERS = [ - Parameter.EMBEDDINGS.value, - Parameter.HIDDEN_SIZE.value, - Parameter.RNN_LAYERS.value, - Parameter.USE_CRF.value, - Parameter.USE_RNN.value, - Parameter.DROPOUT.value, - Parameter.LOCKED_DROPOUT.value, - Parameter.WORD_DROPOUT.value, -] -TEXT_CLASSIFICATION_PARAMETERS = [ - Parameter.LAYERS.value, - Parameter.TRANSFORMER_MODEL.value, -] diff --git a/flair/models/multitask_model.py b/flair/models/multitask_model.py index 74d4e48c82..a59bac606c 100644 --- a/flair/models/multitask_model.py +++ b/flair/models/multitask_model.py @@ -113,7 +113,6 @@ def evaluate( out_path: Union[str, Path] = None, embedding_storage_mode: str = "none", mini_batch_size: int = 32, - num_workers: Optional[int] = 8, main_evaluation_metric: Tuple[str, str] = ("micro avg", "f1-score"), exclude_labels: List[str] = [], gold_label_dictionary: Optional[Dictionary] = None, @@ -126,7 +125,6 @@ def evaluate( :param embeddings_storage_mode: One of 'none' (all embeddings are deleted and freshly recomputed), 'cpu' (embeddings are stored on CPU) or 'gpu' (embeddings are stored on GPU) :param mini_batch_size: size of batches - :param num_workers: number of workers for DataLoader class :param evaluate_all: choose if all tasks should be evaluated, or a single one, depending on gold_label_type :return: Tuple of Result object and loss value (float) """ @@ -147,7 +145,6 @@ def evaluate( out_path=out_path, embedding_storage_mode=embedding_storage_mode, mini_batch_size=mini_batch_size, - num_workers=num_workers, main_evaluation_metric=main_evaluation_metric, exclude_labels=exclude_labels, gold_label_dictionary=gold_label_dictionary, @@ -169,7 +166,6 @@ def evaluate( out_path=f"{out_path}_{task_id}.txt" if out_path is not None else None, embedding_storage_mode=embedding_storage_mode, mini_batch_size=mini_batch_size, - num_workers=mini_batch_size, main_evaluation_metric=main_evaluation_metric, exclude_labels=exclude_labels, gold_label_dictionary=gold_label_dictionary, @@ -197,12 +193,12 @@ def evaluate( ) all_classification_report[task_id] = result.classification_report + scores = {"loss": loss.item() / len(batch_split)} + return Result( - loss=loss.item() / len(batch_split), main_score=main_score / len(batch_split), detailed_results=all_detailed_results, - log_header="", - log_line="", + scores=scores, classification_report=all_classification_report, ) diff --git a/flair/models/relation_classifier_model.py b/flair/models/relation_classifier_model.py index 5c18bc3f9e..f64d3cdc6e 100644 --- a/flair/models/relation_classifier_model.py +++ b/flair/models/relation_classifier_model.py @@ -541,7 +541,7 @@ def transform_dataset(self, dataset: Dataset[Sentence]) -> FlairDatapointDataset :param dataset: A dataset of sentences to transform :return: A dataset of encoded sentences specific to the `RelationClassifier` """ - data_loader: DataLoader = DataLoader(dataset, batch_size=1, num_workers=0) + data_loader: DataLoader = DataLoader(dataset, batch_size=1) original_sentences: List[Sentence] = [batch[0] for batch in iter(data_loader)] return FlairDatapointDataset(self.transform_sentence(original_sentences)) diff --git a/flair/models/text_regression_model.py b/flair/models/text_regression_model.py index 906f873f32..a10840cffd 100644 --- a/flair/models/text_regression_model.py +++ b/flair/models/text_regression_model.py @@ -136,7 +136,6 @@ def evaluate( out_path: Union[str, Path] = None, embedding_storage_mode: str = "none", mini_batch_size: int = 32, - num_workers: Optional[int] = 8, main_evaluation_metric: Tuple[str, str] = ("micro avg", "f1-score"), exclude_labels: List[str] = [], gold_label_dictionary: Optional[Dictionary] = None, @@ -146,7 +145,7 @@ def evaluate( # read Dataset into data loader, if list of sentences passed, make Dataset first if not isinstance(data_points, Dataset): data_points = FlairDatapointDataset(data_points) - data_loader = DataLoader(data_points, batch_size=mini_batch_size, num_workers=num_workers) + data_loader = DataLoader(data_points, batch_size=mini_batch_size) with torch.no_grad(): eval_loss = torch.zeros(1, device=flair.device) @@ -186,8 +185,6 @@ def evaluate( if out_path is not None: with open(out_path, "w", encoding="utf-8") as outfile: outfile.write("".join(lines)) - log_line = f"{metric.mean_squared_error()}\t{metric.spearmanr()}" f"\t{metric.pearsonr()}" - log_header = "MSE\tSPEARMAN\tPEARSON" detailed_result = ( f"AVG: mse: {metric.mean_squared_error():.4f} - " @@ -198,10 +195,14 @@ def evaluate( result: Result = Result( main_score=metric.pearsonr(), - loss=eval_loss.item(), - log_header=log_header, - log_line=log_line, detailed_results=detailed_result, + scores={ + "loss": eval_loss.item(), + "mse": metric.mean_squared_error(), + "mae": metric.mean_absolute_error(), + "pearson": metric.pearsonr(), + "spearman": metric.spearmanr(), + }, ) return result diff --git a/flair/nn/model.py b/flair/nn/model.py index 2fc45f42ba..fdb8652423 100644 --- a/flair/nn/model.py +++ b/flair/nn/model.py @@ -2,7 +2,6 @@ import itertools import logging import typing -import warnings from abc import ABC, abstractmethod from collections import Counter from pathlib import Path @@ -52,7 +51,6 @@ def evaluate( out_path: Union[str, Path] = None, embedding_storage_mode: str = "none", mini_batch_size: int = 32, - num_workers: Optional[int] = 8, main_evaluation_metric: Tuple[str, str] = ("micro avg", "f1-score"), exclude_labels: List[str] = [], gold_label_dictionary: Optional[Dictionary] = None, @@ -103,42 +101,13 @@ def save(self, model_file: Union[str, Path], checkpoint: bool = False): """ model_state = self._get_state_dict() - # in Flair <0.9.1, optimizer and scheduler used to train model are not saved - optimizer = scheduler = None - # write out a "model card" if one is set if self.model_card is not None: - # special handling for optimizer: - # remember optimizer class and state dictionary - if "training_parameters" in self.model_card: - training_parameters = self.model_card["training_parameters"] - - if "optimizer" in training_parameters: - optimizer = training_parameters["optimizer"] - if checkpoint: - training_parameters["optimizer_state_dict"] = optimizer.state_dict() - training_parameters["optimizer"] = optimizer.__class__ - - if "scheduler" in training_parameters: - scheduler = training_parameters["scheduler"] - if checkpoint: - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - training_parameters["scheduler_state_dict"] = scheduler.state_dict() - training_parameters["scheduler"] = scheduler.__class__ - model_state["model_card"] = self.model_card # save model torch.save(model_state, str(model_file), pickle_protocol=4) - # restore optimizer and scheduler to model card if set - if self.model_card is not None: - if optimizer: - self.model_card["training_parameters"]["optimizer"] = optimizer - if scheduler: - self.model_card["training_parameters"]["scheduler"] = scheduler - @classmethod def load(cls, model_path: Union[str, Path, Dict[str, Any]]) -> "Model": """ @@ -253,7 +222,6 @@ def evaluate( out_path: Union[str, Path] = None, embedding_storage_mode: str = "none", mini_batch_size: int = 32, - num_workers: Optional[int] = 8, main_evaluation_metric: Tuple[str, str] = ("micro avg", "f1-score"), exclude_labels: List[str] = [], gold_label_dictionary: Optional[Dictionary] = None, @@ -284,7 +252,7 @@ def evaluate( all_true_values = {} all_predicted_values = {} - loader = DataLoader(data_points, batch_size=mini_batch_size, num_workers=0) + loader = DataLoader(data_points, batch_size=mini_batch_size) sentence_id = 0 for batch in Tqdm.tqdm(loader): @@ -384,7 +352,7 @@ def evaluate( multi_label = True break - log.info(f"Evaluating as a multi-label problem: {multi_label}") + log.debug(f"Evaluating as a multi-label problem: {multi_label}") # compute numbers by formatting true and predicted such that Scikit-Learn can use them y_true = [] @@ -455,13 +423,9 @@ def evaluate( if "micro avg" in classification_report_dict: # micro average is only computed if zero-label exists (for instance "O") - precision_score = round(classification_report_dict["micro avg"]["precision"], 4) - recall_score = round(classification_report_dict["micro avg"]["recall"], 4) micro_f_score = round(classification_report_dict["micro avg"]["f1-score"], 4) else: # if no zero-label exists (such as in POS tagging) micro average is equal to accuracy - precision_score = round(classification_report_dict["accuracy"], 4) - recall_score = round(classification_report_dict["accuracy"], 4) micro_f_score = round(classification_report_dict["accuracy"], 4) # same for the main score @@ -477,7 +441,7 @@ def evaluate( "Could be an error in your corpus or how you " "initialize the trainer!" ) - accuracy_score = precision_score = recall_score = micro_f_score = macro_f_score = main_score = 0.0 + accuracy_score = micro_f_score = macro_f_score = main_score = 0.0 classification_report = "" classification_report_dict = {} @@ -489,20 +453,29 @@ def evaluate( "\n\nBy class:\n" + classification_report ) - # line for log file - log_header = "PRECISION\tRECALL\tF1\tACCURACY" - log_line = f"{precision_score}\t" f"{recall_score}\t" f"{micro_f_score}\t" f"{accuracy_score}" + scores: Dict[Union[Tuple[str, ...], str], Any] = {} + + for avg_type in ("micro avg", "macro avg"): + for metric_type in ("f1-score", "precision", "recall"): + if avg_type == "micro avg" and avg_type not in classification_report_dict: + value = classification_report_dict["accuracy"] + + else: + value = classification_report_dict[avg_type][metric_type] + + scores[(avg_type, metric_type)] = value + + scores["accuracy"] = accuracy_score if average_over > 0: eval_loss /= average_over + scores["loss"] = eval_loss.item() result = Result( main_score=main_score, - log_line=log_line, - log_header=log_header, detailed_results=detailed_result, classification_report=classification_report_dict, - loss=eval_loss.item(), + scores=scores, ) return result diff --git a/flair/trainers/plugins/__init__.py b/flair/trainers/plugins/__init__.py new file mode 100644 index 0000000000..be02970a09 --- /dev/null +++ b/flair/trainers/plugins/__init__.py @@ -0,0 +1,29 @@ +from .base import BasePlugin, Pluggable, TrainerPlugin, TrainingInterrupt +from .functional.amp import AmpPlugin +from .functional.anneal_on_plateau import AnnealingPlugin +from .functional.checkpoints import CheckpointPlugin +from .functional.linear_scheduler import LinearSchedulerPlugin +from .functional.weight_extractor import WeightExtractorPlugin +from .loggers.log_file import LogFilePlugin +from .loggers.loss_file import LossFilePlugin +from .loggers.metric_history import MetricHistoryPlugin +from .loggers.tensorboard import TensorboardLogger +from .metric_records import MetricName, MetricRecord + +__all__ = [ + "AmpPlugin", + "AnnealingPlugin", + "CheckpointPlugin", + "LinearSchedulerPlugin", + "WeightExtractorPlugin", + "LogFilePlugin", + "LossFilePlugin", + "MetricHistoryPlugin", + "TensorboardLogger", + "BasePlugin", + "Pluggable", + "TrainerPlugin", + "TrainingInterrupt", + "MetricName", + "MetricRecord", +] diff --git a/flair/trainers/plugins/base.py b/flair/trainers/plugins/base.py new file mode 100644 index 0000000000..070eb2166e --- /dev/null +++ b/flair/trainers/plugins/base.py @@ -0,0 +1,270 @@ +import logging +from collections import defaultdict +from inspect import isclass, signature +from itertools import count +from queue import Queue +from typing import ( + Callable, + Dict, + Iterator, + List, + NewType, + Optional, + Sequence, + Set, + Type, + Union, + cast, +) + +log = logging.getLogger("flair") + + +PluginArgument = Union["BasePlugin", Type["BasePlugin"]] +HookHandleId = NewType("HookHandleId", int) + +EventIdenifier = str + + +class TrainingInterrupt(Exception): + """Allows plugins to interrupt the training loop.""" + + +class Pluggable: + """Dispatches events which attached plugins can react to.""" + + valid_events: Optional[Set[EventIdenifier]] = None + + def __init__(self, *, plugins: Sequence[PluginArgument] = []): + """Initialize a `Pluggable`. + + :param plugins: Plugins which should be attached to this `Pluggable`. + """ + self._hook_handles: Dict[EventIdenifier, Dict[HookHandleId, HookHandle]] = defaultdict(dict) + + self._hook_handle_id_counter = count() + + self._plugins: List[BasePlugin] = [] + + # This flag tracks, whether an event is currently being processed (otherwise it is added to the queue) + self._processing_events = False + self._event_queue: Queue = Queue() + + for plugin in plugins: + if isclass(plugin): + # instantiate plugin + plugin = plugin() + + plugin = cast("BasePlugin", plugin) + plugin.attach_to(self) + + @property + def plugins(self): + return self._plugins + + def append_plugin(self, plugin): + self._plugins.append(plugin) + + def validate_event(self, *events: EventIdenifier): + for event in events: + assert isinstance(event, EventIdenifier) + + if self.valid_events is not None: + if event not in self.valid_events: + raise RuntimeError(f"Event '{event}' not recognized. Available: {', '.join(self.valid_events)}") + return event + + def register_hook(self, func: Callable, *events: EventIdenifier): + """Register a hook. + + :param func: Function to be called when the event is emitted. + :param *events: List of events to call this function on. + """ + + self.validate_event(*events) + + handle: HookHandle = HookHandle( + HookHandleId(next(self._hook_handle_id_counter)), events=events, func=func, pluggable=self + ) + + for event in events: + self._hook_handles[event][handle.id] = handle + return handle + + def dispatch(self, event: EventIdenifier, *args, **kwargs) -> None: + """Call all functions hooked to a certain event.""" + self.validate_event(event) + + self._event_queue.put((event, args, kwargs)) + + if not self._processing_events: + try: + self._processing_events = True + + while not self._event_queue.empty(): + event, args, kwargs = self._event_queue.get() + + for hook in self._hook_handles[event].values(): + hook(*args, **kwargs) + finally: + # Reset the flag, since an exception event might be dispatched + self._processing_events = False + + def remove_hook(self, handle: "HookHandle"): + """Remove a hook handle from this instance.""" + for event in handle.events: + del self._hook_handles[event][handle.id] + + +class HookHandle: + """Represents the registration information of a hook callback.""" + + def __init__(self, _id: HookHandleId, *, events: Sequence[EventIdenifier], func: Callable, pluggable: Pluggable): + """Intitialize `HookHandle`. + + :param _id: Id, the callback is stored as in the `Pluggable`. + :param *events: List of events, the callback is registered for. + :param func: The callback function. + :param pluggable: The `Pluggable` where the callback is registered. + """ + pluggable.validate_event(*events) + + self._id = _id + self._events = events + self._func = func + self._pluggable = pluggable + + @property + def id(self) -> HookHandleId: + """Return the id of this `HookHandle`.""" + return self._id + + @property + def func_name(self): + return self._func.__qualname__ + + @property + def events(self) -> Iterator[EventIdenifier]: + """Return iterator of events whis `HookHandle` is registered for.""" + yield from self._events + + def remove(self): + """Remove a hook from the `Pluggable` it is attached to.""" + self._pluggable.remove_hook(self) + + def __call__(self, *args, **kw): + """Call the hook this `HookHandle` is associated with.""" + try: + return self._func(*args, **kw) + except TypeError as err: + sig = signature(self._func) + + if not any((p.kind == p.VAR_KEYWORD for p in sig.parameters.values())): + # If there is no **kw argument in the callback, check if any of the passed kw args is not accepted by + # the callback + for name in kw.keys(): + if name not in sig.parameters: + raise TypeError( + f"Hook callback {self.func_name}() does not accept keyword argument '{name}'" + ) from err + + raise err + + +class BasePlugin: + """Base class for all plugins.""" + + def __init__(self): + """Initialize the base plugin.""" + self._hook_handles: List[HookHandle] = [] + self._pluggable: Optional[Pluggable] = None + + def attach_to(self, pluggable: Pluggable): + """Attach this plugin to a `Pluggable`.""" + assert self._pluggable is None + assert len(self._hook_handles) == 0 + + self._pluggable = pluggable + + pluggable.append_plugin(self) + + # go through all attributes + for name in dir(self): + try: + func = getattr(self, name) + + # get attribute hook events (mayr aise an AttributeError) + events = getattr(func, "_plugin_hook_events") + + # register function as a hook + handle = pluggable.register_hook(func, *events) + self._hook_handles.append(handle) + + except AttributeError: + continue + + def detach(self): + """Detach a plugin from the `Pluggable` it is attached to.""" + assert self._pluggable is not None + + for handle in self._hook_handles: + handle.remove() + + self._pluggable = None + self._hook_handles = [] + + @classmethod + def mark_func_as_hook(cls, func: Callable, *events: EventIdenifier) -> Callable: + """Mark method as a hook triggered by the `Pluggable`.""" + if len(events) == 0: + events = (func.__name__,) + setattr(func, "_plugin_hook_events", events) + return func + + @classmethod + def hook( + cls, + first_arg: Union[Callable, EventIdenifier] = None, + *other_args: EventIdenifier, + ) -> Callable: + """Convience function for `BasePlugin.mark_func_as_hook`). + + Enables using the `@BasePlugin.hook` syntax. + + Can also be used as: + `@BasePlugin.hook("some_event", "another_event")` + """ + if first_arg is None: + # Decorator was used with parentheses, but no args + return cls.mark_func_as_hook + + if isinstance(first_arg, EventIdenifier): + # Decorator was used with args (strings specifiying the events) + def decorator_func(func: Callable): + return cls.mark_func_as_hook(func, cast(EventIdenifier, first_arg), *other_args) + + return decorator_func + + # Decorator was used without args + return cls.mark_func_as_hook(first_arg, *other_args) + + @property + def pluggable(self) -> Optional[Pluggable]: + return self._pluggable + + def __str__(self) -> str: + return self.__class__.__name__ + + +class TrainerPlugin(BasePlugin): + @property + def trainer(self): + return self.pluggable + + @property + def model(self): + return self.trainer.model + + @property + def corpus(self): + return self.trainer.corpus diff --git a/flair/trainers/plugins/functional/amp.py b/flair/trainers/plugins/functional/amp.py new file mode 100644 index 0000000000..6d3e8600d5 --- /dev/null +++ b/flair/trainers/plugins/functional/amp.py @@ -0,0 +1,64 @@ +import sys + +from flair.trainers.plugins.base import TrainerPlugin + + +class AmpPlugin(TrainerPlugin): + """ + Simple plugin for AMP + """ + + def __init__(self, opt_level): + super().__init__() + + self.opt_level = opt_level + + self.wrapped_backward = None + + if sys.version_info < (3, 0): + raise RuntimeError("Apex currently only supports Python 3. Aborting.") + + try: + from apex import amp + + self.amp = amp + except ImportError as exc: + raise RuntimeError( + "Failed to import apex. Please install apex from " + "https://www.github.com/nvidia/apex " + "to enable mixed-precision training." + ) from exc + + def detach(self, *args, **kwargs): + # TODO: what does this do? + super().detach(*args, **kwargs) + + # unwrap trainer backward function + self.trainer.backward = self.wrapped_backward + self.wrapped_backward = None + + def backward(self, loss): + assert self.amp is not None + optimizer = self.trainer.optimizer + + with self.amp.scale_loss(loss, optimizer) as scaled_loss: + scaled_loss.backward() + + @TrainerPlugin.hook + def after_setup(self, **kw): + """ + Wraps with AMP + :param kw: + :return: + """ + + optimizer = self.trainer.optimizer + + self.trainer.model, self.trainer.optimizer = self.amp.initialize( + self.model, optimizer, opt_level=self.opt_level + ) + + # replace trainers backward function + self.wrapped_backward = self.trainer.backward + + self.trainer.backward = self.backward diff --git a/flair/trainers/plugins/functional/anneal_on_plateau.py b/flair/trainers/plugins/functional/anneal_on_plateau.py new file mode 100644 index 0000000000..cef793b135 --- /dev/null +++ b/flair/trainers/plugins/functional/anneal_on_plateau.py @@ -0,0 +1,123 @@ +import logging +import os + +from flair.trainers.plugins.base import TrainerPlugin, TrainingInterrupt +from flair.trainers.plugins.metric_records import MetricRecord +from flair.training_utils import AnnealOnPlateau + +log = logging.getLogger("flair") + + +class AnnealingPlugin(TrainerPlugin): + """ + Plugin for annealing logic in Flair. + """ + + def __init__( + self, + base_path, + min_learning_rate, + anneal_factor, + patience, + initial_extra_patience, + anneal_with_restarts, + ): + super().__init__() + + # path to store the model + self.base_path = base_path + + # special annealing modes + self.anneal_with_restarts = anneal_with_restarts + + # determine the min learning rate + self.min_learning_rate = min_learning_rate + + self.anneal_factor = anneal_factor + self.patience = patience + self.initial_extra_patience = initial_extra_patience + + def store_learning_rate(self): + optimizer = self.trainer.optimizer + + self.current_learning_rate = [group["lr"] for group in optimizer.param_groups] + + self.current_momentum = [ + group["betas"][0] if "betas" in group else group.get("momentum", 0) for group in optimizer.param_groups + ] + + @TrainerPlugin.hook + def after_setup( + self, + train_with_dev, + optimizer, + **kw, + ): + """ + initialize different schedulers, including anneal target for AnnealOnPlateau, batch_growth_annealing, loading schedulers + :param train_with_dev: + :param optimizer: + :param kw: + :return: + """ + + # minimize training loss if training with dev data, else maximize dev score + anneal_mode = "min" if train_with_dev else "max" + + # instantiate the scheduler + self.scheduler: AnnealOnPlateau = AnnealOnPlateau( + factor=self.anneal_factor, + patience=self.patience, + initial_extra_patience=self.initial_extra_patience, + mode=anneal_mode, + verbose=False, + optimizer=self.trainer.optimizer, + ) + + self.store_learning_rate() + + @TrainerPlugin.hook + def after_evaluation(self, current_model_is_best, validation_scores, **kw): + """ + Scheduler step of AnnealOnPlateau + :param current_model_is_best: + :param validation_scores: + :param kw: + :return: + """ + reduced_learning_rate: bool = self.scheduler.step(*validation_scores) + + self.store_learning_rate() + + bad_epochs = self.scheduler.num_bad_epochs + if reduced_learning_rate: + bad_epochs = self.patience + 1 + log.info( + f" - {bad_epochs} epochs without improvement (above 'patience')" + f"-> annealing learning_rate to {self.current_learning_rate}" + ) + else: + log.info(f" - {bad_epochs} epochs without improvement") + + self.trainer.dispatch( + "metric_recorded", + MetricRecord.scalar(name="bad_epochs", value=bad_epochs, global_step=self.scheduler.last_epoch + 1), + ) + + # stop training if learning rate becomes too small + for lr in self.current_learning_rate: + if lr < self.min_learning_rate: + raise TrainingInterrupt("learning rate too small - quitting training!") + + # reload last best model if annealing with restarts is enabled + if self.anneal_with_restarts and reduced_learning_rate and os.path.exists(self.base_path / "best-model.pt"): + log.info("resetting to best model") + self.model.load_state_dict(self.model.load(self.base_path / "best-model.pt").state_dict()) + + def __str__(self): + return ( + f"AnnealOnPlateau | " + f"patience: '{self.patience}', " + f"anneal_factor: '{self.anneal_factor}', " + f"min_learning_rate: '{self.min_learning_rate}'" + ) diff --git a/flair/trainers/plugins/functional/checkpoints.py b/flair/trainers/plugins/functional/checkpoints.py new file mode 100644 index 0000000000..2133bc96d6 --- /dev/null +++ b/flair/trainers/plugins/functional/checkpoints.py @@ -0,0 +1,34 @@ +import logging + +from flair.trainers.plugins.base import TrainerPlugin + +log = logging.getLogger("flair") + + +class CheckpointPlugin(TrainerPlugin): + def __init__( + self, + save_model_each_k_epochs, + save_optimizer_state, + base_path, + ): + super().__init__() + self.save_optimizer_state = save_optimizer_state + self.save_model_each_k_epochs = save_model_each_k_epochs + self.base_path = base_path + + @TrainerPlugin.hook + def after_training_epoch(self, epoch, **kw): + """ + Executes save_model_each_k_epochs + :param epoch: + :param kw: + :return: + """ + if self.save_model_each_k_epochs > 0 and epoch % self.save_model_each_k_epochs == 0: + log.info( + f"Saving model at current epoch since 'save_model_each_k_epochs={self.save_model_each_k_epochs}' " + f"was set" + ) + model_name = "model_epoch_" + str(epoch) + ".pt" + self.model.save(self.base_path / model_name, checkpoint=self.save_optimizer_state) diff --git a/flair/trainers/plugins/functional/linear_scheduler.py b/flair/trainers/plugins/functional/linear_scheduler.py new file mode 100644 index 0000000000..b77f6893a9 --- /dev/null +++ b/flair/trainers/plugins/functional/linear_scheduler.py @@ -0,0 +1,78 @@ +import logging + +from flair.optim import LinearSchedulerWithWarmup +from flair.trainers.plugins.base import TrainerPlugin + +log = logging.getLogger("flair") + + +class LinearSchedulerPlugin(TrainerPlugin): + """ + Plugin for LinearSchedulerWithWarmup. + """ + + def __init__(self, warmup_fraction: float, **kwargs): + super().__init__() + + self.warmup_fraction = warmup_fraction + + def store_learning_rate(self): + optimizer = self.trainer.optimizer + + self.current_learning_rate = [group["lr"] for group in optimizer.param_groups] + + self.current_momentum = [ + group["betas"][0] if "betas" in group else group.get("momentum", 0) for group in optimizer.param_groups + ] + + @TrainerPlugin.hook + def after_setup( + self, + dataset_size, + mini_batch_size, + max_epochs, + **kw, + ): + """ + initialize different schedulers, including anneal target for AnnealOnPlateau, batch_growth_annealing, loading schedulers + :param dataset_size: + :param mini_batch_size: + :param max_epochs: + :param kw: + :return: + """ + + # calculate warmup steps + steps_per_epoch = (dataset_size + mini_batch_size - 1) / mini_batch_size + num_train_steps = int(steps_per_epoch * max_epochs) + num_warmup_steps = int(num_train_steps * self.warmup_fraction) + + self.scheduler = LinearSchedulerWithWarmup( + num_train_steps=num_train_steps, num_warmup_steps=num_warmup_steps, optimizer=self.trainer.optimizer + ) + + self.store_learning_rate() + + @TrainerPlugin.hook + def before_training_epoch(self, **kw): + """ + load state for anneal_with_restarts, batch_growth_annealing, logic for early stopping + :param kw: + :return: + """ + self.store_learning_rate() + self.previous_learning_rate = self.current_learning_rate + + @TrainerPlugin.hook + def after_training_batch(self, **kw): + """ + do the scheduler step if one-cycle or linear decay + + :param kw: + :return: + """ + self.scheduler.step() + self.store_learning_rate() + + def __str__(self): + return f"LinearScheduler | warmup_fraction: '{self.warmup_fraction}'" diff --git a/flair/trainers/plugins/functional/weight_extractor.py b/flair/trainers/plugins/functional/weight_extractor.py new file mode 100644 index 0000000000..cdb8e5fabb --- /dev/null +++ b/flair/trainers/plugins/functional/weight_extractor.py @@ -0,0 +1,28 @@ +from flair.trainers.plugins.base import TrainerPlugin +from flair.training_utils import WeightExtractor + + +class WeightExtractorPlugin(TrainerPlugin): + """ + Simple Plugin for weight extraction + """ + + def __init__(self, base_path): + super().__init__() + self.weight_extractor = WeightExtractor(base_path) + + @TrainerPlugin.hook + def after_training_batch(self, batch_no, epoch, total_number_of_batches, **kw): + """ + extracts weights + :param batch_no: + :param epoch: + :param total_number_of_batches: + :param kw: + :return: + """ + modulo = max(1, int(total_number_of_batches / 10)) + iteration = epoch * total_number_of_batches + batch_no + + if (iteration + 1) % modulo == 0: + self.weight_extractor.extract_weights(self.model.state_dict(), iteration) diff --git a/flair/trainers/plugins/loggers/log_file.py b/flair/trainers/plugins/loggers/log_file.py new file mode 100644 index 0000000000..e1388f4d33 --- /dev/null +++ b/flair/trainers/plugins/loggers/log_file.py @@ -0,0 +1,23 @@ +import logging +from pathlib import Path + +from flair.trainers.plugins.base import TrainerPlugin +from flair.training_utils import add_file_handler + +log = logging.getLogger("flair") + + +class LogFilePlugin(TrainerPlugin): + """ + Plugin for the training.log file + """ + + def __init__(self, base_path): + super().__init__() + + self.log_handler = add_file_handler(log, Path(base_path) / "training.log") + + @TrainerPlugin.hook("_training_exception", "after_training") + def close_file_handler(self, **kw): + self.log_handler.close() + log.removeHandler(self.log_handler) diff --git a/flair/trainers/plugins/loggers/loss_file.py b/flair/trainers/plugins/loggers/loss_file.py new file mode 100644 index 0000000000..b145c46ecf --- /dev/null +++ b/flair/trainers/plugins/loggers/loss_file.py @@ -0,0 +1,121 @@ +from datetime import datetime +from typing import Dict, Optional, Tuple, Union + +from flair.trainers.plugins.base import TrainerPlugin +from flair.trainers.plugins.metric_records import MetricName +from flair.training_utils import init_output_file + + +class LossFilePlugin(TrainerPlugin): + """ + Plugin that manages the loss.tsv file output + """ + + def __init__(self, base_path, epoch: int, metrics_to_collect: Dict[Union[Tuple, str], str] = None): + super().__init__() + + self.first_epoch = epoch + 1 + + # prepare loss logging file and set up header + self.loss_txt = init_output_file(base_path, "loss.tsv") + + # set up all metrics to collect + self.metrics_to_collect = metrics_to_collect + if self.metrics_to_collect is not None: + metrics_to_collect = self.metrics_to_collect + else: + metrics_to_collect = { + "loss": "LOSS", + ("micro avg", "precision"): "PRECISION", + ("micro avg", "recall"): "RECALL", + ("micro avg", "f1-score"): "F1", + "accuracy": "ACCURACY", + } + + # set up headers + self.headers = { + # name: HEADER + MetricName("epoch"): "EPOCH", + MetricName("timestamp"): "TIMESTAMP", + MetricName("bad_epochs"): "BAD_EPOCHS", + MetricName("learning_rate"): "LEARNING_RATE", + } + + # Add all potentially relevant metrics. If a metric is not published + # after the first epoch (when the header is written), the column is + # removed at that point. + for prefix in ["train", "train_sample", "dev", "test"]: + for name, header in metrics_to_collect.items(): + metric_name = MetricName(name) + + if prefix == "train" and metric_name != "loss": + metric_name = "train_eval" + metric_name + else: + metric_name = prefix + metric_name + + self.headers[metric_name] = f"{prefix.upper()}_{header}" + + # initialize the first log line + self.current_row: Optional[Dict[MetricName, str]] = None + + @TrainerPlugin.hook + def before_training_epoch(self, epoch, **kw): + """ + Get the current epoch for loss file logging + :param epoch: + :param kw: + :return: + """ + self.current_row = {MetricName("epoch"): epoch} + + @TrainerPlugin.hook + def metric_recorded(self, record): + """ + :param record: + :return: + """ + if record.name in self.headers and self.current_row is not None: + if record.name == "learning_rate" and not record.is_scalar: + # record is a list of scalars + value = ",".join([f"{lr:.4f}" for lr in record.value]) + elif record.is_scalar and isinstance(record.value, int): + value = str(record.value) + else: + assert record.is_scalar + + value = f"{record.value:.4f}" + + self.current_row[record.name] = value + + @TrainerPlugin.hook + def after_evaluation(self, epoch, **kw): + """ + This prints all relevant metrics + :param epoch: + :param kw: + :return: + """ + if self.loss_txt is not None: + self.current_row[MetricName("timestamp")] = f"{datetime.now():%H:%M:%S}" + + # output log file + with open(self.loss_txt, "a") as f: + # remove columns where no value was found on the first epoch (could be != 1 if training was resumed) + if epoch == self.first_epoch: + for k in list(self.headers.keys()): + if k not in self.current_row: + del self.headers[k] + + # make headers on epoch 1 + if epoch == 1: + # write header + f.write("\t".join(self.headers.values()) + "\n") + + for col in self.headers.keys(): + assert col in self.current_row, str(col) + " " + str(self.current_row.keys()) + + assert all(col in self.current_row for col in self.headers.keys()) + + f.write("\t".join([str(self.current_row[col]) for col in self.headers.keys()]) + "\n") + + self.current_row = {} diff --git a/flair/trainers/plugins/loggers/metric_history.py b/flair/trainers/plugins/loggers/metric_history.py new file mode 100644 index 0000000000..3bb47f7299 --- /dev/null +++ b/flair/trainers/plugins/loggers/metric_history.py @@ -0,0 +1,38 @@ +import logging +from typing import Dict, Mapping + +from flair.trainers.plugins.base import TrainerPlugin + +log = logging.getLogger("flair") + + +default_metrics_to_collect = { + ("train", "loss"): "train_loss_history", + ("dev", "score"): "dev_score_history", + ("dev", "loss"): "dev_loss_history", +} + + +class MetricHistoryPlugin(TrainerPlugin): + def __init__(self, metrics_to_collect: Mapping = default_metrics_to_collect): + super().__init__() + + self.metric_history: Dict[str, list] = {} + self.metrics_to_collect: Mapping = metrics_to_collect + for target in self.metrics_to_collect.values(): + self.metric_history[target] = list() + + @TrainerPlugin.hook + def metric_recorded(self, record): + if tuple(record.name) in self.metrics_to_collect: + target = self.metrics_to_collect[tuple(record.name)] + self.metric_history[target].append(record.value) + + @TrainerPlugin.hook + def after_training(self, **kw): + """ + Returns metric history + :param kw: + :return: + """ + self.trainer.return_values.update(self.metric_history) diff --git a/flair/trainers/plugins/loggers/tensorboard.py b/flair/trainers/plugins/loggers/tensorboard.py new file mode 100644 index 0000000000..e12e572fde --- /dev/null +++ b/flair/trainers/plugins/loggers/tensorboard.py @@ -0,0 +1,60 @@ +import logging +import os + +from flair.trainers.plugins.base import TrainerPlugin +from flair.training_utils import log_line + +log = logging.getLogger("flair") + + +class TensorboardLogger(TrainerPlugin): + """ + Plugin that takes care of tensorboard logging + """ + + def __init__(self, log_dir=None, comment="", tracked_metrics=()): + """ + :param log_dir: Directory into which tensorboard log files will be written # noqa: E501 + :param tracked_metrics: List of tuples that specify which metrics (in addition to the main_score) shall be plotted in tensorboard, could be [("macro avg", 'f1-score'), ("macro avg", 'precision')] for example # noqa: E501 + """ + super().__init__() + self.comment = comment + self.tracked_metrics = tracked_metrics + + try: + from torch.utils.tensorboard import SummaryWriter + + if log_dir is not None and not os.path.exists(log_dir): + os.mkdir(log_dir) + + self.writer = SummaryWriter(log_dir=log_dir, comment=self.comment) + + log.info(f"tensorboard logging path is {log_dir}") + + except ImportError: + log_line(log) + log.warning("ATTENTION! PyTorch >= 1.1.0 and pillow are required for TensorBoard support!") + log_line(log) + + self._warned = False + + @TrainerPlugin.hook + def metric_recorded(self, record): + assert self.writer is not None + # TODO: check if metric is in tracked metrics + if record.is_scalar: + self.writer.add_scalar(str(record.name), record.value, record.global_step, walltime=record.walltime) + else: + if not self._warned: + log.warning("Logging anything other than scalars to TensorBoard is currently not supported.") + self._warned = True + + @TrainerPlugin.hook + def _training_finally(self, **kw): + """ + Closes the writer + :param kw: + :return: + """ + assert self.writer is not None + self.writer.close() diff --git a/flair/trainers/plugins/loggers/wandb.py b/flair/trainers/plugins/loggers/wandb.py new file mode 100644 index 0000000000..93b794f8d0 --- /dev/null +++ b/flair/trainers/plugins/loggers/wandb.py @@ -0,0 +1,72 @@ +import logging + +from flair.trainers.plugins.base import TrainerPlugin + +log = logging.getLogger("flair") + + +class WandbLoggingHandler(logging.Handler): + def __init__(self, wandb, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.wandb = wandb + + def emit(self, record): + try: + # adjust alert level + if record.level >= logging.ERROR: + level = self.wandb.AlertLevel.ERROR + elif record.level >= logging.WARNING: + level = self.wandb.AlertLevel.WARN + else: + level = self.wandb.AlertLevel.INFO + + self.wandb.alert( + title=f"Alert from {record.module}:{record.lineno}", + text=self.format(record), + level=level, + ) + + except Exception: + self.handleError(record) + + +class WandbLogger(TrainerPlugin): + def __init__(self, wandb, emit_alerts=True, alert_level=logging.WARNING, **kwargs): + super().__init__(**kwargs) + + self.wandb = wandb + self.emit_alerts = emit_alerts + self.alert_level = alert_level + self._emitted_record_type_warning = False + + @TrainerPlugin.hook + def after_training_setup(self, **kw): + if self.emit_alerts: + self.log_handler = WandbLoggingHandler(self.wandb) + self.log_handler.setLevel(self.alert_level) + + formatter = logging.Formatter("%(asctime)-15s %(message)s") + self.log_handler.setFormatter(formatter) + log.addHandler(self.log_handler) + else: + self.log_handler = None + + @TrainerPlugin.hook("_training_exception", "after_teardown") + def close_file_handler(self, **kw): + if self.emit_alerts: + self.log_handler.close() + log.removeHandler(self.log_handler) + + @TrainerPlugin.hook + def metric_recorded(self, record): + if record.is_scalar: + self.wandb.log({record.name: record.value}) + else: + if not self._emitted_record_type_warning: + log.warning("Logging anything other than scalars to W&B is currently not supported.") + self._emitted_record_type_warning = True + + @TrainerPlugin.hook + def _training_finally(self, **kw): + self.writer.close() diff --git a/flair/trainers/plugins/metric_records.py b/flair/trainers/plugins/metric_records.py new file mode 100644 index 0000000000..6e02f172d7 --- /dev/null +++ b/flair/trainers/plugins/metric_records.py @@ -0,0 +1,126 @@ +import time +from dataclasses import dataclass +from enum import Enum +from typing import Any, Iterable, Iterator, Tuple, Union + +RecordType = Enum("RecordType", ["scalar", "image", "histogram", "string", "scalar_list"]) + + +class MetricName: + def __init__(self, name): + self.parts: Tuple[str, ...] + + if isinstance(name, str): + self.parts = tuple(name.split("/")) + else: + self.parts = tuple(name) + + def __str__(self) -> str: + return "/".join(self.parts) + + def __repr__(self) -> str: + return str(self) + + def __iter__(self) -> Iterator[str]: + return iter(self.parts) + + def __getitem__(self, i) -> Union["MetricName", str]: + item = self.parts[i] + + if isinstance(i, slice): + item = self.__class__(item) + + return item + + def __add__(self, other) -> "MetricName": + if isinstance(other, str): + return self.__class__(self.parts + (other,)) + elif isinstance(other, MetricName): + return self.__class__(self.parts + other.parts) + else: + return self.__class__(self.parts + tuple(other)) + + def __radd__(self, other) -> "MetricName": + if isinstance(other, str): + return self.__class__((other,) + self.parts) + else: + # no need to check for MetricName, as __add__ of other would be called in this case + return self.__class__(tuple(other) + self.parts) + + def __eq__(self, other) -> bool: + if isinstance(other, str): + return self.parts == tuple(other.split("/")) + elif isinstance(other, MetricName): + return self.parts == other.parts + elif other is None: + return False + else: + return self.parts == tuple(other) + + def __hash__(self): + return hash(self.parts) + + +@dataclass +class MetricRecord: + """Represents a recorded metric value.""" + + def __init__( + self, name: Union[Iterable[str], str], value: Any, global_step: int, typ: RecordType, *, walltime: float = None + ): + """Create a metric record. + + :param name: Name of the metric. + :param typ: Type of metric. + :param value: Value of the metric (can be anything: scalar, tensor, + image, etc.). + :param walltime: Time of recording this metric. + """ + + self.name: MetricName = MetricName(name) + self.typ: RecordType = typ + self.value: Any = value + self.global_step: int = global_step + self.walltime: float = walltime if walltime is not None else time.time() + + @property + def joined_name(self) -> str: + return str(self.name) + + @classmethod + def scalar(cls, name: Iterable[str], value: Any, global_step: int, *, walltime=None): + return cls(name=name, value=value, global_step=global_step, typ=RecordType.scalar, walltime=walltime) + + @classmethod + def scalar_list(cls, name: Iterable[str], value: list, global_step: int, *, walltime=None): + return cls(name=name, value=value, global_step=global_step, typ=RecordType.scalar_list, walltime=walltime) + + @classmethod + def string(cls, name: Iterable[str], value: str, global_step: int, *, walltime=None): + return cls(name=name, value=value, global_step=global_step, typ=RecordType.string, walltime=walltime) + + @classmethod + def histogram(cls, name: Iterable[str], value: str, global_step: int, *, walltime=None): + return cls(name=name, value=value, global_step=global_step, typ=RecordType.histogram, walltime=walltime) + + def is_type(self, typ): + return self.typ == typ + + @property + def is_scalar(self): + return self.is_type(RecordType.scalar) + + @property + def is_scalar_list(self): + return self.is_type(RecordType.scalar_list) + + @property + def is_string(self): + return self.is_type(RecordType.string) + + @property + def is_histogram(self): + return self.is_type(RecordType.histogram) + + def __repr__(self): + return f"{self.__class__.__name__}({self.joined_name} at step {self.global_step}, {self.walltime:.4f})" diff --git a/flair/trainers/trainer.py b/flair/trainers/trainer.py index bbdbba5d43..71658b4a47 100644 --- a/flair/trainers/trainer.py +++ b/flair/trainers/trainer.py @@ -1,67 +1,78 @@ -import contextlib -import copy -import datetime import inspect import logging import os -import sys +import random import time import warnings from inspect import signature from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple, Type, Union, cast +from typing import List, Optional, Tuple, Type, Union import torch from torch.optim.sgd import SGD from torch.utils.data.dataset import ConcatDataset -from transformer_smaller_training_vocab import reduce_train_vocab - -from flair.embeddings import Embeddings, StackedEmbeddings, TransformerEmbeddings -from flair.models import FewshotClassifier -from flair.nn import Model -from flair.nn.model import ReduceTransformerVocabMixin - -try: - from apex import amp -except ImportError: - amp = None - -import random - -from torch.optim.lr_scheduler import OneCycleLR # type: ignore import flair import flair.nn from flair.data import Corpus, Dictionary, _len_dataset from flair.datasets import DataLoader -from flair.optim import ExpAnnealLR, LinearSchedulerWithWarmup -from flair.training_utils import ( - AnnealOnPlateau, - WeightExtractor, - add_file_handler, - identify_dynamic_embeddings, - init_output_file, - log_line, - store_embeddings, +from flair.trainers.plugins import ( + AnnealingPlugin, + CheckpointPlugin, + LinearSchedulerPlugin, + LogFilePlugin, + LossFilePlugin, + MetricName, + MetricRecord, + Pluggable, + TrainerPlugin, + TrainingInterrupt, + WeightExtractorPlugin, ) +from flair.training_utils import identify_dynamic_embeddings, log_line, store_embeddings log = logging.getLogger("flair") -class ModelTrainer: - def __init__( - self, - model: flair.nn.Model, - corpus: Corpus, - ): +class ModelTrainer(Pluggable): + valid_events = { + "after_setup", + "before_training_epoch", + "before_training_batch", + "before_training_optimizer_step", + "after_training_batch", + "after_training_epoch", + "after_evaluation", + "after_training_loop", + "training_interrupt", + "_training_finally", + "_training_exception", + "after_training", + "metric_recorded", + } + + def __init__(self, model: flair.nn.Model, corpus: Corpus): """ Initialize a model trainer :param model: The model that you want to train. The model should inherit from flair.nn.Model # noqa: E501 :param corpus: The dataset used to train the model, should be of type Corpus """ + super().__init__() self.model: flair.nn.Model = model self.corpus: Corpus = corpus + self.reset_training_attributes() + self.return_values: dict = {} + + def reset_training_attributes(self): + if hasattr(self, "optimizer") and self.optimizer is not None: + self.optimizer.zero_grad(set_to_none=True) + del self.optimizer + + self.optimizer = None + self.mini_batch_size = None + self.return_values: dict = {} + @staticmethod def check_for_and_delete_previous_best_models(base_path): all_best_model_names = [filename for filename in os.listdir(base_path) if filename.startswith("best-model")] @@ -76,1071 +87,763 @@ def check_for_and_delete_previous_best_models(base_path): if os.path.exists(previous_best_path): os.remove(previous_best_path) + @staticmethod + def get_batch_steps(batch, mini_batch_chunk_size): + # if necessary, make batch_steps + if mini_batch_chunk_size is not None and len(batch) > mini_batch_chunk_size: + # break up the batch into slices of size + # mini_batch_chunk_size + return [batch[i : i + mini_batch_chunk_size] for i in range(0, len(batch), mini_batch_chunk_size)] + else: + return [batch] + + def _get_train_data(self, train_with_dev, train_with_test): + # if training also uses dev/train data, include in training set + train_data = self.corpus.train + + if train_with_dev or train_with_test: + parts = [self.corpus.train] + if train_with_dev and self.corpus.dev: + parts.append(self.corpus.dev) + if train_with_test and self.corpus.test: + parts.append(self.corpus.test) + + train_data = ConcatDataset(parts) + + return train_data + + def _backward(self, loss): + """Calls backward on the loss. + + This allows plugins to overwrite the backward call. + """ + loss.backward() + def train( self, - base_path: Union[Path, str], + base_path, + anneal_factor: float = 0.5, + patience: int = 3, + min_learning_rate: Union[float, List[float]] = 0.0001, + initial_extra_patience: int = 0, + anneal_with_restarts: bool = False, learning_rate: float = 0.1, + decoder_learning_rate: Optional[float] = None, mini_batch_size: int = 32, - eval_batch_size: int = None, + eval_batch_size: int = 64, mini_batch_chunk_size: Optional[int] = None, max_epochs: int = 100, + optimizer: Type[torch.optim.Optimizer] = torch.optim.SGD, train_with_dev: bool = False, train_with_test: bool = False, - monitor_train: bool = False, - monitor_test: bool = False, + # evaluation and monitoring main_evaluation_metric: Tuple[str, str] = ("micro avg", "f1-score"), - scheduler=AnnealOnPlateau, - anneal_factor: float = 0.5, - patience: int = 3, - min_learning_rate: Union[float, List[float]] = 0.0001, - initial_extra_patience: int = 0, - optimizer: Union[torch.optim.Optimizer, Type[torch.optim.Optimizer]] = SGD, - cycle_momentum: bool = False, - warmup_fraction: float = 0.1, + monitor_test: bool = False, + monitor_train_sample: Union[float, int] = 0.0, + use_final_model_for_eval: bool = False, + gold_label_dictionary_for_eval: Optional[Dictionary] = None, + exclude_labels: List[str] = [], + # sampling and shuffling + sampler=None, + shuffle: bool = True, + shuffle_first_epoch: bool = True, + # evaluation and monitoring embeddings_storage_mode: str = "cpu", - checkpoint: bool = False, + epoch: int = 0, + # when and what to save save_final_model: bool = True, - anneal_with_restarts: bool = False, - anneal_with_prestarts: bool = False, - anneal_against_dev_loss: bool = False, - batch_growth_annealing: bool = False, - shuffle: bool = True, - param_selection_mode: bool = False, + save_optimizer_state: bool = False, + save_model_each_k_epochs: int = 0, + # logging parameters + create_file_logs: bool = True, + create_loss_file: bool = True, write_weights: bool = False, - num_workers: Optional[int] = None, + # plugins + plugins: List[TrainerPlugin] = None, + **kwargs, + ): + # activate annealing plugin + if plugins is None: + plugins = [] + plugins.append( + AnnealingPlugin( + base_path=base_path, + anneal_factor=anneal_factor, + patience=patience, + min_learning_rate=min_learning_rate, + initial_extra_patience=initial_extra_patience, + anneal_with_restarts=anneal_with_restarts, + ) + ) + + # call self.train_custom with all parameters (minus the ones specific to the AnnealingPlugin) + local_variables = locals() + for var in [ + "self", + "anneal_factor", + "patience", + "min_learning_rate", + "initial_extra_patience", + "anneal_with_restarts", + "kwargs", + ]: + local_variables.pop(var) + return self.train_custom(**local_variables, **kwargs) + + def fine_tune( + self, + base_path: Union[Path, str], + # training parameters + warmup_fraction: float = 0.1, + learning_rate: float = 5e-5, + decoder_learning_rate: Optional[float] = None, + mini_batch_size: int = 4, + eval_batch_size: int = 16, + mini_batch_chunk_size: Optional[int] = None, + max_epochs: int = 10, + optimizer: Type[torch.optim.Optimizer] = torch.optim.AdamW, + train_with_dev: bool = False, + train_with_test: bool = False, + # evaluation and monitoring + main_evaluation_metric: Tuple[str, str] = ("micro avg", "f1-score"), + monitor_test: bool = False, + monitor_train_sample: Union[float, int] = 0.0, + use_final_model_for_eval: bool = True, + gold_label_dictionary_for_eval: Optional[Dictionary] = None, + exclude_labels: List[str] = [], + # sampling and shuffling sampler=None, - use_amp: bool = False, - amp_opt_level: str = "O1", - eval_on_train_fraction: Union[float, str] = 0.0, - eval_on_train_shuffle: bool = False, + shuffle: bool = True, + shuffle_first_epoch: bool = True, + # evaluation and monitoring + embeddings_storage_mode: str = "none", + epoch: int = 0, + # when and what to save + save_final_model: bool = True, + save_optimizer_state: bool = False, save_model_each_k_epochs: int = 0, - tensorboard_comment: str = "", - use_swa: bool = False, + # logging parameters + create_file_logs: bool = True, + create_loss_file: bool = True, + write_weights: bool = False, + # plugins + plugins: List[TrainerPlugin] = None, + **kwargs, + ): + # annealing logic + if plugins is None: + plugins = [] + plugins.append(LinearSchedulerPlugin(warmup_fraction=warmup_fraction)) + + return self.train_custom( + base_path=base_path, + # training parameters + learning_rate=learning_rate, + decoder_learning_rate=decoder_learning_rate, + mini_batch_size=mini_batch_size, + eval_batch_size=eval_batch_size, + mini_batch_chunk_size=mini_batch_chunk_size, + max_epochs=max_epochs, + optimizer=optimizer, + train_with_dev=train_with_dev, + train_with_test=train_with_test, + # evaluation and monitoring + main_evaluation_metric=main_evaluation_metric, + monitor_test=monitor_test, + monitor_train_sample=monitor_train_sample, + use_final_model_for_eval=use_final_model_for_eval, + gold_label_dictionary_for_eval=gold_label_dictionary_for_eval, + exclude_labels=exclude_labels, + # sampling and shuffling + sampler=sampler, + shuffle=shuffle, + shuffle_first_epoch=shuffle_first_epoch, + # evaluation and monitoring + embeddings_storage_mode=embeddings_storage_mode, + epoch=epoch, + # when and what to save + save_final_model=save_final_model, + save_optimizer_state=save_optimizer_state, + save_model_each_k_epochs=save_model_each_k_epochs, + # logging parameters + create_file_logs=create_file_logs, + create_loss_file=create_loss_file, + write_weights=write_weights, + # plugins + plugins=plugins, + **kwargs, + ) + + def train_custom( + self, + base_path: Union[Path, str], + # training parameters + learning_rate: float = 0.1, + decoder_learning_rate: Optional[float] = None, + mini_batch_size: int = 32, + eval_batch_size: int = 64, + mini_batch_chunk_size: Optional[int] = None, + max_epochs: int = 100, + optimizer: Type[torch.optim.Optimizer] = SGD, + train_with_dev: bool = False, + train_with_test: bool = False, + # evaluation and monitoring + main_evaluation_metric: Tuple[str, str] = ("micro avg", "f1-score"), + monitor_test: bool = False, + monitor_train_sample: Union[float, int] = 0.0, use_final_model_for_eval: bool = False, gold_label_dictionary_for_eval: Optional[Dictionary] = None, exclude_labels: List[str] = [], - create_file_logs: bool = True, - create_loss_file: bool = True, + # sampling and shuffling + sampler=None, + shuffle: bool = True, + shuffle_first_epoch: bool = True, + # evaluation and monitoring + embeddings_storage_mode: str = "cpu", epoch: int = 0, - use_tensorboard: bool = False, - tensorboard_log_dir=None, - metrics_for_tensorboard=[], - optimizer_state_dict: Optional[Dict[str, Any]] = None, - scheduler_state_dict: Optional[Dict[str, Any]] = None, + # when and what to save + save_final_model: bool = True, save_optimizer_state: bool = False, - reduce_transformer_vocab: bool = False, - shuffle_first_epoch: bool = False, + save_model_each_k_epochs: int = 0, + # logging parameters + create_file_logs: bool = True, + create_loss_file: bool = True, + write_weights: bool = False, + # plugins + plugins: List[TrainerPlugin] = [], **kwargs, ) -> dict: """ Trains any class that implements the flair.nn.Model interface. - :param base_path: Main path to which all output during training is logged and models are saved # noqa: E501 - :param learning_rate: Initial learning rate (or max, if scheduler is OneCycleLR) # noqa: E501 - :param mini_batch_size: Size of mini-batches during training # noqa: E501 - :param eval_batch_size: Size of mini-batches during evaluation. Defaults to mini_batch_size. # noqa: E501 - :param mini_batch_chunk_size: If mini-batches are larger than this number, they get broken down into chunks of this size for processing purposes # noqa: E501 - :param max_epochs: Maximum number of epochs to train. Terminates training if this number is surpassed. # noqa: E501 - :param scheduler: The learning rate scheduler to use - :param checkpoint: If True, a full checkpoint is saved at end of each epoch # noqa: E501 - :param cycle_momentum: If scheduler is OneCycleLR, whether the scheduler should cycle also the momentum # noqa: E501 - :param anneal_factor: The factor by which the learning rate is annealed - :param patience: Patience is the number of epochs with no improvement the Trainer waits # noqa: E501 - until annealing the learning rate - :param min_learning_rate: If the (in multi lr case: all) learning rate falls below this threshold, training terminates # noqa: E501 - :param initial_extra_patience: Extra patience on top of the base patience value before the first learning rate annealment # noqa: E501 - :param warmup_fraction: Fraction of warmup steps if the scheduler is LinearSchedulerWithWarmup # noqa: E501 - :param train_with_dev: If True, the data from dev split is added to the training data # noqa: E501 - :param train_with_test: If True, the data from test split is added to the training data # noqa: E501 - :param monitor_train: If True, training data is evaluated at end of each epoch - :param monitor_test: If True, test data is evaluated at end of each epoch - :param embeddings_storage_mode: One of 'none' (all embeddings are deleted and freshly recomputed), # noqa: E501 - 'cpu' (embeddings are stored on CPU) or 'gpu' (embeddings are stored on GPU) - :param save_final_model: If True, final model is saved - :param anneal_with_restarts: If True, the last best model is restored when annealing the learning rate # noqa: E501 - :param anneal_with_prestarts: If True, the model preceding the last best model is restored when annealing the learning rate # noqa: E501 - :param anneal_against_dev_loss: If True, the annealment is triggered when dev loss plateaus. # noqa: E501 - If False (default), it is triggered when dev score plateaus. - :param batch_growth_annealing: If True, mini_batch_size doubles every time learning_rate is annealed. # noqa: E501 - :param shuffle: If True, data is shuffled during training - :param param_selection_mode: If True, testing is performed against dev data. Use this mode when doing # noqa: E501 - parameter selection. - :param write_weights: If True, write weights to weights.txt on each batch logging event. - :param num_workers: Number of workers in your data loader. - :param sampler: You can pass a data sampler here for special sampling of data. # noqa: E501 - :param eval_on_train_fraction: the fraction of train data to do the evaluation on, # noqa: E501 - if 0. the evaluation is not performed on fraction of training data, - if 'dev' the size is determined from dev set size - :param eval_on_train_shuffle: if True the train data fraction is determined on the start of training # noqa: E501 - and kept fixed during training, otherwise it's sampled at beginning of each epoch # noqa: E501 - :param save_model_each_k_epochs: Each k epochs, a model state will be written out. If set to '5', a model will # noqa: E501 - be saved each 5 epochs. Default is 0 which means no model saving. - :param main_evaluation_metric: Type of metric to use for best model tracking and learning rate scheduling (if dev data is available, otherwise loss will be used), currently only applicable for text_classification_model # noqa: E501 - :param tensorboard_comment: Comment to use for tensorboard logging - :param create_file_logs: If True, the logs will also be stored in a file 'training.log' in the model folder # noqa: E501 - :param create_loss_file: If True, the loss will be writen to a file 'loss.tsv' in the model folder # noqa: E501 - :param optimizer: The optimizer to use (typically SGD or Adam) - :param epoch: The starting epoch (normally 0 but could be higher if you continue training model) # noqa: E501 - :param use_tensorboard: If True, writes out tensorboard information - :param tensorboard_log_dir: Directory into which tensorboard log files will be written # noqa: E501 - :param metrics_for_tensorboard: List of tuples that specify which metrics (in addition to the main_score) shall be plotted in tensorboard, could be [("macro avg", 'f1-score'), ("macro avg", 'precision')] for example # noqa: E501 - :param kwargs: Other arguments for the Optimizer - :return: - """ - # create a model card for this model with Flair and PyTorch version - model_card: Dict[str, Any] = { - "flair_version": flair.__version__, - "pytorch_version": torch.__version__, - } + Args: + base_path: Main path to which all output during training is logged and models are saved + learning_rate (float): The learning rate of the optimizer + decoder_learning_rate (Optional[float]): Optional, if set, the decoder is trained with a separate learning rate + mini_batch_size (int): Size of mini-batches during training + eval_batch_size (int): Size of mini-batches during evaluation + mini_batch_chunk_size (int): If mini-batches are larger than this number, they get broken down into chunks of + this size for processing purposes + max_epochs (int): Maximum number of epochs to train. Terminates training if this number is surpassed. + optimizer: The optimizer to use (typically SGD or Adam) + train_with_dev (bool): If True, the data from dev split is added to the training data + train_with_test (bool): If True, the data from test split is added to the training data + main_evaluation_metric: The metric to optimize (often micro-average or macro-average F1-score, or accuracy) + monitor_test (bool): If True, test data is evaluated at end of each epoch + monitor_train_sample: Set this to evaluate on a sample of the train data at the end of each epoch. + If you set an int, it will sample this many sentences to evaluate on. If you set a float, it will sample + a percentage of data points from train. + use_final_model_for_eval (bool): If True, the final model is used for the final evaluation. If False, the + model from the best epoch as determined by main_evaluation_metric is used for the final evaluation. + gold_label_dictionary_for_eval: Set to force evaluation to use a particular label dictionary + exclude_labels: Optionally define a list of labels to exclude from the evaluation + sampler: You can pass a data sampler here for special sampling of data. + shuffle: If True, data is shuffled during training + shuffle_first_epoch: If True, data is shuffled during the first epoch of training + embeddings_storage_mode: One of 'none' (all embeddings are deleted and freshly recomputed), + 'cpu' (embeddings stored on CPU) or 'gpu' (embeddings stored on GPU) + epoch: The starting epoch (normally 0 but could be higher if you continue training model) + save_final_model: If True, the final model is saved at the end of training. + save_optimizer_state (bool): If True, the optimizer state is saved alongside the model + save_model_each_k_epochs: Each k epochs, a model state will be written out. If set to '5', a model will + be saved each 5 epochs. Default is 0 which means no model saving. + create_file_logs (bool): If True, logging output is written to a file + create_loss_file (bool): If True, a loss file logging output is created + write_weights (bool): If True, write weights to weights.txt on each batch logging event. + plugins: Any additional plugins you want to pass to the trainer + **kwargs: Additional arguments, for instance for the optimizer + + Returns: + dict: A dictionary with at least the key "test_score" containing the final evaluation score. Some plugins + add additional information to this dictionary, such as the :class:`MetricHistoryPlugin` + """ - # also record Transformers version if library is loaded - try: - import transformers + # Create output folder + base_path = Path(base_path) + base_path.mkdir(exist_ok=True, parents=True) - model_card["transformers_version"] = transformers.__version__ - except ImportError: - pass + # === START BLOCK: ACTIVATE PLUGINS === # + # We first activate all optional plugins. These take care of optional functionality such as various + # logging techniques and checkpointing - # remember all parameters used in train() call - local_variables = locals() - training_parameters = {} - for parameter in signature(self.train).parameters: - if isinstance(local_variables[parameter], Path): - training_parameters[parameter] = str(local_variables[parameter]) - else: - training_parameters[parameter] = local_variables[parameter] - model_card["training_parameters"] = training_parameters + for plugin in plugins: + plugin.attach_to(self) - if epoch >= max_epochs: - log.warning(f"Starting at epoch {epoch + 1}/{max_epochs}. No training will be done.") + # log file plugin + if create_file_logs: + LogFilePlugin(base_path=base_path).attach_to(self) - # add model card to model - self.model.model_card = model_card - assert self.corpus.train - if use_tensorboard: - try: - from torch.utils.tensorboard import SummaryWriter + # loss file plugin + if create_loss_file: + LossFilePlugin(base_path=base_path, epoch=epoch).attach_to(self) - if tensorboard_log_dir is not None and not os.path.exists(tensorboard_log_dir): - os.mkdir(tensorboard_log_dir) - writer = SummaryWriter(log_dir=tensorboard_log_dir, comment=tensorboard_comment) - log.info(f"tensorboard logging path is {tensorboard_log_dir}") + # plugin for writing weights + if write_weights: + WeightExtractorPlugin(base_path=base_path).attach_to(self) - except ImportError: - log_line(log) - log.warning("ATTENTION! PyTorch >= 1.1.0 and pillow are required" "for TensorBoard support!") - log_line(log) - use_tensorboard = False - pass - - if use_amp: - if sys.version_info < (3, 0): - raise RuntimeError("Apex currently only supports Python 3. Aborting.") - if amp is None: - raise RuntimeError( - "Failed to import apex. Please install apex from " - "https://www.github.com/nvidia/apex " - "to enable mixed-precision training." - ) + # plugin for checkpointing + if save_model_each_k_epochs > 0: + CheckpointPlugin( + save_model_each_k_epochs=save_model_each_k_epochs, + save_optimizer_state=save_optimizer_state, + base_path=base_path, + ).attach_to(self) - if not eval_batch_size: - eval_batch_size = mini_batch_size - if mini_batch_chunk_size is None: - mini_batch_chunk_size = mini_batch_size + # === END BLOCK: ACTIVATE PLUGINS === # - # if optimizer class is passed, instantiate: - if inspect.isclass(optimizer): - kwargs["lr"] = learning_rate - optimizer_instance = optimizer(self.model.parameters(), **kwargs) + # derive parameters the function was called with (or defaults) + local_variables = locals() + training_parameters = { + parameter: local_variables[parameter] for parameter in signature(self.train_custom).parameters + } + training_parameters.update(kwargs) + + # initialize model card with these parameters + self.model.model_card = self._initialize_model_card(**training_parameters) + + # Prepare training data and get dataset size + train_data = self._get_train_data(train_with_dev=train_with_dev, train_with_test=train_with_test) + dataset_size = _len_dataset(train_data) + parameters = {"dataset_size": dataset_size, **training_parameters} + + # determine what splits (train, dev, test) to evaluate + evaluation_splits = {} + if not train_with_dev and self.corpus.dev: + evaluation_splits["dev"] = self.corpus.dev + if self.corpus.test and monitor_test: + evaluation_splits["test"] = self.corpus.test + if monitor_train_sample > 0.0: + evaluation_splits["train_sample"] = self._sample_train_split(monitor_train_sample) + + # determine how to determine best model and whether to save it + determine_best_epoch_using_dev_score = not train_with_dev and self.corpus.dev + best_epoch_score = 0 if determine_best_epoch_using_dev_score else float("inf") + save_best_model = not train_with_dev and not use_final_model_for_eval + + # instantiate the optimizer + kwargs["lr"] = learning_rate + if decoder_learning_rate: + params = [ + { + "params": [param for name, param in self.model.named_parameters() if "embeddings" not in name], + "lr": decoder_learning_rate, + }, + { + "params": [param for name, param in self.model.named_parameters() if "embeddings" in name], + "lr": learning_rate, + }, + ] + self.optimizer = optimizer(params=params, **kwargs) + log.info( + f"Modifying learning rate to {decoder_learning_rate} for the following " + f"parameters: {[name for name, param in self.model.named_parameters() if 'embeddings' not in name]}" + ) else: - optimizer_instance = optimizer - - initial_learning_rate = [group["lr"] for group in optimizer_instance.param_groups] - - if not isinstance(min_learning_rate, list): - min_learning_rate = [min_learning_rate] * len(initial_learning_rate) - - for i, lr in enumerate(initial_learning_rate): - if lr < min_learning_rate[i]: - min_learning_rate[i] = lr / 10 - - base_path = Path(base_path) - base_path.mkdir(exist_ok=True, parents=True) - + self.optimizer = optimizer(params=self.model.parameters(), **kwargs) + + # initialize sampler if provided + if sampler is not None: + # init with default values if only class is provided + if inspect.isclass(sampler): + sampler = sampler() + # set dataset to sample from + sampler.set_dataset(train_data) + shuffle = False + + # this field stores the names of all dynamic embeddings in the model (determined after first forward pass) + dynamic_embeddings = None + + # Sanity checks + assert len(train_data) > 0 + if epoch >= max_epochs: + log.warning(f"Starting at epoch {epoch + 1}/{max_epochs}. No training will be done.") if epoch == 0: self.check_for_and_delete_previous_best_models(base_path) - # determine what splits (train, dev, test) to evaluate and log - log_train = True if monitor_train else False - log_test = True if (not param_selection_mode and self.corpus.test and monitor_test) else False - log_dev = False if train_with_dev or not self.corpus.dev else True - log_train_part = True if (eval_on_train_fraction == "dev" or float(eval_on_train_fraction) > 0.0) else False - - if log_train_part: - train_part_size = ( - _len_dataset(self.corpus.dev) - if eval_on_train_fraction == "dev" - else int(_len_dataset(self.corpus.train) * eval_on_train_fraction) - ) - - assert train_part_size > 0 - if not eval_on_train_shuffle: - train_part_indices = list(range(train_part_size)) - train_part = torch.utils.data.dataset.Subset(self.corpus.train, train_part_indices) - - # prepare loss logging file and set up header - loss_txt = init_output_file(base_path, "loss.tsv") if create_loss_file else None - - weight_extractor = WeightExtractor(base_path) - - with contextlib.ExitStack() as context_stack: - if reduce_transformer_vocab and isinstance(self.model, ReduceTransformerVocabMixin): - transformer_embeddings = get_transformer_embeddings(self) - if not transformer_embeddings: - reduce_transformer_vocab = False - else: - tokens = list(self.model.get_used_tokens(self.corpus)) - for emb in transformer_embeddings: - context_stack.enter_context( - reduce_train_vocab( - model=emb.model, tokenizer=emb.tokenizer, texts=tokens, optimizer=optimizer_instance - ) - ) - else: - reduce_transformer_vocab = False + # -- AmpPlugin -> wraps with AMP + # -- AnnealingPlugin -> initialize schedulers (requires instantiated optimizer) + self.dispatch("after_setup", **parameters) - if use_swa: - import torchcontrib - - optimizer_instance = torchcontrib.optim.SWA( - optimizer_instance, swa_start=10, swa_freq=5, swa_lr=learning_rate - ) + final_eval_info = ( + "model after last epoch (final-model.pt)" + if use_final_model_for_eval + else "model from best epoch (best-model.pt)" + ) - # from here on, use list of learning rates - current_learning_rate: List = [group["lr"] for group in optimizer_instance.param_groups] - - if use_amp: - self.model, optimizer_instance = amp.initialize(self.model, optimizer_instance, opt_level=amp_opt_level) - - optimizer_instance = cast(torch.optim.Optimizer, optimizer_instance) - - # load existing optimizer state dictionary if it exists - if optimizer_state_dict: - optimizer_instance.load_state_dict(optimizer_state_dict) - - # minimize training loss if training with dev data, else maximize dev score - anneal_mode = "min" if train_with_dev or anneal_against_dev_loss else "max" - best_validation_score = 100000000000 if train_with_dev or anneal_against_dev_loss else -1.0 - - dataset_size = _len_dataset(self.corpus.train) - if train_with_dev: - dataset_size += _len_dataset(self.corpus.dev) - - # if scheduler is passed as a class, instantiate - if inspect.isclass(scheduler): - if scheduler == OneCycleLR: - scheduler = OneCycleLR( - optimizer_instance, - max_lr=current_learning_rate, - steps_per_epoch=dataset_size // mini_batch_size + 1, - epochs=max_epochs - epoch, - # if we load a checkpoint, we have already trained for epoch - pct_start=0.0, - cycle_momentum=cycle_momentum, - ) - elif scheduler == LinearSchedulerWithWarmup: - steps_per_epoch = (dataset_size + mini_batch_size - 1) / mini_batch_size - num_train_steps = int(steps_per_epoch * max_epochs) - num_warmup_steps = int(num_train_steps * warmup_fraction) - - scheduler = LinearSchedulerWithWarmup( - optimizer_instance, - num_train_steps=num_train_steps, - num_warmup_steps=num_warmup_steps, - ) - else: - scheduler = scheduler( - optimizer_instance, - factor=anneal_factor, - patience=patience, - initial_extra_patience=initial_extra_patience, - mode=anneal_mode, - verbose=True, - ) + log_line(log) + log.info(f'Model: "{self.model}"') + log_line(log) + log.info(f"{self.corpus}") + log_line(log) + log.info(f"Train: {len(train_data)} sentences") + log.info(f" (train_with_dev={train_with_dev}, train_with_test={train_with_test})") + log_line(log) + log.info("Training Params:") + log.info( + f' - learning_rate: "{learning_rate}" ' + f'{"(decoder: " + str(decoder_learning_rate) + ")" if decoder_learning_rate else ""}' + ) + log.info(f' - mini_batch_size: "{mini_batch_size}"') + log.info(f' - max_epochs: "{max_epochs}"') + log.info(f' - shuffle: "{shuffle}"') + log_line(log) + log.info("Plugins:") + for plugin in plugins: + log.info(" - " + str(plugin)) + log_line(log) + log.info(f"Final evaluation on {final_eval_info}") + log.info(f' - metric: "{main_evaluation_metric}"') + log_line(log) + log.info("Computation:") + log.info(f" - compute on device: {flair.device}") + log.info(f" - embedding storage: {embeddings_storage_mode}") + log_line(log) + log.info(f'Model training base path: "{base_path}"') + log_line(log) - # Determine whether to log "bad epochs" information - log_bad_epochs = True if scheduler.__class__ == AnnealOnPlateau else False + # At any point you can hit Ctrl + C to break out of training early. + try: + total_train_samples = 0 - # load existing scheduler state dictionary if it exists - if scheduler_state_dict: - scheduler.load_state_dict(scheduler_state_dict) + for epoch in range(epoch + 1, max_epochs + 1): + log_line(log) - # update optimizer and scheduler in model card - model_card["training_parameters"]["optimizer"] = optimizer_instance - model_card["training_parameters"]["scheduler"] = scheduler + # - SchedulerPlugin -> load state for anneal_with_restarts, batch_growth_annealing, logic for early stopping + # - LossFilePlugin -> get the current epoch for loss file logging + self.dispatch("before_training_epoch", epoch=epoch) + self.model.model_card["training_parameters"]["epoch"] = epoch # type: ignore - if isinstance(scheduler, OneCycleLR) and batch_growth_annealing: - raise ValueError("Batch growth with OneCycle policy is not implemented.") + lr_info, momentum_info = self._get_current_lr_and_momentum(epoch) - train_data = self.corpus.train + # if shuffle_first_epoch==False, the first epoch is not shuffled + shuffle_data_this_epoch = shuffle + if not shuffle_first_epoch and epoch == 1: + shuffle_data_this_epoch = False - # if training also uses dev/train data, include in training set - if train_with_dev or train_with_test: - parts = [self.corpus.train] - if train_with_dev and self.corpus.dev: - parts.append(self.corpus.dev) - if train_with_test and self.corpus.test: - parts.append(self.corpus.test) + batch_loader = DataLoader( + train_data, + batch_size=mini_batch_size, + shuffle=shuffle_data_this_epoch, + sampler=sampler, + ) - train_data = ConcatDataset(parts) + self.model.train() - # initialize sampler if provided - if sampler is not None: - # init with default values if only class is provided - if inspect.isclass(sampler): - sampler = sampler() - # set dataset to sample from - sampler.set_dataset(train_data) - shuffle = False + epoch_train_loss: float = 0.0 + epoch_train_samples: int = 0 - dev_score_history = [] - dev_loss_history = [] - train_loss_history = [] + epoch_start_time = time.time() - micro_batch_size = mini_batch_chunk_size + # log infos on training progress every `log_modulo` batches + log_modulo = max(1, int(len(batch_loader) / 10)) - # this field stores the names of all dynamic embeddings in the model (determined after first forward pass) - dynamic_embeddings = None + # process mini-batches + for batch_no, batch in enumerate(batch_loader): + # zero the gradients on the model and optimizer + self.model.zero_grad() + self.optimizer.zero_grad() - # At any point you can hit Ctrl + C to break out of training early. - try: - if create_file_logs: - log_handler = add_file_handler(log, base_path / "training.log") - else: - log_handler = None + batch_train_loss = 0.0 + batch_train_samples = 0 - lr_info = ",".join([f"{lr:.6f}" for lr in current_learning_rate]) + batch_kw = { + "batch_no": batch_no, + "batch": batch, + "total_number_of_batches": len(batch_loader), + "epoch": epoch, + } - log_line(log) - log.info(f'Model: "{self.model}"') - log_line(log) - log.info(f'Corpus: "{self.corpus}"') - log_line(log) - log.info("Parameters:") - log.info(f' - learning_rate: "{lr_info}"') - log.info(f' - mini_batch_size: "{mini_batch_size}"') - log.info(f' - patience: "{patience}"') - log.info(f' - anneal_factor: "{anneal_factor}"') - log.info(f' - max_epochs: "{max_epochs}"') - log.info(f' - shuffle: "{shuffle}"') - log.info(f' - train_with_dev: "{train_with_dev}"') - log.info(f' - batch_growth_annealing: "{batch_growth_annealing}"') - log_line(log) - log.info(f'Model training base path: "{base_path}"') - log_line(log) - log.info(f"Device: {flair.device}") - log_line(log) - log.info(f"Embeddings storage mode: {embeddings_storage_mode}") + self.dispatch("before_training_batch", **batch_kw) - previous_learning_rate = current_learning_rate + batch_steps = self.get_batch_steps(batch, mini_batch_chunk_size=mini_batch_chunk_size) - momentum = [ - group["momentum"] if "momentum" in group else 0 for group in optimizer_instance.param_groups - ] + # forward and backward for batch + for batch_step in batch_steps: + # forward pass + loss, datapoint_count = self.model.forward_loss(batch_step) - for epoch in range(epoch + 1, max_epochs + 1): - log_line(log) + batch_train_samples += datapoint_count + batch_train_loss += loss.item() - # update epoch in model card - model_card["training_parameters"]["epoch"] = epoch + self._backward(loss) - if anneal_with_prestarts: - last_epoch_model_state_dict = copy.deepcopy(self.model.state_dict()) + # identify dynamic embeddings (always deleted) on first sentence + if dynamic_embeddings is None: + dynamic_embeddings = identify_dynamic_embeddings(batch) - if eval_on_train_shuffle: - train_part_indices = list(range(_len_dataset(self.corpus.train))) - random.shuffle(train_part_indices) - train_part_indices = train_part_indices[:train_part_size] - train_part = torch.utils.data.dataset.Subset(self.corpus.train, train_part_indices) + # depending on memory mode, embeddings are moved to CPU, GPU or deleted + store_embeddings(batch_step, embeddings_storage_mode, dynamic_embeddings) - # get new learning rate - current_learning_rate = [group["lr"] for group in optimizer_instance.param_groups] + self.dispatch("before_training_optimizer_step", **batch_kw) - lr_changed = any( - [lr != prev_lr for lr, prev_lr in zip(current_learning_rate, previous_learning_rate)] - ) + # do the optimizer step + torch.nn.utils.clip_grad_norm_(self.model.parameters(), 5.0) + self.optimizer.step() - if lr_changed and batch_growth_annealing: - mini_batch_size *= 2 - - # reload last best model if annealing with restarts is enabled - if ( - (anneal_with_restarts or anneal_with_prestarts) - and lr_changed - and os.path.exists(base_path / "best-model.pt") - ): - if anneal_with_restarts: - log.info("resetting to best model") - self.model.load_state_dict(self.model.load(base_path / "best-model.pt").state_dict()) - if anneal_with_prestarts: - log.info("resetting to pre-best model") - self.model.load_state_dict(self.model.load(base_path / "pre-best-model.pt").state_dict()) - - previous_learning_rate = current_learning_rate - if use_tensorboard: - if len(current_learning_rate) == 1: - writer.add_scalar("learning_rate", current_learning_rate[0], epoch) - else: - for i, lr in enumerate(current_learning_rate): - writer.add_scalar(f"learning_rate_{i}", lr, epoch) - - all_lrs_too_small = all( - [lr < min_lr for lr, min_lr in zip(current_learning_rate, min_learning_rate)] - ) + if batch_train_samples > 0: + train_loss = batch_train_loss / batch_train_samples + self._record(MetricRecord.scalar(("train", "batch_loss"), train_loss, total_train_samples)) - # stop training if learning rate becomes too small - if not isinstance(scheduler, (OneCycleLR, LinearSchedulerWithWarmup)) and all_lrs_too_small: - log_line(log) - log.info("learning rate too small - quitting training!") - log_line(log) - break - - start_time = time.time() - - # if shuffle_first_epoch==False, the first epoch is not shuffled - shuffle_data_this_epoch = shuffle - if not shuffle_first_epoch and epoch == 1: - shuffle_data_this_epoch = False - - batch_loader = DataLoader( - train_data, - batch_size=mini_batch_size, - shuffle=shuffle_data_this_epoch, - num_workers=0 if num_workers is None else num_workers, - sampler=sampler, - ) + epoch_train_loss += batch_train_loss + epoch_train_samples += batch_train_samples - self.model.train() - - train_loss: float = 0 - - seen_batches = 0 - total_number_of_batches = len(batch_loader) - - modulo = max(1, int(total_number_of_batches / 10)) - - # process mini-batches - average_over = 0 - for batch_no, batch in enumerate(batch_loader): - # zero the gradients on the model and optimizer - self.model.zero_grad() - optimizer_instance.zero_grad() - - # if necessary, make batch_steps - batch_steps = [batch] - if len(batch) > micro_batch_size: - batch_steps = [ - batch[x : x + micro_batch_size] for x in range(0, len(batch), micro_batch_size) - ] - - # forward and backward for batch - for batch_step in batch_steps: - # forward pass - loss, datapoint_count = self.model.forward_loss(batch_step) - average_over += datapoint_count - # Backward - if use_amp: - with amp.scale_loss(loss, optimizer_instance) as scaled_loss: - scaled_loss.backward() - else: - loss.backward() - train_loss += loss.item() - - # identify dynamic embeddings (always deleted) on first sentence - - if dynamic_embeddings is None: - dynamic_embeddings = identify_dynamic_embeddings(batch) - - # depending on memory mode, embeddings are moved to CPU, GPU or deleted - store_embeddings(batch, embeddings_storage_mode, dynamic_embeddings) - - # do the optimizer step - torch.nn.utils.clip_grad_norm_(self.model.parameters(), 5.0) - optimizer_instance.step() - - # do the scheduler step if one-cycle or linear decay - if isinstance(scheduler, (OneCycleLR, LinearSchedulerWithWarmup)): - scheduler.step() - # get new learning rate - current_learning_rate = [group["lr"] for group in optimizer_instance.param_groups] - - momentum = [ - group["betas"][0] if "betas" in group else group.get("momentum", 0) - for group in optimizer_instance.param_groups - ] - - seen_batches += 1 - - if seen_batches % modulo == 0: - momentum_info = "" - if cycle_momentum: - momentum_info = " - momentum:" + ",".join([f"{m:.4f}" for m in momentum]) - - lr_info = ",".join([f"{lr:.6f}" for lr in current_learning_rate]) - - intermittent_loss = ( - train_loss / average_over if average_over > 0 else train_loss / seen_batches - ) - end_time = time.time() - log.info( - f"epoch {epoch}" - f" - iter {seen_batches}/{total_number_of_batches}" - f" - loss {intermittent_loss:.8f}" - f" - time (sec): {end_time - start_time:.2f}" - f" - samples/sec: {average_over / (end_time - start_time):.2f}" - f" - lr: {lr_info}{momentum_info}" - ) - iteration = epoch * total_number_of_batches + batch_no - if not param_selection_mode and write_weights: - weight_extractor.extract_weights(self.model.state_dict(), iteration) - - if average_over != 0: - train_loss /= average_over - - self.model.eval() - - if save_model_each_k_epochs > 0 and epoch % save_model_each_k_epochs == 0: - log.info("saving model of current epoch") - model_name = "model_epoch_" + str(epoch) + ".pt" - self.model.save(base_path / model_name, checkpoint=save_optimizer_state) - - log_line(log) - log.info(f"EPOCH {epoch} done: loss {train_loss:.4f} - lr {lr_info}") - - if use_tensorboard: - writer.add_scalar("train_loss", train_loss, epoch) - - # evaluate on train / dev / test split depending on training settings - result_line: str = "" - - if log_train: - train_eval_result = self.model.evaluate( - self.corpus.train, - gold_label_type=self.model.label_type, - mini_batch_size=eval_batch_size, - num_workers=num_workers, - embedding_storage_mode=embeddings_storage_mode, - main_evaluation_metric=main_evaluation_metric, - gold_label_dictionary=gold_label_dictionary_for_eval, - exclude_labels=exclude_labels, - ) - result_line += f"\t{train_eval_result.loss}\t{train_eval_result.log_line}" - log.info( - f"TRAIN : loss {train_eval_result.loss} -" - f" {main_evaluation_metric[1]}" - f" ({main_evaluation_metric[0]}) " - f" {round(train_eval_result.main_score, 4)}" - ) - # depending on memory mode, embeddings are moved to CPU, GPU or deleted - store_embeddings(self.corpus.train, embeddings_storage_mode, dynamic_embeddings) - - if log_train_part: - train_part_eval_result = self.model.evaluate( - train_part, - gold_label_type=self.model.label_type, - mini_batch_size=eval_batch_size, - num_workers=num_workers, - embedding_storage_mode=embeddings_storage_mode, - main_evaluation_metric=main_evaluation_metric, - gold_label_dictionary=gold_label_dictionary_for_eval, - exclude_labels=exclude_labels, - ) - result_line += f"\t{train_part_eval_result.loss}" f"\t{train_part_eval_result.log_line}" - log.info( - f"TRAIN_SPLIT : loss {train_part_eval_result.loss}" - f" - {main_evaluation_metric[1]}" - f" ({main_evaluation_metric[0]})" - f" {round(train_part_eval_result.main_score, 4)}" + if (batch_no + 1) % log_modulo == 0: + intermittent_loss = ( + epoch_train_loss / epoch_train_samples + if epoch_train_samples > 0 + else epoch_train_samples / (batch_no + 1) ) - if use_tensorboard: - for metric_class_avg_type, metric_type in metrics_for_tensorboard: - writer.add_scalar( - f"train_{metric_class_avg_type}_{metric_type}", - train_part_eval_result.classification_report[metric_class_avg_type][metric_type], - epoch, - ) - - if log_dev: - assert self.corpus.dev - dev_eval_result = self.model.evaluate( - self.corpus.dev, - gold_label_type=self.model.label_type, - mini_batch_size=eval_batch_size, - num_workers=num_workers, - out_path=base_path / "dev.tsv", - embedding_storage_mode=embeddings_storage_mode, - main_evaluation_metric=main_evaluation_metric, - gold_label_dictionary=gold_label_dictionary_for_eval, - exclude_labels=exclude_labels, - ) - result_line += f"\t{dev_eval_result.loss}\t{dev_eval_result.log_line}" - log.info( - f"DEV : loss {dev_eval_result.loss}" - f" - {main_evaluation_metric[1]}" - f" ({main_evaluation_metric[0]})" - f" {round(dev_eval_result.main_score, 4)}" - ) - # calculate scores using dev data if available - # append dev score to score history - dev_score_history.append(dev_eval_result.main_score) - dev_loss_history.append(dev_eval_result.loss) - dev_score = dev_eval_result.main_score + current_time = time.time() - # depending on memory mode, embeddings are moved to CPU, GPU or deleted - store_embeddings(self.corpus.dev, embeddings_storage_mode, dynamic_embeddings) - - if use_tensorboard: - writer.add_scalar("dev_loss", dev_eval_result.loss, epoch) - writer.add_scalar("dev_score", dev_eval_result.main_score, epoch) - for ( - metric_class_avg_type, - metric_type, - ) in metrics_for_tensorboard: - writer.add_scalar( - f"dev_{metric_class_avg_type}_{metric_type}", - dev_eval_result.classification_report[metric_class_avg_type][metric_type], - epoch, - ) - - if log_test: - assert self.corpus.test - test_eval_result = self.model.evaluate( - self.corpus.test, - gold_label_type=self.model.label_type, - mini_batch_size=eval_batch_size, - num_workers=num_workers, - out_path=base_path / "test.tsv", - embedding_storage_mode=embeddings_storage_mode, - main_evaluation_metric=main_evaluation_metric, - gold_label_dictionary=gold_label_dictionary_for_eval, - exclude_labels=exclude_labels, - ) - result_line += f"\t{test_eval_result.loss}\t{test_eval_result.log_line}" + lr_info, momentum_info = self._get_current_lr_and_momentum(epoch) log.info( - f"TEST : loss {test_eval_result.loss} -" - f" {main_evaluation_metric[1]}" - f" ({main_evaluation_metric[0]}) " - f" {round(test_eval_result.main_score, 4)}" + f"epoch {epoch}" + f" - iter {batch_no + 1}/{len(batch_loader)}" + f" - loss {intermittent_loss:.8f}" + f" - time (sec): {(current_time - epoch_start_time):.2f}" + f" - samples/sec: {epoch_train_samples / (current_time - epoch_start_time):.2f}" + f"{lr_info}{momentum_info}" ) - # depending on memory mode, embeddings are moved to CPU, GPU or deleted - store_embeddings(self.corpus.test, embeddings_storage_mode, dynamic_embeddings) - - if use_tensorboard: - writer.add_scalar("test_loss", test_eval_result.loss, epoch) - writer.add_scalar("test_score", test_eval_result.main_score, epoch) - for ( - metric_class_avg_type, - metric_type, - ) in metrics_for_tensorboard: - writer.add_scalar( - f"test_{metric_class_avg_type}_{metric_type}", - test_eval_result.classification_report[metric_class_avg_type][metric_type], - epoch, - ) - - # determine if this is the best model or if we need to anneal - current_epoch_has_best_model_so_far = False - # default mode: anneal against dev score - if not train_with_dev and not anneal_against_dev_loss and self.corpus.dev is not None: - if dev_score > best_validation_score: - current_epoch_has_best_model_so_far = True - best_validation_score = dev_score + # - SchedulerPlugin -> do the scheduler step if one-cycle or linear decay + # - WeightExtractorPlugin -> extracts weights + self.dispatch("after_training_batch", **batch_kw) - if isinstance(scheduler, AnnealOnPlateau): - scheduler.step(dev_score, dev_eval_result.loss) - - # alternative: anneal against dev loss - if not train_with_dev and anneal_against_dev_loss and self.corpus.dev is not None: - if dev_eval_result.loss < best_validation_score: - current_epoch_has_best_model_so_far = True - best_validation_score = dev_eval_result.loss + train_loss = epoch_train_loss / epoch_train_samples + self._record(MetricRecord.scalar(("train", "loss"), train_loss, epoch)) - if isinstance(scheduler, AnnealOnPlateau): - scheduler.step(dev_eval_result.loss) + total_train_samples += epoch_train_samples - # alternative: anneal against train loss - if train_with_dev: - if train_loss < best_validation_score: - current_epoch_has_best_model_so_far = True - best_validation_score = train_loss - - if isinstance(scheduler, AnnealOnPlateau): - scheduler.step(train_loss) - - train_loss_history.append(train_loss) - - # determine bad epoch number - try: - bad_epochs = scheduler.num_bad_epochs - except AttributeError: - bad_epochs = 0 - - new_learning_rate = [group["lr"] for group in optimizer_instance.param_groups] - - if any([new_lr != prev_lr for new_lr, prev_lr in zip(new_learning_rate, previous_learning_rate)]): - bad_epochs = patience + 1 - - # lr unchanged - if all( - [ - prev_lr == initial_lr - for prev_lr, initial_lr in zip(previous_learning_rate, initial_learning_rate) - ] - ): - bad_epochs += initial_extra_patience - - # log bad epochs - if log_bad_epochs: - log.info(f"BAD EPOCHS (no improvement): {bad_epochs}") - - if loss_txt is not None: - # output log file - with open(loss_txt, "a") as f: - # make headers on first epoch - if epoch == 1: - bad_epoch_header = "BAD_EPOCHS\t" if log_bad_epochs else "" - f.write(f"EPOCH\tTIMESTAMP\t{bad_epoch_header}LEARNING_RATE\tTRAIN_LOSS") - - if log_train: - f.write("\tTRAIN_" + "\tTRAIN_".join(train_eval_result.log_header.split("\t"))) - - if log_train_part: - f.write( - "\tTRAIN_PART_LOSS\tTRAIN_PART_" - + "\tTRAIN_PART_".join(train_part_eval_result.log_header.split("\t")) - ) - - if log_dev: - f.write("\tDEV_LOSS\tDEV_" + "\tDEV_".join(dev_eval_result.log_header.split("\t"))) - - if log_test: - f.write( - "\tTEST_LOSS\tTEST_" + "\tTEST_".join(test_eval_result.log_header.split("\t")) - ) - - lr_info = ",".join([f"{lr:.4f}" for lr in current_learning_rate]) - - bad_epoch_info = "\t" + str(bad_epochs) if log_bad_epochs else "" - f.write( - f"\n{epoch}\t{datetime.datetime.now():%H:%M:%S}" - f"{bad_epoch_info}" - f"\t{lr_info}\t{train_loss}" - ) - f.write(result_line) - - # if checkpoint is enabled, save model at each epoch - if checkpoint and not param_selection_mode: - self.model.save(base_path / "checkpoint.pt", checkpoint=True) - - # Check whether to save best model - if ( - (not train_with_dev or anneal_with_restarts or anneal_with_prestarts) - and not param_selection_mode - and current_epoch_has_best_model_so_far - and not use_final_model_for_eval - ): - log.info("saving best model") - self.model.save(base_path / "best-model.pt", checkpoint=save_optimizer_state) - - if anneal_with_prestarts: - current_state_dict = self.model.state_dict() - self.model.load_state_dict(last_epoch_model_state_dict) - self.model.save(base_path / "pre-best-model.pt") - self.model.load_state_dict(current_state_dict) - - if use_swa: - import torchcontrib - - cast(torchcontrib.optim.SWA, optimizer_instance).swap_swa_sgd() - - # if we do not use dev data for model selection, save final model - if save_final_model and not param_selection_mode: - self.model.save(base_path / "final-model.pt", checkpoint=save_optimizer_state) - - except KeyboardInterrupt: log_line(log) - log.info("Exiting from training early.") - - if not param_selection_mode: - log.info("Saving model ...") - self.model.save(base_path / "final-model.pt", checkpoint=save_optimizer_state) - log.info("Done.") - except Exception: - if create_file_logs: - log_handler.close() - log.removeHandler(log_handler) - raise - finally: - if use_tensorboard: - writer.close() - optimizer_instance.zero_grad(set_to_none=True) - del optimizer_instance - - # test best model if test data is present - if self.corpus.test and not train_with_test: - final_score = self.final_test( - base_path=base_path, - eval_mini_batch_size=eval_batch_size, - num_workers=num_workers, - main_evaluation_metric=main_evaluation_metric, - gold_label_dictionary_for_eval=gold_label_dictionary_for_eval, - exclude_labels=exclude_labels, - ) - else: - final_score = 0 - log.info("Test data not provided setting final score to 0") - if reduce_transformer_vocab: - if (base_path / "best-model.pt").exists(): - self.model.save(base_path / "best-model.pt") - elif save_final_model and not param_selection_mode: - self.model.save(base_path / "final-model.pt", checkpoint=save_optimizer_state) - - if create_file_logs: - log_handler.close() - log.removeHandler(log_handler) - - return { - "test_score": final_score, - "dev_score_history": dev_score_history, - "train_loss_history": train_loss_history, - "dev_loss_history": dev_loss_history, - } - - def resume( - self, - model: Model, - additional_epochs: Optional[int] = None, - **trainer_args, - ): - assert model.model_card is not None - self.model = model - # recover all arguments that were used to train this model - args_used_to_train_model = model.model_card["training_parameters"] - - # you can overwrite params with your own - for param in trainer_args: - args_used_to_train_model[param] = trainer_args[param] - if param == "optimizer" and "optimizer_state_dict" in args_used_to_train_model: - del args_used_to_train_model["optimizer_state_dict"] - if param == "scheduler" and "scheduler_state_dict" in args_used_to_train_model: - del args_used_to_train_model["scheduler_state_dict"] - - # surface nested arguments - kwargs = args_used_to_train_model["kwargs"] - del args_used_to_train_model["kwargs"] - - if additional_epochs is not None: - args_used_to_train_model["max_epochs"] = ( - args_used_to_train_model.get("epoch", kwargs.get("epoch", 0)) + additional_epochs - ) + log.info(f"EPOCH {epoch} done: loss {train_loss:.4f}{lr_info}") + + # - CheckpointPlugin -> executes save_model_each_k_epochs + # - SchedulerPlugin -> log bad epochs + self.dispatch("after_training_epoch", epoch=epoch) + + self.model.eval() + + # Determine if this is the best model or if we need to anneal + current_epoch_has_best_model_so_far = False + validation_scores: tuple + + for evaluation_split, evaluation_split_data in evaluation_splits.items(): + eval_result = self.model.evaluate( + evaluation_split_data, + out_path=base_path / f"{evaluation_split}.tsv", + mini_batch_size=eval_batch_size, + exclude_labels=exclude_labels, + main_evaluation_metric=main_evaluation_metric, + gold_label_dictionary=gold_label_dictionary_for_eval, + embedding_storage_mode=embeddings_storage_mode, + gold_label_type=self.model.label_type, + gold_label_dictionary_for_eval=gold_label_dictionary_for_eval, + ) - # resume training with these parameters - self.train(**args_used_to_train_model, **kwargs) + # log results + log.info( + f"{evaluation_split.upper()} : loss {eval_result.loss}" + f" - {main_evaluation_metric[1]}" + f" ({main_evaluation_metric[0]})" + f" {round(eval_result.main_score, 4)}" + ) - def fine_tune( - self, - base_path: Union[Path, str], - learning_rate: float = 5e-5, - max_epochs: int = 10, - optimizer=torch.optim.AdamW, - scheduler=LinearSchedulerWithWarmup, - warmup_fraction: float = 0.1, - mini_batch_size: int = 4, - embeddings_storage_mode: str = "none", - use_final_model_for_eval: bool = True, - decoder_lr_factor: float = 1.0, - **trainer_args, - ): - # If set, add a factor to the learning rate of all parameters with 'embeddings' not in name - if decoder_lr_factor != 1.0: - optimizer = optimizer( - [ - { - "params": [param for name, param in self.model.named_parameters() if "embeddings" not in name], - "lr": learning_rate * decoder_lr_factor, - }, - { - "params": [param for name, param in self.model.named_parameters() if "embeddings" in name], - "lr": learning_rate, - }, - ] - ) - log.info( - f"Modifying learning rate to {learning_rate * decoder_lr_factor} for the following " - f"parameters: {[name for name, param in self.model.named_parameters() if 'embeddings' not in name]}" - ) + # depending on memory mode, embeddings are moved to CPU, GPU or deleted + store_embeddings(evaluation_split_data, embeddings_storage_mode) - return self.train( - base_path=base_path, - learning_rate=learning_rate, - max_epochs=max_epochs, - optimizer=optimizer, - scheduler=scheduler, - warmup_fraction=warmup_fraction, - mini_batch_size=mini_batch_size, - embeddings_storage_mode=embeddings_storage_mode, - use_final_model_for_eval=use_final_model_for_eval, - **trainer_args, - ) + self._publish_eval_result(eval_result, evaluation_split, global_step=epoch) - def final_test( - self, - base_path: Union[Path, str], - eval_mini_batch_size: int, - main_evaluation_metric: Tuple[str, str], - num_workers: Optional[int] = 8, - gold_label_dictionary_for_eval: Optional[Dictionary] = None, - exclude_labels: List[str] = [], - ): - base_path = Path(base_path) - base_path.mkdir(exist_ok=True, parents=True) + # use DEV split to determine if this is the best model so far + if determine_best_epoch_using_dev_score and evaluation_split == "dev": + validation_scores = eval_result.main_score, eval_result.loss - log_line(log) + if eval_result.main_score > best_epoch_score: + current_epoch_has_best_model_so_far = True + best_validation_score = eval_result.main_score + + # if not using DEV score, determine best model using train loss + if not determine_best_epoch_using_dev_score: + validation_scores = (train_loss,) + + if epoch_train_loss < best_epoch_score: + current_epoch_has_best_model_so_far = True + best_validation_score = train_loss + + # - LossFilePlugin -> somehow prints all relevant metrics + # - AnnealPlugin -> scheduler step + self.dispatch( + "after_evaluation", + epoch=epoch, + current_model_is_best=current_epoch_has_best_model_so_far, + validation_scores=validation_scores, + ) - self.model.eval() + if save_best_model and current_epoch_has_best_model_so_far: + log.info("saving best model") + self.model.save(base_path / "best-model.pt", checkpoint=save_optimizer_state) - if (base_path / "best-model.pt").exists(): - self.model.load_state_dict(self.model.load(base_path / "best-model.pt").state_dict()) - else: - log.info("Testing using last state of model ...") - - assert self.corpus.test - test_results = self.model.evaluate( - self.corpus.test, - gold_label_type=self.model.label_type, - mini_batch_size=eval_mini_batch_size, - num_workers=num_workers, - out_path=base_path / "test.tsv", - embedding_storage_mode="none", - main_evaluation_metric=main_evaluation_metric, - gold_label_dictionary=gold_label_dictionary_for_eval, - exclude_labels=exclude_labels, - return_loss=False, - ) + # - SWAPlugin -> restores SGD weights from SWA + self.dispatch("after_training_loop") - log.info(test_results.log_line) - log.info(test_results.detailed_results) - log_line(log) + # if we do not use dev data for model selection, save final model + if save_final_model: + self.model.save(base_path / "final-model.pt", checkpoint=save_optimizer_state) - # get and return the final test score of best model - final_score = test_results.main_score + except KeyboardInterrupt: + log_line(log) + log.info("Exiting from training early.") - return final_score + self.dispatch("training_interrupt") # TODO: no plugin calls this event - def find_learning_rate( - self, - base_path: Union[Path, str], - optimizer, - file_name: str = "learning_rate.tsv", - start_learning_rate: float = 1e-7, - end_learning_rate: float = 10, - iterations: int = 100, - mini_batch_size: int = 32, - stop_early: bool = True, - smoothing_factor: float = 0.98, - **kwargs, - ) -> Path: - best_loss = None - moving_avg_loss = 0.0 + log.info("Saving model ...") + self.model.save(base_path / "final-model.pt", checkpoint=save_optimizer_state) + log.info("Done.") - # cast string to Path - if type(base_path) is str: - base_path = Path(base_path) - learning_rate_tsv = init_output_file(base_path, file_name) + except TrainingInterrupt as exc: + log_line(log) + log.info(str(exc)) + log_line(log) + self.dispatch("training_interrupt") # TODO: no plugin calls this event - with open(learning_rate_tsv, "a") as f: - f.write("ITERATION\tTIMESTAMP\tLEARNING_RATE\tTRAIN_LOSS\n") + log.info("Saving model ...") + self.model.save(base_path / "final-model.pt", checkpoint=save_optimizer_state) + log.info("Done.") - optimizer = optimizer(self.model.parameters(), lr=start_learning_rate, **kwargs) + except Exception: + self.dispatch("_training_exception") + raise + finally: + # TensorboardLogger -> closes writer + self.dispatch("_training_finally") - train_data = self.corpus.train + # test best model if test data is present + if self.corpus.test and not train_with_test: + log_line(log) - scheduler = ExpAnnealLR(optimizer, end_learning_rate, iterations) + self.model.eval() - model_state = self.model.state_dict() - self.model.train() + if (base_path / "best-model.pt").exists(): + log.info("Loading model from best epoch ...") + self.model.load_state_dict(self.model.load(base_path / "best-model.pt").state_dict()) + else: + log.info("Testing using last state of model ...") + + test_results = self.model.evaluate( + self.corpus.test, + gold_label_type=self.model.label_type, + mini_batch_size=eval_batch_size, + out_path=base_path / "test.tsv", + embedding_storage_mode="none", + main_evaluation_metric=main_evaluation_metric, + gold_label_dictionary=gold_label_dictionary_for_eval, + exclude_labels=exclude_labels, + return_loss=False, + ) - step = 0 - while step < iterations: - batch_loader = DataLoader(train_data, batch_size=mini_batch_size, shuffle=True) - for batch in batch_loader: - step += 1 + log.info(test_results.detailed_results) + log_line(log) - # forward pass - loss, datapoint_count = self.model.forward_loss(batch) + # get and return the final test score of best model + self.return_values["test_score"] = test_results.main_score - # update optimizer and scheduler - optimizer.zero_grad() - loss.backward() - torch.nn.utils.clip_grad_norm_(self.model.parameters(), 5.0) - optimizer.step() - scheduler.step() + else: + self.return_values["test_score"] = 0 + log.info("Test data not provided setting final score to 0") + + # MetricHistoryPlugin -> stores the loss history in return_values + self.dispatch("after_training") + + # Store return values, as they will be erased by reset_training_attributes + return_values = self.return_values + + self.reset_training_attributes() + + return return_values + + def _get_current_lr_and_momentum(self, epoch): + current_learning_rate = [group["lr"] for group in self.optimizer.param_groups] + momentum = [group["momentum"] if "momentum" in group else 0 for group in self.optimizer.param_groups] + lr_info = " - lr: " + ",".join([f"{m:.6f}" for m in current_learning_rate]) + momentum_info = " - momentum: " + ",".join([f"{m:.6f}" for m in momentum]) + self._record(MetricRecord.scalar_list("learning_rate", current_learning_rate, epoch)) + self._record(MetricRecord.scalar_list(("optimizer", "momentum"), momentum, epoch)) + return lr_info, momentum_info + + def _sample_train_split(self, monitor_train_sample): + train_part_size = 0 + if isinstance(monitor_train_sample, float): + train_part_size = int(_len_dataset(self.corpus.train) * monitor_train_sample) + if isinstance(monitor_train_sample, int): + train_part_size = monitor_train_sample + assert train_part_size > 0 + # get a random sample of training sentences + train_part_indices = list(range(_len_dataset(self.corpus.train))) + random.shuffle(train_part_indices) + train_part_indices = train_part_indices[:train_part_size] + train_part = torch.utils.data.dataset.Subset(self.corpus.train, train_part_indices) + return train_part + + def _flat_dict_items(self, d, composite_key=()): + for key, value in d.items(): + if isinstance(key, str): + key = composite_key + (key,) + else: + key = composite_key + tuple(key) - learning_rate = scheduler.get_lr()[0] + if isinstance(value, dict): + yield from self._flat_dict_items(value, composite_key=key) + else: + yield key, value - loss_item = loss.item() - if step == 1: - best_loss = loss_item + def _publish_eval_result(self, result, prefix=(), **kw): + for key, value in self._flat_dict_items(result.scores, composite_key=MetricName(prefix)): + try: + self._record(MetricRecord.scalar(name=key, value=float(value), **kw)) + except TypeError: + if isinstance(value, list): + self._record(MetricRecord.scalar_list(name=key, value=value, **kw)) + elif isinstance(value, torch.Tensor): + self._record(MetricRecord.histogram(name=key, value=value, **kw)) else: - if smoothing_factor > 0: - moving_avg_loss = smoothing_factor * moving_avg_loss + (1 - smoothing_factor) * loss_item - loss_item = moving_avg_loss / (1 - smoothing_factor ** (step + 1)) - if loss_item < best_loss: # type: ignore - best_loss = loss # type: ignore - - if step > iterations: - break - - if stop_early and (loss_item > 4 * best_loss or torch.isnan(loss)): # type: ignore - log_line(log) - log.info("loss diverged - stopping early!") - step = iterations - break - - with open(str(learning_rate_tsv), "a") as f: - f.write(f"{step}\t{datetime.datetime.now():%H:%M:%S}\t{learning_rate}\t{loss_item}\n") + value = str(value) + self._record(MetricRecord.string(name=key, value=value, **kw)) - self.model.load_state_dict(model_state) - self.model.to(flair.device) + self._record(MetricRecord.string(name=MetricName(prefix) + "score", value=result.main_score, **kw)) - log_line(log) - log.info(f"learning rate finder finished - plot {learning_rate_tsv}") - log_line(log) - - return Path(learning_rate_tsv) + self._record( + MetricRecord.string(name=MetricName(prefix) + "detailed_result", value=result.detailed_results, **kw) + ) + def _initialize_model_card(self, **training_parameters): + """ + initializes model card with library versions and parameters + :param training_parameters: + :return: + """ + # create a model card for this model with Flair and PyTorch version + model_card = { + "flair_version": flair.__version__, + "pytorch_version": torch.__version__, + } -def get_transformer_embeddings(trainer: ModelTrainer) -> List[TransformerEmbeddings]: - if isinstance(trainer.model, FewshotClassifier): - embeddings = trainer.model.tars_embeddings - else: - embeddings = getattr(trainer.model, "embeddings", None) + # record Transformers version if library is loaded + try: + import transformers - if embeddings is None: - log.warning(f"Could not extract embeddings of Model of type {type(trainer.model)}") - return [] + model_card["transformers_version"] = transformers.__version__ + except ImportError: + pass - transformer_embeddings = set() + # remember all parameters used in train() call + model_card["training_parameters"] = { + k: str(v) if isinstance(v, Path) else v for k, v in training_parameters.items() + } - def scan_embeddings(emb: Embeddings): - if isinstance(emb, StackedEmbeddings): - for sub_emb in emb.embeddings: - scan_embeddings(sub_emb) - if isinstance(emb, TransformerEmbeddings): - transformer_embeddings.add(emb) + plugins = [plugin.__class__ for plugin in model_card["training_parameters"]["plugins"]] + model_card["training_parameters"]["plugins"] = plugins - scan_embeddings(embeddings) + return model_card - return list(transformer_embeddings) + def _record(self, metric): + self.dispatch("metric_recorded", metric) diff --git a/flair/training_utils.py b/flair/training_utils.py index 1d520073b4..1efc291dfe 100644 --- a/flair/training_utils.py +++ b/flair/training_utils.py @@ -23,18 +23,20 @@ class Result(object): def __init__( self, main_score: float, - log_header: str, - log_line: str, detailed_results: str, - loss: float, classification_report: dict = {}, + scores: dict = {}, ): + assert "loss" in scores, "No loss provided." + self.main_score: float = main_score - self.log_header: str = log_header - self.log_line: str = log_line + self.scores = scores self.detailed_results: str = detailed_results self.classification_report = classification_report - self.loss: float = loss + + @property + def loss(self): + return self.scores["loss"] def __str__(self): return f"{str(self.detailed_results)}\nLoss: {self.loss}'" @@ -249,7 +251,7 @@ def _reset(self): self.cooldown_counter = 0 self.num_bad_epochs = 0 - def step(self, metric, auxiliary_metric=None): + def step(self, metric, auxiliary_metric=None) -> bool: # convert `metrics` to float, in case it's a zero-dim Tensor current = float(metric) epoch = self.last_epoch + 1 @@ -287,7 +289,8 @@ def step(self, metric, auxiliary_metric=None): self.cooldown_counter -= 1 self.num_bad_epochs = 0 # ignore any bad epochs in cooldown - if self.num_bad_epochs > self.effective_patience: + reduce_learning_rate = True if self.num_bad_epochs > self.effective_patience else False + if reduce_learning_rate: self._reduce_lr(epoch) self.cooldown_counter = self.cooldown self.num_bad_epochs = 0 @@ -295,6 +298,8 @@ def step(self, metric, auxiliary_metric=None): self._last_lr = [group["lr"] for group in self.optimizer.param_groups] + return reduce_learning_rate + def _reduce_lr(self, epoch): for i, param_group in enumerate(self.optimizer.param_groups): old_lr = float(param_group["lr"]) @@ -302,7 +307,7 @@ def _reduce_lr(self, epoch): if old_lr - new_lr > self.eps: param_group["lr"] = new_lr if self.verbose: - log.info("Epoch {:5d}: reducing learning rate" " of group {} to {:.4e}.".format(epoch, i, new_lr)) + log.info(f" - reducing learning rate of group {epoch} to {new_lr}") @property def in_cooldown(self): diff --git a/requirements.txt b/requirements.txt index 882660fe93..57dda99ba1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,7 +8,6 @@ mpld3==0.3 scikit-learn>=0.21.3 sqlitedict>=1.6.0 deprecated>=1.2.4 -hyperopt>=0.2.7 boto3 transformers[sentencepiece]>=4.18.0 bpemb>=0.3.2 diff --git a/tests/model_test_utils.py b/tests/model_test_utils.py index 499c7bc751..509782f0b3 100644 --- a/tests/model_test_utils.py +++ b/tests/model_test_utils.py @@ -156,32 +156,6 @@ def test_train_load_use_model_multi_corpus( loaded_model.predict([self.empty_sentence]) del loaded_model - @pytest.mark.integration - def test_train_resume_classifier( - self, results_base_path, corpus, embeddings, example_sentence, train_test_sentence - ): - flair.set_seed(123) - label_dict = corpus.make_label_dictionary(label_type=self.train_label_type) - - model = self.build_model(embeddings, label_dict) - corpus = self.transform_corpus(model, corpus) - - trainer = ModelTrainer(model, corpus) - if self.finetune_instead_of_train: - trainer.fine_tune(results_base_path, shuffle=False, **self.training_args, checkpoint=True) - else: - trainer.train(results_base_path, shuffle=False, **self.training_args, checkpoint=True) - - del model - checkpoint_model = self.model_cls.load(results_base_path / "checkpoint.pt") - - trainer.resume(model=checkpoint_model, max_epochs=self.training_args.get("max_epochs", 2) + 1) - checkpoint_model.predict(train_test_sentence) - - self.assert_training_example(train_test_sentence) - - del trainer, checkpoint_model, corpus - def test_forward_loss(self, labeled_sentence, embeddings): label_dict = Dictionary() for label in labeled_sentence.get_labels(self.train_label_type): diff --git a/tests/models/test_relation_classifier.py b/tests/models/test_relation_classifier.py index c7c25e5e5a..d21d3e516c 100644 --- a/tests/models/test_relation_classifier.py +++ b/tests/models/test_relation_classifier.py @@ -169,7 +169,7 @@ def check_transformation_correctness( """Ground truth is a set of tuples of (, )""" assert split is not None - data_loader = DataLoader(split, batch_size=1, num_workers=0) + data_loader = DataLoader(split, batch_size=1) assert all(isinstance(sentence, EncodedSentence) for sentence in map(itemgetter(0), data_loader)) assert { (sentence.to_tokenized_string(), tuple(label.value for label in sentence.get_labels("relation"))) diff --git a/tests/test_hyperparameter.py b/tests/test_hyperparameter.py deleted file mode 100644 index 3fbdbfe273..0000000000 --- a/tests/test_hyperparameter.py +++ /dev/null @@ -1,80 +0,0 @@ -import shutil - -import pytest -from hyperopt import hp -from torch.optim import SGD - -import flair.datasets -from flair.embeddings import StackedEmbeddings, WordEmbeddings -from flair.hyperparameter import ( - Parameter, - SearchSpace, - SequenceTaggerParamSelector, - TextClassifierParamSelector, -) - -glove_embedding: WordEmbeddings = WordEmbeddings("glove") - - -@pytest.mark.integration -def test_sequence_tagger_param_selector(results_base_path, tasks_base_path): - corpus = flair.datasets.ColumnCorpus(data_folder=tasks_base_path / "fashion", column_format={0: "text", 3: "ner"}) - - # define search space - search_space = SearchSpace() - - # sequence tagger parameter - search_space.add( - Parameter.EMBEDDINGS, - hp.choice, - options=[StackedEmbeddings([glove_embedding])], - ) - search_space.add(Parameter.USE_CRF, hp.choice, options=[True, False]) - search_space.add(Parameter.DROPOUT, hp.uniform, low=0.25, high=0.75) - search_space.add(Parameter.WORD_DROPOUT, hp.uniform, low=0.0, high=0.25) - search_space.add(Parameter.LOCKED_DROPOUT, hp.uniform, low=0.0, high=0.5) - search_space.add(Parameter.HIDDEN_SIZE, hp.choice, options=[64, 128]) - search_space.add(Parameter.RNN_LAYERS, hp.choice, options=[1, 2]) - - # model trainer parameter - search_space.add(Parameter.OPTIMIZER, hp.choice, options=[SGD]) - - # training parameter - search_space.add(Parameter.MINI_BATCH_SIZE, hp.choice, options=[4, 8, 32]) - search_space.add(Parameter.LEARNING_RATE, hp.uniform, low=0.01, high=1) - search_space.add(Parameter.ANNEAL_FACTOR, hp.uniform, low=0.3, high=0.75) - search_space.add(Parameter.PATIENCE, hp.choice, options=[3, 5]) - search_space.add(Parameter.WEIGHT_DECAY, hp.uniform, low=0.01, high=1) - - # find best parameter settings - optimizer = SequenceTaggerParamSelector(corpus, "ner", results_base_path, max_epochs=2) - optimizer.optimize(search_space, max_evals=2) - - # clean up results directory - shutil.rmtree(results_base_path) - del optimizer, search_space - - -@pytest.mark.integration -def test_text_classifier_param_selector(results_base_path, tasks_base_path): - corpus = flair.datasets.ClassificationCorpus(tasks_base_path / "imdb", label_type="sentiment") - label_type = "sentiment" - - search_space = SearchSpace() - - # document embeddings parameter - search_space.add(Parameter.TRANSFORMER_MODEL, hp.choice, options=["sshleifer/tiny-distilbert-base-cased"]) - search_space.add(Parameter.LAYERS, hp.choice, options=["-1", "-2"]) - - # training parameter - search_space.add(Parameter.LEARNING_RATE, hp.uniform, low=0, high=1) - search_space.add(Parameter.MINI_BATCH_SIZE, hp.choice, options=[4, 8, 16, 32]) - search_space.add(Parameter.ANNEAL_FACTOR, hp.uniform, low=0, high=0.75) - search_space.add(Parameter.PATIENCE, hp.choice, options=[3, 5]) - - param_selector = TextClassifierParamSelector(corpus, label_type, False, results_base_path, max_epochs=2) - param_selector.optimize(search_space, max_evals=2) - - # clean up results directory - shutil.rmtree(results_base_path) - del param_selector, search_space diff --git a/tests/test_trainer.py b/tests/test_trainer.py index ba4a727322..6864e638b4 100644 --- a/tests/test_trainer.py +++ b/tests/test_trainer.py @@ -1,8 +1,8 @@ import pytest -from torch.optim import SGD, Adam +from torch.optim import Adam import flair -from flair.data import MultiCorpus, Sentence +from flair.data import Sentence from flair.datasets import ClassificationCorpus from flair.embeddings import DocumentPoolEmbeddings, FlairEmbeddings, WordEmbeddings from flair.models import SequenceTagger, TextClassifier @@ -37,12 +37,11 @@ def test_text_classifier_multi(results_base_path, tasks_base_path): assert train_log_file.exists() lines = train_log_file.read_text(encoding="utf-8").split("\n") expected_substrings = [ - "Device: ", + "compute on device: ", "Corpus: ", - "Parameters:", "- learning_rate: ", - "- patience: ", - "Embeddings storage mode:", + "patience", + "embedding storage:", "epoch 1 - iter", "EPOCH 1 done: loss", "Results:", @@ -126,60 +125,6 @@ def test_train_load_use_tagger_adam(results_base_path, tasks_base_path): del loaded_model -@pytest.mark.integration -def test_train_resume_tagger_with_additional_epochs(results_base_path, tasks_base_path): - flair.set_seed(1337) - corpus_1 = flair.datasets.ColumnCorpus(data_folder=tasks_base_path / "fashion", column_format={0: "text", 3: "ner"}) - corpus_2 = flair.datasets.NER_GERMAN_GERMEVAL(base_path=tasks_base_path).downsample(0.1) - - corpus = MultiCorpus([corpus_1, corpus_2]) - tag_dictionary = corpus.make_label_dictionary("ner", add_unk=False) - - model: SequenceTagger = SequenceTagger( - hidden_size=64, - embeddings=turian_embeddings, - tag_dictionary=tag_dictionary, - tag_type="ner", - use_crf=False, - ) - - # train model for 2 epochs - trainer = ModelTrainer(model, corpus) - trainer.train(results_base_path, max_epochs=1, shuffle=False, checkpoint=True) - - del model - - # load the checkpoint model and train until epoch 4 - checkpoint_model = SequenceTagger.load(results_base_path / "checkpoint.pt") - trainer.resume(model=checkpoint_model, additional_epochs=1) - - assert checkpoint_model.model_card["training_parameters"]["max_epochs"] == 2 - - # clean up results directory - del trainer - - -@pytest.mark.integration -def test_find_learning_rate(results_base_path, tasks_base_path): - corpus = flair.datasets.ColumnCorpus(data_folder=tasks_base_path / "fashion", column_format={0: "text", 3: "ner"}) - tag_dictionary = corpus.make_label_dictionary("ner", add_unk=False) - - tagger: SequenceTagger = SequenceTagger( - hidden_size=64, - embeddings=turian_embeddings, - tag_dictionary=tag_dictionary, - tag_type="ner", - use_crf=False, - ) - - # initialize trainer - trainer: ModelTrainer = ModelTrainer(tagger, corpus) - - trainer.find_learning_rate(results_base_path, optimizer=SGD, iterations=5) - - del trainer, tagger, tag_dictionary, corpus - - def test_missing_validation_split(results_base_path, tasks_base_path): corpus = flair.datasets.ColumnCorpus( data_folder=tasks_base_path / "fewshot_conll",