From f14f9f8d15037c7b81cbdc880d61d05e23382a8f Mon Sep 17 00:00:00 2001
From: 游雁 <zhifu.gzf@alibaba-inc.com>
Date: 星期二, 09 一月 2024 00:13:51 +0800
Subject: [PATCH] funasr1.0 infer url modelscope
---
funasr/utils/load_utils.py | 87 +++++++++++++++++++++++++++----------------
1 files changed, 55 insertions(+), 32 deletions(-)
diff --git a/funasr/utils/load_utils.py b/funasr/utils/load_utils.py
index 7f1b850..c5c3ffc 100644
--- a/funasr/utils/load_utils.py
+++ b/funasr/utils/load_utils.py
@@ -9,53 +9,59 @@
import time
import logging
from torch.nn.utils.rnn import pad_sequence
-
-# def load_audio(audio_or_path_or_list, fs: int=16000, audio_fs: int=16000):
+try:
+ from urllib.parse import urlparse
+ from funasr.download.file import HTTPStorage
+ import tempfile
+except:
+ print("urllib is not installed, if you infer from url, please install it first.")
+# def load_audio(data_or_path_or_list, fs: int=16000, audio_fs: int=16000):
#
-# if isinstance(audio_or_path_or_list, (list, tuple)):
-# return [load_audio(audio, fs=fs, audio_fs=audio_fs) for audio in audio_or_path_or_list]
+# if isinstance(data_or_path_or_list, (list, tuple)):
+# return [load_audio(audio, fs=fs, audio_fs=audio_fs) for audio in data_or_path_or_list]
#
-# if isinstance(audio_or_path_or_list, str) and os.path.exists(audio_or_path_or_list):
-# audio_or_path_or_list, audio_fs = torchaudio.load(audio_or_path_or_list)
-# audio_or_path_or_list = audio_or_path_or_list[0, :]
-# elif isinstance(audio_or_path_or_list, np.ndarray): # audio sample point
-# audio_or_path_or_list = np.squeeze(audio_or_path_or_list) #[n_samples,]
+# if isinstance(data_or_path_or_list, str) and os.path.exists(data_or_path_or_list):
+# data_or_path_or_list, audio_fs = torchaudio.load(data_or_path_or_list)
+# data_or_path_or_list = data_or_path_or_list[0, :]
+# elif isinstance(data_or_path_or_list, np.ndarray): # audio sample point
+# data_or_path_or_list = np.squeeze(data_or_path_or_list) #[n_samples,]
#
# if audio_fs != fs:
# resampler = torchaudio.transforms.Resample(audio_fs, fs)
-# audio_or_path_or_list = resampler(audio_or_path_or_list[None, :])[0, :]
-# return audio_or_path_or_list
+# data_or_path_or_list = resampler(data_or_path_or_list[None, :])[0, :]
+# return data_or_path_or_list
-def load_audio_and_text_image_video(audio_or_path_or_list, fs: int = 16000, audio_fs: int = 16000, data_type=None, tokenizer=None):
- if isinstance(audio_or_path_or_list, (list, tuple)):
+def load_audio_text_image_video(data_or_path_or_list, fs: int = 16000, audio_fs: int = 16000, data_type=None, tokenizer=None):
+ if isinstance(data_or_path_or_list, (list, tuple)):
if data_type is not None and isinstance(data_type, (list, tuple)):
- data_types = [data_type] * len(audio_or_path_or_list)
- audio_or_path_or_list_ret = [[] for d in data_type]
- for i, (data_type_i, audio_or_path_or_list_i) in enumerate(zip(data_types, audio_or_path_or_list)):
+ data_types = [data_type] * len(data_or_path_or_list)
+ data_or_path_or_list_ret = [[] for d in data_type]
+ for i, (data_type_i, data_or_path_or_list_i) in enumerate(zip(data_types, data_or_path_or_list)):
- for j, (data_type_j, audio_or_path_or_list_j) in enumerate(zip(data_type_i, audio_or_path_or_list_i)):
+ for j, (data_type_j, data_or_path_or_list_j) in enumerate(zip(data_type_i, data_or_path_or_list_i)):
- audio_or_path_or_list_j = load_audio_and_text_image_video(audio_or_path_or_list_j, fs=fs, audio_fs=audio_fs, data_type=data_type_j, tokenizer=tokenizer)
- audio_or_path_or_list_ret[j].append(audio_or_path_or_list_j)
+ data_or_path_or_list_j = load_audio_text_image_video(data_or_path_or_list_j, fs=fs, audio_fs=audio_fs, data_type=data_type_j, tokenizer=tokenizer)
+ data_or_path_or_list_ret[j].append(data_or_path_or_list_j)
- return audio_or_path_or_list_ret
+ return data_or_path_or_list_ret
else:
- return [load_audio_and_text_image_video(audio, fs=fs, audio_fs=audio_fs) for audio in audio_or_path_or_list]
-
- if isinstance(audio_or_path_or_list, str) and os.path.exists(audio_or_path_or_list):
- audio_or_path_or_list, audio_fs = torchaudio.load(audio_or_path_or_list)
- audio_or_path_or_list = audio_or_path_or_list[0, :]
- elif isinstance(audio_or_path_or_list, np.ndarray): # audio sample point
- audio_or_path_or_list = np.squeeze(audio_or_path_or_list) # [n_samples,]
- elif isinstance(audio_or_path_or_list, str) and data_type is not None and data_type == "text" and tokenizer is not None:
- audio_or_path_or_list = tokenizer.encode(audio_or_path_or_list)
+ return [load_audio_text_image_video(audio, fs=fs, audio_fs=audio_fs) for audio in data_or_path_or_list]
+ if isinstance(data_or_path_or_list, str) and data_or_path_or_list.startswith('http'):
+ data_or_path_or_list = download_from_url(data_or_path_or_list)
+ if isinstance(data_or_path_or_list, str) and os.path.exists(data_or_path_or_list):
+ data_or_path_or_list, audio_fs = torchaudio.load(data_or_path_or_list)
+ data_or_path_or_list = data_or_path_or_list[0, :]
+ elif isinstance(data_or_path_or_list, np.ndarray): # audio sample point
+ data_or_path_or_list = np.squeeze(data_or_path_or_list) # [n_samples,]
+ elif isinstance(data_or_path_or_list, str) and data_type is not None and data_type == "text" and tokenizer is not None:
+ data_or_path_or_list = tokenizer.encode(data_or_path_or_list)
if audio_fs != fs and data_type != "text":
resampler = torchaudio.transforms.Resample(audio_fs, fs)
- audio_or_path_or_list = resampler(audio_or_path_or_list[None, :])[0, :]
- return audio_or_path_or_list
+ data_or_path_or_list = resampler(data_or_path_or_list[None, :])[0, :]
+ return data_or_path_or_list
def load_bytes(input):
middle_data = np.frombuffer(input, dtype=np.int16)
@@ -99,4 +105,21 @@
if isinstance(data_len, (list, tuple)):
data_len = torch.tensor([data_len])
- return data.to(torch.float32), data_len.to(torch.int32)
\ No newline at end of file
+ return data.to(torch.float32), data_len.to(torch.int32)
+
+def download_from_url(url):
+
+ result = urlparse(url)
+ file_path = None
+ if result.scheme is not None and len(result.scheme) > 0:
+ storage = HTTPStorage()
+ # bytes
+ data = storage.read(url)
+ work_dir = tempfile.TemporaryDirectory().name
+ if not os.path.exists(work_dir):
+ os.makedirs(work_dir)
+ file_path = os.path.join(work_dir, os.path.basename(url))
+ with open(file_path, 'wb') as fb:
+ fb.write(data)
+ assert file_path is not None, f"failed to download: {url}"
+ return file_path
\ No newline at end of file
--
Gitblit v1.9.1