From e8fd84f5a4c8a7528e474f37b47d9fecde3534b0 Mon Sep 17 00:00:00 2001
From: 游雁 <zhifu.gzf@alibaba-inc.com>
Date: 星期三, 22 五月 2024 14:14:42 +0800
Subject: [PATCH] wenetspeech

---
 funasr/train_utils/trainer_ds.py |   47 ++++++++++++++++++++++++-----------------------
 1 files changed, 24 insertions(+), 23 deletions(-)

diff --git a/funasr/train_utils/trainer_ds.py b/funasr/train_utils/trainer_ds.py
index 78cfceb..1a553f8 100644
--- a/funasr/train_utils/trainer_ds.py
+++ b/funasr/train_utils/trainer_ds.py
@@ -15,6 +15,7 @@
 from funasr.train_utils.recursive_op import recursive_average
 from funasr.train_utils.average_nbest_models import average_checkpoints
 from torch.distributed.fsdp.sharded_grad_scaler import ShardedGradScaler
+import funasr.utils.misc as misc_utils
 
 try:
     import wandb
@@ -168,8 +169,7 @@
         """
         step_in_epoch = None if step is None else step_in_epoch
         if self.use_deepspeed:
-            with torch.no_grad():
-                model.save_checkpoint(save_dir=model_dir, tag=tag, client_state=info_dict)
+
             logging.info(f"Save checkpoint: {epoch}, rank: {self.local_rank}\n")
             # self.step_or_epoch += 1
             state = {
@@ -269,12 +269,12 @@
                     filename = os.path.join(self.output_dir, key)
                     logging.info(f"Delete: {filename}")
                     if os.path.exists(filename):
-                        os.remove(filename)
+                        # os.remove(filename)
+                        misc_utils.smart_remove(filename)
 
         elif self.use_fsdp:
             pass
-        step_in_epoch = None if step is None else step_in_epoch
-        if self.rank == 0:
+        elif self.rank == 0:
             logging.info(f"Save checkpoint: {epoch}, rank: {self.local_rank}\n")
             # self.step_or_epoch += 1
             state = {
@@ -362,7 +362,8 @@
                     filename = os.path.join(self.output_dir, key)
                     logging.info(f"Delete: {filename}")
                     if os.path.exists(filename):
-                        os.remove(filename)
+                        # os.remove(filename)
+                        misc_utils.smart_remove(filename)
 
         if self.use_ddp or self.use_fsdp:
             dist.barrier()
@@ -385,9 +386,9 @@
 
             if self.use_deepspeed:
                 ckpt = os.path.join(self.output_dir, "model.pt")
-                if os.path.isfile(ckpt):
-                    _, checkpoint = model_engine.load_checkpoint(self.output_dir, "model.pt")
-
+                if os.path.exists(ckpt):
+                    _, checkpoint = model.load_checkpoint(self.output_dir, "model.pt")
+                    self.start_epoch = checkpoint["epoch"]
                     self.saved_ckpts = checkpoint["saved_ckpts"]
                     self.val_acc_step_or_eoch = (
                         checkpoint["val_acc_step_or_eoch"]
@@ -574,12 +575,12 @@
             loss_dict["lr"] = scheduler.get_last_lr()[0]
             loss_dict["batch_num_epoch"] = len(dataloader_train)
 
-            self.val_loss_avg = (
-                self.val_loss_avg * batch_idx + loss_dict["loss"].detach().cpu().item()
+            self.train_loss_avg = (
+                self.train_loss_avg * batch_idx + loss_dict["loss"].detach().cpu().item()
             ) / (batch_idx + 1)
-            if "acc" in stats:
-                self.val_acc_avg = (
-                    self.val_acc_avg * batch_idx + loss_dict["stats"]["acc"].detach().cpu().item()
+            if "acc" in loss_dict["stats"]:
+                self.train_acc_avg = (
+                    self.train_acc_avg * batch_idx + loss_dict["stats"]["acc"].detach().cpu().item()
                 ) / (batch_idx + 1)
 
             self.log(loss_dict, tag="train")
@@ -612,12 +613,12 @@
             time_beg = time.perf_counter()
 
         if self.use_ddp or self.use_fsdp or self.use_deepspeed:
-            val_loss_avg = torch.tensor(self.val_loss_avg, dtype=torch.float32).to(self.device)
-            val_acc_avg = torch.tensor(self.val_acc_avg, dtype=torch.float32).to(self.device)
-            dist.all_reduce(val_loss_avg, op=dist.ReduceOp.SUM)
-            dist.all_reduce(val_acc_avg, op=dist.ReduceOp.SUM)
-            self.val_loss_avg = val_loss_avg.detach().cpu().item() / self.world_size
-            self.val_acc_avg = val_acc_avg.detach().cpu().item() / self.world_size
+            train_loss_avg = torch.tensor(self.train_loss_avg, dtype=torch.float32).to(self.device)
+            train_acc_avg = torch.tensor(self.train_acc_avg, dtype=torch.float32).to(self.device)
+            dist.all_reduce(train_loss_avg, op=dist.ReduceOp.SUM)
+            dist.all_reduce(train_acc_avg, op=dist.ReduceOp.SUM)
+            self.train_loss_avg = train_loss_avg.detach().cpu().item() / self.world_size
+            self.train_acc_avg = train_acc_avg.detach().cpu().item() / self.world_size
 
     def forward_step(self, model, batch, loss_dict={}):
         dtype = torch.bfloat16
@@ -711,8 +712,8 @@
                     "data_split_i": kwargs.get("data_split_i", 0),
                     "data_split_num": kwargs.get("data_split_num", 1),
                     "log_step": batch_idx + kwargs.get("start_step", 0),
-                    "batch_total": batch_idx,
-                    "step_in_epoch": step_in_epoch,
+                    "batch_total": batch_idx + 1,
+                    "step_in_epoch": batch_idx + 1,
                     "lr": 0.0,
                 }
 
@@ -740,7 +741,7 @@
                 self.val_loss_avg = (
                     self.val_loss_avg * batch_idx + loss_dict["loss"].detach().cpu().item()
                 ) / (batch_idx + 1)
-                if "acc" in stats:
+                if "acc" in loss_dict["stats"]:
                     self.val_acc_avg = (
                         self.val_acc_avg * batch_idx
                         + loss_dict["stats"]["acc"].detach().cpu().item()

--
Gitblit v1.9.1