Source code for tlt.models.text_generation.pytorch_hf_text_generation_model

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2023 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0
#

import math
import os
import time
import torch
from requests.adapters import ProxyError

# Hugging Face imports
from peft import LoraConfig, TaskType, get_peft_model, PeftModelForCausalLM
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    DataCollatorForSeq2Seq,
    TrainingArguments,
    GenerationConfig,
    Trainer
)

from downloader.models import ModelDownloader
from tlt import TLT_BASE_DIR
from tlt.utils.file_utils import read_json_file, validate_model_name, verify_directory
from tlt.utils.platform_util import PlatformUtil
from tlt.utils.types import FrameworkType, UseCaseType
from tlt.models.hf_model import HFModel
from tlt.models.pytorch_model import PyTorchModel
from tlt.models.text_generation.text_generation_model import TextGenerationModel
from tlt.datasets.text_generation.text_generation_dataset import TextGenerationDataset
from tlt.datasets.text_generation.hf_custom_text_generation_dataset import HFCustomTextGenerationDataset


MODEL_CONFIG_DIR = os.path.join(TLT_BASE_DIR, "models/configs")


[docs]class PyTorchHFTextGenerationModel(TextGenerationModel, HFModel, PyTorchModel): """ Class to represent a PyTorch Hugging Face pretrained model that can be used for text generation fine tuning. """
[docs] def __init__(self, model_name: str, model=None, **kwargs): hf_model_map = read_json_file(os.path.join( TLT_BASE_DIR, "models/configs/pytorch_hf_text_generation_models.json")) # extra properties that will become configurable in the future self._model_name = model_name self._generate_checkpoints = True self._device = 'cpu' self._tokenizer = None self._enable_auto_mixed_precision = False TextGenerationModel.__init__(self, model_name, FrameworkType.PYTORCH, UseCaseType.TEXT_GENERATION) HFModel.__init__(self, model_name, FrameworkType.PYTORCH, UseCaseType.TEXT_GENERATION) PyTorchModel.__init__(self, model_name, framework=FrameworkType.PYTORCH, use_case=UseCaseType.TEXT_GENERATION) # Store the dataset type that this model type can use for Intel Neural Compressor self._inc_compatible_dataset = (HFCustomTextGenerationDataset) # model definition self.hub_name = hf_model_map[model_name]["hub_name"] self._model = None self._trainer = None self._history = None if model and isinstance(model, str): self.load_from_directory(model)
def _get_hub_model(self, model_name, force_download=False): downloader = ModelDownloader(model_name, model_dir=None, hub='hugging_face', hf_model_class='AutoModelForCausalLM', force_download=force_download) try: model = downloader.download() except ProxyError: print('Max retries reached. Sleeping for 10 sec...') time.sleep(10) model = downloader.download() return model
[docs] def train( self, dataset, output_dir: str, epochs: int = 1, initial_checkpoints=None, temperature=1.0, lora_rank=8, lora_alpha=32, lora_dropout=0.05, max_train_samples=None, do_eval: bool = True, device: str = "cpu", ipex_optimize: bool = True, use_trainer: bool = True, force_download: bool = False, enable_auto_mixed_precision: bool = None, **kwargs ): """ Trains the model using the specified text generation dataset. Args: dataset (TextGenerationDataset): The dataset to use for training. If a train subset has been defined, that subset will be used to fit the model. Otherwise, the entire non-partitioned dataset will be used. output_dir (str): A writeable output directory to write checkpoint files during training epochs (int): The number of training epochs [default: 1] initial_checkpoints (str): Path to checkpoint weights to load. If the path provided is a directory, the latest checkpoint will be used. temperature (float): The value used to modulate the next token probabilities [default: 1.0] lora_rank (int): LoRA rank parameter [default: 8] lora_alpha (int): LoRA alpha parameter [default: 32] lora_dropout (float): LoRA dropout parameter [default: 0.05] max_train_samples (int or None): Use this to truncate the training set to a maximum number of samples for quick testing [default: None] do_eval (bool): If do_eval is True and the dataset has a validation subset, the model will be evaluated at the end of each epoch. If the dataset does not have a validation split, the test subset will be used. device (str): Device to train the model. Defaults to "cpu" ipex_optimize (bool): Optimize the model using Intel® Extension for PyTorch. Defaults to True use_trainer (bool): Placeholder argument, model training is done using the Hugging Face Trainer and a native PyTorch training loop is not yet implemented. force_download (bool): Downloads the model with default parameters. Defaults to False. enable_auto_mixed_precision (bool or None): Enable auto mixed precision for training. Mixed precision uses both 16-bit and 32-bit floating point types to make training run faster and use less memory. It is recommended to enable auto mixed precision training when running on platforms that support bfloat16 (Intel third or fourth generation Xeon processors). If it is enabled on a platform that does not support bfloat16, it can be detrimental to the training performance. If enable_auto_mixed_precision is set to None, auto mixed precision will be automatically enabled when running with Intel fourth generation Xeon processors, and disabled for other platforms. Defaults to None. Returns: Hugging Face TrainOutput object Raises: TypeError: if the dataset specified is not a TextGenerationDataset ValueError: if the given dataset has not been preprocessed yet """ self._check_train_inputs(output_dir, dataset, TextGenerationDataset, None, epochs, False, None, enable_auto_mixed_precision) if enable_auto_mixed_precision is None: try: # Only automatically enable auto mixed precision for SPR enable_auto_mixed_precision = PlatformUtil().cpu_type == 'SPR' except Exception as e: enable_auto_mixed_precision = False print("Unable to determine the CPU type: {}. Mixed precision training will be disabled.".format(str(e))) self._enable_auto_mixed_precision = enable_auto_mixed_precision if not self._model: self._model = self._get_hub_model(model_name=self.hub_name, force_download=force_download) self._model.train() self._device = device self.train_data_loader = None self.validation_data_loader = None # Get the eval_dataset eval_dataset = None try: eval_dataset = dataset.validation_subset except ValueError: try: eval_dataset = dataset.test_subset except ValueError: if do_eval: print("Warning: The dataset provided does not have a validation or test subset.") # Truncate the train dataset if desired train_dataset = dataset.train_subset if max_train_samples is not None: print("Truncating training dataset to size {}".format(max_train_samples)) train_dataset = train_dataset.select(range(max_train_samples)) # Initialize tokenizer if self._tokenizer is None: self._tokenizer = dataset._tokenizer self._tokenizer.pad_token_id = (0) self._tokenizer.padding_side = "left" print('Using Low-Rank Adaptation (LoRA) for {}'.format(self.model_name)) # PEFT settings peft_config = LoraConfig( r=lora_rank, lora_alpha=lora_alpha, lora_dropout=lora_dropout, bias="none", task_type=TaskType.CAUSAL_LM, ) self._model = get_peft_model(self._model, peft_config) self._model.print_trainable_parameters() self._model.train() if use_trainer: # Randomly mask the tokens data_collator = DataCollatorForSeq2Seq(self._tokenizer, pad_to_multiple_of=8, return_tensors="pt", padding=True) training_args = TrainingArguments( output_dir=output_dir, num_train_epochs=epochs, do_eval=do_eval, do_train=True, no_cuda=True, overwrite_output_dir=True, per_device_train_batch_size=dataset.info['preprocessing_info']['batch_size'], per_device_eval_batch_size=dataset.info['preprocessing_info']['batch_size'], use_ipex=ipex_optimize, bf16=enable_auto_mixed_precision ) # Initialize our Trainer self._trainer = Trainer( model=self._model, args=training_args, train_dataset=train_dataset, eval_dataset=eval_dataset, tokenizer=self._tokenizer, data_collator=data_collator ) self._history = self._trainer.train(resume_from_checkpoint=initial_checkpoints) else: raise ValueError("Training without the Hugging Face Trainer is not implemented yet") return self._history
def evaluate(self, dataset=None, enable_auto_mixed_precision=None): """ Evaluates the model on the 'eval_dataset' given in the Trainer arguments Args: dataset (TextGenerationDataset): The dataset to use for evaluation. If Hugging Face Trainer object was used to train the model, it evaluates on the 'eval_dataset' given in the Trainer arguments enable_auto_mixed_precision (bool or None): Enable auto mixed precision for evaluation. Mixed precision uses both 16-bit and 32-bit floating point types to make evaluation run faster and use less memory. It is recommended to enable auto mixed precision when running on platforms that support bfloat16 (Intel third or fourth generation Xeon processors). If it is enabled on a platform that does not support bfloat16, it can be detrimental to the evaluation performance. If enable_auto_mixed_precision is set to None, auto mixed precision will be automatically enabled when running with Intel fourth generation Xeon processors, and disabled for other platforms. Defaults to None. Returns: Perplexity metric Raises: RuntimeError: if the model has not been trained yet and does not have an associated Trainer """ if enable_auto_mixed_precision is None: try: # Only automatically enable auto mixed precision for SPR enable_auto_mixed_precision = PlatformUtil().cpu_type == 'SPR' except Exception as e: enable_auto_mixed_precision = False print("Unable to determine the CPU type: {}.\n" "Mixed precision will be disabled for evaluation.".format(str(e))) self._enable_auto_mixed_precision = enable_auto_mixed_precision self._model.eval() if self._trainer: eval_results = self._trainer.evaluate() else: if not isinstance(dataset, TextGenerationDataset): raise ValueError("Expected a dataset of type TextGenerationDataset and got {}".format(type(dataset))) train_dataset = dataset.train_subset eval_dataset = dataset.validation_subset batch_size = dataset.info['preprocessing_info']['batch_size'] tokenizer = dataset._tokenizer tokenizer.pad_token_id = (0) tokenizer.padding_side = "left" data_collator = DataCollatorForSeq2Seq(tokenizer, pad_to_multiple_of=8, return_tensors="pt", padding=True) training_args = TrainingArguments( output_dir='/tmp/output', do_eval=True, do_train=False, no_cuda=True, per_device_train_batch_size=batch_size, per_device_eval_batch_size=batch_size, bf16=enable_auto_mixed_precision ) # Initialize Trainer trainer = Trainer( model=self._model, args=training_args, train_dataset=train_dataset, eval_dataset=eval_dataset, tokenizer=tokenizer, data_collator=data_collator ) eval_results = trainer.evaluate() print(f"Perplexity: {math.exp(eval_results['eval_loss']):.2f}") return eval_results def generate(self, input_samples, temperature=1.0, top_p=0.75, top_k=40, repetition_penalty=1.0, num_beams=4, max_new_tokens=128, decode=True, enable_auto_mixed_precision=None): """ Generates text completions for the specified input samples. Args: input_samples (dict, encoded dict): Input sample to use to generate text completion. temperature (float): The value used to modulate the next token probabilities [default: 1.0] top_p (float): If set to float < 1, only the smallest set of most probable tokens with probabilities that add up to top_p or higher are kept for generation [default: 0.75] top_k (int): The number of highest probability vocabulary tokens to keep for top-k-filtering [default: 40] repetition_penalty (float): The parameter for repetition penalty. 1.0 means no penalty. [default: 1.0] num_beams (int): Number of beams for beam search. 1 means no beam search. [default: 4] max_new_tokens (int): The maximum number of new tokens generated [default: 128] decode (bool): Set to False if the tokenized output is desired, otherwise if True, the decoded response will be returned [default: True] enable_auto_mixed_precision (bool or None): Enable auto mixed precision for evaluation. Mixed precision uses both 16-bit and 32-bit floating point types to make evaluation run faster and use less memory. It is recommended to enable auto mixed precision when running on platforms that support bfloat16 (Intel third or fourth generation Xeon processors). If it is enabled on a platform that does not support bfloat16, it can be detrimental to the evaluation performance. If enable_auto_mixed_precision is set to None, auto mixed precision will be automatically enabled when running with Intel fourth generation Xeon processors, and disabled for other platforms. Defaults to None. Returns: List of strings Raises: NotImplementedError: if the given input_samples is of type DataLoader """ if enable_auto_mixed_precision is None: try: # Only automatically enable auto mixed precision for SPR enable_auto_mixed_precision = PlatformUtil().cpu_type == 'SPR' except Exception as e: enable_auto_mixed_precision = False print("Unable to determine the CPU type: {}.\n" "Mixed precision will be disabled for generation.".format(str(e))) self._enable_auto_mixed_precision = enable_auto_mixed_precision if self._model is None: print("The model has not been fine-tuned yet, so generation is being done using the original model") self._model = self._get_hub_model(model_name=self.hub_name) self._model.eval() if self._tokenizer is None: try: self._tokenizer = AutoTokenizer.from_pretrained(self.hub_name) except ProxyError: print("Max retries reached. Sleeping for 10 sec...") time.sleep(10) self._tokenizer = AutoTokenizer.from_pretrained(self.hub_name) self._tokenizer.pad_token_id = (0) self._tokenizer.padding_side = "left" # If 'input_samples' is a single text string or a list of text strings if isinstance(input_samples, str) or isinstance(input_samples, list): encoded_input = self._tokenizer(input_samples, padding=True, return_tensors='pt') # If 'input_samples' is an encoded input dict elif isinstance(input_samples, dict) and 'input_ids' in input_samples.keys(): # Requires at least the keys below required_keys = ['input_ids', 'attention_mask', 'labels'] encoded_input = {k: v for k, v in input_samples.items() if k in required_keys} # If 'input_samples' is a single unencoded dict elif isinstance(input_samples, dict): encoded_input = self._tokenizer(input_samples, padding=True, return_tensors='pt') # if 'input_samples' is any other kind of object else: raise NotImplementedError("Generation using a List, Dataset, or Dataloader hasn't been implemented yet. " "Use an unencoded or encoded dictionary.") generation_config = GenerationConfig( temperature=temperature, top_p=top_p, top_k=top_k, repetition_penalty=repetition_penalty, num_beams=num_beams ) if self._enable_auto_mixed_precision: with torch.no_grad(): with torch.cpu.amp.autocast(dtype=torch.bfloat16): output = self._model.generate(input_ids=encoded_input['input_ids'], generation_config=generation_config, max_new_tokens=max_new_tokens) else: with torch.no_grad(): output = self._model.generate(input_ids=encoded_input['input_ids'], generation_config=generation_config, max_new_tokens=max_new_tokens) if not decode: return output else: return self._tokenizer.batch_decode(output) def export(self, output_dir: str): """ Saves the model and tokenizer to the given output_dir directory. Args: output_dir (str): Path to save the model. """ if self._model: verify_directory(output_dir) valid_model_name = validate_model_name(self.model_name) saved_model_dir = os.path.join(output_dir, valid_model_name) if os.path.exists(saved_model_dir) and len(os.listdir(saved_model_dir)): saved_model_dir = os.path.join(saved_model_dir, "{}".format(len(os.listdir(saved_model_dir)) + 1)) else: saved_model_dir = os.path.join(saved_model_dir, "1") verify_directory(saved_model_dir) self._trainer.save_model(saved_model_dir) print("Saved model directory:", saved_model_dir) return saved_model_dir else: raise ValueError("Unable to export the model, because it hasn't been trained yet") def load_from_directory(self, model_dir: str): """ Loads a saved pytorch model from the given model_dir directory. Requires a 'config.json' and 'pytorch_model.bin' file corresponding to a transformers model in the model_dir. Args: model_dir(str): Path to the transformers model directory """ verify_directory(model_dir, require_directory_exists=True) try: model = AutoModelForCausalLM.from_pretrained(self.hub_name) self._model = PeftModelForCausalLM.from_pretrained(model, model_dir) except Exception: raise ValueError("Unable to load model from {}".format(model_dir))