From 2175736ab0e2752837db102ffc27277339f19b5b Mon Sep 17 00:00:00 2001
From: zhifu gao <zhifu.gzf@alibaba-inc.com>
Date: 星期二, 11 六月 2024 14:02:18 +0800
Subject: [PATCH] Merge branch 'dev_gzf_deepspeed' into main

---
 funasr/datasets/audio_datasets/samplers.py |   27 ++++++++++++++++++++-------
 1 files changed, 20 insertions(+), 7 deletions(-)

diff --git a/funasr/datasets/audio_datasets/samplers.py b/funasr/datasets/audio_datasets/samplers.py
index 18f8f91..f7057de 100644
--- a/funasr/datasets/audio_datasets/samplers.py
+++ b/funasr/datasets/audio_datasets/samplers.py
@@ -334,6 +334,7 @@
         drop_last=False,
         is_training: bool = True,
         sort_size: int = 1024,
+        start_step: int = 0,
         **kwargs,
     ):
 
@@ -364,9 +365,14 @@
         self.sort_size = sort_size * num_replicas
         self.max_token_length = kwargs.get("max_token_length", 2048)
         self.length_scale_source = kwargs.get("length_scale_source", 1.0)
-        super().__init__(
-            dataset, num_replicas=num_replicas, rank=rank, shuffle=shuffle, drop_last=drop_last
-        )
+        self.batch_size_sample_max = kwargs.get("batch_size_sample_max", 200)
+        self.start_step = start_step
+        self.batch_num = 1
+        if self.start_step > 0:
+            logging.info(f"Warning, start_step > 0, dataloader start from step: {self.start_step}")
+        # super().__init__(
+        #     dataset, num_replicas=num_replicas, rank=rank, shuffle=shuffle, drop_last=drop_last
+        # )
 
     def __iter__(self):
         if self.shuffle:
@@ -386,19 +392,22 @@
             )
             batch = []
             max_len_in_batch = 0
+            count = 1
             for idx in buffer:
                 original_sample_length = self.dataset.get_source_len(idx)
                 if original_sample_length > self.max_token_length:
                     continue
                 sample_length = 1 if self.batch_type == "example" else original_sample_length
                 potential_batch_length = max(max_len_in_batch, sample_length) * (len(batch) + 1)
-                if potential_batch_length <= self.batch_size:
+                if potential_batch_length <= self.batch_size and count < self.batch_size_sample_max:
                     batch.append(idx)
                     max_len_in_batch = max(max_len_in_batch, sample_length)
+                    count += 1
                 else:
                     buffer_batches.append(batch)
                     batch = [idx]
                     max_len_in_batch = sample_length
+                    count = 1
             if batch:
                 buffer_batches.append(batch)
 
@@ -415,13 +424,17 @@
             rank_batches[i % self.num_replicas].append(batch)
 
         # Assign all batches for the current rank directly
-        final_batches = rank_batches[self.rank]
+        final_batches = rank_batches[self.rank][self.start_step :]
+        self.batch_num = len(final_batches)
 
+        logging.info(
+            f"rank: {self.rank}, dataloader start from step: {self.start_step}, batch_num: {len(rank_batches[self.rank])}, after: {self.batch_num}"
+        )
         return iter(final_batches)
 
     def __len__(self):
-
-        return 1
+        # Calculate the number of batches per epoch for the current rank
+        return self.batch_num
 
     def set_epoch(self, epoch):
         self.epoch = epoch

--
Gitblit v1.9.1