From 2ac38adbe5f4e1374a079e032ed4b504351a207c Mon Sep 17 00:00:00 2001
From: zhifu gao <zhifu.gzf@alibaba-inc.com>
Date: 星期二, 23 四月 2024 18:08:57 +0800
Subject: [PATCH] Dev gzf exp (#1647)

---
 funasr/models/sense_voice/decoder.py       |   24 ---
 funasr/train_utils/trainer.py              |    7 +
 funasr/bin/train.py                        |   29 ++--
 funasr/datasets/audio_datasets/index_ds.py |  246 +++++++++++++++++++++++++----------------
 funasr/datasets/audio_datasets/samplers.py |    7 +
 funasr/datasets/dataloader_entry.py        |   16 ++
 6 files changed, 194 insertions(+), 135 deletions(-)

diff --git a/funasr/bin/train.py b/funasr/bin/train.py
index ab49c82..c02a66f 100644
--- a/funasr/bin/train.py
+++ b/funasr/bin/train.py
@@ -176,15 +176,12 @@
     except:
         writer = None
 
-    # if use_ddp or use_fsdp:
-    #     context = Join([model])
-    # else:
-    #     context = nullcontext()
-    context = nullcontext()
+
     for epoch in range(trainer.start_epoch, trainer.max_epoch + 1):
         time1 = time.perf_counter()
-        with context:
-            dataloader_tr, dataloader_val = dataloader.build_iter(epoch)
+        
+        for data_split_i in range(dataloader.data_split_num):
+            dataloader_tr, dataloader_val = dataloader.build_iter(epoch, data_split_i=data_split_i)
             trainer.train_epoch(
                                 model=model,
                                 optim=optim,
@@ -193,15 +190,17 @@
                                 dataloader_train=dataloader_tr,
                                 dataloader_val=dataloader_val,
                                 epoch=epoch,
-                                writer=writer
+                                writer=writer,
+                                data_split_i=data_split_i,
+                                data_split_num=dataloader.data_split_num,
                                 )
-        with context:
-            trainer.validate_epoch(
-                model=model,
-                dataloader_val=dataloader_val,
-                epoch=epoch,
-                writer=writer
-            )
+        
+        trainer.validate_epoch(
+            model=model,
+            dataloader_val=dataloader_val,
+            epoch=epoch,
+            writer=writer
+        )
         scheduler.step()
 
         
diff --git a/funasr/datasets/audio_datasets/index_ds.py b/funasr/datasets/audio_datasets/index_ds.py
index 3270531..de0d653 100644
--- a/funasr/datasets/audio_datasets/index_ds.py
+++ b/funasr/datasets/audio_datasets/index_ds.py
@@ -2,8 +2,9 @@
 import json
 import torch
 import logging
-import concurrent.futures
+
 import librosa
+import random
 import torch.distributed as dist
 
 from funasr.register import tables
@@ -44,7 +45,7 @@
 #         except:
 #             rank = 0
 #             world_size = 1
-#             logging.warning("distributed is not initialized, only single shard")
+#             logging.info("distributed is not initialized, only single shard")
 #         num_per_rank = total_num // world_size
 #
 #         # rank = 0
@@ -72,6 +73,7 @@
 
 @tables.register("index_ds_classes", "IndexDSJsonl")
 @tables.register("index_ds_classes", "IndexDSJsonlRankFull")
+@tables.register("index_ds_classes", "IndexDSJsonlRankSplit")
 class IndexDSJsonlRankFull(torch.utils.data.Dataset):
     
     def __init__(self, path: str, **kwargs):
@@ -80,83 +82,27 @@
         self.min_source_length = kwargs.get("min_source_length", 0)
         self.max_target_length = kwargs.get("max_target_length", 2048)
         self.min_target_length = kwargs.get("min_target_length", 0)
-        if isinstance(path, (list, tuple)): # wav.scp, text.txt/text.trans
-            from funasr.datasets.audio_datasets.scp2jsonl import gen_jsonl_from_wav_text_list
-            jsonl_outdir = os.path.dirname(path[0])
-            jsonl_name = "datalist_train.jsonl" if kwargs.get("is_training", True) else "datalist_val.jsonl"
-            jsonl_file_out = os.path.join(jsonl_outdir, jsonl_name)
-            if not os.path.exists(jsonl_file_out):
-                print(f"datalist is: {path}, generate jsonl from it")
-                gen_jsonl_from_wav_text_list(path, jsonl_file_out=jsonl_file_out, **kwargs)
-            path = jsonl_file_out
 
-        contents = []
-        with open(path, encoding='utf-8') as fin:
-            for line in fin:
-                data = json.loads(line.strip())
-                if "text" in data:  # for sft
-                    contents.append(data['text'])
-                if "source" in data:  # for speech lab pretrain
-                    prompt = data.get("prompt", "<ASR>")
-                    source = data["source"]
-                    target = data["target"]
-                    source_len = data.get("source_len", 1)
-                    target_len = data.get("target_len", 0)
-                    if "aishell" in source:
-                        target = target.replace(" ", "")
-                    if source_len < self.min_source_length or source_len > self.max_source_length:
-                        continue
-                    if target_len < self.min_target_length or target_len > self.max_target_length:
-                        continue
-                    contents_i = {"source": source,
-                                 "prompt": prompt,
-                                 "target": target,
-                                 "source_len": source_len,
-                                 "target_len": target_len,
-                                 }
-                    text_language = data.get("text_language", None)
-                    if text_language is not None:
-                        contents_i["text_language"] = text_language
-                    audio_language = data.get("audio_language", None)
-                    if audio_language is not None:
-                        contents_i["audio_language"] = audio_language
-                    contents.append(contents_i)
-
-        self.contents = contents
+        is_training = kwargs.get("is_training", True)
+        if not (path.endswith(".jsonl") or path.endswith(".json")):
+            # jsonl list file
+            data_split_num = kwargs.get("data_split_num", 1)
+            data_split_i = kwargs.get("data_split_i", 0)
+            
+            if not is_training:
+                data_split_num = 1
+                data_split_i = 0
+            with open(path, encoding='utf-8') as fin:
+                file_list_all = fin.readlines()
+    
+                num_per_slice = len(file_list_all) // data_split_num
+                file_list = file_list_all[data_split_i * num_per_slice:(data_split_i + 1) * num_per_slice]
+                logging.info(
+                    f"is_training: {is_training}, data_split_num: {data_split_num}, data_split_i: {data_split_i}, \nfile_list: {file_list}, \nfile_list_all: {file_list_all}")
         
-        logging.info(
-            "total_num of samplers across ranks: {}".format(len(self.contents)))
-    
-    def __len__(self):
-        return len(self.contents)
-    
-    def __getitem__(self, index):
-        try:
-            data = self.contents[index]
-        except:
-            print(index)
-        return data
-    
-    def get_source_len(self, data_dict):
-        return data_dict.get("source_len", 1)
-    
-    def get_target_len(self, data_dict):
-        
-        return data_dict.get("target_len", 0)
-
-
-@tables.register("index_ds_classes", "IndexDSJsonlRankSplit")
-class IndexDSJsonlRankSplit(torch.utils.data.Dataset):
-    
-    def __init__(self, path: str, **kwargs):
-        super().__init__()
-        self.max_source_length = kwargs.get("max_source_length", 2048)
-        self.min_source_length = kwargs.get("min_source_length", 0)
-        self.max_target_length = kwargs.get("max_target_length", 2048)
-        self.min_target_length = kwargs.get("min_target_length", 0)
-
-        with open(path, encoding='utf-8') as fin:
-            file_list = fin.readlines()
+        else:
+            file_list = [path]
+            
 
         total_num = len(file_list)
         try:
@@ -165,16 +111,30 @@
         except:
             rank = 0
             world_size = 1
-            logging.warning("distributed is not initialized, only single shard")
+            logging.info("distributed is not initialized, only single shard")
+        
+        if not kwargs.get("rank_split", False):
+            logging.info(f"Warning, rank_split disenabled, batch and shuffle data in global")
+            rank = 0
+            world_size = 1
+        
         num_per_rank = total_num // world_size
         if num_per_rank * world_size < total_num:
-            logging.warning(f"Warning, jsonl file:{total_num} could not be divided by world_size: {world_size}, {path}")
+            logging.info(f"Warning, jsonl file:{total_num} could not be divided by world_size: {world_size}, {path}")
+            total_num_needed = num_per_rank * world_size
+
+            extra_num = total_num_needed - total_num
+            file_list_tmp = random.choices(file_list, k=extra_num)
+            file_list += file_list_tmp
+            logging.info(f"Warning, after random choices: {file_list}")
 
         file_list_rank = file_list[rank * num_per_rank:(rank + 1) * num_per_rank]
 
+        logging.info(
+            f"is_training: {is_training}, file_list_rank: {file_list_rank}")
+
         contents = []
         for file_json in file_list_rank:
-            
             with open(file_json.strip(), encoding='utf-8') as fin:
                 for line in fin:
                     data = json.loads(line.strip())
@@ -182,41 +142,42 @@
                         contents.append(data['text'])
                     if "source" in data:  # for speech lab pretrain
                         prompt = data.get("prompt", "<ASR>")
-                        source = data["source"].replace("/cpfs01", "/cpfs_speech/data")
+                        source = data["source"].replace("/cpfs01", "/cpfs_speech/data") # only use in alibaba gpu group: .replace("/cpfs01", "/cpfs_speech/data")
                         target = data["target"]
                         source_len = data.get("source_len", 1)
                         target_len = data.get("target_len", 0)
-                        
+                        if "aishell" in source:
+                            target = target.replace(" ", "")
                         if source_len < self.min_source_length or source_len > self.max_source_length:
                             continue
                         if target_len < self.min_target_length or target_len > self.max_target_length:
                             continue
                         contents_i = {"source": source,
-                                      "prompt": prompt,
-                                      "target": target,
-                                      "source_len": source_len,
-                                      "target_len": target_len,
-                                      }
+                                     "prompt": prompt,
+                                     "target": target,
+                                     "source_len": source_len,
+                                     "target_len": target_len,
+                                     }
                         text_language = data.get("text_language", None)
                         if text_language is not None:
                             contents_i["text_language"] = text_language
