kongdeqiang
5 天以前 28ccfbfc51068a663a80764e14074df5edf2b5ba
funasr/frontends/default.py
@@ -3,10 +3,10 @@
from typing import Tuple
from typing import Union
import logging
import humanfriendly
import numpy as np
import torch
import torch.nn as nn
try:
    from torch_complex.tensor import ComplexTensor
except:
@@ -16,38 +16,41 @@
from funasr.frontends.utils.stft import Stft
from funasr.frontends.utils.frontend import Frontend
from funasr.models.transformer.utils.nets_utils import make_pad_mask
from funasr.register import tables
@tables.register("frontend_classes", "DefaultFrontend")
@tables.register("frontend_classes", "EspnetFrontend")
class DefaultFrontend(nn.Module):
    """Conventional frontend structure for ASR.
    Stft -> WPE -> MVDR-Beamformer -> Power-spec -> Mel-Fbank -> CMVN
    """
    def __init__(
            self,
            fs: Union[int, str] = 16000,
            n_fft: int = 512,
            win_length: int = None,
            hop_length: int = 128,
            window: Optional[str] = "hann",
            center: bool = True,
            normalized: bool = False,
            onesided: bool = True,
            n_mels: int = 80,
            fmin: int = None,
            fmax: int = None,
            htk: bool = False,
            frontend_conf: Optional[dict] = None,
            apply_stft: bool = True,
            use_channel: int = None,
        self,
        fs: int = 16000,
        n_fft: int = 512,
        win_length: int = None,
        hop_length: int = 128,
        window: Optional[str] = "hann",
        center: bool = True,
        normalized: bool = False,
        onesided: bool = True,
        n_mels: int = 80,
        fmin: int = None,
        fmax: int = None,
        htk: bool = False,
        frontend_conf: Optional[dict] = None,
        apply_stft: bool = True,
        use_channel: int = None,
        **kwargs,
    ):
        super().__init__()
        if isinstance(fs, str):
            fs = humanfriendly.parse_size(fs)
        # Deepcopy (In general, dict shouldn't be used as default arg)
        frontend_conf = copy.deepcopy(frontend_conf)
        self.hop_length = hop_length
        self.fs = fs
        if apply_stft:
            self.stft = Stft(
@@ -84,8 +87,12 @@
        return self.n_mels
    def forward(
            self, input: torch.Tensor, input_lengths: torch.Tensor
        self, input: torch.Tensor, input_lengths: Union[torch.Tensor, list]
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        if isinstance(input_lengths, list):
            input_lengths = torch.tensor(input_lengths)
        if input.dtype == torch.float64:
            input = input.float()
        # 1. Domain-conversion: e.g. Stft: time -> time-freq
        if self.stft is not None:
            input_stft, feats_lens = self._compute_stft(input, input_lengths)
@@ -114,7 +121,7 @@
        # 4. STFT -> Power spectrum
        # h: ComplexTensor(B, T, F) -> torch.Tensor(B, T, F)
        input_power = input_stft.real ** 2 + input_stft.imag ** 2
        input_power = input_stft.real**2 + input_stft.imag**2
        # 5. Feature transform e.g. Stft -> Log-Mel-Fbank
        # input_power: (Batch, [Channel,] Length, Freq)
@@ -123,9 +130,7 @@
        return input_feats, feats_lens
    def _compute_stft(
            self, input: torch.Tensor, input_lengths: torch.Tensor
    ) -> torch.Tensor:
    def _compute_stft(self, input: torch.Tensor, input_lengths: torch.Tensor) -> torch.Tensor:
        input_stft, feats_lens = self.stft(input, input_lengths)
        assert input_stft.dim() >= 4, input_stft.shape
@@ -144,33 +149,30 @@
    """
    def __init__(
            self,
            fs: Union[int, str] = 16000,
            n_fft: int = 512,
            win_length: int = None,
            hop_length: int = None,
            frame_length: int = None,
            frame_shift: int = None,
            window: Optional[str] = "hann",
            center: bool = True,
            normalized: bool = False,
            onesided: bool = True,
            n_mels: int = 80,
            fmin: int = None,
            fmax: int = None,
            htk: bool = False,
            frontend_conf: Optional[dict] = None,
            apply_stft: bool = True,
            use_channel: int = None,
            lfr_m: int = 1,
            lfr_n: int = 1,
            cmvn_file: str = None,
            mc: bool = True
        self,
        fs: int = 16000,
        n_fft: int = 512,
        win_length: int = None,
        hop_length: int = None,
        frame_length: int = None,
        frame_shift: int = None,
        window: Optional[str] = "hann",
        center: bool = True,
        normalized: bool = False,
        onesided: bool = True,
        n_mels: int = 80,
        fmin: int = None,
        fmax: int = None,
        htk: bool = False,
        frontend_conf: Optional[dict] = None,
        apply_stft: bool = True,
        use_channel: int = None,
        lfr_m: int = 1,
        lfr_n: int = 1,
        cmvn_file: str = None,
        mc: bool = True,
    ):
        super().__init__()
        if isinstance(fs, str):
            fs = humanfriendly.parse_size(fs)
        # Deepcopy (In general, dict shouldn't be used as default arg)
        frontend_conf = copy.deepcopy(frontend_conf)
        if win_length is None and hop_length is None:
@@ -181,8 +183,7 @@
            self.hop_length = self.hop_length
        else:
            logging.error(
                "Only one of (win_length, hop_length) and (frame_length, frame_shift)"
                "can be set."
                "Only one of (win_length, hop_length) and (frame_length, frame_shift)" "can be set."
            )
            exit(1)
@@ -232,10 +233,9 @@
        return self.n_mels
    def forward(
            self, input: torch.Tensor, input_lengths: torch.Tensor
        self, input: torch.Tensor, input_lengths: torch.Tensor
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        # 1. Domain-conversion: e.g. Stft: time -> time-freq
        #import pdb;pdb.set_trace()
        if self.stft is not None:
            input_stft, feats_lens = self._compute_stft(input, input_lengths)
        else:
@@ -253,7 +253,7 @@
            if self.training:
                if self.use_channel is not None:
                    input_stft = input_stft[:, :, self.use_channel, :]
                else:
                    # Select 1ch randomly
                    ch = np.random.randint(input_stft.size(2))
@@ -264,7 +264,7 @@
        # 4. STFT -> Power spectrum
        # h: ComplexTensor(B, T, F) -> torch.Tensor(B, T, F)
        input_power = input_stft.real ** 2 + input_stft.imag ** 2
        input_power = input_stft.real**2 + input_stft.imag**2
        # 5. Feature transform e.g. Stft -> Log-Mel-Fbank
        # input_power: (Batch, [Channel,] Length, Freq)
@@ -272,11 +272,13 @@
        input_feats, _ = self.logmel(input_power, feats_lens)
        if self.mc:
            # MFCCA
            if input_feats.dim() ==4:
            if input_feats.dim() == 4:
                bt = input_feats.size(0)
                channel_size = input_feats.size(2)
                input_feats = input_feats.transpose(1,2).reshape(bt*channel_size,-1,80).contiguous()
                feats_lens = feats_lens.repeat(1,channel_size).squeeze()
                input_feats = (
                    input_feats.transpose(1, 2).reshape(bt * channel_size, -1, 80).contiguous()
                )
                feats_lens = feats_lens.repeat(1, channel_size).squeeze()
            else:
                channel_size = 1
            return input_feats, feats_lens, channel_size
@@ -302,9 +304,7 @@
            return input_feats, feats_lens
    def _compute_stft(
            self, input: torch.Tensor, input_lengths: torch.Tensor
    ) -> torch.Tensor:
    def _compute_stft(self, input: torch.Tensor, input_lengths: torch.Tensor) -> torch.Tensor:
        input_stft, feats_lens = self.stft(input, input_lengths)
        assert input_stft.dim() >= 4, input_stft.shape
@@ -317,22 +317,22 @@
        return input_stft, feats_lens
    def _load_cmvn(self, cmvn_file):
        with open(cmvn_file, 'r', encoding='utf-8') as f:
        with open(cmvn_file, "r", encoding="utf-8") as f:
            lines = f.readlines()
        means_list = []
        vars_list = []
        for i in range(len(lines)):
            line_item = lines[i].split()
            if line_item[0] == '<AddShift>':
            if line_item[0] == "<AddShift>":
                line_item = lines[i + 1].split()
                if line_item[0] == '<LearnRateCoef>':
                    add_shift_line = line_item[3:(len(line_item) - 1)]
                if line_item[0] == "<LearnRateCoef>":
                    add_shift_line = line_item[3 : (len(line_item) - 1)]
                    means_list = list(add_shift_line)
                    continue
            elif line_item[0] == '<Rescale>':
            elif line_item[0] == "<Rescale>":
                line_item = lines[i + 1].split()
                if line_item[0] == '<LearnRateCoef>':
                    rescale_line = line_item[3:(len(line_item) - 1)]
                if line_item[0] == "<LearnRateCoef>":
                    rescale_line = line_item[3 : (len(line_item) - 1)]
                    vars_list = list(rescale_line)
                    continue
        means = np.array(means_list).astype(np.float)