游雁
2023-11-23 dc682db808eb5f425f0dbed4c5e7feb0a334955f
funasr/datasets/small_datasets/preprocessor.py
@@ -9,13 +9,11 @@
import numpy as np
import scipy.signal
import soundfile
from typeguard import check_argument_types
from typeguard import check_return_type
import librosa
from funasr.text.build_tokenizer import build_tokenizer
from funasr.text.cleaner import TextCleaner
from funasr.text.token_id_converter import TokenIDConverter
from funasr.tokenizer.build_tokenizer import build_tokenizer
from funasr.tokenizer.cleaner import TextCleaner
from funasr.tokenizer.token_id_converter import TokenIDConverter
class AbsPreprocessor(ABC):
@@ -260,7 +258,6 @@
    def _speech_process(
            self, data: Dict[str, Union[str, np.ndarray]]
    ) -> Dict[str, Union[str, np.ndarray]]:
        assert check_argument_types()
        if self.speech_name in data:
            if self.train and (self.rirs is not None or self.noises is not None):
                speech = data[self.speech_name]
@@ -278,7 +275,7 @@
                if self.rirs is not None and self.rir_apply_prob >= np.random.random():
                    rir_path = np.random.choice(self.rirs)
                    if rir_path is not None:
                        rir, _ = soundfile.read(
                        rir, _ = librosa.load(
                            rir_path, dtype=np.float64, always_2d=True
                        )
@@ -304,28 +301,30 @@
                        noise_db = np.random.uniform(
                            self.noise_db_low, self.noise_db_high
                        )
                        with soundfile.SoundFile(noise_path) as f:
                            if f.frames == nsamples:
                                noise = f.read(dtype=np.float64, always_2d=True)
                            elif f.frames < nsamples:
                                offset = np.random.randint(0, nsamples - f.frames)
                                # noise: (Time, Nmic)
                                noise = f.read(dtype=np.float64, always_2d=True)
                                # Repeat noise
                                noise = np.pad(
                                    noise,
                                    [(offset, nsamples - f.frames - offset), (0, 0)],
                                    mode="wrap",
                                )
                            else:
                                offset = np.random.randint(0, f.frames - nsamples)
                                f.seek(offset)
                                # noise: (Time, Nmic)
                                noise = f.read(
                                    nsamples, dtype=np.float64, always_2d=True
                                )
                                if len(noise) != nsamples:
                                    raise RuntimeError(f"Something wrong: {noise_path}")
                        audio_data = librosa.load(noise_path, dtype='float32')[0][None, :]
                        frames = len(audio_data[0])
                        if frames == nsamples:
                            noise = audio_data
                        elif frames < nsamples:
                            offset = np.random.randint(0, nsamples - frames)
                            # noise: (Time, Nmic)
                            noise = audio_data
                            # Repeat noise
                            noise = np.pad(
                                noise,
                                [(offset, nsamples - frames - offset), (0, 0)],
                                mode="wrap",
                            )
                        else:
                            noise = audio_data[:, nsamples]
                            # offset = np.random.randint(0, frames - nsamples)
                            # f.seek(offset)
                            # noise: (Time, Nmic)
                            # noise = f.read(
                            #     nsamples, dtype=np.float64, always_2d=True
                            # )
                            # if len(noise) != nsamples:
                            #     raise RuntimeError(f"Something wrong: {noise_path}")
                        # noise: (Nmic, Time)
                        noise = noise.T
@@ -347,7 +346,6 @@
                speech = data[self.speech_name]
                ma = np.max(np.abs(speech))
                data[self.speech_name] = speech * self.speech_volume_normalize / ma
        assert check_return_type(data)
        return data
    def _text_process(
@@ -365,13 +363,11 @@
                tokens = self.tokenizer.text2tokens(text)
            text_ints = self.token_id_converter.tokens2ids(tokens)
            data[self.text_name] = np.array(text_ints, dtype=np.int64)
        assert check_return_type(data)
        return data
    def __call__(
            self, uid: str, data: Dict[str, Union[str, np.ndarray]]
    ) -> Dict[str, np.ndarray]:
        assert check_argument_types()
        data = self._speech_process(data)
        data = self._text_process(data)
@@ -439,7 +435,6 @@
                tokens = self.tokenizer.text2tokens(text)
            text_ints = self.token_id_converter.tokens2ids(tokens)
            data[self.text_name] = np.array(text_ints, dtype=np.int64)
        assert check_return_type(data)
        return data
@@ -496,13 +491,11 @@
                tokens = self.tokenizer.text2tokens(text)
                text_ints = self.token_id_converter.tokens2ids(tokens)
                data[text_n] = np.array(text_ints, dtype=np.int64)
        assert check_return_type(data)
        return data
    def __call__(
            self, uid: str, data: Dict[str, Union[str, np.ndarray]]
    ) -> Dict[str, np.ndarray]:
        assert check_argument_types()
        if self.speech_name in data:
            # Nothing now: candidates:
@@ -606,7 +599,6 @@
                tokens = self.tokenizer[i].text2tokens(text)
                text_ints = self.token_id_converter[i].tokens2ids(tokens)
                data[text_name] = np.array(text_ints, dtype=np.int64)
        assert check_return_type(data)
        return data
@@ -685,7 +677,6 @@
    def __call__(
            self, uid: str, data: Dict[str, Union[list, str, np.ndarray]]
    ) -> Dict[str, Union[list, np.ndarray]]:
        assert check_argument_types()
        # Split words.
        if isinstance(data[self.text_name], str):
            split_text = self.split_words(data[self.text_name])
@@ -820,7 +811,7 @@
def build_preprocess(args, train):
    if args.use_preprocessor:
    if not args.use_preprocessor:
        return None
    if args.task_name in ["asr", "data2vec", "diar", "sv"]:
        retval = CommonPreprocessor(
@@ -828,7 +819,7 @@
            token_type=args.token_type,
            token_list=args.token_list,
            bpemodel=args.bpemodel,
            non_linguistic_symbols=args.non_linguistic_symbols,
            non_linguistic_symbols=args.non_linguistic_symbols if hasattr(args, "non_linguistic_symbols") else None,
            text_cleaner=args.cleaner,
            g2p_type=args.g2p,
            split_with_space=args.split_with_space if hasattr(args, "split_with_space") else False,