| | |
| | | |
| | | import numpy as np |
| | | import scipy.signal |
| | | import soundfile |
| | | from typeguard import check_argument_types |
| | | from typeguard import check_return_type |
| | | import librosa |
| | | |
| | | from funasr.text.build_tokenizer import build_tokenizer |
| | | from funasr.text.cleaner import TextCleaner |
| | | from funasr.text.token_id_converter import TokenIDConverter |
| | | from funasr.tokenizer.build_tokenizer import build_tokenizer |
| | | from funasr.tokenizer.cleaner import TextCleaner |
| | | from funasr.tokenizer.token_id_converter import TokenIDConverter |
| | | |
| | | |
| | | class AbsPreprocessor(ABC): |
| | |
| | | def _speech_process( |
| | | self, data: Dict[str, Union[str, np.ndarray]] |
| | | ) -> Dict[str, Union[str, np.ndarray]]: |
| | | assert check_argument_types() |
| | | if self.speech_name in data: |
| | | if self.train and (self.rirs is not None or self.noises is not None): |
| | | speech = data[self.speech_name] |
| | |
| | | if self.rirs is not None and self.rir_apply_prob >= np.random.random(): |
| | | rir_path = np.random.choice(self.rirs) |
| | | if rir_path is not None: |
| | | rir, _ = soundfile.read( |
| | | rir, _ = librosa.load( |
| | | rir_path, dtype=np.float64, always_2d=True |
| | | ) |
| | | |
| | |
| | | noise_db = np.random.uniform( |
| | | self.noise_db_low, self.noise_db_high |
| | | ) |
| | | with soundfile.SoundFile(noise_path) as f: |
| | | if f.frames == nsamples: |
| | | noise = f.read(dtype=np.float64, always_2d=True) |
| | | elif f.frames < nsamples: |
| | | offset = np.random.randint(0, nsamples - f.frames) |
| | | # noise: (Time, Nmic) |
| | | noise = f.read(dtype=np.float64, always_2d=True) |
| | | # Repeat noise |
| | | noise = np.pad( |
| | | noise, |
| | | [(offset, nsamples - f.frames - offset), (0, 0)], |
| | | mode="wrap", |
| | | ) |
| | | else: |
| | | offset = np.random.randint(0, f.frames - nsamples) |
| | | f.seek(offset) |
| | | # noise: (Time, Nmic) |
| | | noise = f.read( |
| | | nsamples, dtype=np.float64, always_2d=True |
| | | ) |
| | | if len(noise) != nsamples: |
| | | raise RuntimeError(f"Something wrong: {noise_path}") |
| | | audio_data = librosa.load(noise_path, dtype='float32')[0][None, :] |
| | | frames = len(audio_data[0]) |
| | | if frames == nsamples: |
| | | noise = audio_data |
| | | elif frames < nsamples: |
| | | offset = np.random.randint(0, nsamples - frames) |
| | | # noise: (Time, Nmic) |
| | | noise = audio_data |
| | | # Repeat noise |
| | | noise = np.pad( |
| | | noise, |
| | | [(offset, nsamples - frames - offset), (0, 0)], |
| | | mode="wrap", |
| | | ) |
| | | else: |
| | | noise = audio_data[:, nsamples] |
| | | # offset = np.random.randint(0, frames - nsamples) |
| | | # f.seek(offset) |
| | | # noise: (Time, Nmic) |
| | | # noise = f.read( |
| | | # nsamples, dtype=np.float64, always_2d=True |
| | | # ) |
| | | # if len(noise) != nsamples: |
| | | # raise RuntimeError(f"Something wrong: {noise_path}") |
| | | # noise: (Nmic, Time) |
| | | noise = noise.T |
| | | |
| | |
| | | speech = data[self.speech_name] |
| | | ma = np.max(np.abs(speech)) |
| | | data[self.speech_name] = speech * self.speech_volume_normalize / ma |
| | | assert check_return_type(data) |
| | | return data |
| | | |
| | | def _text_process( |
| | |
| | | tokens = self.tokenizer.text2tokens(text) |
| | | text_ints = self.token_id_converter.tokens2ids(tokens) |
| | | data[self.text_name] = np.array(text_ints, dtype=np.int64) |
| | | assert check_return_type(data) |
| | | return data |
| | | |
| | | def __call__( |
| | | self, uid: str, data: Dict[str, Union[str, np.ndarray]] |
| | | ) -> Dict[str, np.ndarray]: |
| | | assert check_argument_types() |
| | | |
| | | data = self._speech_process(data) |
| | | data = self._text_process(data) |
| | |
| | | tokens = self.tokenizer.text2tokens(text) |
| | | text_ints = self.token_id_converter.tokens2ids(tokens) |
| | | data[self.text_name] = np.array(text_ints, dtype=np.int64) |
| | | assert check_return_type(data) |
| | | return data |
| | | |
| | | |
| | |
| | | tokens = self.tokenizer.text2tokens(text) |
| | | text_ints = self.token_id_converter.tokens2ids(tokens) |
| | | data[text_n] = np.array(text_ints, dtype=np.int64) |
| | | assert check_return_type(data) |
| | | return data |
| | | |
| | | def __call__( |
| | | self, uid: str, data: Dict[str, Union[str, np.ndarray]] |
| | | ) -> Dict[str, np.ndarray]: |
| | | assert check_argument_types() |
| | | |
| | | if self.speech_name in data: |
| | | # Nothing now: candidates: |
| | |
| | | tokens = self.tokenizer[i].text2tokens(text) |
| | | text_ints = self.token_id_converter[i].tokens2ids(tokens) |
| | | data[text_name] = np.array(text_ints, dtype=np.int64) |
| | | assert check_return_type(data) |
| | | return data |
| | | |
| | | |
| | |
| | | def __call__( |
| | | self, uid: str, data: Dict[str, Union[list, str, np.ndarray]] |
| | | ) -> Dict[str, Union[list, np.ndarray]]: |
| | | assert check_argument_types() |
| | | # Split words. |
| | | if isinstance(data[self.text_name], str): |
| | | split_text = self.split_words(data[self.text_name]) |
| | |
| | | |
| | | |
| | | def build_preprocess(args, train): |
| | | if args.use_preprocessor: |
| | | if not args.use_preprocessor: |
| | | return None |
| | | if args.task_name in ["asr", "data2vec", "diar", "sv"]: |
| | | retval = CommonPreprocessor( |
| | |
| | | token_type=args.token_type, |
| | | token_list=args.token_list, |
| | | bpemodel=args.bpemodel, |
| | | non_linguistic_symbols=args.non_linguistic_symbols, |
| | | non_linguistic_symbols=args.non_linguistic_symbols if hasattr(args, "non_linguistic_symbols") else None, |
| | | text_cleaner=args.cleaner, |
| | | g2p_type=args.g2p, |
| | | split_with_space=args.split_with_space if hasattr(args, "split_with_space") else False, |