| | |
| | | |
| | | def export(self, |
| | | tag_name: str = 'damo/speech_paraformer-large_asr_nat-zh-cn-16k-common-vocab8404-pytorch', |
| | | mode: str = 'paraformer', |
| | | mode: str = None, |
| | | ): |
| | | |
| | | model_dir = tag_name |
| | | if model_dir.startswith('damo/'): |
| | | if model_dir.startswith('damo'): |
| | | from modelscope.hub.snapshot_download import snapshot_download |
| | | model_dir = snapshot_download(model_dir, cache_dir=self.cache_dir) |
| | | asr_train_config = os.path.join(model_dir, 'config.yaml') |
| | | asr_model_file = os.path.join(model_dir, 'model.pb') |
| | | cmvn_file = os.path.join(model_dir, 'am.mvn') |
| | | json_file = os.path.join(model_dir, 'configuration.json') |
| | | |
| | | if mode is None: |
| | | import json |
| | | json_file = os.path.join(model_dir, 'configuration.json') |
| | | with open(json_file, 'r') as f: |
| | | config_data = json.load(f) |
| | | mode = config_data['model']['model_config']['mode'] |
| | | if mode.startswith('paraformer'): |
| | | from funasr.tasks.asr import ASRTaskParaformer as ASRTask |
| | | elif mode.startswith('uniasr'): |
| | | from funasr.tasks.asr import ASRTaskUniASR as ASRTask |
| | | config = os.path.join(model_dir, 'config.yaml') |
| | | model_file = os.path.join(model_dir, 'model.pb') |
| | | cmvn_file = os.path.join(model_dir, 'am.mvn') |
| | | model, asr_train_args = ASRTask.build_model_from_file( |
| | | config, model_file, cmvn_file, 'cpu' |
| | | ) |
| | | self.frontend = model.frontend |
| | | elif mode.startswith('offline'): |
| | | from funasr.tasks.vad import VADTask |
| | | config = os.path.join(model_dir, 'vad.yaml') |
| | | model_file = os.path.join(model_dir, 'vad.pb') |
| | | cmvn_file = os.path.join(model_dir, 'vad.mvn') |
| | | |
| | | model, asr_train_args = ASRTask.build_model_from_file( |
| | | asr_train_config, asr_model_file, cmvn_file, 'cpu' |
| | | ) |
| | | self.frontend = model.frontend |
| | | model, vad_infer_args = VADTask.build_model_from_file( |
| | | config, model_file, 'cpu' |
| | | ) |
| | | self._export(model, tag_name) |
| | | |
| | | |
| | |
| | | from funasr.models.e2e_asr_paraformer import Paraformer, BiCifParaformer |
| | | from funasr.export.models.e2e_asr_paraformer import Paraformer as Paraformer_export |
| | | from funasr.export.models.e2e_asr_paraformer import BiCifParaformer as BiCifParaformer_export |
| | | from funasr.models.e2e_uni_asr import UniASR |
| | | |
| | | from funasr.models.e2e_vad import E2EVadModel |
| | | from funasr.export.models.e2e_vad import E2EVadModel as E2EVadModel_export |
| | | |
| | | def get_model(model, export_config=None): |
| | | if isinstance(model, BiCifParaformer): |
| | | return BiCifParaformer_export(model, **export_config) |
| | | elif isinstance(model, Paraformer): |
| | | return Paraformer_export(model, **export_config) |
| | | elif isinstance(model, E2EVadModel): |
| | | return E2EVadModel_export(model, **export_config) |
| | | else: |
| | | raise "Funasr does not support the given model type currently." |
| New file |
| | |
| | | from enum import Enum |
| | | from typing import List, Tuple, Dict, Any |
| | | |
| | | import torch |
| | | from torch import nn |
| | | import math |
| | | |
| | | from funasr.models.encoder.fsmn_encoder import FSMN |
| | | from funasr.export.models.encoder.fsmn_encoder import FSMN as FSMN_export |
| | | |
| | | class E2EVadModel(nn.Module): |
| | | def __init__(self, model, |
| | | max_seq_len=512, |
| | | feats_dim=560, |
| | | model_name='model', |
| | | **kwargs,): |
| | | super(E2EVadModel, self).__init__() |
| | | self.feats_dim = feats_dim |
| | | self.max_seq_len = max_seq_len |
| | | self.model_name = model_name |
| | | if isinstance(model.encoder, FSMN): |
| | | self.encoder = FSMN_export(model.encoder) |
| | | else: |
| | | raise "unsupported encoder" |
| | | |
| | | |
| | | def forward(self, feats: torch.Tensor, |
| | | in_cache0: torch.Tensor, |
| | | in_cache1: torch.Tensor, |
| | | in_cache2: torch.Tensor, |
| | | in_cache3: torch.Tensor, |
| | | ): |
| | | |
| | | scores, cache0, cache1, cache2, cache3 = self.encoder(feats, |
| | | in_cache0, |
| | | in_cache1, |
| | | in_cache2, |
| | | in_cache3) # return B * T * D |
| | | return scores, cache0, cache1, cache2, cache3 |
| | | |
| | | def get_dummy_inputs(self, frame=30): |
| | | speech = torch.randn(1, frame, self.feats_dim) |
| | | in_cache0 = torch.randn(1, 128, 19, 1) |
| | | in_cache1 = torch.randn(1, 128, 19, 1) |
| | | in_cache2 = torch.randn(1, 128, 19, 1) |
| | | in_cache3 = torch.randn(1, 128, 19, 1) |
| | | |
| | | return (speech, in_cache0, in_cache1, in_cache2, in_cache3) |
| | | |
| | | # def get_dummy_inputs_txt(self, txt_file: str = "/mnt/workspace/data_fbank/0207/12345.wav.fea.txt"): |
| | | # import numpy as np |
| | | # fbank = np.loadtxt(txt_file) |
| | | # fbank_lengths = np.array([fbank.shape[0], ], dtype=np.int32) |
| | | # speech = torch.from_numpy(fbank[None, :, :].astype(np.float32)) |
| | | # speech_lengths = torch.from_numpy(fbank_lengths.astype(np.int32)) |
| | | # return (speech, speech_lengths) |
| | | |
| | | def get_input_names(self): |
| | | return ['speech', 'in_cache0', 'in_cache1', 'in_cache2', 'in_cache3'] |
| | | |
| | | def get_output_names(self): |
| | | return ['logits', 'out_cache0', 'out_cache1', 'out_cache2', 'out_cache3'] |
| | | |
| | | def get_dynamic_axes(self): |
| | | return { |
| | | 'speech': { |
| | | 1: 'feats_length' |
| | | }, |
| | | } |
| New file |
| | |
| | | from typing import Tuple, Dict |
| | | import copy |
| | | |
| | | import numpy as np |
| | | import torch |
| | | import torch.nn as nn |
| | | import torch.nn.functional as F |
| | | from funasr.models.encoder.fsmn_encoder import BasicBlock |
| | | |
| | | class LinearTransform(nn.Module): |
| | | |
| | | def __init__(self, input_dim, output_dim): |
| | | super(LinearTransform, self).__init__() |
| | | self.input_dim = input_dim |
| | | self.output_dim = output_dim |
| | | self.linear = nn.Linear(input_dim, output_dim, bias=False) |
| | | |
| | | def forward(self, input): |
| | | output = self.linear(input) |
| | | |
| | | return output |
| | | |
| | | |
| | | class AffineTransform(nn.Module): |
| | | |
| | | def __init__(self, input_dim, output_dim): |
| | | super(AffineTransform, self).__init__() |
| | | self.input_dim = input_dim |
| | | self.output_dim = output_dim |
| | | self.linear = nn.Linear(input_dim, output_dim) |
| | | |
| | | def forward(self, input): |
| | | output = self.linear(input) |
| | | |
| | | return output |
| | | |
| | | |
| | | class RectifiedLinear(nn.Module): |
| | | |
| | | def __init__(self, input_dim, output_dim): |
| | | super(RectifiedLinear, self).__init__() |
| | | self.dim = input_dim |
| | | self.relu = nn.ReLU() |
| | | self.dropout = nn.Dropout(0.1) |
| | | |
| | | def forward(self, input): |
| | | out = self.relu(input) |
| | | return out |
| | | |
| | | |
| | | class FSMNBlock(nn.Module): |
| | | |
| | | def __init__( |
| | | self, |
| | | input_dim: int, |
| | | output_dim: int, |
| | | lorder=None, |
| | | rorder=None, |
| | | lstride=1, |
| | | rstride=1, |
| | | ): |
| | | super(FSMNBlock, self).__init__() |
| | | |
| | | self.dim = input_dim |
| | | |
| | | if lorder is None: |
| | | return |
| | | |
| | | self.lorder = lorder |
| | | self.rorder = rorder |
| | | self.lstride = lstride |
| | | self.rstride = rstride |
| | | |
| | | self.conv_left = nn.Conv2d( |
| | | self.dim, self.dim, [lorder, 1], dilation=[lstride, 1], groups=self.dim, bias=False) |
| | | |
| | | if self.rorder > 0: |
| | | self.conv_right = nn.Conv2d( |
| | | self.dim, self.dim, [rorder, 1], dilation=[rstride, 1], groups=self.dim, bias=False) |
| | | else: |
| | | self.conv_right = None |
| | | |
| | | def forward(self, input: torch.Tensor, cache: torch.Tensor): |
| | | x = torch.unsqueeze(input, 1) |
| | | x_per = x.permute(0, 3, 2, 1) # B D T C |
| | | |
| | | cache = cache.to(x_per.device) |
| | | y_left = torch.cat((cache, x_per), dim=2) |
| | | cache = y_left[:, :, -(self.lorder - 1) * self.lstride:, :] |
| | | y_left = self.conv_left(y_left) |
| | | out = x_per + y_left |
| | | |
| | | if self.conv_right is not None: |
| | | # maybe need to check |
| | | y_right = F.pad(x_per, [0, 0, 0, self.rorder * self.rstride]) |
| | | y_right = y_right[:, :, self.rstride:, :] |
| | | y_right = self.conv_right(y_right) |
| | | out += y_right |
| | | |
| | | out_per = out.permute(0, 3, 2, 1) |
| | | output = out_per.squeeze(1) |
| | | |
| | | return output, cache |
| | | |
| | | |
| | | class BasicBlock_export(nn.Module): |
| | | def __init__(self, |
| | | model, |
| | | ): |
| | | super(BasicBlock_export, self).__init__() |
| | | self.linear = model.linear |
| | | self.fsmn_block = model.fsmn_block |
| | | self.affine = model.affine |
| | | self.relu = model.relu |
| | | |
| | | def forward(self, input: torch.Tensor, in_cache: torch.Tensor): |
| | | x = self.linear(input) # B T D |
| | | # cache_layer_name = 'cache_layer_{}'.format(self.stack_layer) |
| | | # if cache_layer_name not in in_cache: |
| | | # in_cache[cache_layer_name] = torch.zeros(x1.shape[0], x1.shape[-1], (self.lorder - 1) * self.lstride, 1) |
| | | x, out_cache = self.fsmn_block(x, in_cache) |
| | | x = self.affine(x) |
| | | x = self.relu(x) |
| | | return x, out_cache |
| | | |
| | | |
| | | # class FsmnStack(nn.Sequential): |
| | | # def __init__(self, *args): |
| | | # super(FsmnStack, self).__init__(*args) |
| | | # |
| | | # def forward(self, input: torch.Tensor, in_cache: Dict[str, torch.Tensor]): |
| | | # x = input |
| | | # for module in self._modules.values(): |
| | | # x = module(x, in_cache) |
| | | # return x |
| | | |
| | | |
| | | ''' |
| | | FSMN net for keyword spotting |
| | | input_dim: input dimension |
| | | linear_dim: fsmn input dimensionll |
| | | proj_dim: fsmn projection dimension |
| | | lorder: fsmn left order |
| | | rorder: fsmn right order |
| | | num_syn: output dimension |
| | | fsmn_layers: no. of sequential fsmn layers |
| | | ''' |
| | | |
| | | |
| | | class FSMN(nn.Module): |
| | | def __init__( |
| | | self, |
| | | model, |
| | | ): |
| | | super(FSMN, self).__init__() |
| | | |
| | | # self.input_dim = input_dim |
| | | # self.input_affine_dim = input_affine_dim |
| | | # self.fsmn_layers = fsmn_layers |
| | | # self.linear_dim = linear_dim |
| | | # self.proj_dim = proj_dim |
| | | # self.output_affine_dim = output_affine_dim |
| | | # self.output_dim = output_dim |
| | | # |
| | | # self.in_linear1 = AffineTransform(input_dim, input_affine_dim) |
| | | # self.in_linear2 = AffineTransform(input_affine_dim, linear_dim) |
| | | # self.relu = RectifiedLinear(linear_dim, linear_dim) |
| | | # self.fsmn = FsmnStack(*[BasicBlock(linear_dim, proj_dim, lorder, rorder, lstride, rstride, i) for i in |
| | | # range(fsmn_layers)]) |
| | | # self.out_linear1 = AffineTransform(linear_dim, output_affine_dim) |
| | | # self.out_linear2 = AffineTransform(output_affine_dim, output_dim) |
| | | # self.softmax = nn.Softmax(dim=-1) |
| | | self.in_linear1 = model.in_linear1 |
| | | self.in_linear2 = model.in_linear2 |
| | | self.relu = model.relu |
| | | # self.fsmn = model.fsmn |
| | | self.out_linear1 = model.out_linear1 |
| | | self.out_linear2 = model.out_linear2 |
| | | self.softmax = model.softmax |
| | | |
| | | for i, d in enumerate(self.model.fsmn): |
| | | if isinstance(d, BasicBlock): |
| | | self.model.fsmn[i] = BasicBlock_export(d) |
| | | |
| | | def fuse_modules(self): |
| | | pass |
| | | |
| | | def forward( |
| | | self, |
| | | input: torch.Tensor, |
| | | *args, |
| | | ): |
| | | """ |
| | | Args: |
| | | input (torch.Tensor): Input tensor (B, T, D) |
| | | in_cache: when in_cache is not None, the forward is in streaming. The type of in_cache is a dict, egs, |
| | | {'cache_layer_1': torch.Tensor(B, T1, D)}, T1 is equal to self.lorder. It is {} for the 1st frame |
| | | """ |
| | | |
| | | x = self.in_linear1(input) |
| | | x = self.in_linear2(x) |
| | | x = self.relu(x) |
| | | # x4 = self.fsmn(x3, in_cache) # self.in_cache will update automatically in self.fsmn |
| | | out_caches = list() |
| | | for i, d in enumerate(self.model.fsmn): |
| | | in_cache = args[i] |
| | | x, out_cache = d(x, in_cache) |
| | | out_caches.append(out_cache) |
| | | x = self.out_linear1(x) |
| | | x = self.out_linear2(x) |
| | | x = self.softmax(x) |
| | | |
| | | return x, *out_caches |
| | | |
| | | |
| | | ''' |
| | | one deep fsmn layer |
| | | dimproj: projection dimension, input and output dimension of memory blocks |
| | | dimlinear: dimension of mapping layer |
| | | lorder: left order |
| | | rorder: right order |
| | | lstride: left stride |
| | | rstride: right stride |
| | | ''' |
| | | |
| | | |
| | | class DFSMN(nn.Module): |
| | | |
| | | def __init__(self, dimproj=64, dimlinear=128, lorder=20, rorder=1, lstride=1, rstride=1): |
| | | super(DFSMN, self).__init__() |
| | | |
| | | self.lorder = lorder |
| | | self.rorder = rorder |
| | | self.lstride = lstride |
| | | self.rstride = rstride |
| | | |
| | | self.expand = AffineTransform(dimproj, dimlinear) |
| | | self.shrink = LinearTransform(dimlinear, dimproj) |
| | | |
| | | self.conv_left = nn.Conv2d( |
| | | dimproj, dimproj, [lorder, 1], dilation=[lstride, 1], groups=dimproj, bias=False) |
| | | |
| | | if rorder > 0: |
| | | self.conv_right = nn.Conv2d( |
| | | dimproj, dimproj, [rorder, 1], dilation=[rstride, 1], groups=dimproj, bias=False) |
| | | else: |
| | | self.conv_right = None |
| | | |
| | | def forward(self, input): |
| | | f1 = F.relu(self.expand(input)) |
| | | p1 = self.shrink(f1) |
| | | |
| | | x = torch.unsqueeze(p1, 1) |
| | | x_per = x.permute(0, 3, 2, 1) |
| | | |
| | | y_left = F.pad(x_per, [0, 0, (self.lorder - 1) * self.lstride, 0]) |
| | | |
| | | if self.conv_right is not None: |
| | | y_right = F.pad(x_per, [0, 0, 0, (self.rorder) * self.rstride]) |
| | | y_right = y_right[:, :, self.rstride:, :] |
| | | out = x_per + self.conv_left(y_left) + self.conv_right(y_right) |
| | | else: |
| | | out = x_per + self.conv_left(y_left) |
| | | |
| | | out1 = out.permute(0, 3, 2, 1) |
| | | output = input + out1.squeeze(1) |
| | | |
| | | return output |
| | | |
| | | |
| | | ''' |
| | | build stacked dfsmn layers |
| | | ''' |
| | | |
| | | |
| | | def buildDFSMNRepeats(linear_dim=128, proj_dim=64, lorder=20, rorder=1, fsmn_layers=6): |
| | | repeats = [ |
| | | nn.Sequential( |
| | | DFSMN(proj_dim, linear_dim, lorder, rorder, 1, 1)) |
| | | for i in range(fsmn_layers) |
| | | ] |
| | | |
| | | return nn.Sequential(*repeats) |
| | | |
| | | |
| | | if __name__ == '__main__': |
| | | fsmn = FSMN(400, 140, 4, 250, 128, 10, 2, 1, 1, 140, 2599) |
| | | print(fsmn) |
| | | |
| | | num_params = sum(p.numel() for p in fsmn.parameters()) |
| | | print('the number of model params: {}'.format(num_params)) |
| | | x = torch.zeros(128, 200, 400) # batch-size * time * dim |
| | | y, _ = fsmn(x) # batch-size * time * dim |
| | | print('input shape: {}'.format(x.shape)) |
| | | print('output shape: {}'.format(y.shape)) |
| | | |
| | | print(fsmn.to_kaldi_net()) |