From 1c8b46a233ac4a782d7170e20533f536761e25c4 Mon Sep 17 00:00:00 2001
From: 游雁 <zhifu.gzf@alibaba-inc.com>
Date: 星期日, 09 六月 2024 00:21:44 +0800
Subject: [PATCH] fix bug

---
 funasr/bin/train_ds.py                     |   10 ++++++++++
 funasr/train_utils/trainer_ds.py           |    6 ++++++
 funasr/datasets/audio_datasets/samplers.py |   17 ++++++++++-------
 funasr/datasets/dataloader_entry.py        |   21 ++++++++++++---------
 4 files changed, 38 insertions(+), 16 deletions(-)

diff --git a/funasr/bin/train_ds.py b/funasr/bin/train_ds.py
index a4ae11b..21d747a 100644
--- a/funasr/bin/train_ds.py
+++ b/funasr/bin/train_ds.py
@@ -158,6 +158,8 @@
         time1 = time.perf_counter()
 
         for data_split_i in range(trainer.start_data_split_i, dataloader.data_split_num):
+            time_slice_i = time.perf_counter()
+
             dataloader_tr, dataloader_val = dataloader.build_iter(
                 epoch, data_split_i=data_split_i, start_step=trainer.start_step
             )
@@ -178,6 +180,14 @@
 
             torch.cuda.empty_cache()
 
+            time_escaped = (time.perf_counter() - time1) / 3600.0
+            logging.info(
+                f"rank: {local_rank}, "
+                f"time_escaped_epoch: {time_escaped:.3f} hours, "
+                f"estimated to finish {dataloader.data_split_num} data_slices, remaining: {(dataloader.data_split_num-data_split_i)*time_escaped:.3f} hours"
+                f"epoch: {((trainer.max_epoch - epoch - 1)*dataloader.data_split_num + dataloader.data_split_num-data_split_i)*time_escaped:.3f} hours\n"
+            )
+
         trainer.start_data_split_i = 0
         trainer.validate_epoch(model=model, dataloader_val=dataloader_val, epoch=epoch + 1)
         scheduler.step()
diff --git a/funasr/datasets/audio_datasets/samplers.py b/funasr/datasets/audio_datasets/samplers.py
index 2e271af..94e9209 100644
--- a/funasr/datasets/audio_datasets/samplers.py
+++ b/funasr/datasets/audio_datasets/samplers.py
@@ -334,6 +334,7 @@
         drop_last=False,
         is_training: bool = True,
         sort_size: int = 1024,
+        start_step: int = 0,
         **kwargs,
     ):
 
@@ -364,12 +365,14 @@
         self.sort_size = sort_size * num_replicas
         self.max_token_length = kwargs.get("max_token_length", 2048)
         self.length_scale_source = kwargs.get("length_scale_source", 1.0)
-        self.start_step = kwargs.get("start_step", 2048)
         self.batch_size_sample_max = kwargs.get("batch_size_sample_max", 200)
-
-        super().__init__(
-            dataset, num_replicas=num_replicas, rank=rank, shuffle=shuffle, drop_last=drop_last
-        )
+        self.start_step = start_step
+        self.batch_num = 1
+        if self.start_step > 0:
+            logging.info(f"Warning, start_step > 0, dataloader start from step: {self.start_step}")
+        # super().__init__(
+        #     dataset, num_replicas=num_replicas, rank=rank, shuffle=shuffle, drop_last=drop_last
+        # )
 
     def __iter__(self):
         if self.shuffle:
@@ -424,11 +427,11 @@
             rank_batches[i % self.num_replicas].append(batch)
 
         # Assign all batches for the current rank directly
-        final_batches = rank_batches[self.rank]  # [self.start_step :]
+        final_batches = rank_batches[self.rank][self.start_step :]
         self.batch_num = len(final_batches)
 
         logging.info(
-            f"rank: {self.rank}, dataloader start from step: {self.start_step}, batch_num: {self.batch_num}"
+            f"rank: {self.rank}, dataloader start from step: {self.start_step}, batch_num: {rank_batches[self.rank]}, after: {self.batch_num}"
         )
         return iter(final_batches)
 
diff --git a/funasr/datasets/dataloader_entry.py b/funasr/datasets/dataloader_entry.py
index 925b1d3..055e4c8 100644
--- a/funasr/datasets/dataloader_entry.py
+++ b/funasr/datasets/dataloader_entry.py
@@ -49,14 +49,19 @@
     def __init__(self, frontend=None, tokenizer=None, **kwargs):
         # dataset
         logging.info("Build dataloader")
+
         dataset_class = tables.dataset_classes.get(kwargs.get("dataset", "AudioDataset"))
-        dataset_tr = dataset_class(
-            kwargs.get("train_data_set_list"),
-            frontend=frontend,
-            tokenizer=tokenizer,
-            is_training=True,
-            **kwargs.get("dataset_conf"),
-        )
+        dataset_tr = None
+        # split dataset
+        self.data_split_num = kwargs["dataset_conf"].get("data_split_num", 1)
+        if self.data_split_num == 1:
+            dataset_tr = dataset_class(
+                kwargs.get("train_data_set_list"),
+                frontend=frontend,
+                tokenizer=tokenizer,
+                is_training=True,
+                **kwargs.get("dataset_conf"),
+            )
         dataset_val = dataset_class(
             kwargs.get("valid_data_set_list"),
             frontend=frontend,
@@ -69,8 +74,6 @@
         self.dataset_val = dataset_val
         self.kwargs = kwargs
 
-        # split dataset
-        self.data_split_num = kwargs["dataset_conf"].get("data_split_num", 1)
         self.dataset_class = dataset_class
         self.frontend = frontend
         self.tokenizer = tokenizer
diff --git a/funasr/train_utils/trainer_ds.py b/funasr/train_utils/trainer_ds.py
index ec887cc..ec76531 100644
--- a/funasr/train_utils/trainer_ds.py
+++ b/funasr/train_utils/trainer_ds.py
@@ -167,6 +167,8 @@
         Args:
             epoch (int): The epoch number at which the checkpoint is being saved.
         """
+        if self.use_ddp or self.use_fsdp:
+            dist.barrier()
         step_in_epoch = None if step is None else step_in_epoch
         if self.use_deepspeed:
 
@@ -760,6 +762,10 @@
             ckpt_name = f'model.pt.ep{epoch}.{kwargs.get("step_in_epoch")}'
         self.val_acc_step_or_eoch[ckpt_name] = self.val_acc_avg
         self.val_loss_step_or_eoch[ckpt_name] = self.val_loss_avg
+
+        if self.use_ddp or self.use_fsdp or self.use_deepspeed:
+            dist.barrier()
+
         model.train()
 
     def log(

--
Gitblit v1.9.1