From 4a99b828a834d1d3870abbe3ee477518470f3dd9 Mon Sep 17 00:00:00 2001
From: 游雁 <zhifu.gzf@alibaba-inc.com>
Date: 星期五, 07 六月 2024 01:58:14 +0800
Subject: [PATCH] auto frontend

---
 funasr/datasets/openai_datasets/datasets.py |   29 ++++++++++++++++++++++-------
 funasr/datasets/openai_datasets/index_ds.py |   19 +++++++++++++++----
 funasr/datasets/audio_datasets/samplers.py  |   12 +++++++++---
 3 files changed, 46 insertions(+), 14 deletions(-)

diff --git a/funasr/datasets/audio_datasets/samplers.py b/funasr/datasets/audio_datasets/samplers.py
index 18f8f91..7195add 100644
--- a/funasr/datasets/audio_datasets/samplers.py
+++ b/funasr/datasets/audio_datasets/samplers.py
@@ -364,6 +364,8 @@
         self.sort_size = sort_size * num_replicas
         self.max_token_length = kwargs.get("max_token_length", 2048)
         self.length_scale_source = kwargs.get("length_scale_source", 1.0)
+        self.start_step = kwargs.get("start_step", 2048)
+
         super().__init__(
             dataset, num_replicas=num_replicas, rank=rank, shuffle=shuffle, drop_last=drop_last
         )
@@ -415,13 +417,17 @@
             rank_batches[i % self.num_replicas].append(batch)
 
         # Assign all batches for the current rank directly
-        final_batches = rank_batches[self.rank]
+        final_batches = rank_batches[self.rank][self.start_step :]
+        self.batch_num = len(final_batches)
 
+        logging.info(
+            f"rank: {self.rank}, dataloader start from step: {self.start_step}, batch_num: {self.batch_num}"
+        )
         return iter(final_batches)
 
     def __len__(self):
-
-        return 1
+        # Calculate the number of batches per epoch for the current rank
+        return self.batch_num
 
     def set_epoch(self, epoch):
         self.epoch = epoch
diff --git a/funasr/datasets/openai_datasets/datasets.py b/funasr/datasets/openai_datasets/datasets.py
index 9bd0698..5813c3b 100644
--- a/funasr/datasets/openai_datasets/datasets.py
+++ b/funasr/datasets/openai_datasets/datasets.py
@@ -51,7 +51,7 @@
         self.batch_size = kwargs.get("batch_size")
         self.batch_type = kwargs.get("batch_type")
         self.prompt_ids_len = 0
-        self.retry = kwargs.get("retry", 5)
+        self.retry = kwargs.get("retry", 10)
 
         self.permute = False
         from funasr.frontends.whisper_frontend import WhisperFrontend
@@ -60,6 +60,8 @@
             self.permute = True
 
         self.pattern = re.compile(r"(<\|startofspeech\|>.*?<\|endofspeech\|>)")
+        # self.kwargs = kwargs
+        self.max_token_length = kwargs.get("max_token_length", 1024)
 
     def get_source_len(self, index):
         item = self.index_ds[index]
@@ -77,7 +79,9 @@
         # pdb.set_trace()
 
         output = None
+
         for idx in range(self.retry):
+            badcase_flag = False
             if idx == 0:
                 index_cur = index
             else:
@@ -112,9 +116,14 @@
                             "<|endofspeech|>", ""
                         )
                         if sub_str.startswith("!"):
-
-                            data_src = load_audio_text_image_video(sub_str[1:], fs=self.fs)
-
+                            try:
+                                data_src = load_audio_text_image_video(sub_str[1:], fs=self.fs)
+                            except Exception as e:
+                                logging.error(
+                                    f"Loading wav failed! {str(e)}, {traceback.format_exc()}"
+                                )
+                                badcase_flag = True
+                                continue
                             speech, speech_lengths = extract_fbank(
                                 data_src,
                                 data_type=self.data_type,
@@ -134,6 +143,8 @@
                             source_ids += sub_token
                             fbank_mask_i += [1] * len(sub_token)
 
+                if badcase_flag:
+                    continue
                 source_mask = [-100] * len(source_ids)
                 target_out = f"{target_out}<|im_end|>"
                 target_ids = self.tokenizer.encode(target_out)
@@ -142,6 +153,10 @@
                 fbank_mask += fbank_mask_i
                 fbank_beg.append(fbank_beg_i)
 
+            if len(input_ids) > self.max_token_length:
+                badcase_flag = True
+            if badcase_flag:
+                continue
             input_ids = torch.tensor(input_ids, dtype=torch.int64)
             attention_mask = torch.tensor([len(input_ids)], dtype=torch.int32)
             labels = torch.tensor(labels, dtype=torch.int64)
@@ -186,9 +201,9 @@
                     data_list, batch_first=True, padding_value=pad_value
                 )
 
-        if self.batch_type != "example":
-            for i in range(10):
-                outputs = self._filter_badcase(outputs, i=i)
+        # if self.batch_type != "example":
+        #     for i in range(10):
+        #         outputs = self._filter_badcase(outputs, i=i)
 
         return outputs
 
diff --git a/funasr/datasets/openai_datasets/index_ds.py b/funasr/datasets/openai_datasets/index_ds.py
index 1c48cd2..cc518f8 100644
--- a/funasr/datasets/openai_datasets/index_ds.py
+++ b/funasr/datasets/openai_datasets/index_ds.py
@@ -48,7 +48,10 @@
         for file_json in file_list:
             with open(file_json.strip(), encoding="utf-8") as fin:
                 for line in fin:
-                    data = json.loads(line.strip())["messages"]
+                    data_dict = json.loads(line.strip())
+                    data = data_dict["messages"]
+                    speech_length = data_dict.get("speech_length", -1) // 8
+                    text_length = data_dict.get("text_length", 0)
 
                     system, user, assistant = [], [], []
                     for i, item in enumerate(data):
@@ -63,7 +66,12 @@
 
                     system = system * len(user)
 
-                    contents_i = {"system": system, "user": user, "assistant": assistant}
+                    contents_i = {
+                        "system": system,
+                        "user": user,
+                        "assistant": assistant,
+                        "source_len": speech_length + text_length,
+                    }
                     contents.append(contents_i)
 
         self.contents = contents
@@ -80,11 +88,14 @@
         return data
 
     def get_source_len(self, data_dict):
-        return len(data_dict["system"]) + len(data_dict["user"])
+        source_len = data_dict.get("source_len", -1)
+        if source_len < 0:
+            source_len = len(data_dict["system"]) + len(data_dict["user"])
+        return source_len
 
     def get_target_len(self, data_dict):
 
-        return len(data_dict["assistant"])
+        return 0
 
 
 if __name__ == "__main__":

--
Gitblit v1.9.1