From 94de39dde2e616a01683c518023d0fab72b4e103 Mon Sep 17 00:00:00 2001
From: 游雁 <zhifu.gzf@alibaba-inc.com>
Date: 星期一, 19 二月 2024 22:21:50 +0800
Subject: [PATCH] aishell example

---
 funasr/utils/load_utils.py |  177 +++++++++++++++++++++++++++++++----------------------------
 1 files changed, 93 insertions(+), 84 deletions(-)

diff --git a/funasr/utils/load_utils.py b/funasr/utils/load_utils.py
index 7f1b850..7748172 100644
--- a/funasr/utils/load_utils.py
+++ b/funasr/utils/load_utils.py
@@ -9,94 +9,103 @@
 import time
 import logging
 from torch.nn.utils.rnn import pad_sequence
-
-# def load_audio(audio_or_path_or_list, fs: int=16000, audio_fs: int=16000):
-#
-# 	if isinstance(audio_or_path_or_list, (list, tuple)):
-# 		return [load_audio(audio, fs=fs, audio_fs=audio_fs) for audio in audio_or_path_or_list]
-#
-# 	if isinstance(audio_or_path_or_list, str) and os.path.exists(audio_or_path_or_list):
-# 		audio_or_path_or_list, audio_fs = torchaudio.load(audio_or_path_or_list)
-# 		audio_or_path_or_list = audio_or_path_or_list[0, :]
-# 	elif isinstance(audio_or_path_or_list, np.ndarray): # audio sample point
-# 		audio_or_path_or_list = np.squeeze(audio_or_path_or_list) #[n_samples,]
-#
-# 	if audio_fs != fs:
-# 		resampler = torchaudio.transforms.Resample(audio_fs, fs)
-# 		audio_or_path_or_list = resampler(audio_or_path_or_list[None, :])[0, :]
-# 	return audio_or_path_or_list
+try:
+    from funasr.download.file import download_from_url
+except:
+    print("urllib is not installed, if you infer from url, please install it first.")
 
 
-def load_audio_and_text_image_video(audio_or_path_or_list, fs: int = 16000, audio_fs: int = 16000, data_type=None, tokenizer=None):
-	if isinstance(audio_or_path_or_list, (list, tuple)):
-		if data_type is not None and isinstance(data_type, (list, tuple)):
 
-			data_types = [data_type] * len(audio_or_path_or_list)
-			audio_or_path_or_list_ret = [[] for d in data_type]
-			for i, (data_type_i, audio_or_path_or_list_i) in enumerate(zip(data_types, audio_or_path_or_list)):
-				
-				for j, (data_type_j, audio_or_path_or_list_j) in enumerate(zip(data_type_i, audio_or_path_or_list_i)):
-					
-					audio_or_path_or_list_j = load_audio_and_text_image_video(audio_or_path_or_list_j, fs=fs, audio_fs=audio_fs, data_type=data_type_j, tokenizer=tokenizer)
-					audio_or_path_or_list_ret[j].append(audio_or_path_or_list_j)
+def load_audio_text_image_video(data_or_path_or_list, fs: int = 16000, audio_fs: int = 16000, data_type="sound", tokenizer=None, **kwargs):
+    if isinstance(data_or_path_or_list, (list, tuple)):
+        if data_type is not None and isinstance(data_type, (list, tuple)):
 
-			return audio_or_path_or_list_ret
-		else:
-			return [load_audio_and_text_image_video(audio, fs=fs, audio_fs=audio_fs) for audio in audio_or_path_or_list]
-	
-	if isinstance(audio_or_path_or_list, str) and os.path.exists(audio_or_path_or_list):
-		audio_or_path_or_list, audio_fs = torchaudio.load(audio_or_path_or_list)
-		audio_or_path_or_list = audio_or_path_or_list[0, :]
-	elif isinstance(audio_or_path_or_list, np.ndarray):  # audio sample point
-		audio_or_path_or_list = np.squeeze(audio_or_path_or_list)  # [n_samples,]
-	elif isinstance(audio_or_path_or_list, str) and data_type is not None and data_type == "text" and tokenizer is not None:
-		audio_or_path_or_list = tokenizer.encode(audio_or_path_or_list)
-		
-	if audio_fs != fs and data_type != "text":
-		resampler = torchaudio.transforms.Resample(audio_fs, fs)
-		audio_or_path_or_list = resampler(audio_or_path_or_list[None, :])[0, :]
-	return audio_or_path_or_list
+            data_types = [data_type] * len(data_or_path_or_list)
+            data_or_path_or_list_ret = [[] for d in data_type]
+            for i, (data_type_i, data_or_path_or_list_i) in enumerate(zip(data_types, data_or_path_or_list)):
+                
+                for j, (data_type_j, data_or_path_or_list_j) in enumerate(zip(data_type_i, data_or_path_or_list_i)):
+                    
+                    data_or_path_or_list_j = load_audio_text_image_video(data_or_path_or_list_j, fs=fs, audio_fs=audio_fs, data_type=data_type_j, tokenizer=tokenizer, **kwargs)
+                    data_or_path_or_list_ret[j].append(data_or_path_or_list_j)
+
+            return data_or_path_or_list_ret
+        else:
+            return [load_audio_text_image_video(audio, fs=fs, audio_fs=audio_fs, data_type=data_type, **kwargs) for audio in data_or_path_or_list]
+    
+    if isinstance(data_or_path_or_list, str) and data_or_path_or_list.startswith('http'): # download url to local file
+        data_or_path_or_list = download_from_url(data_or_path_or_list)
+    
+    if isinstance(data_or_path_or_list, str) and os.path.exists(data_or_path_or_list): # local file
+        if data_type is None or data_type == "sound":
+            data_or_path_or_list, audio_fs = torchaudio.load(data_or_path_or_list)
+            if kwargs.get("reduce_channels", True):
+                data_or_path_or_list = data_or_path_or_list.mean(0)
+        elif data_type == "text" and tokenizer is not None:
+            data_or_path_or_list = tokenizer.encode(data_or_path_or_list)
+        elif data_type == "image": # undo
+            pass
+        elif data_type == "video": # undo
+            pass
+        
+        # if data_in is a file or url, set is_final=True
+        if "cache" in kwargs:
+            kwargs["cache"]["is_final"] = True
+            kwargs["cache"]["is_streaming_input"] = False
+    elif isinstance(data_or_path_or_list, str) and data_type == "text" and tokenizer is not None:
+        data_or_path_or_list = tokenizer.encode(data_or_path_or_list)
+    elif isinstance(data_or_path_or_list, np.ndarray):  # audio sample point
+        data_or_path_or_list = torch.from_numpy(data_or_path_or_list).squeeze()  # [n_samples,]
+    else:
+        pass
+        # print(f"unsupport data type: {data_or_path_or_list}, return raw data")
+        
+    if audio_fs != fs and data_type != "text":
+        resampler = torchaudio.transforms.Resample(audio_fs, fs)
+        data_or_path_or_list = resampler(data_or_path_or_list[None, :])[0, :]
+    return data_or_path_or_list
 
 def load_bytes(input):
