游雁
2023-02-14 1d4ab65c8bfebaecbcb0eec0064bae9a321cad75
funasr/datasets/preprocessor.py
@@ -58,6 +58,15 @@
            continue
    return out_txt.strip().split()
def seg_tokenize_wo_pattern(txt, seg_dict):
    out_txt = ""
    for word in txt:
        if word in seg_dict:
            out_txt += seg_dict[word] + " "
        else:
            out_txt += "<unk>" + " "
    return out_txt.strip().split()
def framing(
        x,
@@ -354,7 +363,7 @@
            if self.split_with_space:
                tokens = text.strip().split(" ")
                if self.seg_dict is not None:
                    tokens = forward_segment("".join(tokens).lower(), self.seg_dict)
                    tokens = forward_segment("".join(tokens), self.seg_dict)
                    tokens = seg_tokenize(tokens, self.seg_dict)
            else:
                tokens = self.tokenizer.text2tokens(text)
@@ -370,6 +379,70 @@
        data = self._speech_process(data)
        data = self._text_process(data)
        return data
## FIXME
class LMPreprocessor(CommonPreprocessor):
    def __init__(
            self,
            train: bool,
            token_type: str = None,
            token_list: Union[Path, str, Iterable[str]] = None,
            bpemodel: Union[Path, str, Iterable[str]] = None,
            text_cleaner: Collection[str] = None,
            g2p_type: str = None,
            unk_symbol: str = "<unk>",
            space_symbol: str = "<space>",
            non_linguistic_symbols: Union[Path, str, Iterable[str]] = None,
            delimiter: str = None,
            rir_scp: str = None,
            rir_apply_prob: float = 1.0,
            noise_scp: str = None,
            noise_apply_prob: float = 1.0,
            noise_db_range: str = "3_10",
            speech_volume_normalize: float = None,
            speech_name: str = "speech",
            text_name: str = "text",
            split_with_space: bool = False,
            seg_dict_file: str = None,
    ):
        super().__init__(train,
                         token_type,
                         token_list,
                         bpemodel,
                         text_cleaner,
                         g2p_type,
                         unk_symbol,
                         space_symbol,
                         non_linguistic_symbols,
                         delimiter,
                         rir_scp,
                         rir_apply_prob,
                         noise_scp,
                         noise_apply_prob,
                         noise_db_range,
                         speech_volume_normalize,
                         speech_name,
                         text_name,
                         split_with_space,
                         seg_dict_file,
                         )
    def _text_process(
            self, data: Dict[str, Union[str, np.ndarray]]
    ) -> Dict[str, np.ndarray]:
        if self.text_name in data and self.tokenizer is not None:
            text = data[self.text_name]
            text = self.text_cleaner(text)
            if self.split_with_space:
                tokens = text.strip().split(" ")
                if self.seg_dict is not None:
                    tokens = seg_tokenize_wo_pattern(tokens, self.seg_dict)
            else:
                tokens = self.tokenizer.text2tokens(text)
            text_ints = self.token_id_converter.tokens2ids(tokens)
            data[self.text_name] = np.array(text_ints, dtype=np.int64)
        assert check_return_type(data)
        return data
@@ -538,3 +611,96 @@
                data[text_name] = np.array(text_ints, dtype=np.int64)
        assert check_return_type(data)
        return data
class CodeMixTokenizerCommonPreprocessor(CommonPreprocessor):
    def __init__(
            self,
            train: bool,
            token_type: str = None,
            token_list: Union[Path, str, Iterable[str]] = None,
            bpemodel: Union[Path, str, Iterable[str]] = None,
            text_cleaner: Collection[str] = None,
            g2p_type: str = None,
            unk_symbol: str = "<unk>",
            space_symbol: str = "<space>",
            non_linguistic_symbols: Union[Path, str, Iterable[str]] = None,
            delimiter: str = None,
            rir_scp: str = None,
            rir_apply_prob: float = 1.0,
            noise_scp: str = None,
            noise_apply_prob: float = 1.0,
            noise_db_range: str = "3_10",
            speech_volume_normalize: float = None,
            speech_name: str = "speech",
            text_name: str = "text",
            split_text_name: str = "split_text",
            split_with_space: bool = False,
            seg_dict_file: str = None,
    ):
        super().__init__(
            train=train,
            # Force to use word.
            token_type="word",
            token_list=token_list,
            bpemodel=bpemodel,
            text_cleaner=text_cleaner,
            g2p_type=g2p_type,
            unk_symbol=unk_symbol,
            space_symbol=space_symbol,
            non_linguistic_symbols=non_linguistic_symbols,
            delimiter=delimiter,
            speech_name=speech_name,
            text_name=text_name,
            rir_scp=rir_scp,
            rir_apply_prob=rir_apply_prob,
            noise_scp=noise_scp,
            noise_apply_prob=noise_apply_prob,
            noise_db_range=noise_db_range,
            speech_volume_normalize=speech_volume_normalize,
            split_with_space=split_with_space,
            seg_dict_file=seg_dict_file,
        )
        # The data field name for split text.
        self.split_text_name = split_text_name
    @classmethod
    def split_words(cls, text: str):
        words = []
        segs = text.split()
        for seg in segs:
            # There is no space in seg.
            current_word = ""
            for c in seg:
                if len(c.encode()) == 1:
                    # This is an ASCII char.
                    current_word += c
                else:
                    # This is a Chinese char.
                    if len(current_word) > 0:
                        words.append(current_word)
                        current_word = ""
                    words.append(c)
            if len(current_word) > 0:
                words.append(current_word)
        return words
    def __call__(
            self, uid: str, data: Dict[str, Union[list, str, np.ndarray]]
    ) -> Dict[str, Union[list, np.ndarray]]:
        assert check_argument_types()
        # Split words.
        if isinstance(data[self.text_name], str):
            split_text = self.split_words(data[self.text_name])
        else:
            split_text = data[self.text_name]
        data[self.text_name] = " ".join(split_text)
        data = self._speech_process(data)
        data = self._text_process(data)
        data[self.split_text_name] = split_text
        return data
    def pop_split_text_data(self, data: Dict[str, Union[str, np.ndarray]]):
        result = data[self.split_text_name]
        del data[self.split_text_name]
        return result