游雁
2023-11-23 dc682db808eb5f425f0dbed4c5e7feb0a334955f
funasr/datasets/preprocessor.py
@@ -10,13 +10,12 @@
import numpy as np
import scipy.signal
import soundfile
from typeguard import check_argument_types
from typeguard import check_return_type
import librosa
import jieba
from funasr.text.build_tokenizer import build_tokenizer
from funasr.text.cleaner import TextCleaner
from funasr.text.token_id_converter import TokenIDConverter
from funasr.tokenizer.build_tokenizer import build_tokenizer
from funasr.tokenizer.cleaner import TextCleaner
from funasr.tokenizer.token_id_converter import TokenIDConverter
class AbsPreprocessor(ABC):
@@ -44,18 +43,31 @@
        i += len(longest_word)
    return word_list
def seg_tokenize(txt, seg_dict):
    pattern = re.compile(r'^[\u4E00-\u9FA50-9]+$')
    out_txt = ""
    pattern = re.compile(r"([\u4E00-\u9FA5A-Za-z0-9])")
    for word in txt:
        if pattern.match(word):
            if word in seg_dict:
                out_txt += seg_dict[word] + " "
        word = word.lower()
        if word in seg_dict:
            out_txt += seg_dict[word] + " "
        else:
            if pattern.match(word):
                for char in word:
                    if char in seg_dict:
                        out_txt += seg_dict[char] + " "
                    else:
                        out_txt += "<unk>" + " "
            else:
                out_txt += "<unk>" + " "
    return out_txt.strip().split()
def seg_tokenize_wo_pattern(txt, seg_dict):
    out_txt = ""
    for word in txt:
        if word in seg_dict:
            out_txt += seg_dict[word] + " "
        else:
            continue
            out_txt += "<unk>" + " "
    return out_txt.strip().split()
@@ -189,7 +201,7 @@
        self.seg_dict = None
        if seg_dict_file is not None:
            self.seg_dict = {}
            with open(seg_dict_file) as f:
            with open(seg_dict_file, "r", encoding="utf8") as f:
                lines = f.readlines()
            for line in lines:
                s = line.strip().split()
@@ -255,7 +267,6 @@
    def _speech_process(
            self, data: Dict[str, Union[str, np.ndarray]]
    ) -> Dict[str, Union[str, np.ndarray]]:
        assert check_argument_types()
        if self.speech_name in data:
            if self.train and (self.rirs is not None or self.noises is not None):
                speech = data[self.speech_name]
