| | |
| | | |
| | | import numpy as np |
| | | import scipy.signal |
| | | import soundfile |
| | | from typeguard import check_argument_types |
| | | from typeguard import check_return_type |
| | | import librosa |
| | | import jieba |
| | | |
| | | from funasr.text.build_tokenizer import build_tokenizer |
| | | from funasr.text.cleaner import TextCleaner |
| | | from funasr.text.token_id_converter import TokenIDConverter |
| | | from funasr.tokenizer.build_tokenizer import build_tokenizer |
| | | from funasr.tokenizer.cleaner import TextCleaner |
| | | from funasr.tokenizer.token_id_converter import TokenIDConverter |
| | | |
| | | |
| | | class AbsPreprocessor(ABC): |
| | |
| | | i += len(longest_word) |
| | | return word_list |
| | | |
| | | |
| | | def seg_tokenize(txt, seg_dict): |
| | | pattern = re.compile(r'^[\u4E00-\u9FA50-9]+$') |
| | | out_txt = "" |
| | | for word in txt: |
| | | word = word.lower() |
| | | if word in seg_dict: |
| | | out_txt += seg_dict[word] + " " |
| | | else: |
| | | out_txt += "<unk>" + " " |
| | | if pattern.match(word): |
| | | for char in word: |
| | | if char in seg_dict: |
| | | out_txt += seg_dict[char] + " " |
| | | else: |
| | | out_txt += "<unk>" + " " |
| | | else: |
| | | out_txt += "<unk>" + " " |
| | | return out_txt.strip().split() |
| | | |
| | | def seg_tokenize_wo_pattern(txt, seg_dict): |
| | |
| | | self.seg_dict = None |
| | | if seg_dict_file is not None: |
| | | self.seg_dict = {} |
| | | with open(seg_dict_file) as f: |
| | | with open(seg_dict_file, "r", encoding="utf8") as f: |
| | | lines = f.readlines() |
| | | for line in lines: |
| | | s = line.strip().split() |
| | |
| | | def _speech_process( |
| | | self, data: Dict[str, Union[str, np.ndarray]] |
| | | ) -> Dict[str, Union[str, np.ndarray]]: |
| | | assert check_argument_types() |
| | | if self.speech_name in data: |
| | | if self.train and (self.rirs is not None or self.noises is not None): |
| | | speech = data[self.speech_name] |
| | |
| | | if self.rirs is not None and self.rir_apply_prob >= np.random.random(): |
| | | rir_path = np.random.choice(self.rirs) |
| | | if rir_path is not None: |
| | | rir, _ = soundfile.read( |
| | | rir, _ = librosa.load( |
| | | rir_path, dtype=np.float64, always_2d=True |
| | | ) |
| | | |
| | |
| | | noise_db = np.random.uniform( |
| | | self.noise_db_low, self.noise_db_high |
| | | ) |
| | | with soundfile.SoundFile(noise_path) as f: |
| | | if f.frames == nsamples: |
| | | noise = f.read(dtype=np.float64, always_2d=True) |
| | | elif f.frames < nsamples: |
| | | offset = np.random.randint(0, nsamples - f.frames) |
| | | # noise: (Time, Nmic) |
| | | noise = f.read(dtype=np.float64, always_2d=True) |
| | | # Repeat noise |
| | | noise = np.pad( |
| | | noise, |
| | | [(offset, nsamples - f.frames - offset), (0, 0)], |
| | | mode="wrap", |
| | | ) |
| | | else: |
| | | offset = np.random.randint(0, f.frames - nsamples) |
| | | f.seek(offset) |
| | | # noise: (Time, Nmic) |
| | | noise = f.read( |
| | | nsamples, dtype=np.float64, always_2d=True |
| | | ) |
| | | if len(noise) != nsamples: |
| | | raise RuntimeError(f"Something wrong: {noise_path}") |
| | | |
| | | audio_data = librosa.load(noise_path, dtype='float32')[0][None, :] |
| | | frames = len(audio_data[0]) |
| | | if frames == nsamples: |
| | | noise = audio_data |
| | | elif frames < nsamples: |
| | | offset = np.random.randint(0, nsamples - frames) |
| | | # noise: (Time, Nmic) |
| | | noise = audio_data |
| | | # Repeat noise |
| | | noise = np.pad( |
| | | noise, |
| | | [(offset, nsamples - frames - offset), (0, 0)], |
| | | mode="wrap", |
| | | ) |
| | | else: |
| | | noise = audio_data[:, nsamples] |
| | | # offset = np.random.randint(0, frames - nsamples) |
| | | # f.seek(offset) |
| | | # noise: (Time, Nmic) |
| | | # noise = f.read( |
| | | # nsamples, dtype=np.float64, always_2d=True |
| | | # ) |
| | | # if len(noise) != nsamples: |
| | | # raise RuntimeError(f"Something wrong: {noise_path}") |
| | | # noise: (Nmic, Time) |
| | | noise = noise.T |
| | | |
| | |
| | | speech = data[self.speech_name] |
| | | ma = np.max(np.abs(speech)) |
| | | data[self.speech_name] = speech * self.speech_volume_normalize / ma |
| | | assert check_return_type(data) |
| | | return data |
| | | |
| | | def _text_process( |
| | |
| | | tokens = self.tokenizer.text2tokens(text) |
| | | text_ints = self.token_id_converter.tokens2ids(tokens) |
| | | data[self.text_name] = np.array(text_ints, dtype=np.int64) |
| | | assert check_return_type(data) |
| | | return data |
| | | |
| | | def __call__( |
| | | self, uid: str, data: Dict[str, Union[str, np.ndarray]] |
| | | ) -> Dict[str, np.ndarray]: |
| | | assert check_argument_types() |
| | | |
| | | data = self._speech_process(data) |
| | | data = self._text_process(data) |
| | |
| | | tokens = self.tokenizer.text2tokens(text) |
| | | text_ints = self.token_id_converter.tokens2ids(tokens) |
| | | data[self.text_name] = np.array(text_ints, dtype=np.int64) |
| | | assert check_return_type(data) |
| | | return data |
| | | |
| | | |
| | |
| | | tokens = self.tokenizer.text2tokens(text) |
| | | text_ints = self.token_id_converter.tokens2ids(tokens) |
| | | data[text_n] = np.array(text_ints, dtype=np.int64) |
| | | assert check_return_type(data) |
| | | return data |
| | | |
| | | def __call__( |
| | | self, uid: str, data: Dict[str, Union[str, np.ndarray]] |
| | | ) -> Dict[str, np.ndarray]: |
| | | assert check_argument_types() |
| | | |
| | | if self.speech_name in data: |
| | | # Nothing now: candidates: |
| | |
| | | tokens = self.tokenizer[i].text2tokens(text) |
| | | text_ints = self.token_id_converter[i].tokens2ids(tokens) |
| | | data[text_name] = np.array(text_ints, dtype=np.int64) |
| | | assert check_return_type(data) |
| | | return data |
| | | |
| | | class CodeMixTokenizerCommonPreprocessor(CommonPreprocessor): |
| | |
| | | text_name: str = "text", |
| | | split_text_name: str = "split_text", |
| | | split_with_space: bool = False, |
| | | seg_jieba: bool = False, |
| | | seg_dict_file: str = None, |
| | | ): |
| | | super().__init__( |
| | |
| | | ) |
| | | # The data field name for split text. |
| | | self.split_text_name = split_text_name |
| | | self.seg_jieba = seg_jieba |
| | | if self.seg_jieba: |
| | | jieba.load_userdict(seg_dict_file) |
| | | |
| | | @classmethod |
| | | def split_words(cls, text: str): |
| | |
| | | words.append(current_word) |
| | | return words |
| | | |
| | | @classmethod |
| | | def isEnglish(cls, text:str): |
| | | if re.search('^[a-zA-Z\']+$', text): |
| | | return True |
| | | else: |
| | | return False |
| | | |
| | | @classmethod |
| | | def join_chinese_and_english(cls, input_list): |
| | | line = '' |
| | | for token in input_list: |
| | | if cls.isEnglish(token): |
| | | line = line + ' ' + token |
| | | else: |
| | | line = line + token |
| | | |
| | | line = line.strip() |
| | | return line |
| | | |
| | | @classmethod |
| | | def split_words_jieba(cls, text: str): |
| | | input_list = text.split() |
| | | token_list_all = [] |
| | | langauge_list = [] |
| | | token_list_tmp = [] |
| | | language_flag = None |
| | | for token in input_list: |
| | | if cls.isEnglish(token) and language_flag == 'Chinese': |
| | | token_list_all.append(token_list_tmp) |
| | | langauge_list.append('Chinese') |
| | | token_list_tmp = [] |
| | | elif not cls.isEnglish(token) and language_flag == 'English': |
| | | token_list_all.append(token_list_tmp) |
| | | langauge_list.append('English') |
| | | token_list_tmp = [] |
| | | |
| | | token_list_tmp.append(token) |
| | | |
| | | if cls.isEnglish(token): |
| | | language_flag = 'English' |
| | | else: |
| | | language_flag = 'Chinese' |
| | | |
| | | if token_list_tmp: |
| | | token_list_all.append(token_list_tmp) |
| | | langauge_list.append(language_flag) |
| | | |
| | | result_list = [] |
| | | for token_list_tmp, language_flag in zip(token_list_all, langauge_list): |
| | | if language_flag == 'English': |
| | | result_list.extend(token_list_tmp) |
| | | else: |
| | | seg_list = jieba.cut(cls.join_chinese_and_english(token_list_tmp), HMM=False) |
| | | result_list.extend(seg_list) |
| | | |
| | | return result_list |
| | | |
| | | def __call__( |
| | | self, uid: str, data: Dict[str, Union[list, str, np.ndarray]] |
| | | ) -> Dict[str, Union[list, np.ndarray]]: |
| | | assert check_argument_types() |
| | | # Split words. |
| | | if isinstance(data[self.text_name], str): |
| | | split_text = self.split_words(data[self.text_name]) |
| | | if self.seg_jieba: |
| | | # jieba.load_userdict(seg_dict_file) |
| | | split_text = self.split_words_jieba(data[self.text_name]) |
| | | else: |
| | | split_text = self.split_words(data[self.text_name]) |
| | | else: |
| | | split_text = data[self.text_name] |
| | | data[self.text_name] = " ".join(split_text) |
| | |
| | | ) -> Dict[str, np.ndarray]: |
| | | for i in range(self.num_tokenizer): |
| | | text_name = self.text_name[i] |
| | | #import pdb; pdb.set_trace() |
| | | if text_name in data and self.tokenizer[i] is not None: |
| | | text = data[text_name] |
| | | text = self.text_cleaner(text) |