| | |
| | | import struct |
| | | from typing import Any, Dict, List, Union |
| | | |
| | | import torchaudio |
| | | import librosa |
| | | import numpy as np |
| | | import pkg_resources |
| | |
| | | |
| | | global_asr_language = 'zh-cn' |
| | | |
| | | SUPPORT_AUDIO_TYPE_SETS = ['flac', 'mp3', 'ogg', 'opus', 'wav', 'pcm'] |
| | | |
| | | def get_version(): |
| | | return float(pkg_resources.get_distribution('easyasr').version) |
| | |
| | | def sample_rate_checking(audio_in: Union[str, bytes], audio_format: str): |
| | | r_audio_fs = None |
| | | |
| | | if audio_format == 'wav': |
| | | if audio_format == 'wav' or audio_format == 'scp': |
| | | r_audio_fs = get_sr_from_wav(audio_in) |
| | | elif audio_format == 'pcm' and isinstance(audio_in, bytes): |
| | | r_audio_fs = get_sr_from_bytes(audio_in) |
| | |
| | | if r_recog_type is None and audio_in is not None: |
| | | # audio_in is wav, recog_type is wav_file |
| | | if os.path.isfile(audio_in): |
| | | if audio_in.endswith('.wav') or audio_in.endswith('.WAV'): |
| | | r_recog_type = 'wav' |
| | | r_audio_format = 'wav' |
| | | elif audio_in.endswith('.scp') or audio_in.endswith('.SCP'): |
| | | audio_type = os.path.basename(audio_in).lower() |
| | | for support_audio_type in SUPPORT_AUDIO_TYPE_SETS: |
| | | if audio_type.rfind(".{}".format(support_audio_type)) >= 0: |
| | | r_recog_type = 'wav' |
| | | r_audio_format = 'wav' |
| | | if audio_type.rfind(".scp") >= 0: |
| | | r_recog_type = 'wav' |
| | | r_audio_format = 'scp' |
| | | if r_recog_type is None: |
| | | raise NotImplementedError( |
| | | f'Not supported audio type: {audio_type}') |
| | | |
| | | # recog_type is datasets_file |
| | | elif os.path.isdir(audio_in): |
| | |
| | | def get_sr_from_wav(fname: str): |
| | | fs = None |
| | | if os.path.isfile(fname): |
| | | audio, fs = librosa.load(fname, sr=None) |
| | | audio_type = os.path.basename(fname).lower() |
| | | for support_audio_type in SUPPORT_AUDIO_TYPE_SETS: |
| | | if audio_type.rfind(".{}".format(support_audio_type)) >= 0: |
| | | if support_audio_type == "pcm": |
| | | fs = None |
| | | else: |
| | | try: |
| | | audio, fs = torchaudio.load(fname) |
| | | except: |
| | | audio, fs = librosa.load(fname) |
| | | break |
| | | if audio_type.rfind(".scp") >= 0: |
| | | with open(fname, encoding="utf-8") as f: |
| | | for line in f: |
| | | wav_path = line.split()[1] |
| | | fs = get_sr_from_wav(wav_path) |
| | | if fs is not None: |
| | | break |
| | | return fs |
| | | elif os.path.isdir(fname): |
| | | dir_files = os.listdir(fname) |
| | | for file in dir_files: |
| | | file_path = os.path.join(fname, file) |
| | | if os.path.isfile(file_path): |
| | | if file_path.endswith('.wav') or file_path.endswith('.WAV'): |
| | | fs = get_sr_from_wav(file_path) |
| | | fs = get_sr_from_wav(file_path) |
| | | elif os.path.isdir(file_path): |
| | | fs = get_sr_from_wav(file_path) |
| | | |
| | |
| | | for file in dir_files: |
| | | file_path = os.path.join(dir_path, file) |
| | | if os.path.isfile(file_path): |
| | | if file_path.endswith(ends): |
| | | if ends == ".wav" or ends == ".WAV": |
| | | audio_type = os.path.basename(file_path).lower() |
| | | for support_audio_type in SUPPORT_AUDIO_TYPE_SETS: |
| | | if audio_type.rfind(".{}".format(support_audio_type)) >= 0: |
| | | return True |
| | | raise NotImplementedError( |
| | | f'Not supported audio type: {audio_type}') |
| | | elif file_path.endswith(ends): |
| | | return True |
| | | elif os.path.isdir(file_path): |
| | | if find_file_by_ends(file_path, ends): |
| | |
| | | for file in dir_files: |
| | | file_path = os.path.join(dir_path, file) |
| | | if os.path.isfile(file_path): |
| | | if file_path.endswith('.wav') or file_path.endswith('.WAV'): |
| | | wav_list.append(file_path) |
| | | audio_type = os.path.basename(file_path).lower() |
| | | for support_audio_type in SUPPORT_AUDIO_TYPE_SETS: |
| | | if audio_type.rfind(".{}".format(support_audio_type)) >= 0: |
| | | wav_list.append(file_path) |
| | | elif os.path.isdir(file_path): |
| | | recursion_dir_all_wav(wav_list, file_path) |
| | | |
| | | return wav_list |
| | | |
| | | |
| | | def set_parameters(language: str = None): |
| | | if language is not None: |
| | | global global_asr_language |
| | | global_asr_language = language |
| | | |
| | | |
| | | def compute_wer(hyp_list: List[Any], |
| | | ref_list: List[Any], |
| | | lang: str = None) -> Dict[str, Any]: |
| | | assert len(hyp_list) > 0, 'hyp list is empty' |
| | | assert len(ref_list) > 0, 'ref list is empty' |
| | | |
| | | if lang is not None: |
| | | global global_asr_language |
| | | global_asr_language = lang |
| | | |
| | | rst = { |
| | | 'Wrd': 0, |
| | |
| | | 'wrong_sentences': 0 |
| | | } |
| | | |
| | | if lang is None: |
| | | lang = global_asr_language |
| | | |
| | | for h_item in hyp_list: |
| | | for r_item in ref_list: |
| | | if h_item['key'] == r_item['key']: |
| | | out_item = compute_wer_by_line(h_item['value'], |
| | | r_item['value'], |
| | | global_asr_language) |
| | | lang) |
| | | rst['Wrd'] += out_item['nwords'] |
| | | rst['Corr'] += out_item['cor'] |
| | | rst['wrong_words'] += out_item['wrong'] |