From 012903e42ec890ab5c50137beb365c3d94e731d1 Mon Sep 17 00:00:00 2001
From: nichongjia-2007 <nichongjia@gmail.com>
Date: 星期五, 30 六月 2023 11:21:28 +0800
Subject: [PATCH] Merge branch 'main' of https://github.com/alibaba-damo-academy/FunASR

---
 funasr/models/frontend/default.py |  139 ++++++++++++++++++++++++++++++++++++----------
 1 files changed, 108 insertions(+), 31 deletions(-)

diff --git a/funasr/models/frontend/default.py b/funasr/models/frontend/default.py
index 19994f0..b41af80 100644
--- a/funasr/models/frontend/default.py
+++ b/funasr/models/frontend/default.py
@@ -2,18 +2,18 @@
 from typing import Optional
 from typing import Tuple
 from typing import Union
-
+import logging
 import humanfriendly
 import numpy as np
 import torch
 from torch_complex.tensor import ComplexTensor
-from typeguard import check_argument_types
 
 from funasr.layers.log_mel import LogMel
 from funasr.layers.stft import Stft
 from funasr.models.frontend.abs_frontend import AbsFrontend
 from funasr.modules.frontends.frontend import Frontend
 from funasr.utils.get_default_kwargs import get_default_kwargs
+from funasr.modules.nets_utils import make_pad_mask
 
 
 class DefaultFrontend(AbsFrontend):
@@ -39,7 +39,6 @@
             apply_stft: bool = True,
             use_channel: int = None,
     ):
-        assert check_argument_types()
         super().__init__()
         if isinstance(fs, str):
             fs = humanfriendly.parse_size(fs)
@@ -76,8 +75,8 @@
             htk=htk,
         )
         self.n_mels = n_mels
-        self.frontend_type = "default"
         self.use_channel = use_channel
+        self.frontend_type = "default"
 
     def output_size(self) -> int:
         return self.n_mels
@@ -137,8 +136,6 @@
         return input_stft, feats_lens
 
 
