From 24f73665e2d8ea8e4de2fe4f900bc539d7f7b989 Mon Sep 17 00:00:00 2001
From: hnluo <haoneng.lhn@alibaba-inc.com>
Date: 星期一, 17 四月 2023 15:49:45 +0800
Subject: [PATCH] Merge pull request #367 from alibaba-damo-academy/dev_lhn2
---
funasr/datasets/preprocessor.py | 479 ++++++++++++++++++++++++++++++++++++++++++++++++++---------
1 files changed, 400 insertions(+), 79 deletions(-)
diff --git a/funasr/datasets/preprocessor.py b/funasr/datasets/preprocessor.py
index 80d1adc..1adca05 100644
--- a/funasr/datasets/preprocessor.py
+++ b/funasr/datasets/preprocessor.py
@@ -1,3 +1,4 @@
+import re
from abc import ABC
from abc import abstractmethod
from pathlib import Path
@@ -24,17 +25,51 @@
@abstractmethod
def __call__(
- self, uid: str, data: Dict[str, Union[str, np.ndarray]]
+ self, uid: str, data: Dict[str, Union[str, np.ndarray]]
) -> Dict[str, np.ndarray]:
raise NotImplementedError
+def forward_segment(text, dic):
+ word_list = []
+ i = 0
+ while i < len(text):
+ longest_word = text[i]
+ for j in range(i + 1, len(text) + 1):
+ word = text[i:j]
+ if word in dic:
+ if len(word) > len(longest_word):
+ longest_word = word
+ word_list.append(longest_word)
+ i += len(longest_word)
+ return word_list
+
+
+def seg_tokenize(txt, seg_dict):
+ out_txt = ""
+ for word in txt:
+ if word in seg_dict:
+ out_txt += seg_dict[word] + " "
+ else:
+ out_txt += "<unk>" + " "
+ return out_txt.strip().split()
+
+def seg_tokenize_wo_pattern(txt, seg_dict):
+ out_txt = ""
+ for word in txt:
+ if word in seg_dict:
+ out_txt += seg_dict[word] + " "
+ else:
+ out_txt += "<unk>" + " "
+ return out_txt.strip().split()
+
+
def framing(
- x,
- frame_length: int = 512,
- frame_shift: int = 256,
- centered: bool = True,
- padded: bool = True,
+ x,
+ frame_length: int = 512,
+ frame_shift: int = 256,
+ centered: bool = True,
+ padded: bool = True,
):
if x.size == 0:
raise ValueError("Input array size is zero")
@@ -73,11 +108,11 @@
def detect_non_silence(
- x: np.ndarray,
- threshold: float = 0.01,
- frame_length: int = 1024,
- frame_shift: int = 512,
- window: str = "boxcar",
+ x: np.ndarray,
+ threshold: float = 0.01,
+ frame_length: int = 1024,
+ frame_shift: int = 512,
+ window: str = "boxcar",
) -> np.ndarray:
"""Power based voice activity detection.
@@ -103,7 +138,7 @@
)
framed_w *= scipy.signal.get_window(window, frame_length).astype(framed_w.dtype)
# power: (C, T)
- power = (framed_w**2).mean(axis=-1)
+ power = (framed_w ** 2).mean(axis=-1)
# mean_power: (C, 1)
mean_power = np.mean(power, axis=-1, keepdims=True)
if np.all(mean_power == 0):
@@ -126,26 +161,27 @@
class CommonPreprocessor(AbsPreprocessor):
def __init__(
- self,
- train: bool,
- token_type: str = None,
- token_list: Union[Path, str, Iterable[str]] = None,
- bpemodel: Union[Path, str, Iterable[str]] = None,
- text_cleaner: Collection[str] = None,
- g2p_type: str = None,
- unk_symbol: str = "<unk>",
- space_symbol: str = "<space>",
- non_linguistic_symbols: Union[Path, str, Iterable[str]] = None,
- delimiter: str = None,
- rir_scp: str = None,
- rir_apply_prob: float = 1.0,
- noise_scp: str = None,
- noise_apply_prob: float = 1.0,
- noise_db_range: str = "3_10",
- speech_volume_normalize: float = None,
- speech_name: str = "speech",
- text_name: str = "text",
- split_with_space: bool = False,
+ self,
+ train: bool,
+ token_type: str = None,
+ token_list: Union[Path, str, Iterable[str]] = None,
+ bpemodel: Union[Path, str, Iterable[str]] = None,
+ text_cleaner: Collection[str] = None,
+ g2p_type: str = None,
+ unk_symbol: str = "<unk>",
+ space_symbol: str = "<space>",
+ non_linguistic_symbols: Union[Path, str, Iterable[str]] = None,
+ delimiter: str = None,
+ rir_scp: str = None,
+ rir_apply_prob: float = 1.0,
+ noise_scp: str = None,
+ noise_apply_prob: float = 1.0,
+ noise_db_range: str = "3_10",
+ speech_volume_normalize: float = None,
+ speech_name: str = "speech",
+ text_name: str = "text",
+ split_with_space: bool = False,
+ seg_dict_file: str = None,
):
super().__init__(train)
self.train = train
@@ -155,6 +191,16 @@
self.rir_apply_prob = rir_apply_prob
self.noise_apply_prob = noise_apply_prob
self.split_with_space = split_with_space
+ self.seg_dict = None
+ if seg_dict_file is not None:
+ self.seg_dict = {}
+ with open(seg_dict_file) as f:
+ lines = f.readlines()
+ for line in lines:
+ s = line.strip().split()
+ key = s[0]
+ value = s[1:]
+ self.seg_dict[key] = " ".join(value)
if token_type is not None:
if token_list is None:
@@ -212,7 +258,7 @@
self.noises = None
def _speech_process(
- self, data: Dict[str, Union[str, np.ndarray]]
+ self, data: Dict[str, Union[str, np.ndarray]]
) -> Dict[str, Union[str, np.ndarray]]:
assert check_argument_types()
if self.speech_name in data:
@@ -242,16 +288,16 @@
# speech: (Nmic, Time)
# Note that this operation doesn't change the signal length
speech = scipy.signal.convolve(speech, rir, mode="full")[
- :, : speech.shape[1]
- ]
+ :, : speech.shape[1]
+ ]
# Reverse mean power to the original power
power2 = (speech[detect_non_silence(speech)] ** 2).mean()
speech = np.sqrt(power / max(power2, 1e-10)) * speech
# 2. Add Noise
if (
- self.noises is not None
- and self.noise_apply_prob >= np.random.random()
+ self.noises is not None
+ and self.noise_apply_prob >= np.random.random()
):
noise_path = np.random.choice(self.noises)
if noise_path is not None:
@@ -283,11 +329,11 @@
# noise: (Nmic, Time)
noise = noise.T
- noise_power = (noise**2).mean()
+ noise_power = (noise ** 2).mean()
scale = (
- 10 ** (-noise_db / 20)
- * np.sqrt(power)
- / np.sqrt(max(noise_power, 1e-10))
+ 10 ** (-noise_db / 20)
+ * np.sqrt(power)
+ / np.sqrt(max(noise_power, 1e-10))
)
speech = speech + scale * noise
@@ -305,13 +351,16 @@
return data
def _text_process(
- self, data: Dict[str, Union[str, np.ndarray]]
+ self, data: Dict[str, Union[str, np.ndarray]]
) -> Dict[str, np.ndarray]:
if self.text_name in data and self.tokenizer is not None:
text = data[self.text_name]
text = self.text_cleaner(text)
if self.split_with_space:
tokens = text.strip().split(" ")
+ if self.seg_dict is not None:
+ tokens = forward_segment("".join(tokens), self.seg_dict)
+ tokens = seg_tokenize(tokens, self.seg_dict)
else:
tokens = self.tokenizer.text2tokens(text)
text_ints = self.token_id_converter.tokens2ids(tokens)
@@ -320,7 +369,7 @@
return data
def __call__(
- self, uid: str, data: Dict[str, Union[str, np.ndarray]]
+ self, uid: str, data: Dict[str, Union[str, np.ndarray]]
) -> Dict[str, np.ndarray]:
assert check_argument_types()
@@ -328,22 +377,86 @@
data = self._text_process(data)
return data
+## FIXME
+class LMPreprocessor(CommonPreprocessor):
+ def __init__(
+ self,
+ train: bool,
+ token_type: str = None,
+ token_list: Union[Path, str, Iterable[str]] = None,
+ bpemodel: Union[Path, str, Iterable[str]] = None,
+ text_cleaner: Collection[str] = None,
+ g2p_type: str = None,
+ unk_symbol: str = "<unk>",
+ space_symbol: str = "<space>",
+ non_linguistic_symbols: Union[Path, str, Iterable[str]] = None,
+ delimiter: str = None,
+ rir_scp: str = None,
+ rir_apply_prob: float = 1.0,
+ noise_scp: str = None,
+ noise_apply_prob: float = 1.0,
+ noise_db_range: str = "3_10",
+ speech_volume_normalize: float = None,
+ speech_name: str = "speech",
+ text_name: str = "text",
+ split_with_space: bool = False,
+ seg_dict_file: str = None,
+ ):
+ super().__init__(train,
+ token_type,
+ token_list,
+ bpemodel,
+ text_cleaner,
+ g2p_type,
+ unk_symbol,
+ space_symbol,
+ non_linguistic_symbols,
+ delimiter,
+ rir_scp,
+ rir_apply_prob,
+ noise_scp,
+ noise_apply_prob,
+ noise_db_range,
+ speech_volume_normalize,
+ speech_name,
+ text_name,
+ split_with_space,
+ seg_dict_file,
+ )
+
+ def _text_process(
+ self, data: Dict[str, Union[str, np.ndarray]]
+ ) -> Dict[str, np.ndarray]:
+ if self.text_name in data and self.tokenizer is not None:
+ text = data[self.text_name]
+ text = self.text_cleaner(text)
+ if self.split_with_space:
+ tokens = text.strip().split(" ")
+ if self.seg_dict is not None:
+ tokens = seg_tokenize_wo_pattern(tokens, self.seg_dict)
+ else:
+ tokens = self.tokenizer.text2tokens(text)
+ text_ints = self.token_id_converter.tokens2ids(tokens)
+ data[self.text_name] = np.array(text_ints, dtype=np.int64)
+ assert check_return_type(data)
+ return data
+
class CommonPreprocessor_multi(AbsPreprocessor):
def __init__(
- self,
- train: bool,
- token_type: str = None,
- token_list: Union[Path, str, Iterable[str]] = None,
- bpemodel: Union[Path, str, Iterable[str]] = None,
- text_cleaner: Collection[str] = None,
- g2p_type: str = None,
- unk_symbol: str = "<unk>",
- space_symbol: str = "<space>",
- non_linguistic_symbols: Union[Path, str, Iterable[str]] = None,
- delimiter: str = None,
- speech_name: str = "speech",
- text_name: List[str] = ["text"],
+ self,
+ train: bool,
+ token_type: str = None,
+ token_list: Union[Path, str, Iterable[str]] = None,
+ bpemodel: Union[Path, str, Iterable[str]] = None,
+ text_cleaner: Collection[str] = None,
+ g2p_type: str = None,
+ unk_symbol: str = "<unk>",
+ space_symbol: str = "<space>",
+ non_linguistic_symbols: Union[Path, str, Iterable[str]] = None,
+ delimiter: str = None,
+ speech_name: str = "speech",
+ text_name: List[str] = ["text"],
):
super().__init__(train)
self.train = train
@@ -373,7 +486,7 @@
self.token_id_converter = None
def _text_process(
- self, data: Dict[str, Union[str, np.ndarray]]
+ self, data: Dict[str, Union[str, np.ndarray]]
) -> Dict[str, np.ndarray]:
for text_n in self.text_name:
if text_n in data and self.tokenizer is not None:
@@ -386,7 +499,7 @@
return data
def __call__(
- self, uid: str, data: Dict[str, Union[str, np.ndarray]]
+ self, uid: str, data: Dict[str, Union[str, np.ndarray]]
) -> Dict[str, np.ndarray]:
assert check_argument_types()
@@ -404,25 +517,25 @@
class MutliTokenizerCommonPreprocessor(CommonPreprocessor):
def __init__(
- self,
- train: bool,
- token_type: List[str] = [None],
- token_list: List[Union[Path, str, Iterable[str]]] = [None],
- bpemodel: List[Union[Path, str, Iterable[str]]] = [None],
- text_cleaner: Collection[str] = None,
- g2p_type: str = None,
- unk_symbol: str = "<unk>",
- space_symbol: str = "<space>",
- non_linguistic_symbols: Union[Path, str, Iterable[str]] = None,
- delimiter: str = None,
- rir_scp: str = None,
- rir_apply_prob: float = 1.0,
- noise_scp: str = None,
- noise_apply_prob: float = 1.0,
- noise_db_range: str = "3_10",
- speech_volume_normalize: float = None,
- speech_name: str = "speech",
- text_name: List[str] = ["text"],
+ self,
+ train: bool,
+ token_type: List[str] = [None],
+ token_list: List[Union[Path, str, Iterable[str]]] = [None],
+ bpemodel: List[Union[Path, str, Iterable[str]]] = [None],
+ text_cleaner: Collection[str] = None,
+ g2p_type: str = None,
+ unk_symbol: str = "<unk>",
+ space_symbol: str = "<space>",
+ non_linguistic_symbols: Union[Path, str, Iterable[str]] = None,
+ delimiter: str = None,
+ rir_scp: str = None,
+ rir_apply_prob: float = 1.0,
+ noise_scp: str = None,
+ noise_apply_prob: float = 1.0,
+ noise_db_range: str = "3_10",
+ speech_volume_normalize: float = None,
+ speech_name: str = "speech",
+ text_name: List[str] = ["text"],
):
# TODO(jiatong): sync with Kamo and Jing on interface for preprocessor
super().__init__(
@@ -447,7 +560,7 @@
)
assert (
- len(token_type) == len(token_list) == len(bpemodel) == len(text_name)
+ len(token_type) == len(token_list) == len(bpemodel) == len(text_name)
), "token_type, token_list, bpemodel, or processing text_name mismatched"
self.num_tokenizer = len(token_type)
self.tokenizer = []
@@ -482,7 +595,7 @@
self.text_name = text_name # override the text_name from CommonPreprocessor
def _text_process(
- self, data: Dict[str, Union[str, np.ndarray]]
+ self, data: Dict[str, Union[str, np.ndarray]]
) -> Dict[str, np.ndarray]:
for i in range(self.num_tokenizer):
text_name = self.text_name[i]
@@ -494,3 +607,211 @@
data[text_name] = np.array(text_ints, dtype=np.int64)
assert check_return_type(data)
return data
+
+class CodeMixTokenizerCommonPreprocessor(CommonPreprocessor):
+ def __init__(
+ self,
+ train: bool,
+ token_type: str = None,
+ token_list: Union[Path, str, Iterable[str]] = None,
+ bpemodel: Union[Path, str, Iterable[str]] = None,
+ text_cleaner: Collection[str] = None,
+ g2p_type: str = None,
+ unk_symbol: str = "<unk>",
+ space_symbol: str = "<space>",
+ non_linguistic_symbols: Union[Path, str, Iterable[str]] = None,
+ delimiter: str = None,
+ rir_scp: str = None,
+ rir_apply_prob: float = 1.0,
+ noise_scp: str = None,
+ noise_apply_prob: float = 1.0,
+ noise_db_range: str = "3_10",
+ speech_volume_normalize: float = None,
+ speech_name: str = "speech",
+ text_name: str = "text",
+ split_text_name: str = "split_text",
+ split_with_space: bool = False,
+ seg_dict_file: str = None,
+ ):
+ super().__init__(
+ train=train,
+ # Force to use word.
+ token_type="word",
+ token_list=token_list,
+ bpemodel=bpemodel,
+ text_cleaner=text_cleaner,
+ g2p_type=g2p_type,
+ unk_symbol=unk_symbol,
+ space_symbol=space_symbol,
+ non_linguistic_symbols=non_linguistic_symbols,
+ delimiter=delimiter,
+ speech_name=speech_name,
+ text_name=text_name,
+ rir_scp=rir_scp,
+ rir_apply_prob=rir_apply_prob,
+ noise_scp=noise_scp,
+ noise_apply_prob=noise_apply_prob,
+ noise_db_range=noise_db_range,
+ speech_volume_normalize=speech_volume_normalize,
+ split_with_space=split_with_space,
+ seg_dict_file=seg_dict_file,
+ )
+ # The data field name for split text.
+ self.split_text_name = split_text_name
+
+ @classmethod
+ def split_words(cls, text: str):
+ words = []
+ segs = text.split()
+ for seg in segs:
+ # There is no space in seg.
+ current_word = ""
+ for c in seg:
+ if len(c.encode()) == 1:
+ # This is an ASCII char.
+ current_word += c
+ else:
+ # This is a Chinese char.
+ if len(current_word) > 0:
+ words.append(current_word)
+ current_word = ""
+ words.append(c)
+ if len(current_word) > 0:
+ words.append(current_word)
+ return words
+
+ def __call__(
+ self, uid: str, data: Dict[str, Union[list, str, np.ndarray]]
+ ) -> Dict[str, Union[list, np.ndarray]]:
+ assert check_argument_types()
+ # Split words.
+ if isinstance(data[self.text_name], str):
+ split_text = self.split_words(data[self.text_name])
+ else:
+ split_text = data[self.text_name]
+ data[self.text_name] = " ".join(split_text)
+ data = self._speech_process(data)
+ data = self._text_process(data)
+ data[self.split_text_name] = split_text
+ return data
+
+ def pop_split_text_data(self, data: Dict[str, Union[str, np.ndarray]]):
+ result = data[self.split_text_name]
+ del data[self.split_text_name]
+ return result
+
+class PuncTrainTokenizerCommonPreprocessor(CommonPreprocessor):
+ def __init__(
+ self,
+ train: bool,
+ token_type: List[str] = [None],
+ token_list: List[Union[Path, str, Iterable[str]]] = [None],
+ bpemodel: List[Union[Path, str, Iterable[str]]] = [None],
+ text_cleaner: Collection[str] = None,
+ g2p_type: str = None,
+ unk_symbol: str = "<unk>",
+ space_symbol: str = "<space>",
+ non_linguistic_symbols: Union[Path, str, Iterable[str]] = None,
+ delimiter: str = None,
+ rir_scp: str = None,
+ rir_apply_prob: float = 1.0,
+ noise_scp: str = None,
+ noise_apply_prob: float = 1.0,
+ noise_db_range: str = "3_10",
+ speech_volume_normalize: float = None,
+ speech_name: str = "speech",
+ text_name: List[str] = ["text"],
+ vad_name: str = "vad_indexes",
+ ):
+ # TODO(jiatong): sync with Kamo and Jing on interface for preprocessor
+ super().__init__(
+ train=train,
+ token_type=token_type[0],
+ token_list=token_list[0],
+ bpemodel=bpemodel[0],
+ text_cleaner=text_cleaner,
+ g2p_type=g2p_type,
+ unk_symbol=unk_symbol,
+ space_symbol=space_symbol,
+ non_linguistic_symbols=non_linguistic_symbols,
+ delimiter=delimiter,
+ speech_name=speech_name,
+ text_name=text_name[0],
+ rir_scp=rir_scp,
+ rir_apply_prob=rir_apply_prob,
+ noise_scp=noise_scp,
+ noise_apply_prob=noise_apply_prob,
+ noise_db_range=noise_db_range,
+ speech_volume_normalize=speech_volume_normalize,
+ )
+
+ assert (
+ len(token_type) == len(token_list) == len(bpemodel) == len(text_name)
+ ), "token_type, token_list, bpemodel, or processing text_name mismatched"
+ self.num_tokenizer = len(token_type)
+ self.tokenizer = []
+ self.token_id_converter = []
+
+ for i in range(self.num_tokenizer):
+ if token_type[i] is not None:
+ if token_list[i] is None:
+ raise ValueError("token_list is required if token_type is not None")
+
+ self.tokenizer.append(
+ build_tokenizer(
+ token_type=token_type[i],
+ bpemodel=bpemodel[i],
+ delimiter=delimiter,
+ space_symbol=space_symbol,
+ non_linguistic_symbols=non_linguistic_symbols,
+ g2p_type=g2p_type,
+ )
+ )
+ self.token_id_converter.append(
+ TokenIDConverter(
+ token_list=token_list[i],
+ unk_symbol=unk_symbol,
+ )
+ )
+ else:
+ self.tokenizer.append(None)
+ self.token_id_converter.append(None)
+
+ self.text_cleaner = TextCleaner(text_cleaner)
+ self.text_name = text_name # override the text_name from CommonPreprocessor
+ self.vad_name = vad_name
+
+ def _text_process(
+ self, data: Dict[str, Union[str, np.ndarray]]
+ ) -> Dict[str, np.ndarray]:
+ for i in range(self.num_tokenizer):
+ text_name = self.text_name[i]
+ #import pdb; pdb.set_trace()
+ if text_name in data and self.tokenizer[i] is not None:
+ text = data[text_name]
+ text = self.text_cleaner(text)
+ tokens = self.tokenizer[i].text2tokens(text)
+ if "vad:" in tokens[-1]:
+ vad = tokens[-1][4:]
+ tokens = tokens[:-1]
+ if len(vad) == 0:
+ vad = -1
+ else:
+ vad = int(vad)
+ data[self.vad_name] = np.array([vad], dtype=np.int64)
+ text_ints = self.token_id_converter[i].tokens2ids(tokens)
+ data[text_name] = np.array(text_ints, dtype=np.int64)
+ return data
+
+def split_to_mini_sentence(words: list, word_limit: int = 20):
+ assert word_limit > 1
+ if len(words) <= word_limit:
+ return [words]
+ sentences = []
+ length = len(words)
+ sentence_len = length // word_limit
+ for i in range(sentence_len):
+ sentences.append(words[i * word_limit:(i + 1) * word_limit])
+ if length % word_limit > 0:
+ sentences.append(words[sentence_len * word_limit:])
+ return sentences
--
Gitblit v1.9.1