From 1d4ab65c8bfebaecbcb0eec0064bae9a321cad75 Mon Sep 17 00:00:00 2001
From: 游雁 <zhifu.gzf@alibaba-inc.com>
Date: 星期二, 14 二月 2023 16:27:37 +0800
Subject: [PATCH] export model
---
funasr/datasets/preprocessor.py | 368 +++++++++++++++++++++++++++++++++++++++++-----------
1 files changed, 289 insertions(+), 79 deletions(-)
diff --git a/funasr/datasets/preprocessor.py b/funasr/datasets/preprocessor.py
index 80d1adc..8e86794 100644
--- a/funasr/datasets/preprocessor.py
+++ b/funasr/datasets/preprocessor.py
@@ -1,3 +1,4 @@
+import re
from abc import ABC
from abc import abstractmethod
from pathlib import Path
@@ -24,17 +25,55 @@
@abstractmethod
def __call__(
- self, uid: str, data: Dict[str, Union[str, np.ndarray]]
+ self, uid: str, data: Dict[str, Union[str, np.ndarray]]
) -> Dict[str, np.ndarray]:
raise NotImplementedError
+def forward_segment(text, dic):
+ word_list = []
+ i = 0
+ while i < len(text):
+ longest_word = text[i]
+ for j in range(i + 1, len(text) + 1):
+ word = text[i:j]
+ if word in dic:
+ if len(word) > len(longest_word):
+ longest_word = word
+ word_list.append(longest_word)
+ i += len(longest_word)
+ return word_list
+
+
+def seg_tokenize(txt, seg_dict):
+ out_txt = ""
+ pattern = re.compile(r"([\u4E00-\u9FA5A-Za-z0-9])")
+ for word in txt:
+ if pattern.match(word):
+ if word in seg_dict:
+ out_txt += seg_dict[word] + " "
+ else:
+ out_txt += "<unk>" + " "
+ else:
+ continue
+ return out_txt.strip().split()
+
+def seg_tokenize_wo_pattern(txt, seg_dict):
+ out_txt = ""
+ for word in txt:
+ if word in seg_dict:
+ out_txt += seg_dict[word] + " "
+ else:
+ out_txt += "<unk>" + " "
+ return out_txt.strip().split()
+
+
def framing(
- x,
- frame_length: int = 512,
- frame_shift: int = 256,
- centered: bool = True,
- padded: bool = True,
+ x,
+ frame_length: int = 512,
+ frame_shift: int = 256,
+ centered: bool = True,
+ padded: bool = True,
):
if x.size == 0:
raise ValueError("Input array size is zero")
@@ -73,11 +112,11 @@
def detect_non_silence(
- x: np.ndarray,
- threshold: float = 0.01,
- frame_length: int = 1024,
- frame_shift: int = 512,
- window: str = "boxcar",
+ x: np.ndarray,
+ threshold: float = 0.01,
+ frame_length: int = 1024,
+ frame_shift: int = 512,
+ window: str = "boxcar",
) -> np.ndarray:
"""Power based voice activity detection.
@@ -103,7 +142,7 @@
)
framed_w *= scipy.signal.get_window(window, frame_length).astype(framed_w.dtype)
# power: (C, T)
- power = (framed_w**2).mean(axis=-1)
+ power = (framed_w ** 2).mean(axis=-1)
# mean_power: (C, 1)
mean_power = np.mean(power, axis=-1, keepdims=True)
if np.all(mean_power == 0):
@@ -126,26 +165,27 @@
class CommonPreprocessor(AbsPreprocessor):
def __init__(
- self,
- train: bool,
- token_type: str = None,
- token_list: Union[Path, str, Iterable[str]] = None,
- bpemodel: Union[Path, str, Iterable[str]] = None,
- text_cleaner: Collection[str] = None,
- g2p_type: str = None,
- unk_symbol: str = "<unk>",
- space_symbol: str = "<space>",
- non_linguistic_symbols: Union[Path, str, Iterable[str]] = None,
- delimiter: str = None,
- rir_scp: str = None,
- rir_apply_prob: float = 1.0,
- noise_scp: str = None,
- noise_apply_prob: float = 1.0,
- noise_db_range: str = "3_10",
- speech_volume_normalize: float = None,
- speech_name: str = "speech",
- text_name: str = "text",
- split_with_space: bool = False,
+ self,
+ train: bool,
+ token_type: str = None,
+ token_list: Union[Path, str, Iterable[str]] = None,
+ bpemodel: Union[Path, str, Iterable[str]] = None,
+ text_cleaner: Collection[str] = None,
+ g2p_type: str = None,
+ unk_symbol: str = "<unk>",
+ space_symbol: str = "<space>",
+ non_linguistic_symbols: Union[Path, str, Iterable[str]] = None,
+ delimiter: str = None,
+ rir_scp: str = None,
+ rir_apply_prob: float = 1.0,
+ noise_scp: str = None,
+ noise_apply_prob: float = 1.0,
+ noise_db_range: str = "3_10",
+ speech_volume_normalize: float = None,
+ speech_name: str = "speech",
+ text_name: str = "text",
+ split_with_space: bool = False,
+ seg_dict_file: str = None,
):
super().__init__(train)
self.train = train
@@ -155,6 +195,16 @@
self.rir_apply_prob = rir_apply_prob
self.noise_apply_prob = noise_apply_prob
self.split_with_space = split_with_space
+ self.seg_dict = None
+ if seg_dict_file is not None:
+ self.seg_dict = {}
+ with open(seg_dict_file) as f:
+ lines = f.readlines()
+ for line in lines:
+ s = line.strip().split()
+ key = s[0]
+ value = s[1:]
+ self.seg_dict[key] = " ".join(value)
if token_type is not None:
if token_list is None:
@@ -212,7 +262,7 @@
self.noises = None
def _speech_process(
- self, data: Dict[str, Union[str, np.ndarray]]
+ self, data: Dict[str, Union[str, np.ndarray]]
) -> Dict[str, Union[str, np.ndarray]]:
assert check_argument_types()
if self.speech_name in data:
@@ -242,16 +292,16 @@
# speech: (Nmic, Time)
# Note that this operation doesn't change the signal length
speech = scipy.signal.convolve(speech, rir, mode="full")[
- :, : speech.shape[1]
- ]
+ :, : speech.shape[1]
+ ]
# Reverse mean power to the original power
power2 = (speech[detect_non_silence(speech)] ** 2).mean()
speech = np.sqrt(power / max(power2, 1e-10)) * speech
# 2. Add Noise
if (
- self.noises is not None
- and self.noise_apply_prob >= np.random.random()
+ self.noises is not None
+ and self.noise_apply_prob >= np.random.random()
):
noise_path = np.random.choice(self.noises)
if noise_path is not None:
@@ -283,11 +333,11 @@
# noise: (Nmic, Time)
noise = noise.T
- noise_power = (noise**2).mean()
+ noise_power = (noise ** 2).mean()
scale = (
- 10 ** (-noise_db / 20)
- * np.sqrt(power)
- / np.sqrt(max(noise_power, 1e-10))
+ 10 ** (-noise_db / 20)
+ * np.sqrt(power)
+ / np.sqrt(max(noise_power, 1e-10))
)
speech = speech + scale * noise
@@ -305,13 +355,16 @@
return data
def _text_process(
- self, data: Dict[str, Union[str, np.ndarray]]
+ self, data: Dict[str, Union[str, np.ndarray]]
) -> Dict[str, np.ndarray]:
if self.text_name in data and self.tokenizer is not None:
text = data[self.text_name]
text = self.text_cleaner(text)
if self.split_with_space:
tokens = text.strip().split(" ")
+ if self.seg_dict is not None:
+ tokens = forward_segment("".join(tokens), self.seg_dict)
+ tokens = seg_tokenize(tokens, self.seg_dict)
else:
tokens = self.tokenizer.text2tokens(text)
text_ints = self.token_id_converter.tokens2ids(tokens)
@@ -320,7 +373,7 @@
return data
def __call__(
- self, uid: str, data: Dict[str, Union[str, np.ndarray]]
+ self, uid: str, data: Dict[str, Union[str, np.ndarray]]
) -> Dict[str, np.ndarray]:
assert check_argument_types()
@@ -328,22 +381,86 @@
data = self._text_process(data)
return data
+## FIXME
+class LMPreprocessor(CommonPreprocessor):
+ def __init__(
+ self,
+ train: bool,
+ token_type: str = None,
+ token_list: Union[Path, str, Iterable[str]] = None,
+ bpemodel: Union[Path, str, Iterable[str]] = None,
+ text_cleaner: Collection[str] = None,
+ g2p_type: str = None,
+ unk_symbol: str = "<unk>",
+ space_symbol: str = "<space>",
+ non_linguistic_symbols: Union[Path, str, Iterable[str]] = None,
+ delimiter: str = None,
+ rir_scp: str = None,
+ rir_apply_prob: float = 1.0,
+ noise_scp: str = None,
+ noise_apply_prob: float = 1.0,
+ noise_db_range: str = "3_10",
+ speech_volume_normalize: float = None,
+ speech_name: str = "speech",
+ text_name: str = "text",
+ split_with_space: bool = False,
+ seg_dict_file: str = None,
+ ):
+ super().__init__(train,
+ token_type,
+ token_list,
+ bpemodel,
+ text_cleaner,
+ g2p_type,
+ unk_symbol,
+ space_symbol,
+ non_linguistic_symbols,
+ delimiter,
+ rir_scp,
+ rir_apply_prob,
+ noise_scp,
+ noise_apply_prob,
+ noise_db_range,
+ speech_volume_normalize,
+ speech_name,
+ text_name,
+ split_with_space,
+ seg_dict_file,
+ )
+
+ def _text_process(
+ self, data: Dict[str, Union[str, np.ndarray]]
+ ) -> Dict[str, np.ndarray]:
+ if self.text_name in data and self.tokenizer is not None:
+ text = data[self.text_name]
+ text = self.text_cleaner(text)
+ if self.split_with_space:
+ tokens = text.strip().split(" ")
+ if self.seg_dict is not None:
+ tokens = seg_tokenize_wo_pattern(tokens, self.seg_dict)
+ else:
+ tokens = self.tokenizer.text2tokens(text)
+ text_ints = self.token_id_converter.tokens2ids(tokens)
+ data[self.text_name] = np.array(text_ints, dtype=np.int64)
+ assert check_return_type(data)
+ return data
+
class CommonPreprocessor_multi(AbsPreprocessor):
def __init__(
- self,
- train: bool,
- token_type: str = None,
- token_list: Union[Path, str, Iterable[str]] = None,
- bpemodel: Union[Path, str, Iterable[str]] = None,
- text_cleaner: Collection[str] = None,
- g2p_type: str = None,
- unk_symbol: str = "<unk>",
- space_symbol: str = "<space>",
- non_linguistic_symbols: Union[Path, str, Iterable[str]] = None,
- delimiter: str = None,
- speech_name: str = "speech",
- text_name: List[str] = ["text"],
+ self,
+ train: bool,
+ token_type: str = None,
+ token_list: Union[Path, str, Iterable[str]] = None,
+ bpemodel: Union[Path, str, Iterable[str]] = None,
+ text_cleaner: Collection[str] = None,
+ g2p_type: str = None,
+ unk_symbol: str = "<unk>",
+ space_symbol: str = "<space>",
+ non_linguistic_symbols: Union[Path, str, Iterable[str]] = None,
+ delimiter: str = None,
+ speech_name: str = "speech",
+ text_name: List[str] = ["text"],
):
super().__init__(train)
self.train = train
@@ -373,7 +490,7 @@
self.token_id_converter = None
def _text_process(
- self, data: Dict[str, Union[str, np.ndarray]]
+ self, data: Dict[str, Union[str, np.ndarray]]
) -> Dict[str, np.ndarray]:
for text_n in self.text_name:
if text_n in data and self.tokenizer is not None:
@@ -386,7 +503,7 @@
return data
def __call__(
- self, uid: str, data: Dict[str, Union[str, np.ndarray]]
+ self, uid: str, data: Dict[str, Union[str, np.ndarray]]
) -> Dict[str, np.ndarray]:
assert check_argument_types()
@@ -404,25 +521,25 @@
class MutliTokenizerCommonPreprocessor(CommonPreprocessor):
def __init__(
- self,
- train: bool,
- token_type: List[str] = [None],
- token_list: List[Union[Path, str, Iterable[str]]] = [None],
- bpemodel: List[Union[Path, str, Iterable[str]]] = [None],
- text_cleaner: Collection[str] = None,
- g2p_type: str = None,
- unk_symbol: str = "<unk>",
- space_symbol: str = "<space>",
- non_linguistic_symbols: Union[Path, str, Iterable[str]] = None,
- delimiter: str = None,
- rir_scp: str = None,
- rir_apply_prob: float = 1.0,
- noise_scp: str = None,
- noise_apply_prob: float = 1.0,
- noise_db_range: str = "3_10",
- speech_volume_normalize: float = None,
- speech_name: str = "speech",
- text_name: List[str] = ["text"],
+ self,
+ train: bool,
+ token_type: List[str] = [None],
+ token_list: List[Union[Path, str, Iterable[str]]] = [None],
+ bpemodel: List[Union[Path, str, Iterable[str]]] = [None],
+ text_cleaner: Collection[str] = None,
+ g2p_type: str = None,
+ unk_symbol: str = "<unk>",
+ space_symbol: str = "<space>",
+ non_linguistic_symbols: Union[Path, str, Iterable[str]] = None,
+ delimiter: str = None,
+ rir_scp: str = None,
+ rir_apply_prob: float = 1.0,
+ noise_scp: str = None,
+ noise_apply_prob: float = 1.0,
+ noise_db_range: str = "3_10",
+ speech_volume_normalize: float = None,
+ speech_name: str = "speech",
+ text_name: List[str] = ["text"],
):
# TODO(jiatong): sync with Kamo and Jing on interface for preprocessor
super().__init__(
@@ -447,7 +564,7 @@
)
assert (
- len(token_type) == len(token_list) == len(bpemodel) == len(text_name)
+ len(token_type) == len(token_list) == len(bpemodel) == len(text_name)
), "token_type, token_list, bpemodel, or processing text_name mismatched"
self.num_tokenizer = len(token_type)
self.tokenizer = []
@@ -482,7 +599,7 @@
self.text_name = text_name # override the text_name from CommonPreprocessor
def _text_process(
- self, data: Dict[str, Union[str, np.ndarray]]
+ self, data: Dict[str, Union[str, np.ndarray]]
) -> Dict[str, np.ndarray]:
for i in range(self.num_tokenizer):
text_name = self.text_name[i]
@@ -494,3 +611,96 @@
data[text_name] = np.array(text_ints, dtype=np.int64)
assert check_return_type(data)
return data
+
+class CodeMixTokenizerCommonPreprocessor(CommonPreprocessor):
+ def __init__(
+ self,
+ train: bool,
+ token_type: str = None,
+ token_list: Union[Path, str, Iterable[str]] = None,
+ bpemodel: Union[Path, str, Iterable[str]] = None,
+ text_cleaner: Collection[str] = None,
+ g2p_type: str = None,
+ unk_symbol: str = "<unk>",
+ space_symbol: str = "<space>",
+ non_linguistic_symbols: Union[Path, str, Iterable[str]] = None,
+ delimiter: str = None,
+ rir_scp: str = None,
+ rir_apply_prob: float = 1.0,
+ noise_scp: str = None,
+ noise_apply_prob: float = 1.0,
+ noise_db_range: str = "3_10",
+ speech_volume_normalize: float = None,
+ speech_name: str = "speech",
+ text_name: str = "text",
+ split_text_name: str = "split_text",
+ split_with_space: bool = False,
+ seg_dict_file: str = None,
+ ):
+ super().__init__(
+ train=train,
+ # Force to use word.
+ token_type="word",
+ token_list=token_list,
+ bpemodel=bpemodel,
+ text_cleaner=text_cleaner,
+ g2p_type=g2p_type,
+ unk_symbol=unk_symbol,
+ space_symbol=space_symbol,
+ non_linguistic_symbols=non_linguistic_symbols,
+ delimiter=delimiter,
+ speech_name=speech_name,
+ text_name=text_name,
+ rir_scp=rir_scp,
+ rir_apply_prob=rir_apply_prob,
+ noise_scp=noise_scp,
+ noise_apply_prob=noise_apply_prob,
+ noise_db_range=noise_db_range,
+ speech_volume_normalize=speech_volume_normalize,
+ split_with_space=split_with_space,
+ seg_dict_file=seg_dict_file,
+ )
+ # The data field name for split text.
+ self.split_text_name = split_text_name
+
+ @classmethod
+ def split_words(cls, text: str):
+ words = []
+ segs = text.split()
+ for seg in segs:
+ # There is no space in seg.
+ current_word = ""
+ for c in seg:
+ if len(c.encode()) == 1:
+ # This is an ASCII char.
+ current_word += c
+ else:
+ # This is a Chinese char.
+ if len(current_word) > 0:
+ words.append(current_word)
+ current_word = ""
+ words.append(c)
+ if len(current_word) > 0:
+ words.append(current_word)
+ return words
+
+ def __call__(
+ self, uid: str, data: Dict[str, Union[list, str, np.ndarray]]
+ ) -> Dict[str, Union[list, np.ndarray]]:
+ assert check_argument_types()
+ # Split words.
+ if isinstance(data[self.text_name], str):
+ split_text = self.split_words(data[self.text_name])
+ else:
+ split_text = data[self.text_name]
+ data[self.text_name] = " ".join(split_text)
+ data = self._speech_process(data)
+ data = self._text_process(data)
+ data[self.split_text_name] = split_text
+ return data
+
+ def pop_split_text_data(self, data: Dict[str, Union[str, np.ndarray]]):
+ result = data[self.split_text_name]
+ del data[self.split_text_name]
+ return result
+
--
Gitblit v1.9.1