| | |
| | | import os |
| | | import torch |
| | | import json |
| | | from io import BytesIO |
| | | import torch.distributed as dist |
| | | import numpy as np |
| | | import kaldiio |
| | |
| | | import pdb |
| | | import subprocess |
| | | from subprocess import CalledProcessError, run |
| | | |
| | | try: |
| | | from pydub import AudioSegment |
| | | except: |
| | | pass |
| | | |
| | | |
| | | def is_ffmpeg_installed(): |
| | |
| | | ): # download url to local file |
| | | data_or_path_or_list = download_from_url(data_or_path_or_list) |
| | | |
| | | if isinstance(data_or_path_or_list, str) and os.path.exists(data_or_path_or_list): # local file |
| | | if (isinstance(data_or_path_or_list, str) and os.path.exists(data_or_path_or_list)) or hasattr(data_or_path_or_list, 'read'): # local file or bytes io |
| | | if data_type is None or data_type == "sound": |
| | | if hasattr(data_or_path_or_list, "read") and hasattr(data_or_path_or_list, "seek"): |
| | | data_or_path_or_list.seek(0) |
| | | # if use_ffmpeg: |
| | | # data_or_path_or_list = _load_audio_ffmpeg(data_or_path_or_list, sr=fs) |
| | | # data_or_path_or_list = torch.from_numpy(data_or_path_or_list).squeeze() # [n_samples,] |
| | |
| | | data_or_path_or_list |
| | | ).squeeze() # [n_samples,] |
| | | elif data_type == "text" and tokenizer is not None: |
| | | data_or_path_or_list = tokenizer.encode(data_or_path_or_list) |
| | | with open(data_or_path_or_list, "r") as f: |
| | | data_or_path_or_list = tokenizer.encode(f.read().strip()) |
| | | elif data_type == "image": # undo |
| | | pass |
| | | elif data_type == "video": # undo |
| | |
| | | |
| | | |
| | | def load_bytes(input): |
| | | try: |
| | | input = validate_frame_rate(input) |
| | | except: |
| | | pass |
| | | middle_data = np.frombuffer(input, dtype=np.int16) |
| | | middle_data = np.asarray(middle_data) |
| | | if middle_data.dtype.kind not in "iu": |
| | |
| | | offset = i.min + abs_max |
| | | array = np.frombuffer((middle_data.astype(dtype) - offset) / abs_max, dtype=np.float32) |
| | | return array |
| | | |
| | | |
| | | def validate_frame_rate( |
| | | input, |
| | | fs: int = 16000, |
| | | ): |
| | | |
| | | # 将文件读取为字节流 |
| | | byte_data = BytesIO(input) |
| | | |
| | | # 使用 pydub 加载音频 |
| | | try: |
| | | audio = AudioSegment.from_file(byte_data) |
| | | except: |
| | | raise RuntimeError( |
| | | "You are decoding the pcm data, please install pydub first. via `pip install pydub`." |
| | | ) |
| | | |
| | | # 确保采样率为 16000 Hz |
| | | if audio.frame_rate != fs: |
| | | audio = audio.set_frame_rate(fs) |
| | | |
| | | # 将重新采样后的音频导出为字节流 |
| | | output = BytesIO() |
| | | audio.export(output, format="wav") |
| | | output.seek(0) |
| | | |
| | | # 获取重新采样后的字节流数据 |
| | | input = output.read() |
| | | |
| | | return input |
| | | |
| | | |
| | | def extract_fbank(data, data_len=None, data_type: str = "sound", frontend=None, **kwargs): |
| | |
| | | # This launches a subprocess to decode audio while down-mixing |
| | | # and resampling as necessary. Requires the ffmpeg CLI in PATH. |
| | | # fmt: off |
| | | pcm_params = [] |
| | | if file.lower().endswith('.pcm'): |
| | | pcm_params = [ |
| | | "-f", "s16le", |
| | | "-ar", str(sr), |
| | | "-ac", "1" |
| | | ] |
| | | |
| | | cmd = [ |
| | | "ffmpeg", |
| | | "-nostdin", |
| | | "-threads", "0", |
| | | *pcm_params, # PCM files need input format specified before -i since PCM is raw data without headers |
| | | "-i", file, |
| | | "-f", "s16le", |
| | | "-ac", "1", |