| | |
| | | logger_initialized = {} |
| | | |
| | | |
| | | class WavFrontend(): |
| | | """Conventional frontend structure for ASR. |
| | | """ |
| | | class WavFrontend: |
| | | """Conventional frontend structure for ASR.""" |
| | | |
| | | def __init__( |
| | | self, |
| | | cmvn_file: str = None, |
| | | fs: int = 16000, |
| | | window: str = 'hamming', |
| | | n_mels: int = 80, |
| | | frame_length: int = 25, |
| | | frame_shift: int = 10, |
| | | lfr_m: int = 1, |
| | | lfr_n: int = 1, |
| | | dither: float = 1.0, |
| | | **kwargs, |
| | | self, |
| | | cmvn_file: str = None, |
| | | fs: int = 16000, |
| | | window: str = "hamming", |
| | | n_mels: int = 80, |
| | | frame_length: int = 25, |
| | | frame_shift: int = 10, |
| | | lfr_m: int = 1, |
| | | lfr_n: int = 1, |
| | | dither: float = 1.0, |
| | | **kwargs, |
| | | ) -> None: |
| | | |
| | | opts = knf.FbankOptions() |
| | |
| | | self.fbank_beg_idx = 0 |
| | | self.reset_status() |
| | | |
| | | def fbank(self, |
| | | waveform: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: |
| | | def fbank(self, waveform: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: |
| | | waveform = waveform * (1 << 15) |
| | | self.fbank_fn = knf.OnlineFbank(self.opts) |
| | | self.fbank_fn.accept_waveform(self.opts.frame_opts.samp_freq, waveform.tolist()) |
| | |
| | | feat_len = np.array(mat.shape[0]).astype(np.int32) |
| | | return feat, feat_len |
| | | |
| | | def fbank_online(self, |
| | | waveform: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: |
| | | def fbank_online(self, waveform: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: |
| | | waveform = waveform * (1 << 15) |
| | | # self.fbank_fn = knf.OnlineFbank(self.opts) |
| | | self.fbank_fn.accept_waveform(self.opts.frame_opts.samp_freq, waveform.tolist()) |
| | |
| | | T = T + (lfr_m - 1) // 2 |
| | | for i in range(T_lfr): |
| | | if lfr_m <= T - i * lfr_n: |
| | | LFR_inputs.append( |
| | | (inputs[i * lfr_n:i * lfr_n + lfr_m]).reshape(1, -1)) |
| | | LFR_inputs.append((inputs[i * lfr_n : i * lfr_n + lfr_m]).reshape(1, -1)) |
| | | else: |
| | | # process last LFR frame |
| | | num_padding = lfr_m - (T - i * lfr_n) |
| | | frame = inputs[i * lfr_n:].reshape(-1) |
| | | frame = inputs[i * lfr_n :].reshape(-1) |
| | | for _ in range(num_padding): |
| | | frame = np.hstack((frame, inputs[-1])) |
| | | |
| | |
| | | inputs = (inputs + means) * vars |
| | | return inputs |
| | | |
| | | def load_cmvn(self,) -> np.ndarray: |
| | | with open(self.cmvn_file, 'r', encoding='utf-8') as f: |
| | | def load_cmvn( |
| | | self, |
| | | ) -> np.ndarray: |
| | | with open(self.cmvn_file, "r", encoding="utf-8") as f: |
| | | lines = f.readlines() |
| | | |
| | | means_list = [] |
| | | vars_list = [] |
| | | for i in range(len(lines)): |
| | | line_item = lines[i].split() |
| | | if line_item[0] == '<AddShift>': |
| | | if line_item[0] == "<AddShift>": |
| | | line_item = lines[i + 1].split() |
| | | if line_item[0] == '<LearnRateCoef>': |
| | | add_shift_line = line_item[3:(len(line_item) - 1)] |
| | | if line_item[0] == "<LearnRateCoef>": |
| | | add_shift_line = line_item[3 : (len(line_item) - 1)] |
| | | means_list = list(add_shift_line) |
| | | continue |
| | | elif line_item[0] == '<Rescale>': |
| | | elif line_item[0] == "<Rescale>": |
| | | line_item = lines[i + 1].split() |
| | | if line_item[0] == '<LearnRateCoef>': |
| | | rescale_line = line_item[3:(len(line_item) - 1)] |
| | | if line_item[0] == "<LearnRateCoef>": |
| | | rescale_line = line_item[3 : (len(line_item) - 1)] |
| | | vars_list = list(rescale_line) |
| | | continue |
| | | |
| | |
| | | cmvn = np.array([means, vars]) |
| | | return cmvn |
| | | |
| | | |
| | | def load_bytes(input): |
| | | middle_data = np.frombuffer(input, dtype=np.int16) |
| | | middle_data = np.asarray(middle_data) |
| | | if middle_data.dtype.kind not in 'iu': |
| | | if middle_data.dtype.kind not in "iu": |
| | | raise TypeError("'middle_data' must be an array of integers") |
| | | dtype = np.dtype('float32') |
| | | if dtype.kind != 'f': |
| | | dtype = np.dtype("float32") |
| | | if dtype.kind != "f": |
| | | raise TypeError("'dtype' must be a floating point type") |
| | | |
| | | i = np.iinfo(middle_data.dtype) |
| | |
| | | def test(): |
| | | path = "/nfs/zhifu.gzf/export/damo/speech_paraformer-large_asr_nat-zh-cn-16k-common-vocab8404-pytorch/example/asr_example.wav" |
| | | import librosa |
| | | |
| | | cmvn_file = "/nfs/zhifu.gzf/export/damo/speech_paraformer-large_asr_nat-zh-cn-16k-common-vocab8404-pytorch/am.mvn" |
| | | config_file = "/nfs/zhifu.gzf/export/damo/speech_paraformer-large_asr_nat-zh-cn-16k-common-vocab8404-pytorch/config.yaml" |
| | | from funasr.runtime.python.onnxruntime.rapid_paraformer.utils.utils import read_yaml |
| | | |
| | | config = read_yaml(config_file) |
| | | waveform, _ = librosa.load(path, sr=None) |
| | | frontend = WavFrontend( |
| | | cmvn_file=cmvn_file, |
| | | **config['frontend_conf'], |
| | | **config["frontend_conf"], |
| | | ) |
| | | speech, _ = frontend.fbank_online(waveform) #1d, (sample,), numpy |
| | | feat, feat_len = frontend.lfr_cmvn(speech) # 2d, (frame, 450), np.float32 -> torch, torch.from_numpy(), dtype, (1, frame, 450) |
| | | |
| | | frontend.reset_status() # clear cache |
| | | speech, _ = frontend.fbank_online(waveform) # 1d, (sample,), numpy |
| | | feat, feat_len = frontend.lfr_cmvn( |
| | | speech |
| | | ) # 2d, (frame, 450), np.float32 -> torch, torch.from_numpy(), dtype, (1, frame, 450) |
| | | |
| | | frontend.reset_status() # clear cache |
| | | return feat, feat_len |
| | | |
| | | if __name__ == '__main__': |
| | | test() |
| | | |
| | | if __name__ == "__main__": |
| | | test() |