-                        audio_language = data.get("audio_language", None)
-                        if audio_language is not None:
-                            contents_i["audio_language"] = audio_language
+                        # audio_language = data.get("audio_language", None)
+                        # if audio_language is not None:
+                        #     contents_i["audio_language"] = audio_language
                         contents.append(contents_i)
-        
+
         self.contents = contents
         
-        logging.info(f"total_num: {len(self.contents)} of samplers in ranks: {rank}")
+        logging.info(
+            "total_num of samplers: {}, {}".format(len(self.contents), path))
     
     def __len__(self):
         return len(self.contents)
     
     def __getitem__(self, index):
-        try:
-            data = self.contents[index]
-        except:
-            print(index)
+        
+        data = self.contents[index]
+
         return data
     
     def get_source_len(self, data_dict):
@@ -225,3 +186,96 @@
     def get_target_len(self, data_dict):
         
         return data_dict.get("target_len", 0)
+
+# 
+# @tables.register("index_ds_classes", "IndexDSJsonlRankSplit")
+# class IndexDSJsonlRankSplit(torch.utils.data.Dataset):
+# 
+#     def __init__(self, path: str, **kwargs):
+#         super().__init__()
+#         logging.info("building IndexDS")
+#         self.max_source_length = kwargs.get("max_source_length", 2048)
+#         self.min_source_length = kwargs.get("min_source_length", 0)
+#         self.max_target_length = kwargs.get("max_target_length", 2048)
+#         self.min_target_length = kwargs.get("min_target_length", 0)
+# 
+#         data_split_num = kwargs.get("data_split_num", 1)
+#         data_split_i = kwargs.get("data_split_i", 0)
+#         if not kwargs.get("is_training", True):
+#             data_split_num = 1
+#             data_split_i = 0
+#         with open(path, encoding='utf-8') as fin:
+#             file_list_all = fin.readlines()
+# 
+#             num_per_slice = len(file_list_all) // data_split_num
+#             file_list = file_list_all[data_split_i * num_per_slice:(data_split_i + 1) * num_per_slice]
+#             logging.info(f"data_split_num: {data_split_num}, data_split_i: {data_split_i}, file_list: {file_list}, file_list_all: {file_list_all}")
+# 
+# 
+#         total_num = len(file_list)
+#         try:
+#             rank = dist.get_rank()
+#             world_size = dist.get_world_size()
+#         except:
+#             rank = 0
+#             world_size = 1
+#             logging.info("distributed is not initialized, only single shard")
+#         num_per_rank = total_num // world_size
+#         if num_per_rank * world_size < total_num:
+#             logging.info(f"Warning, jsonl file:{total_num} could not be divided by world_size: {world_size}, {path}")
+# 
+#         file_list_rank = file_list[rank * num_per_rank:(rank + 1) * num_per_rank]
+# 
+#         contents = []
+#         for file_json in file_list_rank:
+# 
+#             with open(file_json.strip(), encoding='utf-8') as fin:
+#                 for line in fin:
+#                     data = json.loads(line.strip())
+#                     if "text" in data:  # for sft
+#                         contents.append(data['text'])
+#                     if "source" in data:  # for speech lab pretrain
+#                         prompt = data.get("prompt", "<ASR>")
+#                         source = data["source"].replace("/cpfs01", "/cpfs_speech/data")
+#                         target = data["target"]
+#                         source_len = data.get("source_len", 1)
+#                         target_len = data.get("target_len", 0)
+# 
+#                         if source_len < self.min_source_length or source_len > self.max_source_length:
+#                             continue
+#                         if target_len < self.min_target_length or target_len > self.max_target_length:
+#                             continue
+#                         contents_i = {"source": source,
+#                                       "prompt": prompt,
+#                                       "target": target,
+#                                       "source_len": source_len,
+#                                       "target_len": target_len,
+#                                       }
+#                         text_language = data.get("text_language", None)
+#                         if text_language is not None:
+#                             contents_i["text_language"] = text_language
+#                         # audio_language = data.get("audio_language", None)
+#                         # if audio_language is not None:
+#                         #     contents_i["audio_language"] = audio_language
+#                         contents.append(contents_i)
+# 
+#         self.contents = contents
+# 
+#         logging.info(f"total_num: {len(self.contents)} of samplers in ranks: {rank}, file_list_rank: {file_list_rank}")
+# 
+#     def __len__(self):
+#         return len(self.contents)
+# 
+#     def __getitem__(self, index):
+#         try:
+#             data = self.contents[index]
+#         except:
+#             print(index)
+#         return data
+# 
+#     def get_source_len(self, data_dict):
+#         return data_dict.get("source_len", 1)
+# 
+#     def get_target_len(self, data_dict):
+# 
+#         return data_dict.get("target_len", 0)
diff --git a/funasr/datasets/audio_datasets/samplers.py b/funasr/datasets/audio_datasets/samplers.py
index 108e68a..fdf630e 100644
--- a/funasr/datasets/audio_datasets/samplers.py
+++ b/funasr/datasets/audio_datasets/samplers.py
@@ -301,6 +301,7 @@
                  batch_type="token",
                  num_replicas=None,
                  rank=None,
