From 0a4a1d5257dace9561d95b38a9386539908dcd5e Mon Sep 17 00:00:00 2001
From: zhifu gao <zhifu.gzf@alibaba-inc.com>
Date: 星期二, 23 四月 2024 12:48:52 +0800
Subject: [PATCH] Dev gzf exp (#1645)

---
 funasr/datasets/audio_datasets/index_ds.py |  202 +++++++++++++++++++++++++++++++++++---------------
 1 files changed, 142 insertions(+), 60 deletions(-)

diff --git a/funasr/datasets/audio_datasets/index_ds.py b/funasr/datasets/audio_datasets/index_ds.py
index 53419e8..3270531 100644
--- a/funasr/datasets/audio_datasets/index_ds.py
+++ b/funasr/datasets/audio_datasets/index_ds.py
@@ -9,66 +9,66 @@
 from funasr.register import tables
 
 
-@tables.register("index_ds_classes", "IndexDSJsonlRankSplit")
-class IndexDSJsonlRankSplit(torch.utils.data.Dataset):
-    
-    def __init__(self, path):
-        super().__init__()
-        
-        contents = []
-        with open(path, encoding='utf-8') as fin:
-            for line in fin:
-                data = json.loads(line.strip())
-                if "text" in data:  # for sft
-                    self.contents.append(data['text'])
-                if "source" in data:  # for speech lab pretrain
-                    prompt = data["prompt"]
-                    source = data["source"]
-                    target = data["target"]
-                    source_len = data["source_len"]
-                    target_len = data["target_len"]
-
-                    contents.append({"source": source,
-                                     "prompt": prompt,
-                                     "target": target,
-                                     "source_len": source_len,
-                                     "target_len": target_len,
-                                     }
-                                    )
-        
-        self.contents = []
-        total_num = len(contents)
-        try:
-            rank = dist.get_rank()
-            world_size = dist.get_world_size()
-        except:
-            rank = 0
-            world_size = 1
-            logging.warning("distributed is not initialized, only single shard")
-        num_per_rank = total_num // world_size
-        
-        # rank = 0
-        # import ipdb; ipdb.set_trace()
-        self.contents = contents[rank * num_per_rank:(rank + 1) * num_per_rank]
-    
-        logging.info("in rank: {}, num of samplers: {}, total_num of samplers across ranks: {}".format(rank, len(self.contents), len(contents)))
-
-    def __len__(self):
-        return len(self.contents)
-    
-    def __getitem__(self, index):
-        try:
-            data = self.contents[index]
-        except:
-            print(index)
-        return data
-    
-    def get_source_len(self, data_dict):
-        return data_dict["source_len"]
-
-    def get_target_len(self, data_dict):
-        
-        return data_dict["target_len"] if "target_len" in data_dict else 0
+# @tables.register("index_ds_classes", "IndexDSJsonlRankSplit")
+# class IndexDSJsonlRankSplit(torch.utils.data.Dataset):
+#
+#     def __init__(self, path):
+#         super().__init__()
+#
+#         contents = []
+#         with open(path, encoding='utf-8') as fin:
+#             for line in fin:
+#                 data = json.loads(line.strip())
+#                 if "text" in data:  # for sft
+#                     self.contents.append(data['text'])
+#                 if "source" in data:  # for speech lab pretrain
+#                     prompt = data["prompt"]
+#                     source = data["source"]
+#                     target = data["target"]
+#                     source_len = data["source_len"]
+#                     target_len = data["target_len"]
+#
+#                     contents.append({"source": source,
+#                                      "prompt": prompt,
+#                                      "target": target,
+#                                      "source_len": source_len,
+#                                      "target_len": target_len,
+#                                      }
+#                                     )
+#
+#         self.contents = []
+#         total_num = len(contents)
+#         try:
+#             rank = dist.get_rank()
+#             world_size = dist.get_world_size()
+#         except:
+#             rank = 0
+#             world_size = 1
+#             logging.warning("distributed is not initialized, only single shard")
+#         num_per_rank = total_num // world_size
+#
+#         # rank = 0
+#         # import ipdb; ipdb.set_trace()
+#         self.contents = contents[rank * num_per_rank:(rank + 1) * num_per_rank]
+#
+#         logging.info("in rank: {}, num of samplers: {}, total_num of samplers across ranks: {}".format(rank, len(self.contents), len(contents)))
+#
+#     def __len__(self):
+#         return len(self.contents)
+#
+#     def __getitem__(self, index):
+#         try:
+#             data = self.contents[index]
+#         except:
+#             print(index)
+#         return data
+#
+#     def get_source_len(self, data_dict):
+#         return data_dict["source_len"]
+#
+#     def get_target_len(self, data_dict):
+#
+#         return data_dict["target_len"] if "target_len" in data_dict else 0
 
 @tables.register("index_ds_classes", "IndexDSJsonl")
 @tables.register("index_ds_classes", "IndexDSJsonlRankFull")
@@ -143,3 +143,85 @@
     def get_target_len(self, data_dict):
         
         return data_dict.get("target_len", 0)
+
+
+@tables.register("index_ds_classes", "IndexDSJsonlRankSplit")
+class IndexDSJsonlRankSplit(torch.utils.data.Dataset):
+    
+    def __init__(self, path: str, **kwargs):
+        super().__init__()
+        self.max_source_length = kwargs.get("max_source_length", 2048)
+        self.min_source_length = kwargs.get("min_source_length", 0)
+        self.max_target_length = kwargs.get("max_target_length", 2048)
+        self.min_target_length = kwargs.get("min_target_length", 0)
+
+        with open(path, encoding='utf-8') as fin:
+            file_list = fin.readlines()
+
+        total_num = len(file_list)
+        try:
+            rank = dist.get_rank()
+            world_size = dist.get_world_size()
+        except:
+            rank = 0
+            world_size = 1
+            logging.warning("distributed is not initialized, only single shard")
+        num_per_rank = total_num // world_size
+        if num_per_rank * world_size < total_num:
+            logging.warning(f"Warning, jsonl file:{total_num} could not be divided by world_size: {world_size}, {path}")
+
+        file_list_rank = file_list[rank * num_per_rank:(rank + 1) * num_per_rank]
+
+        contents = []
+        for file_json in file_list_rank:
+            
+            with open(file_json.strip(), encoding='utf-8') as fin:
+                for line in fin:
+                    data = json.loads(line.strip())
+                    if "text" in data:  # for sft
+                        contents.append(data['text'])
+                    if "source" in data:  # for speech lab pretrain
+                        prompt = data.get("prompt", "<ASR>")
+                        source = data["source"].replace("/cpfs01", "/cpfs_speech/data")
+                        target = data["target"]
+                        source_len = data.get("source_len", 1)
+                        target_len = data.get("target_len", 0)
+                        
+                        if source_len < self.min_source_length or source_len > self.max_source_length:
+                            continue
+                        if target_len < self.min_target_length or target_len > self.max_target_length:
+                            continue
+                        contents_i = {"source": source,
+                                      "prompt": prompt,
+                                      "target": target,
+                                      "source_len": source_len,
+                                      "target_len": target_len,
+                                      }
+                        text_language = data.get("text_language", None)
+                        if text_language is not None:
+                            contents_i["text_language"] = text_language
+                        audio_language = data.get("audio_language", None)
+                        if audio_language is not None:
+                            contents_i["audio_language"] = audio_language
+                        contents.append(contents_i)
+        
+        self.contents = contents
+        
+        logging.info(f"total_num: {len(self.contents)} of samplers in ranks: {rank}")
+    
+    def __len__(self):
+        return len(self.contents)
+    
+    def __getitem__(self, index):
+        try:
+            data = self.contents[index]
+        except:
+            print(index)
+        return data
+    
+    def get_source_len(self, data_dict):
+        return data_dict.get("source_len", 1)
+    
+    def get_target_len(self, data_dict):
+        
+        return data_dict.get("target_len", 0)

--
Gitblit v1.9.1