六、Accelerate + Deepspeed

avatar
作者
猴君
阅读量:1

帮up宣传一下,优质up值得信赖!

B站UP:你可是处女座啊
在这里插入图片描述

文章目录


理论知识

https://huggingface.co/docs/accelerate/usage_guides/deepspeed

DP&DDP

image.png

Deepspeed

介绍

image.png
image.png
image.png
image.png
image.png

注意事项

image.png

多机多卡

image.png


实战

ddp_accelerate.py

  • ddp_accelerate.py
import time import math import torch import pandas as pd  from torch.optim import Adam from accelerate import Accelerator from torch.utils.data import Dataset from torch.utils.data import DataLoader from torch.utils.data import random_split from peft import LoraConfig, get_peft_model from transformers import BertTokenizer, BertForSequenceClassification   class MyDataset(Dataset):      def __init__(self) -> None:         super().__init__()         self.data = pd.read_csv("./ChnSentiCorp_htl_all.csv")         self.data = self.data.dropna()      def __getitem__(self, index):         return self.data.iloc[index]["review"], self.data.iloc[index]["label"]          def __len__(self):         return len(self.data)   def prepare_dataloader():      dataset = MyDataset()      trainset, validset = random_split(dataset, lengths=[0.9, 0.1], generator=torch.Generator().manual_seed(42))      tokenizer = BertTokenizer.from_pretrained("rbt3")      def collate_func(batch):         texts, labels = [], []         for item in batch:             texts.append(item[0])             labels.append(item[1])         inputs = tokenizer(texts, max_length=128, padding="max_length", truncation=True, return_tensors="pt")         inputs["labels"] = torch.tensor(labels)         return inputs      trainloader = DataLoader(trainset, batch_size=32, collate_fn=collate_func, shuffle=True)     validloader = DataLoader(validset, batch_size=64, collate_fn=collate_func, shuffle=False)      return trainloader, validloader   def prepare_model_and_optimizer():      model = BertForSequenceClassification.from_pretrained("rbt3")      lora_config = LoraConfig(target_modules=["query", "key", "value"])      model = get_peft_model(model, lora_config)      model.print_trainable_parameters()      optimizer = Adam(model.parameters(), lr=2e-5, weight_decay=0.001)      return model, optimizer   def evaluate(model, validloader, accelerator: Accelerator):     model.eval()     acc_num = 0     with torch.no_grad():         for batch in validloader:             output = model(**batch)             pred = torch.argmax(output.logits, dim=-1)             pred, refs = accelerator.gather_for_metrics((pred, batch["labels"]))             acc_num += (pred.long() == refs.long()).float().sum()     return acc_num / len(validloader.dataset)   def train(model, optimizer, trainloader, validloader, accelerator: Accelerator, resume, epoch=3, log_step=10):     global_step = 0     start_time = time.time()      resume_step = 0     resume_epoch = 0      if resume is not None:         accelerator.load_state(resume)         steps_per_epoch = math.ceil(len(trainloader) / accelerator.gradient_accumulation_steps)         resume_step = global_step = int(resume.split("step_")[-1])         resume_epoch = resume_step // steps_per_epoch         resume_step -= resume_epoch * steps_per_epoch         accelerator.print(f"resume from checkpoint -> {resume}")      for ep in range(resume_epoch, epoch):         model.train()         if resume and ep == resume_epoch and resume_step != 0:             active_dataloader = accelerator.skip_first_batches(trainloader, resume_step * accelerator.gradient_accumulation_steps)         else:             active_dataloader = trainloader         for batch in active_dataloader:             with accelerator.accumulate(model):                 optimizer.zero_grad()                 output = model(**batch)                 loss = output.loss                 accelerator.backward(loss)                 optimizer.step()                  if accelerator.sync_gradients:                     global_step += 1                      if global_step % log_step == 0:                         loss = accelerator.reduce(loss, "mean")                         accelerator.print(f"ep: {ep}, global_step: {global_step}, loss: {loss.item()}")                         accelerator.log({"loss": loss.item()}, global_step)                      if global_step % 50 == 0 and global_step != 0:                         accelerator.print(f"save checkpoint -> step_{global_step}")                         accelerator.save_state(accelerator.project_dir + f"/step_{global_step}")                         accelerator.unwrap_model(model).save_pretrained(                             save_directory=accelerator.project_dir + f"/step_{global_step}/model",                             is_main_process=accelerator.is_main_process,                             state_dict=accelerator.get_state_dict(model),                             save_func=accelerator.save                         )         acc = evaluate(model, validloader, accelerator)         accelerator.print(f"ep: {ep}, acc: {acc}, time: {time.time() - start_time}")         accelerator.log({"acc": acc}, global_step)      accelerator.end_training()   def main():      accelerator = Accelerator(log_with="tensorboard", project_dir="ckpts")      accelerator.init_trackers("runs")      trainloader, validloader = prepare_dataloader()      model, optimizer = prepare_model_and_optimizer()      model, optimizer, trainloader, validloader = accelerator.prepare(model, optimizer, trainloader, validloader)      train(model, optimizer, trainloader, validloader, accelerator, resume=None)   if __name__ == "__main__":     main() 

