From 5023dd04224eddd4c9a047bd946695c3932743ae Mon Sep 17 00:00:00 2001
From: zhifu gao <zhifu.gzf@alibaba-inc.com>
Date: 星期五, 15 三月 2024 16:24:29 +0800
Subject: [PATCH] Dev gzf llm (#1503)

---
 funasr/bin/train_llm.py |  140 +++++++++++++++++++++++++---------------------
 1 files changed, 77 insertions(+), 63 deletions(-)

diff --git a/funasr/bin/train_llm.py b/funasr/bin/train_llm.py
index a33cd53..8742bf1 100644
--- a/funasr/bin/train_llm.py
+++ b/funasr/bin/train_llm.py
@@ -6,17 +6,22 @@
 import torch
 import hydra
 import logging
+import time
 import argparse
 from io import BytesIO
+
 import torch.distributed as dist
 from collections.abc import Sequence
 from omegaconf import DictConfig, OmegaConf
+from torch.cuda.amp import autocast, GradScaler
 from torch.nn.parallel import DistributedDataParallel as DDP
 from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
+from torch.distributed.fsdp.sharded_grad_scaler import ShardedGradScaler
+from funasr.train_utils.average_nbest_models import average_checkpoints
 
 from funasr.register import tables
 from funasr.optimizers import optim_classes
-from funasr.train_utils.trainer import Trainer
+from funasr.train_utils.trainer_llm import Trainer
 from funasr.schedulers import scheduler_classes
 from funasr.train_utils.initialize import initialize
 from funasr.download.download_from_hub import download_model
@@ -61,14 +66,9 @@
         dist.init_process_group(backend=kwargs.get("backend", "nccl"), init_method='env://')
         torch.cuda.set_device(local_rank)
         
-    device = kwargs.get("device", "cpu")
+    device = kwargs.get("device", "cuda")
     kwargs["device"] = "cpu"
     model = AutoModel(**kwargs)
-    kwargs["device"] = device
-    model = model.model
-    tokenizer = kwargs["tokenizer"]
-    frontend = kwargs["frontend"]
-    
     
     
     # save config.yaml
@@ -77,35 +77,14 @@
         yaml_file = os.path.join(kwargs.get("output_dir", "./"), "config.yaml")
         OmegaConf.save(config=kwargs, f=yaml_file)
         logging.info("config.yaml is saved to: %s", yaml_file)
-
-
     
-    
-
-    # init_param
-    init_param = kwargs.get("init_param", None)
-    if init_param is not None:
-        if not isinstance(init_param, (list, tuple)):
-            init_param = (init_param,)
-        logging.info("init_param is not None: %s", init_param)
-        for p in init_param:
-            if os.path.exists(p):
-                logging.info(f"Loading pretrained params from {p}")
-                load_pretrained_model(
-                    model=model,
-                    path=p,
-                    ignore_init_mismatch=kwargs.get("ignore_init_mismatch", True),
-                    oss_bucket=kwargs.get("oss_bucket", None),
-                    scope_map=kwargs.get("scope_map", []),
-                    excludes=kwargs.get("excludes", None),
-                )
-            else:
-                logging.info(f"Checkpoint does not exist, init randomly: {p}")
-    elif kwargs.get("init", None):
-        initialize(model, kwargs.get("init", "kaiming_normal"))
-    else:
-        print("No initialize method")
-
+    # parse kwargs
+    kwargs = model.kwargs
+    kwargs["device"] = device
+    tokenizer = kwargs["tokenizer"]
+    frontend = kwargs["frontend"]
+    model = model.model
+    del kwargs["model"]
 
     # freeze_param
     freeze_param = kwargs.get("freeze_param", None)
@@ -129,7 +108,8 @@
         model = FSDP(model).cuda(local_rank)
     else:
         model = model.to(device=kwargs.get("device", "cuda"))
-        
+
+    kwargs["device"] = next(model.parameters()).device
         
     # optim
     optim = kwargs.get("optim", "adam")
@@ -156,34 +136,68 @@
         batch_sampler_class = tables.batch_sampler_classes.get(batch_sampler)
         batch_sampler = batch_sampler_class(dataset_tr, **kwargs.get("dataset_conf"))
         batch_sampler_val = batch_sampler_class(dataset_val, is_training=False, **kwargs.get("dataset_conf"))
-    dataloader_tr = torch.utils.data.DataLoader(dataset_tr,
-                                                collate_fn=dataset_tr.collator,
-                                                batch_sampler=batch_sampler,
-                                                num_workers=kwargs.get("dataset_conf").get("num_workers", 4),
-                                                pin_memory=True)
+        
+    dataloader_tr = torch.utils.data.DataLoader(dataset_tr, collate_fn=dataset_tr.collator, **batch_sampler)
+    dataloader_val = torch.utils.data.DataLoader(dataset_val, collate_fn=dataset_val.collator, **batch_sampler_val)
+
+    trainer = Trainer(local_rank=local_rank,
+                      use_ddp=use_ddp,
+                      resume=kwargs.get("resume", True),
+                      device=kwargs["device"],
+                      **kwargs.get("train_conf"),
+                      )
+
+    scaler = GradScaler(enabled=trainer.use_fp16) if trainer.use_fp16 else None
+    scaler = ShardedGradScaler(enabled=trainer.use_fp16) if trainer.use_fsdp else scaler
+
+    trainer.resume_checkpoint(model=model, optim=optim, scheduler=scheduler, scaler=scaler)
+
+    tensorboard_dir = os.path.join(kwargs.get("output_dir"), "tensorboard")
+    os.makedirs(tensorboard_dir, exist_ok=True)
+    try:
+        from tensorboardX import SummaryWriter
+        writer = SummaryWriter(tensorboard_dir) if trainer.rank == 0 else None
+    except:
+        writer = None
     
-    dataloader_val = torch.utils.data.DataLoader(dataset_val,
-                                                collate_fn=dataset_val.collator,
-                                                batch_sampler=batch_sampler_val,
-                                                num_workers=kwargs.get("dataset_conf").get("num_workers", 4),
-                                                pin_memory=True)
-    trainer = Trainer(
-        model=model,
-        optim=optim,
-        scheduler=scheduler,
-        dataloader_train=dataloader_tr,
-        dataloader_val=dataloader_val,
-        local_rank=local_rank,
-        use_ddp=use_ddp,
-        use_fsdp=use_fsdp,
-        output_dir=kwargs.get("output_dir", "./exp"),
-        resume=kwargs.get("resume", True),
-        **kwargs.get("train_conf"),
-    )
-    trainer.run()
-    
-    if use_ddp or use_fsdp:
-        torch.distributed.destroy_process_group()
+    for epoch in range(trainer.start_epoch, trainer.max_epoch + 1):
+        time1 = time.perf_counter()
+        trainer.train_epoch(
+                            model=model,
+                            optim=optim,
+                            scheduler=scheduler,
+                            scaler=scaler,
+                            dataloader_train=dataloader_tr,
+                            dataloader_val=dataloader_val,
+                            epoch=epoch,
+                            writer=writer
+                            )
+
+        trainer.validate_epoch(
+            model=model,
+            dataloader_val=dataloader_val,
+            epoch=epoch,
+            writer=writer
+        )
+
+        trainer.save_checkpoint(epoch, model=model, optim=optim, scheduler=scheduler, scaler=scaler)
+
+        scheduler.step()
+
+        time2 = time.perf_counter()
+        time_escaped = (time2 - time1) / 3600.0
+        logging.info(
+            f"\nrank: {local_rank}, "
+            f"time_escaped_epoch: {time_escaped:.3f} hours, "
+            f"estimated to finish {trainer.max_epoch} "
+            f"epoch: {(trainer.max_epoch - epoch) * time_escaped:.3f} hours\n")
+
+
+    if trainer.rank == 0:
+        average_checkpoints(trainer.output_dir, trainer.avg_nbest_model)
+
+    trainer.close()
+
 
     
 

--
Gitblit v1.9.1