kongdeqiang
5 天以前 28ccfbfc51068a663a80764e14074df5edf2b5ba
funasr/utils/load_utils.py
@@ -1,6 +1,7 @@
import os
import torch
import json
from io import BytesIO
import torch.distributed as dist
import numpy as np
import kaldiio
@@ -9,117 +10,261 @@
import time
import logging
from torch.nn.utils.rnn import pad_sequence
try:
   from urllib.parse import urlparse
   from funasr.download.file import HTTPStorage
   import tempfile
    from funasr.download.file import download_from_url
except:
   print("urllib is not installed, if you infer from url, please install it first.")
# def load_audio(data_or_path_or_list, fs: int=16000, audio_fs: int=16000):
#
#    if isinstance(data_or_path_or_list, (list, tuple)):
#       return [load_audio(audio, fs=fs, audio_fs=audio_fs) for audio in data_or_path_or_list]
#
#    if isinstance(data_or_path_or_list, str) and os.path.exists(data_or_path_or_list):
#       data_or_path_or_list, audio_fs = torchaudio.load(data_or_path_or_list)
#       data_or_path_or_list = data_or_path_or_list[0, :]
#    elif isinstance(data_or_path_or_list, np.ndarray): # audio sample point
#       data_or_path_or_list = np.squeeze(data_or_path_or_list) #[n_samples,]
#
#    if audio_fs != fs:
#       resampler = torchaudio.transforms.Resample(audio_fs, fs)
#       data_or_path_or_list = resampler(data_or_path_or_list[None, :])[0, :]
#    return data_or_path_or_list
    print("urllib is not installed, if you infer from url, please install it first.")
import pdb
import subprocess
from subprocess import CalledProcessError, run
try:
    from pydub import AudioSegment
except:
    pass
def load_audio_text_image_video(data_or_path_or_list, fs: int = 16000, audio_fs: int = 16000, data_type=None, tokenizer=None):
   if isinstance(data_or_path_or_list, (list, tuple)):
      if data_type is not None and isinstance(data_type, (list, tuple)):
def is_ffmpeg_installed():
    try:
        output = subprocess.check_output(["ffmpeg", "-version"], stderr=subprocess.STDOUT)
        return "ffmpeg version" in output.decode("utf-8")
    except (subprocess.CalledProcessError, FileNotFoundError):
        return False
         data_types = [data_type] * len(data_or_path_or_list)
         data_or_path_or_list_ret = [[] for d in data_type]
         for i, (data_type_i, data_or_path_or_list_i) in enumerate(zip(data_types, data_or_path_or_list)):
            for j, (data_type_j, data_or_path_or_list_j) in enumerate(zip(data_type_i, data_or_path_or_list_i)):
               data_or_path_or_list_j = load_audio_text_image_video(data_or_path_or_list_j, fs=fs, audio_fs=audio_fs, data_type=data_type_j, tokenizer=tokenizer)
               data_or_path_or_list_ret[j].append(data_or_path_or_list_j)
         return data_or_path_or_list_ret
      else:
         return [load_audio_text_image_video(audio, fs=fs, audio_fs=audio_fs) for audio in data_or_path_or_list]
   if isinstance(data_or_path_or_list, str) and data_or_path_or_list.startswith('http'):
      data_or_path_or_list = download_from_url(data_or_path_or_list)
   if isinstance(data_or_path_or_list, str) and os.path.exists(data_or_path_or_list):
      data_or_path_or_list, audio_fs = torchaudio.load(data_or_path_or_list)
      data_or_path_or_list = data_or_path_or_list[0, :]
   elif isinstance(data_or_path_or_list, np.ndarray):  # audio sample point
      data_or_path_or_list = np.squeeze(data_or_path_or_list)  # [n_samples,]
   elif isinstance(data_or_path_or_list, str) and data_type is not None and data_type == "text" and tokenizer is not None:
      data_or_path_or_list = tokenizer.encode(data_or_path_or_list)
   if audio_fs != fs and data_type != "text":
      resampler = torchaudio.transforms.Resample(audio_fs, fs)
      data_or_path_or_list = resampler(data_or_path_or_list[None, :])[0, :]
   return data_or_path_or_list
use_ffmpeg = False
if is_ffmpeg_installed():
    use_ffmpeg = True
else:
    print(
        "Notice: ffmpeg is not installed. torchaudio is used to load audio\n"
        "If you want to use ffmpeg backend to load audio, please install it by:"
        "\n\tsudo apt install ffmpeg # ubuntu"
        "\n\t# brew install ffmpeg # mac"
    )