原先显存

image.png

DDP 运行

accelerate launch ddp_accelerate.py 

image.png

Deepspeed 运行

pip install deepspeed 

方式一-zero2

image.png

accelerate config  

image.png
image.png
image.pngimage.png
image.png

~/.cache/huggingface/accelerate/default_config.yaml 
compute_environment: LOCAL_MACHINE debug: false deepspeed_config:   gradient_accumulation_steps: 1   offload_optimizer_device: none   offload_param_device: none   zero3_init_flag: false   zero_stage: 2 distributed_type: DEEPSPEED downcast_bf16: 'no' machine_rank: 0 main_training_function: main mixed_precision: bf16 num_machines: 1 num_processes: 2 rdzv_backend: static same_network: true tpu_env: [] tpu_use_cluster: false tpu_use_sudo: false use_cpu: false 
  • 运行
accelerate launch --config_file default_config.yaml ddp_accelerate.py 

image.png


方式二 -zero2

image.png
https://huggingface.co/docs/accelerate/usage_guides/deepspeed

  • default_config.yaml
compute_environment: LOCAL_MACHINE debug: false deepspeed_config:   # gradient_accumulation_steps: 1   # offload_optimizer_device: none   # offload_param_device: none   # zero3_init_flag: false   # zero_stage: 2   deepspeed_config_file: zero_stage2_config.json   zero3_init_flag: false distributed_type: DEEPSPEED downcast_bf16: 'no' machine_rank: 0 main_training_function: main # mixed_precision: bf16 #注释掉 num_machines: 1 num_processes: 2 rdzv_backend: static same_network: true tpu_env: [] tpu_use_cluster: false tpu_use_sudo: false use_cpu: false 
  • zero_stage2_config.json
{     "bf16": {         "enabled": true     },     "zero_optimization": {         "stage": 2,         "allgather_partitions": true,         "allgather_bucket_size": 2e8,         "overlap_comm": true,         "reduce_scatter": true,         "reduce_bucket_size": "auto",         "contiguous_gradients": true     },     "gradient_accumulation_steps": 1,     "gradient_clipping": "auto",     "steps_per_print": 2000,     "train_batch_size": "auto",     "train_micro_batch_size_per_gpu": "auto",     "wall_clock_breakdown": false } 
  • 运行
accelerate launch --config_file default_config.yaml ddp_accelerate.py 
  • mixed_precision: bf16 需要注释掉,否则报错

image.png
image.png


方式一 -zero3
  • zero3
  • default_config.yaml
compute_environment: LOCAL_MACHINE debug: false deepspeed_config:   gradient_accumulation_steps: 1   offload_optimizer_device: none   offload_param_device: none   zero3_init_flag: false   zero_stage: 3   zero3_save_16bit_model: true   # deepspeed_config_file: zero_stage2_config.json   # zero3_init_flag: false distributed_type: DEEPSPEED downcast_bf16: 'no' machine_rank: 0 main_training_function: main mixed_precision: bf16  num_machines: 1 num_processes: 2 rdzv_backend: static same_network: true tpu_env: [] tpu_use_cluster: false tpu_use_sudo: false use_cpu: false 
  • 运行
accelerate launch --config_file default_config.yaml ddp_accelerate.py 

image.png

  • 不加 zero3_save_16bit_model: true 训练正常,保存模型会报错

image.png

  • zero3_save_16bit_model: true 训练正常,保存模型正常,评估会报错
# 评估部分要使用 with torch.no_grad():,而不是with torch.inference_mode():  def evaluate(model, validloader, accelerator: Accelerator):     model.eval()     acc_num = 0     with torch.no_grad():         for batch in validloader:             output = model(**batch)             pred = torch.argmax(output.logits, dim=-1)             pred, refs = accelerator.gather_for_metrics((pred, batch["labels"]))             acc_num += (pred.long() == refs.long()).float().sum()     return acc_num / len(validloader.dataset) 

方式二 -zero3
  • default_config.yaml