+                 rank_split=False,
                  shuffle=True,
                  drop_last=False,
                  is_training: bool = True,
@@ -314,6 +315,12 @@
         except:
             rank = 0
             num_replicas = 1
+
+        if rank_split:
+            logging.info(f"Warning, rank_split: {rank_split}, batch and shuffle data in local rank")
+            rank = 0
+            num_replicas = 1
+            
         self.rank = rank
         self.num_replicas = num_replicas
         self.dataset = dataset
diff --git a/funasr/datasets/dataloader_entry.py b/funasr/datasets/dataloader_entry.py
index abb2828..70da722 100644
--- a/funasr/datasets/dataloader_entry.py
+++ b/funasr/datasets/dataloader_entry.py
@@ -40,7 +40,21 @@
 		self.dataset_val = dataset_val
 		self.kwargs = kwargs
 		
-	def build_iter(self, epoch=0):
+		# split dataset
+		self.data_split_num = kwargs["dataset_conf"].get("data_split_num", 1)
+		self.dataset_class = dataset_class
+		self.frontend = frontend
+		self.tokenizer = tokenizer
+		self.kwargs = kwargs
+		
+	def build_iter(self, epoch=0, data_split_i=0, **kwargs):
+		
+		# reload dataset slice
+		if self.data_split_num > 1:
+			del self.dataset_tr
+			self.dataset_tr = self.dataset_class(self.kwargs.get("train_data_set_list"), frontend=self.frontend, tokenizer=self.tokenizer,
+									   is_training=True, **self.kwargs.get("dataset_conf"), data_split_i=data_split_i)
+		
 		# dataloader
 		batch_sampler = self.kwargs["dataset_conf"].get("batch_sampler", "BatchSampler")
 		batch_sampler_val = None