-	middle_data = np.frombuffer(input, dtype=np.int16)
-	middle_data = np.asarray(middle_data)
-	if middle_data.dtype.kind not in 'iu':
-		raise TypeError("'middle_data' must be an array of integers")
-	dtype = np.dtype('float32')
-	if dtype.kind != 'f':
-		raise TypeError("'dtype' must be a floating point type")
-	
-	i = np.iinfo(middle_data.dtype)
-	abs_max = 2 ** (i.bits - 1)
-	offset = i.min + abs_max
-	array = np.frombuffer((middle_data.astype(dtype) - offset) / abs_max, dtype=np.float32)
-	return array
+    middle_data = np.frombuffer(input, dtype=np.int16)
+    middle_data = np.asarray(middle_data)
+    if middle_data.dtype.kind not in 'iu':
+        raise TypeError("'middle_data' must be an array of integers")
+    dtype = np.dtype('float32')
+    if dtype.kind != 'f':
+        raise TypeError("'dtype' must be a floating point type")
+    
+    i = np.iinfo(middle_data.dtype)
+    abs_max = 2 ** (i.bits - 1)
+    offset = i.min + abs_max
+    array = np.frombuffer((middle_data.astype(dtype) - offset) / abs_max, dtype=np.float32)
+    return array
 
-def extract_fbank(data, data_len = None, data_type: str="sound", frontend=None):
-	# import pdb;
-	# pdb.set_trace()
-	if isinstance(data, np.ndarray):
-		data = torch.from_numpy(data)
-		if len(data.shape) < 2:
-			data = data[None, :] # data: [batch, N]
-		data_len = [data.shape[1]] if data_len is None else data_len
-	elif isinstance(data, torch.Tensor):
-		if len(data.shape) < 2:
-			data = data[None, :] # data: [batch, N]
-		data_len = [data.shape[1]] if data_len is None else data_len
-	elif isinstance(data, (list, tuple)):
-		data_list, data_len = [], []
-		for data_i in data:
-			if isinstance(data, np.ndarray):
-				data_i = torch.from_numpy(data_i)
-			data_list.append(data_i)
-			data_len.append(data_i.shape[0])
-		data = pad_sequence(data_list, batch_first=True) # data: [batch, N]
-	# import pdb;
-	# pdb.set_trace()
-	# if data_type == "sound":
-	data, data_len = frontend(data, data_len)
-	
-	if isinstance(data_len, (list, tuple)):
-		data_len = torch.tensor([data_len])
-	return data.to(torch.float32), data_len.to(torch.int32)
\ No newline at end of file
+def extract_fbank(data, data_len = None, data_type: str="sound", frontend=None, **kwargs):
+    # import pdb;
+    # pdb.set_trace()
+    if isinstance(data, np.ndarray):
+        data = torch.from_numpy(data)
+        if len(data.shape) < 2:
+            data = data[None, :] # data: [batch, N]
+        data_len = [data.shape[1]] if data_len is None else data_len
+    elif isinstance(data, torch.Tensor):
+        if len(data.shape) < 2:
+            data = data[None, :] # data: [batch, N]
+        data_len = [data.shape[1]] if data_len is None else data_len
+    elif isinstance(data, (list, tuple)):
+        data_list, data_len = [], []
+        for data_i in data:
+            if isinstance(data_i, np.ndarray):
+                data_i = torch.from_numpy(data_i)
+            data_list.append(data_i)
+            data_len.append(data_i.shape[0])
+        data = pad_sequence(data_list, batch_first=True) # data: [batch, N]
+    # import pdb;
+    # pdb.set_trace()
+    # if data_type == "sound":
+    data, data_len = frontend(data, data_len, **kwargs)
+    
+    if isinstance(data_len, (list, tuple)):
+        data_len = torch.tensor([data_len])
+    return data.to(torch.float32), data_len.to(torch.int32)
+

--
Gitblit v1.9.1