| | |
| | | import logging |
| | | from torch.nn.utils.rnn import pad_sequence |
| | | try: |
| | | from urllib.parse import urlparse |
| | | from funasr.download.file import HTTPStorage |
| | | import tempfile |
| | | from funasr.download.file import download_from_url |
| | | except: |
| | | print("urllib is not installed, if you infer from url, please install it first.") |
| | | # def load_audio(data_or_path_or_list, fs: int=16000, audio_fs: int=16000): |
| | | # |
| | | # if isinstance(data_or_path_or_list, (list, tuple)): |
| | | # return [load_audio(audio, fs=fs, audio_fs=audio_fs) for audio in data_or_path_or_list] |
| | | # |
| | | # if isinstance(data_or_path_or_list, str) and os.path.exists(data_or_path_or_list): |
| | | # data_or_path_or_list, audio_fs = torchaudio.load(data_or_path_or_list) |
| | | # data_or_path_or_list = data_or_path_or_list[0, :] |
| | | # elif isinstance(data_or_path_or_list, np.ndarray): # audio sample point |
| | | # data_or_path_or_list = np.squeeze(data_or_path_or_list) #[n_samples,] |
| | | # |
| | | # if audio_fs != fs: |
| | | # resampler = torchaudio.transforms.Resample(audio_fs, fs) |
| | | # data_or_path_or_list = resampler(data_or_path_or_list[None, :])[0, :] |
| | | # return data_or_path_or_list |
| | | |
| | | |
| | | def load_audio_text_image_video(data_or_path_or_list, fs: int = 16000, audio_fs: int = 16000, data_type=None, tokenizer=None): |
| | | |
| | | def load_audio_text_image_video(data_or_path_or_list, fs: int = 16000, audio_fs: int = 16000, data_type="sound", tokenizer=None): |
| | | if isinstance(data_or_path_or_list, (list, tuple)): |
| | | if data_type is not None and isinstance(data_type, (list, tuple)): |
| | | |
| | |
| | | |
| | | return data_or_path_or_list_ret |
| | | else: |
| | | return [load_audio_text_image_video(audio, fs=fs, audio_fs=audio_fs) for audio in data_or_path_or_list] |
| | | return [load_audio_text_image_video(audio, fs=fs, audio_fs=audio_fs, data_type=data_type) for audio in data_or_path_or_list] |
| | | if isinstance(data_or_path_or_list, str) and data_or_path_or_list.startswith('http'): |
| | | data_or_path_or_list = download_from_url(data_or_path_or_list) |
| | | if isinstance(data_or_path_or_list, str) and os.path.exists(data_or_path_or_list): |
| | | if data_type is None or data_type == "sound": |
| | | data_or_path_or_list, audio_fs = torchaudio.load(data_or_path_or_list) |
| | | data_or_path_or_list = data_or_path_or_list[0, :] |
| | | # elif data_type == "text" and tokenizer is not None: |
| | | # data_or_path_or_list = tokenizer.encode(data_or_path_or_list) |
| | | elif isinstance(data_or_path_or_list, str) and data_type == "text" and tokenizer is not None: |
| | | data_or_path_or_list = tokenizer.encode(data_or_path_or_list) |
| | | elif isinstance(data_or_path_or_list, np.ndarray): # audio sample point |
| | | data_or_path_or_list = np.squeeze(data_or_path_or_list) # [n_samples,] |
| | | elif isinstance(data_or_path_or_list, str) and data_type is not None and data_type == "text" and tokenizer is not None: |
| | | data_or_path_or_list = tokenizer.encode(data_or_path_or_list) |
| | | else: |
| | | pass |
| | | # print(f"unsupport data type: {data_or_path_or_list}, return raw data") |
| | | |
| | | if audio_fs != fs and data_type != "text": |
| | | resampler = torchaudio.transforms.Resample(audio_fs, fs) |
| | |
| | | data_len = torch.tensor([data_len]) |
| | | return data.to(torch.float32), data_len.to(torch.int32) |
| | | |
| | | def download_from_url(url): |
| | | |
| | | result = urlparse(url) |
| | | file_path = None |
| | | if result.scheme is not None and len(result.scheme) > 0: |
| | | storage = HTTPStorage() |
| | | # bytes |
| | | data = storage.read(url) |
| | | work_dir = tempfile.TemporaryDirectory().name |
| | | if not os.path.exists(work_dir): |
| | | os.makedirs(work_dir) |
| | | file_path = os.path.join(work_dir, os.path.basename(url)) |
| | | with open(file_path, 'wb') as fb: |
| | | fb.write(data) |
| | | assert file_path is not None, f"failed to download: {url}" |
| | | return file_path |