@@ -273,7 +284,7 @@
                if self.rirs is not None and self.rir_apply_prob >= np.random.random():
                    rir_path = np.random.choice(self.rirs)
                    if rir_path is not None:
                        rir, _ = soundfile.read(
                        rir, _ = librosa.load(
                            rir_path, dtype=np.float64, always_2d=True
                        )
@@ -299,28 +310,31 @@
                        noise_db = np.random.uniform(
                            self.noise_db_low, self.noise_db_high
                        )
                        with soundfile.SoundFile(noise_path) as f:
                            if f.frames == nsamples:
                                noise = f.read(dtype=np.float64, always_2d=True)
                            elif f.frames < nsamples:
                                offset = np.random.randint(0, nsamples - f.frames)
                                # noise: (Time, Nmic)
                                noise = f.read(dtype=np.float64, always_2d=True)
                                # Repeat noise
                                noise = np.pad(
                                    noise,
                                    [(offset, nsamples - f.frames - offset), (0, 0)],
                                    mode="wrap",
                                )
                            else:
                                offset = np.random.randint(0, f.frames - nsamples)
                                f.seek(offset)
                                # noise: (Time, Nmic)
                                noise = f.read(
                                    nsamples, dtype=np.float64, always_2d=True
                                )
                                if len(noise) != nsamples:
                                    raise RuntimeError(f"Something wrong: {noise_path}")
                        audio_data = librosa.load(noise_path, dtype='float32')[0][None, :]
                        frames = len(audio_data[0])
                        if frames == nsamples:
                            noise = audio_data
                        elif frames < nsamples:
                            offset = np.random.randint(0, nsamples - frames)
                            # noise: (Time, Nmic)
                            noise = audio_data
                            # Repeat noise
                            noise = np.pad(
                                noise,
                                [(offset, nsamples - frames - offset), (0, 0)],
                                mode="wrap",
                            )
                        else:
                            noise = audio_data[:, nsamples]
                            # offset = np.random.randint(0, frames - nsamples)
                            # f.seek(offset)
                            # noise: (Time, Nmic)
                            # noise = f.read(
                            #     nsamples, dtype=np.float64, always_2d=True
                            # )
                            # if len(noise) != nsamples:
                            #     raise RuntimeError(f"Something wrong: {noise_path}")
                        # noise: (Nmic, Time)
                        noise = noise.T
@@ -342,7 +356,6 @@
                speech = data[self.speech_name]
                ma = np.max(np.abs(speech))
                data[self.speech_name] = speech * self.speech_volume_normalize / ma
        assert check_return_type(data)
        return data
    def _text_process(
@@ -354,22 +367,82 @@
            if self.split_with_space:
                tokens = text.strip().split(" ")
                if self.seg_dict is not None:
                    tokens = forward_segment("".join(tokens).lower(), self.seg_dict)
                    tokens = seg_tokenize(tokens, self.seg_dict)
            else:
                tokens = self.tokenizer.text2tokens(text)
            text_ints = self.token_id_converter.tokens2ids(tokens)
            data[self.text_name] = np.array(text_ints, dtype=np.int64)
        assert check_return_type(data)
        return data
    def __call__(
            self, uid: str, data: Dict[str, Union[str, np.ndarray]]
    ) -> Dict[str, np.ndarray]:
        assert check_argument_types()
        data = self._speech_process(data)
        data = self._text_process(data)
        return data
## FIXME
class LMPreprocessor(CommonPreprocessor):
    def __init__(
            self,
            train: bool,
            token_type: str = None,
            token_list: Union[Path, str, Iterable[str]] = None,
            bpemodel: Union[Path, str, Iterable[str]] = None,
            text_cleaner: Collection[str] = None,
            g2p_type: str = None,
            unk_symbol: str = "<unk>",
            space_symbol: str = "<space>",
            non_linguistic_symbols: Union[Path, str, Iterable[str]] = None,
            delimiter: str = None,
            rir_scp: str = None,
            rir_apply_prob: float = 1.0,
            noise_scp: str = None,
            noise_apply_prob: float = 1.0,
            noise_db_range: str = "3_10",
            speech_volume_normalize: float = None,
            speech_name: str = "speech",
            text_name: str = "text",
            split_with_space: bool = False,
            seg_dict_file: str = None,
    ):
        super().__init__(train,
                         token_type,
                         token_list,
                         bpemodel,
                         text_cleaner,
                         g2p_type,
                         unk_symbol,
                         space_symbol,
                         non_linguistic_symbols,
                         delimiter,
                         rir_scp,
                         rir_apply_prob,
                         noise_scp,
                         noise_apply_prob,
                         noise_db_range,
                         speech_volume_normalize,
                         speech_name,
                         text_name,
                         split_with_space,
                         seg_dict_file,
                         )
    def _text_process(
            self, data: Dict[str, Union[str, np.ndarray]]
    ) -> Dict[str, np.ndarray]:
        if self.text_name in data and self.tokenizer is not None:
            text = data[self.text_name]
            text = self.text_cleaner(text)
            if self.split_with_space:
                tokens = text.strip().split(" ")
                if self.seg_dict is not None:
                    tokens = seg_tokenize_wo_pattern(tokens, self.seg_dict)
            else:
                tokens = self.tokenizer.text2tokens(text)
            text_ints = self.token_id_converter.tokens2ids(tokens)
            data[self.text_name] = np.array(text_ints, dtype=np.int64)
        return data
@@ -426,13 +499,11 @@
                tokens = self.tokenizer.text2tokens(text)
                text_ints = self.token_id_converter.tokens2ids(tokens)
                data[text_n] = np.array(text_ints, dtype=np.int64)
        assert check_return_type(data)
        return data
    def __call__(
            self, uid: str, data: Dict[str, Union[str, np.ndarray]]
    ) -> Dict[str, np.ndarray]:
        assert check_argument_types()
        if self.speech_name in data:
            # Nothing now: candidates:
@@ -536,5 +607,275 @@
                tokens = self.tokenizer[i].text2tokens(text)
                text_ints = self.token_id_converter[i].tokens2ids(tokens)
                data[text_name] = np.array(text_ints, dtype=np.int64)
        assert check_return_type(data)
        return data
class CodeMixTokenizerCommonPreprocessor(CommonPreprocessor):
    def __init__(
            self,
            train: bool,
            token_type: str = None,
            token_list: Union[Path, str, Iterable[str]] = None,
            bpemodel: Union[Path, str, Iterable[str]] = None,
            text_cleaner: Collection[str] = None,
            g2p_type: str = None,
            unk_symbol: str = "<unk>",
            space_symbol: str = "<space>",
            non_linguistic_symbols: Union[Path, str, Iterable[str]] = None,
            delimiter: str = None,
            rir_scp: str = None,
            rir_apply_prob: float = 1.0,
            noise_scp: str = None,
            noise_apply_prob: float = 1.0,
            noise_db_range: str = "3_10",
            speech_volume_normalize: float = None,
            speech_name: str = "speech",
            text_name: str = "text",
            split_text_name: str = "split_text",
            split_with_space: bool = False,
            seg_jieba: bool = False,
            seg_dict_file: str = None,
    ):
        super().__init__(
            train=train,
            # Force to use word.
            token_type="word",
            token_list=token_list,
            bpemodel=bpemodel,
            text_cleaner=text_cleaner,
            g2p_type=g2p_type,
            unk_symbol=unk_symbol,
            space_symbol=space_symbol,
            non_linguistic_symbols=non_linguistic_symbols,
            delimiter=delimiter,
            speech_name=speech_name,
            text_name=text_name,
            rir_scp=rir_scp,
            rir_apply_prob=rir_apply_prob,
            noise_scp=noise_scp,
            noise_apply_prob=noise_apply_prob,
            noise_db_range=noise_db_range,
            speech_volume_normalize=speech_volume_normalize,
            split_with_space=split_with_space,
            seg_dict_file=seg_dict_file,
        )
        # The data field name for split text.
        self.split_text_name = split_text_name
        self.seg_jieba = seg_jieba
        if self.seg_jieba:
            jieba.load_userdict(seg_dict_file)
    @classmethod
    def split_words(cls, text: str):
        words = []
        segs = text.split()
        for seg in segs:
            # There is no space in seg.
            current_word = ""
            for c in seg:
                if len(c.encode()) == 1:
                    # This is an ASCII char.
                    current_word += c
                else:
                    # This is a Chinese char.
                    if len(current_word) > 0:
                        words.append(current_word)
                        current_word = ""
                    words.append(c)
            if len(current_word) > 0:
                words.append(current_word)
        return words
    @classmethod
    def isEnglish(cls, text:str):
        if re.search('^[a-zA-Z\']+$', text):
            return True
        else:
            return False
    @classmethod
    def join_chinese_and_english(cls, input_list):
        line = ''
        for token in input_list:
            if cls.isEnglish(token):
                line = line + ' ' + token
            else:
                line = line + token
        line = line.strip()
        return line
    @classmethod
    def split_words_jieba(cls, text: str):
        input_list = text.split()
        token_list_all = []
        langauge_list = []
        token_list_tmp = []
        language_flag = None
        for token in input_list:
            if cls.isEnglish(token) and language_flag == 'Chinese':
                token_list_all.append(token_list_tmp)
                langauge_list.append('Chinese')
                token_list_tmp = []
            elif not cls.isEnglish(token) and language_flag == 'English':
                token_list_all.append(token_list_tmp)
                langauge_list.append('English')
                token_list_tmp = []
            token_list_tmp.append(token)
            if cls.isEnglish(token):
                language_flag = 'English'
            else:
                language_flag = 'Chinese'
        if token_list_tmp:
            token_list_all.append(token_list_tmp)
            langauge_list.append(language_flag)
        result_list = []
        for token_list_tmp, language_flag in zip(token_list_all, langauge_list):
            if language_flag == 'English':
                result_list.extend(token_list_tmp)
            else:
                seg_list = jieba.cut(cls.join_chinese_and_english(token_list_tmp), HMM=False)
                result_list.extend(seg_list)
        return result_list
    def __call__(
            self, uid: str, data: Dict[str, Union[list, str, np.ndarray]]
    ) -> Dict[str, Union[list, np.ndarray]]:
        # Split words.
        if isinstance(data[self.text_name], str):
            if self.seg_jieba:
  #              jieba.load_userdict(seg_dict_file)
                split_text = self.split_words_jieba(data[self.text_name])
            else:
                split_text = self.split_words(data[self.text_name])
        else:
            split_text = data[self.text_name]
        data[self.text_name] = " ".join(split_text)
        data = self._speech_process(data)
        data = self._text_process(data)
        data[self.split_text_name] = split_text
        return data
    def pop_split_text_data(self, data: Dict[str, Union[str, np.ndarray]]):
        result = data[self.split_text_name]
        del data[self.split_text_name]
        return result
class PuncTrainTokenizerCommonPreprocessor(CommonPreprocessor):
    def __init__(
            self,
            train: bool,
            token_type: List[str] = [None],
            token_list: List[Union[Path, str, Iterable[str]]] = [None],
            bpemodel: List[Union[Path, str, Iterable[str]]] = [None],
            text_cleaner: Collection[str] = None,
            g2p_type: str = None,
            unk_symbol: str = "<unk>",
            space_symbol: str = "<space>",
            non_linguistic_symbols: Union[Path, str, Iterable[str]] = None,
            delimiter: str = None,
            rir_scp: str = None,
            rir_apply_prob: float = 1.0,
            noise_scp: str = None,
            noise_apply_prob: float = 1.0,
            noise_db_range: str = "3_10",
            speech_volume_normalize: float = None,
            speech_name: str = "speech",
            text_name: List[str] = ["text"],
            vad_name: str = "vad_indexes",
    ):
        # TODO(jiatong): sync with Kamo and Jing on interface for preprocessor
        super().__init__(
            train=train,
            token_type=token_type[0],
            token_list=token_list[0],
            bpemodel=bpemodel[0],
            text_cleaner=text_cleaner,
            g2p_type=g2p_type,
            unk_symbol=unk_symbol,
            space_symbol=space_symbol,
            non_linguistic_symbols=non_linguistic_symbols,
            delimiter=delimiter,
            speech_name=speech_name,
            text_name=text_name[0],
            rir_scp=rir_scp,
            rir_apply_prob=rir_apply_prob,
            noise_scp=noise_scp,
            noise_apply_prob=noise_apply_prob,
            noise_db_range=noise_db_range,
            speech_volume_normalize=speech_volume_normalize,
        )
        assert (
                len(token_type) == len(token_list) == len(bpemodel) == len(text_name)
        ), "token_type, token_list, bpemodel, or processing text_name mismatched"
        self.num_tokenizer = len(token_type)
        self.tokenizer = []
        self.token_id_converter = []
        for i in range(self.num_tokenizer):
            if token_type[i] is not None:
                if token_list[i] is None:
                    raise ValueError("token_list is required if token_type is not None")
                self.tokenizer.append(
                    build_tokenizer(
                        token_type=token_type[i],
                        bpemodel=bpemodel[i],
                        delimiter=delimiter,
                        space_symbol=space_symbol,
                        non_linguistic_symbols=non_linguistic_symbols,
                        g2p_type=g2p_type,
                    )
                )
                self.token_id_converter.append(
                    TokenIDConverter(
                        token_list=token_list[i],
                        unk_symbol=unk_symbol,
                    )
                )
            else:
                self.tokenizer.append(None)
                self.token_id_converter.append(None)
        self.text_cleaner = TextCleaner(text_cleaner)
        self.text_name = text_name  # override the text_name from CommonPreprocessor
        self.vad_name = vad_name
    def _text_process(
            self, data: Dict[str, Union[str, np.ndarray]]
    ) -> Dict[str, np.ndarray]:
        for i in range(self.num_tokenizer):
            text_name = self.text_name[i]
            if text_name in data and self.tokenizer[i] is not None:
                text = data[text_name]
                text = self.text_cleaner(text)
                tokens = self.tokenizer[i].text2tokens(text)
                if "vad:" in tokens[-1]:
                    vad = tokens[-1][4:]
                    tokens = tokens[:-1]
                    if len(vad) == 0:
                        vad = -1
                    else:
                        vad = int(vad)
                    data[self.vad_name] = np.array([vad], dtype=np.int64)
                text_ints = self.token_id_converter[i].tokens2ids(tokens)
                data[text_name] = np.array(text_ints, dtype=np.int64)
        return data
def split_to_mini_sentence(words: list, word_limit: int = 20):
    assert word_limit > 1
    if len(words) <= word_limit:
        return [words]
    sentences = []
    length = len(words)
    sentence_len = length // word_limit
    for i in range(sentence_len):
        sentences.append(words[i * word_limit:(i + 1) * word_limit])
    if length % word_limit > 0:
        sentences.append(words[sentence_len * word_limit:])
    return sentences