def load_audio_text_image_video(
    data_or_path_or_list,
    fs: int = 16000,
    audio_fs: int = 16000,
    data_type="sound",
    tokenizer=None,
    **kwargs,
):
    if isinstance(data_or_path_or_list, (list, tuple)):
        if data_type is not None and isinstance(data_type, (list, tuple)):
            data_types = [data_type] * len(data_or_path_or_list)
            data_or_path_or_list_ret = [[] for d in data_type]
            for i, (data_type_i, data_or_path_or_list_i) in enumerate(
                zip(data_types, data_or_path_or_list)
            ):
                for j, (data_type_j, data_or_path_or_list_j) in enumerate(
                    zip(data_type_i, data_or_path_or_list_i)
                ):
                    data_or_path_or_list_j = load_audio_text_image_video(
                        data_or_path_or_list_j,
                        fs=fs,
                        audio_fs=audio_fs,
                        data_type=data_type_j,
                        tokenizer=tokenizer,
                        **kwargs,
                    )
                    data_or_path_or_list_ret[j].append(data_or_path_or_list_j)
            return data_or_path_or_list_ret
        else:
            return [
                load_audio_text_image_video(
                    audio, fs=fs, audio_fs=audio_fs, data_type=data_type, **kwargs
                )
                for audio in data_or_path_or_list
            ]
    if isinstance(data_or_path_or_list, str) and data_or_path_or_list.startswith(
        ("http://", "https://")
    ):  # download url to local file
        data_or_path_or_list = download_from_url(data_or_path_or_list)
    if (isinstance(data_or_path_or_list, str) and os.path.exists(data_or_path_or_list)) or hasattr(data_or_path_or_list, 'read'):  # local file or bytes io
        if data_type is None or data_type == "sound":
            if hasattr(data_or_path_or_list, "read") and hasattr(data_or_path_or_list, "seek"):
                data_or_path_or_list.seek(0)
            # if use_ffmpeg:
            #     data_or_path_or_list = _load_audio_ffmpeg(data_or_path_or_list, sr=fs)
            #     data_or_path_or_list = torch.from_numpy(data_or_path_or_list).squeeze()  # [n_samples,]
            # else:
            #     data_or_path_or_list, audio_fs = torchaudio.load(data_or_path_or_list)
            #     if kwargs.get("reduce_channels", True):
            #         data_or_path_or_list = data_or_path_or_list.mean(0)
            try:
                data_or_path_or_list, audio_fs = torchaudio.load(data_or_path_or_list)
                if kwargs.get("reduce_channels", True):
                    data_or_path_or_list = data_or_path_or_list.mean(0)
            except:
                data_or_path_or_list = _load_audio_ffmpeg(data_or_path_or_list, sr=fs)
                data_or_path_or_list = torch.from_numpy(
                    data_or_path_or_list
                ).squeeze()  # [n_samples,]
        elif data_type == "text" and tokenizer is not None:
            with open(data_or_path_or_list, "r") as f:
                data_or_path_or_list = tokenizer.encode(f.read().strip())
        elif data_type == "image":  # undo
            pass
        elif data_type == "video":  # undo
            pass
        # if data_in is a file or url, set is_final=True
        if "cache" in kwargs:
            kwargs["cache"]["is_final"] = True
            kwargs["cache"]["is_streaming_input"] = False
    elif isinstance(data_or_path_or_list, str) and data_type == "text" and tokenizer is not None:
        data_or_path_or_list = tokenizer.encode(data_or_path_or_list)
    elif isinstance(data_or_path_or_list, np.ndarray):  # audio sample point
        data_or_path_or_list = torch.from_numpy(data_or_path_or_list)  # .squeeze()  # [n_samples,]
    elif isinstance(data_or_path_or_list, str) and data_type == "kaldi_ark":
        data_mat = kaldiio.load_mat(data_or_path_or_list)
        if isinstance(data_mat, tuple):
            audio_fs, mat = data_mat
        else:
            mat = data_mat
        if mat.dtype == "int16" or mat.dtype == "int32":
            mat = mat.astype(np.float64)
            mat = mat / 32768
        if mat.ndim == 2:
            mat = mat[:, 0]
        data_or_path_or_list = mat
    else:
        pass
        # print(f"unsupport data type: {data_or_path_or_list}, return raw data")
    if audio_fs != fs and data_type != "text":
        resampler = torchaudio.transforms.Resample(audio_fs, fs)
        data_or_path_or_list = resampler(data_or_path_or_list[None, :])[0, :]
    return data_or_path_or_list
def load_bytes(input):
   middle_data = np.frombuffer(input, dtype=np.int16)
   middle_data = np.asarray(middle_data)
   if middle_data.dtype.kind not in 'iu':
      raise TypeError("'middle_data' must be an array of integers")
   dtype = np.dtype('float32')
   if dtype.kind != 'f':
      raise TypeError("'dtype' must be a floating point type")
   i = np.iinfo(middle_data.dtype)
   abs_max = 2 ** (i.bits - 1)
   offset = i.min + abs_max
   array = np.frombuffer((middle_data.astype(dtype) - offset) / abs_max, dtype=np.float32)
   return array
    try:
        input = validate_frame_rate(input)
    except:
        pass
    middle_data = np.frombuffer(input, dtype=np.int16)
    middle_data = np.asarray(middle_data)
    if middle_data.dtype.kind not in "iu":
        raise TypeError("'middle_data' must be an array of integers")
    dtype = np.dtype("float32")
    if dtype.kind != "f":
        raise TypeError("'dtype' must be a floating point type")