compute_environment: LOCAL_MACHINE debug: false deepspeed_config:   # gradient_accumulation_steps: 1   # offload_optimizer_device: none   # offload_param_device: none   # zero3_init_flag: false   # zero_stage: 3   # zero3_save_16bit_model: true   deepspeed_config_file: zero_stage3_config.json   zero3_init_flag: false distributed_type: DEEPSPEED downcast_bf16: 'no' machine_rank: 0 main_training_function: main # mixed_precision: bf16 # 使用accelerate config 记得注释AMP num_machines: 1 num_processes: 2 rdzv_backend: static same_network: true tpu_env: [] tpu_use_cluster: false tpu_use_sudo: false use_cpu: false 
  • zero_stage3_config.json
{     "bf16": {         "enabled": true     },     "zero_optimization": {         "stage": 3,         "allgather_partitions": true,         "allgather_bucket_size": 2e8,         "overlap_comm": true,         "reduce_scatter": true,         "reduce_bucket_size": "auto",         "contiguous_gradients": true,         "stage3_gather_16bit_weights_on_model_save": true # 保存模型     },     "gradient_accumulation_steps": 1,     "gradient_clipping": "auto",     "steps_per_print": 2000,     "train_batch_size": "auto",     "train_micro_batch_size_per_gpu": "auto",     "wall_clock_breakdown": false } 
  • 运行
accelerate launch --config_file default_config.yaml ddp_accelerate.py 

image.png


ddp_trainer.py

# %% [markdown] # # 文本分类实例  # %% [markdown] # ## Step1 导入相关包  # %% from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments, BertTokenizer, BertForSequenceClassification from datasets import load_dataset  # %% [markdown] # ## Step2 加载数据集  # %% dataset = load_dataset("csv", data_files="./ChnSentiCorp_htl_all.csv", split="train") dataset = dataset.filter(lambda x: x["review"] is not None) dataset  # %% [markdown] # ## Step3 划分数据集  # %% datasets = dataset.train_test_split(test_size=0.1, seed=42) datasets  # %% [markdown] # ## Step4 数据集预处理  # %% import torch  tokenizer = BertTokenizer.from_pretrained("/gemini/code/model")  def process_function(examples):     tokenized_examples = tokenizer(examples["review"], max_length=128, truncation=True)     tokenized_examples["labels"] = examples["label"]     return tokenized_examples  tokenized_datasets = datasets.map(process_function, batched=True, remove_columns=datasets["train"].column_names) tokenized_datasets  # %% [markdown] # ## Step5 创建模型  # %% model = BertForSequenceClassification.from_pretrained("/gemini/code/model")  # %% model.config  # %% [markdown] # ## Step6 创建评估函数  # %% import evaluate  acc_metric = evaluate.load("./metric_accuracy.py") f1_metirc = evaluate.load("./metric_f1.py")  # %% def eval_metric(eval_predict):     predictions, labels = eval_predict     predictions = predictions.argmax(axis=-1)     acc = acc_metric.compute(predictions=predictions, references=labels)     f1 = f1_metirc.compute(predictions=predictions, references=labels)     acc.update(f1)     return acc  # %% [markdown] # ## Step7 创建TrainingArguments  # %% train_args = TrainingArguments(output_dir="./ckpts",      # 输出文件夹                                per_device_train_batch_size=32,  # 训练时的batch_size                                per_device_eval_batch_size=128,  # 验证时的batch_size                                logging_steps=10,                # log 打印的频率                                evaluation_strategy="epoch",     # 评估策略                                save_strategy="epoch",           # 保存策略                                save_total_limit=3,              # 最大保存数                                bf16=True,                                learning_rate=2e-5,              # 学习率                                weight_decay=0.01,               # weight_decay                                metric_for_best_model="f1",      # 设定评估指标                                load_best_model_at_end=True)     # 训练完成后加载最优模型  # %% [markdown] # ## Step8 创建Trainer  # %% from transformers import DataCollatorWithPadding trainer = Trainer(model=model,                    args=train_args,                    train_dataset=tokenized_datasets["train"],                    eval_dataset=tokenized_datasets["test"],                    data_collator=DataCollatorWithPadding(tokenizer=tokenizer),                   compute_metrics=eval_metric)  # %% [markdown] # ## Step9 模型训练  # %% trainer.train() 

zero2

accelerate launch --config_file default_config.yaml ddp_trainer.py 

image.png

zero3

accelerate launch --config_file default_config.yaml ddp_trainer.py 

image.png


注意事项

  • accelerate deepspeed 部分的 config 里面的参数(方式一)要和 hf trainer 中的参数保持一致,否则会报错

image.png

  • 使用 deepspeed config 文件(方式二)问题同上

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!