-
-
 class MultiChannelFrontend(AbsFrontend):
     """Conventional frontend structure for ASR.
     Stft -> WPE -> MVDR-Beamformer -> Power-spec -> Mel-Fbank -> CMVN
@@ -149,7 +146,9 @@
             fs: Union[int, str] = 16000,
             n_fft: int = 512,
             win_length: int = None,
-            hop_length: int = 128,
+            hop_length: int = None,
+            frame_length: int = None,
+            frame_shift: int = None,
             window: Optional[str] = "hann",
             center: bool = True,
             normalized: bool = False,
@@ -160,25 +159,36 @@
             htk: bool = False,
             frontend_conf: Optional[dict] = get_default_kwargs(Frontend),
             apply_stft: bool = True,
-            frame_length: int = None,
-            frame_shift: int = None,
-            lfr_m: int = None,
-            lfr_n: int = None,
+            use_channel: int = None,
+            lfr_m: int = 1,
+            lfr_n: int = 1,
+            cmvn_file: str = None,
+            mc: bool = True
     ):
-        assert check_argument_types()
         super().__init__()
         if isinstance(fs, str):
             fs = humanfriendly.parse_size(fs)
 
         # Deepcopy (In general, dict shouldn't be used as default arg)
         frontend_conf = copy.deepcopy(frontend_conf)
-        self.hop_length = hop_length
+        if win_length is None and hop_length is None:
+            self.win_length = frame_length * 16
+            self.hop_length = frame_shift * 16
+        elif frame_length is None and frame_shift is None:
+            self.win_length = self.win_length
+            self.hop_length = self.hop_length
+        else:
+            logging.error(
+                "Only one of (win_length, hop_length) and (frame_length, frame_shift)"
+                "can be set."
+            )
+            exit(1)
 
         if apply_stft:
             self.stft = Stft(
                 n_fft=n_fft,
-                win_length=win_length,
-                hop_length=hop_length,
+                win_length=self.win_length,
+                hop_length=self.hop_length,
                 center=center,
                 window=window,
                 normalized=normalized,
@@ -202,6 +212,18 @@
             htk=htk,
         )
         self.n_mels = n_mels
+        self.use_channel = use_channel
+        self.mc = mc
+        if not self.mc:
+            if self.use_channel is not None:
+                logging.info("use the channel %d" % (self.use_channel))
+            else:
+                logging.info("random select channel")
+            self.cmvn_file = cmvn_file
+            if self.cmvn_file is not None:
+                mean, std = self._load_cmvn(self.cmvn_file)
+                self.register_buffer("mean", torch.from_numpy(mean))
+                self.register_buffer("std", torch.from_numpy(std))
         self.frontend_type = "multichannelfrontend"
 
     def output_size(self) -> int:
@@ -215,16 +237,29 @@
         if self.stft is not None:
             input_stft, feats_lens = self._compute_stft(input, input_lengths)
         else:
-            if isinstance(input, ComplexTensor):
-                input_stft = input
-            else:
-                input_stft = ComplexTensor(input[..., 0], input[..., 1])
+            input_stft = ComplexTensor(input[..., 0], input[..., 1])
             feats_lens = input_lengths
         # 2. [Option] Speech enhancement
         if self.frontend is not None:
             assert isinstance(input_stft, ComplexTensor), type(input_stft)
             # input_stft: (Batch, Length, [Channel], Freq)
             input_stft, _, mask = self.frontend(input_stft, feats_lens)
+
+        # 3. [Multi channel case]: Select a channel(sa_asr)
+        if input_stft.dim() == 4 and not self.mc:
+            # h: (B, T, C, F) -> h: (B, T, F)
+            if self.training:
+                if self.use_channel is not None:
+                    input_stft = input_stft[:, :, self.use_channel, :]
+                    
+                else:
+                    # Select 1ch randomly
+                    ch = np.random.randint(input_stft.size(2))
+                    input_stft = input_stft[:, :, ch, :]
+            else:
+                # Use the first channel
+                input_stft = input_stft[:, :, 0, :]
+
         # 4. STFT -> Power spectrum
         # h: ComplexTensor(B, T, F) -> torch.Tensor(B, T, F)
         input_power = input_stft.real ** 2 + input_stft.imag ** 2
@@ -233,18 +268,37 @@
         # input_power: (Batch, [Channel,] Length, Freq)
         #       -> input_feats: (Batch, Length, Dim)
         input_feats, _ = self.logmel(input_power, feats_lens)
-        bt = input_feats.size(0)
-        if input_feats.dim() ==4:
-            channel_size = input_feats.size(2)
-            # batch * channel * T * D
-            #pdb.set_trace()
-            input_feats = input_feats.transpose(1,2).reshape(bt*channel_size,-1,80).contiguous()
-            # input_feats = input_feats.transpose(1,2)
-            # batch * channel
-            feats_lens = feats_lens.repeat(1,channel_size).squeeze()
+        if self.mc:
+            # MFCCA
+            if input_feats.dim() ==4:
+                bt = input_feats.size(0)
+                channel_size = input_feats.size(2)
+                input_feats = input_feats.transpose(1,2).reshape(bt*channel_size,-1,80).contiguous()
+                feats_lens = feats_lens.repeat(1,channel_size).squeeze()
+            else:
+                channel_size = 1
+            return input_feats, feats_lens, channel_size
         else:
-            channel_size = 1
-        return input_feats, feats_lens, channel_size
+            # 6. Apply CMVN
+            if self.cmvn_file is not None:
+                if feats_lens is None:
+                    feats_lens = input_feats.new_full([input_feats.size(0)], input_feats.size(1))
+                self.mean = self.mean.to(input_feats.device, input_feats.dtype)
+                self.std = self.std.to(input_feats.device, input_feats.dtype)
+                mask = make_pad_mask(feats_lens, input_feats, 1)
+
+                if input_feats.requires_grad:
+                    input_feats = input_feats + self.mean
+                else:
+                    input_feats += self.mean
+                if input_feats.requires_grad:
+                    input_feats = input_feats.masked_fill(mask, 0.0)
+                else:
+                    input_feats.masked_fill_(mask, 0.0)
+
+                input_feats *= self.std
+
+            return input_feats, feats_lens
 
     def _compute_stft(
             self, input: torch.Tensor, input_lengths: torch.Tensor
@@ -258,4 +312,27 @@
         # Change torch.Tensor to ComplexTensor
         # input_stft: (..., F, 2) -> (..., F)
         input_stft = ComplexTensor(input_stft[..., 0], input_stft[..., 1])
-        return input_stft, feats_lens
\ No newline at end of file
+        return input_stft, feats_lens
+
+    def _load_cmvn(self, cmvn_file):
+        with open(cmvn_file, 'r', encoding='utf-8') as f:
+            lines = f.readlines()
+        means_list = []
+        vars_list = []
+        for i in range(len(lines)):
+            line_item = lines[i].split()
+            if line_item[0] == '<AddShift>':
+                line_item = lines[i + 1].split()
+                if line_item[0] == '<LearnRateCoef>':
+                    add_shift_line = line_item[3:(len(line_item) - 1)]
+                    means_list = list(add_shift_line)
+                    continue
+            elif line_item[0] == '<Rescale>':
+                line_item = lines[i + 1].split()
+                if line_item[0] == '<LearnRateCoef>':
+                    rescale_line = line_item[3:(len(line_item) - 1)]
+                    vars_list = list(rescale_line)
+                    continue
+        means = np.array(means_list).astype(np.float)
+        vars = np.array(vars_list).astype(np.float)
+        return means, vars

--
Gitblit v1.9.1