def extract_fbank(data, data_len = None, data_type: str="sound", frontend=None):
   # import pdb;
   # pdb.set_trace()
   if isinstance(data, np.ndarray):
      data = torch.from_numpy(data)
      if len(data.shape) < 2:
         data = data[None, :] # data: [batch, N]
      data_len = [data.shape[1]] if data_len is None else data_len
   elif isinstance(data, torch.Tensor):
      if len(data.shape) < 2:
         data = data[None, :] # data: [batch, N]
      data_len = [data.shape[1]] if data_len is None else data_len
   elif isinstance(data, (list, tuple)):
      data_list, data_len = [], []
      for data_i in data:
         if isinstance(data, np.ndarray):
            data_i = torch.from_numpy(data_i)
         data_list.append(data_i)
         data_len.append(data_i.shape[0])
      data = pad_sequence(data_list, batch_first=True) # data: [batch, N]
   # import pdb;
   # pdb.set_trace()
   # if data_type == "sound":
   data, data_len = frontend(data, data_len)
   if isinstance(data_len, (list, tuple)):
      data_len = torch.tensor([data_len])
   return data.to(torch.float32), data_len.to(torch.int32)
    i = np.iinfo(middle_data.dtype)
    abs_max = 2 ** (i.bits - 1)
    offset = i.min + abs_max
    array = np.frombuffer((middle_data.astype(dtype) - offset) / abs_max, dtype=np.float32)
    return array
def download_from_url(url):
   result = urlparse(url)
   file_path = None
   if result.scheme is not None and len(result.scheme) > 0:
      storage = HTTPStorage()
      # bytes
      data = storage.read(url)
      work_dir = tempfile.TemporaryDirectory().name
      if not os.path.exists(work_dir):
         os.makedirs(work_dir)
      file_path = os.path.join(work_dir, os.path.basename(url))
      with open(file_path, 'wb') as fb:
         fb.write(data)
   assert file_path is not None, f"failed to download: {url}"
   return file_path
def validate_frame_rate(
    input,
    fs: int = 16000,
):
    # 将文件读取为字节流
    byte_data = BytesIO(input)
    # 使用 pydub 加载音频
    try:
        audio = AudioSegment.from_file(byte_data)
    except:
        raise RuntimeError(
            "You are decoding the pcm data, please install pydub first. via `pip install pydub`."
        )
    # 确保采样率为 16000 Hz
    if audio.frame_rate != fs:
        audio = audio.set_frame_rate(fs)
        # 将重新采样后的音频导出为字节流
        output = BytesIO()
        audio.export(output, format="wav")
        output.seek(0)
        # 获取重新采样后的字节流数据
        input = output.read()
    return input
def extract_fbank(data, data_len=None, data_type: str = "sound", frontend=None, **kwargs):
    if isinstance(data, np.ndarray):
        data = torch.from_numpy(data)
        if len(data.shape) < 2:
            data = data[None, :]  # data: [batch, N]
        data_len = [data.shape[1]] if data_len is None else data_len
    elif isinstance(data, torch.Tensor):
        if len(data.shape) < 2:
            data = data[None, :]  # data: [batch, N]
        data_len = [data.shape[1]] if data_len is None else data_len
    elif isinstance(data, (list, tuple)):
        data_list, data_len = [], []
        for data_i in data:
            if isinstance(data_i, np.ndarray):
                data_i = torch.from_numpy(data_i)
            data_list.append(data_i)
            data_len.append(data_i.shape[0])
        data = pad_sequence(data_list, batch_first=True)  # data: [batch, N]
    data, data_len = frontend(data, data_len, **kwargs)
    if isinstance(data_len, (list, tuple)):
        data_len = torch.tensor([data_len])
    return data.to(torch.float32), data_len.to(torch.int32)
def _load_audio_ffmpeg(file: str, sr: int = 16000):
    """
    Open an audio file and read as mono waveform, resampling as necessary
    Parameters
    ----------
    file: str
        The audio file to open
    sr: int
        The sample rate to resample the audio if necessary
    Returns
    -------
    A NumPy array containing the audio waveform, in float32 dtype.
    """
    # This launches a subprocess to decode audio while down-mixing
    # and resampling as necessary.  Requires the ffmpeg CLI in PATH.
    # fmt: off
    pcm_params = []
    if file.lower().endswith('.pcm'):
        pcm_params = [
            "-f", "s16le",
            "-ar", str(sr),
            "-ac", "1"
        ]
    cmd = [
        "ffmpeg",
        "-nostdin",
        "-threads", "0",
        *pcm_params,  # PCM files need input format specified before -i since PCM is raw data without headers
        "-i", file,
        "-f", "s16le",
        "-ac", "1",
        "-acodec", "pcm_s16le",
        "-ar", str(sr),
        "-"
    ]
    # fmt: on
    try:
        out = run(cmd, capture_output=True, check=True).stdout
    except CalledProcessError as e:
        raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from e
    return np.frombuffer(out, np.int16).flatten().astype(np.float32) / 32768.0