diff --git a/funasr/models/sense_voice/decoder.py b/funasr/models/sense_voice/decoder.py
index 9087ea1..9fdb3bd 100644
--- a/funasr/models/sense_voice/decoder.py
+++ b/funasr/models/sense_voice/decoder.py
@@ -245,29 +245,7 @@
 		self.register_buffer("mask", mask, persistent=False)
 		
 		self.use_padmask = kwargs.get("use_padmask", True)
-	# def forward(self, x: Tensor, xa: Tensor, kv_cache: Optional[dict] = None):
-	# 	"""
-	# 	x : torch.LongTensor, shape = (batch_size, <= n_ctx)
-	# 		the text tokens
-	# 	xa : torch.Tensor, shape = (batch_size, n_audio_ctx, n_audio_state)
-	# 		the encoded audio features to be attended on
-	# 	"""
-	# 	offset = next(iter(kv_cache.values())).shape[1] if kv_cache else 0
-	# 	x = (
-	# 		self.token_embedding(x)
-	# 		+ self.positional_embedding[offset: offset + x.shape[-1]]
-	# 	)
-	# 	x = x.to(xa.dtype)
-	#
-	# 	for block in self.blocks:
-	# 		x = block(x, xa, mask=self.mask, kv_cache=kv_cache)
-	#
-	# 	x = self.ln(x)
-	# 	logits = (
-	# 		x @ torch.transpose(self.token_embedding.weight.to(x.dtype), 0, 1)
-	# 	).float()
-	#
-	# 	return logits
+
 
 	
 	def forward(
diff --git a/funasr/train_utils/trainer.py b/funasr/train_utils/trainer.py
index caaef38..3ee6885 100644
--- a/funasr/train_utils/trainer.py
+++ b/funasr/train_utils/trainer.py
@@ -252,6 +252,7 @@
                 dataloader_val=None,
                 epoch=None,
                 writer=None,
+                **kwargs,
                     ):
         """
         Defines the training process for a single epoch with gradient accumulation.
@@ -374,6 +375,8 @@
                          stats=stats,
                          writer=writer,
                          tag="train",
+                         data_split_i=kwargs.get("data_split_i", 0),
+                         data_split_num=kwargs.get("data_split_num", 1),
                          )
 
             if (batch_idx + 1) % self.validate_interval == 0:
@@ -507,6 +510,9 @@
             stats=None,
             writer=None,
             tag="train",
+            data_split_i=0,
+            data_split_num=1,
+            **kwargs,
             ):
         
         if (batch_idx + 1) % self.log_interval == 0:
@@ -526,6 +532,7 @@
                 f"{tag}, "
                 f"rank: {self.local_rank}, "
                 f"epoch: {epoch}/{self.max_epoch}, "
+                f"data_slice: {data_split_i}/{data_split_num}, "
                 f"step: {batch_idx + 1}/{batch_num_epoch}, total step: {self.batch_total}, "
                 f"(loss_avg_rank: {loss:.3f}), "
                 f"(loss_avg_epoch: {loss_avg_epoch:.3f}), "

--
Gitblit v1.9.1