| | |
| | | from funasr.download.file import download_from_url |
| | | except: |
| | | print("urllib is not installed, if you infer from url, please install it first.") |
| | | import pdb |
| | | |
| | | |
| | | |
| | | def load_audio_text_image_video(data_or_path_or_list, fs: int = 16000, audio_fs: int = 16000, data_type="sound", tokenizer=None, **kwargs): |
| | | if isinstance(data_or_path_or_list, (list, tuple)): |
| | | if data_type is not None and isinstance(data_type, (list, tuple)): |
| | | |
| | | data_types = [data_type] * len(data_or_path_or_list) |
| | | data_or_path_or_list_ret = [[] for d in data_type] |
| | | for i, (data_type_i, data_or_path_or_list_i) in enumerate(zip(data_types, data_or_path_or_list)): |
| | | |
| | | for j, (data_type_j, data_or_path_or_list_j) in enumerate(zip(data_type_i, data_or_path_or_list_i)): |
| | | |
| | | data_or_path_or_list_j = load_audio_text_image_video(data_or_path_or_list_j, fs=fs, audio_fs=audio_fs, data_type=data_type_j, tokenizer=tokenizer, **kwargs) |
| | | data_or_path_or_list_ret[j].append(data_or_path_or_list_j) |
| | | |
| | | return data_or_path_or_list_ret |
| | | else: |
| | | return [load_audio_text_image_video(audio, fs=fs, audio_fs=audio_fs, data_type=data_type, **kwargs) for audio in data_or_path_or_list] |
| | | |
| | | if isinstance(data_or_path_or_list, str) and data_or_path_or_list.startswith('http'): # download url to local file |
| | | data_or_path_or_list = download_from_url(data_or_path_or_list) |
| | | |
| | | |
| | | if isinstance(data_or_path_or_list, str) and os.path.exists(data_or_path_or_list): # local file |
| | | if data_type is None or data_type == "sound": |
| | | data_or_path_or_list, audio_fs = torchaudio.load(data_or_path_or_list) |
| | |
| | | data_or_path_or_list = tokenizer.encode(data_or_path_or_list) |
| | | elif isinstance(data_or_path_or_list, np.ndarray): # audio sample point |
| | | data_or_path_or_list = torch.from_numpy(data_or_path_or_list).squeeze() # [n_samples,] |
| | | elif isinstance(data_or_path_or_list, str) and data_type == "kaldi_ark": |
| | | data_mat = kaldiio.load_mat(data_or_path_or_list) |
| | | if isinstance(data_mat, tuple): |
| | | audio_fs, mat = data_mat |
| | | else: |
| | | mat = data_mat |
| | | if mat.dtype == 'int16' or mat.dtype == 'int32': |
| | | mat = mat.astype(np.float64) |
| | | mat = mat / 32768 |
| | | if mat.ndim ==2: |
| | | mat = mat[:,0] |
| | | data_or_path_or_list = mat |
| | | else: |
| | | pass |
| | | # print(f"unsupport data type: {data_or_path_or_list}, return raw data") |
| | | |
| | | |
| | | if audio_fs != fs and data_type != "text": |
| | | resampler = torchaudio.transforms.Resample(audio_fs, fs) |
| | | data_or_path_or_list = resampler(data_or_path_or_list[None, :])[0, :] |
| | |
| | | return array |
| | | |
| | | def extract_fbank(data, data_len = None, data_type: str="sound", frontend=None, **kwargs): |
| | | # import pdb; |
| | | # pdb.set_trace() |
| | | if isinstance(data, np.ndarray): |
| | | data = torch.from_numpy(data) |
| | | if len(data.shape) < 2: |
| | |
| | | data_list.append(data_i) |
| | | data_len.append(data_i.shape[0]) |
| | | data = pad_sequence(data_list, batch_first=True) # data: [batch, N] |
| | | |
| | | # import pdb; |
| | | # pdb.set_trace() |
| | | # if data_type == "sound": |
| | | data, data_len = frontend(data, data_len, **kwargs) |
| | | |
| | | if isinstance(data_len, (list, tuple)): |