From 2868fe3df4e92a6ae3e327faf6e57ea492e04124 Mon Sep 17 00:00:00 2001
From: 志浩 <neo.dzh@alibaba-inc.com>
Date: 星期四, 16 三月 2023 19:24:21 +0800
Subject: [PATCH] Merge branch 'main' into dev_dzh

---
 funasr/models/e2e_tp.py |  175 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 175 insertions(+), 0 deletions(-)

diff --git a/funasr/models/e2e_tp.py b/funasr/models/e2e_tp.py
new file mode 100644
index 0000000..887439c
--- /dev/null
+++ b/funasr/models/e2e_tp.py
@@ -0,0 +1,175 @@
+import logging
+from contextlib import contextmanager
+from distutils.version import LooseVersion
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import torch
+import numpy as np
+from typeguard import check_argument_types
+
+from funasr.models.encoder.abs_encoder import AbsEncoder
+from funasr.models.frontend.abs_frontend import AbsFrontend
+from funasr.models.predictor.cif import mae_loss
+from funasr.modules.add_sos_eos import add_sos_eos
+from funasr.modules.nets_utils import make_pad_mask, pad_list
+from funasr.torch_utils.device_funcs import force_gatherable
+from funasr.train.abs_espnet_model import AbsESPnetModel
+from funasr.models.predictor.cif import CifPredictorV3
+
+
+if LooseVersion(torch.__version__) >= LooseVersion("1.6.0"):
+    from torch.cuda.amp import autocast
+else:
+    # Nothing to do if torch<1.6.0
+    @contextmanager
+    def autocast(enabled=True):
+        yield
+
+
+class TimestampPredictor(AbsESPnetModel):
+    """
+    Author: Speech Lab, Alibaba Group, China
+    """
+
+    def __init__(
+            self,
+            frontend: Optional[AbsFrontend],
+            encoder: AbsEncoder,
+            predictor: CifPredictorV3,
+            predictor_bias: int = 0,
+            token_list=None,
+    ):
+        assert check_argument_types()
+
+        super().__init__()
+        # note that eos is the same as sos (equivalent ID)
+
+        self.frontend = frontend
+        self.encoder = encoder
+        self.encoder.interctc_use_conditioning = False
+
+        self.predictor = predictor
+        self.predictor_bias = predictor_bias
+        self.criterion_pre = mae_loss()
+        self.token_list = token_list
+    
+    def forward(
+            self,
+            speech: torch.Tensor,
+            speech_lengths: torch.Tensor,
+            text: torch.Tensor,
+            text_lengths: torch.Tensor,
+    ) -> Tuple[torch.Tensor, Dict[str, torch.Tensor], torch.Tensor]:
+        """Frontend + Encoder + Decoder + Calc loss
+
+        Args:
+                speech: (Batch, Length, ...)
+                speech_lengths: (Batch, )
+                text: (Batch, Length)
+                text_lengths: (Batch,)
+        """
+        assert text_lengths.dim() == 1, text_lengths.shape
+        # Check that batch_size is unified
+        assert (
+                speech.shape[0]
+                == speech_lengths.shape[0]
+                == text.shape[0]
+                == text_lengths.shape[0]
+        ), (speech.shape, speech_lengths.shape, text.shape, text_lengths.shape)
+        batch_size = speech.shape[0]
+        # for data-parallel
+        text = text[:, : text_lengths.max()]
+        speech = speech[:, :speech_lengths.max()]
+
+        # 1. Encoder
+        encoder_out, encoder_out_lens = self.encode(speech, speech_lengths)
+
+        encoder_out_mask = (~make_pad_mask(encoder_out_lens, maxlen=encoder_out.size(1))[:, None, :]).to(
+            encoder_out.device)
+        if self.predictor_bias == 1:
+            _, text = add_sos_eos(text, 1, 2, -1)
+            text_lengths = text_lengths + self.predictor_bias
+        _, _, _, _, pre_token_length2 = self.predictor(encoder_out, text, encoder_out_mask, ignore_id=-1)
+
+        # loss_pre = self.criterion_pre(ys_pad_lens.type_as(pre_token_length), pre_token_length)
+        loss_pre = self.criterion_pre(text_lengths.type_as(pre_token_length2), pre_token_length2)
+
+        loss = loss_pre
+        stats = dict()
+
+        # Collect Attn branch stats
+        stats["loss_pre"] = loss_pre.detach().cpu() if loss_pre is not None else None
+        stats["loss"] = torch.clone(loss.detach())
+
+        # force_gatherable: to-device and to-tensor if scalar for DataParallel
+        loss, stats, weight = force_gatherable((loss, stats, batch_size), loss.device)
+        return loss, stats, weight
+
+    def encode(
+            self, speech: torch.Tensor, speech_lengths: torch.Tensor
+    ) -> Tuple[torch.Tensor, torch.Tensor]:
+        """Frontend + Encoder. Note that this method is used by asr_inference.py
+
+        Args:
+                speech: (Batch, Length, ...)
+                speech_lengths: (Batch, )
+        """
+        with autocast(False):
+            # 1. Extract feats
+            feats, feats_lengths = self._extract_feats(speech, speech_lengths)
+
+        # 4. Forward encoder
+        # feats: (Batch, Length, Dim)
+        # -> encoder_out: (Batch, Length2, Dim2)
+        encoder_out, encoder_out_lens, _ = self.encoder(feats, feats_lengths)
+
+        return encoder_out, encoder_out_lens
+    
+    def _extract_feats(
+            self, speech: torch.Tensor, speech_lengths: torch.Tensor
+    ) -> Tuple[torch.Tensor, torch.Tensor]:
+        assert speech_lengths.dim() == 1, speech_lengths.shape
+
+        # for data-parallel
+        speech = speech[:, : speech_lengths.max()]
+        if self.frontend is not None:
+            # Frontend
+            #  e.g. STFT and Feature extract
+            #       data_loader may send time-domain signal in this case
+            # speech (Batch, NSamples) -> feats: (Batch, NFrames, Dim)
+            feats, feats_lengths = self.frontend(speech, speech_lengths)
+        else:
+            # No frontend and no feature extract
+            feats, feats_lengths = speech, speech_lengths
+        return feats, feats_lengths
+
+    def calc_predictor_timestamp(self, encoder_out, encoder_out_lens, token_num):
+        encoder_out_mask = (~make_pad_mask(encoder_out_lens, maxlen=encoder_out.size(1))[:, None, :]).to(
+            encoder_out.device)
+        ds_alphas, ds_cif_peak, us_alphas, us_peaks = self.predictor.get_upsample_timestamp(encoder_out,
+                                                                                               encoder_out_mask,
+                                                                                               token_num)
+        return ds_alphas, ds_cif_peak, us_alphas, us_peaks
+
+    def collect_feats(
+            self,
+            speech: torch.Tensor,
+            speech_lengths: torch.Tensor,
+            text: torch.Tensor,
+            text_lengths: torch.Tensor,
+    ) -> Dict[str, torch.Tensor]:
+        if self.extract_feats_in_collect_stats:
+            feats, feats_lengths = self._extract_feats(speech, speech_lengths)
+        else:
+            # Generate dummy stats if extract_feats_in_collect_stats is False
+            logging.warning(
+                "Generating dummy stats for feats and feats_lengths, "
+                "because encoder_conf.extract_feats_in_collect_stats is "
+                f"{self.extract_feats_in_collect_stats}"
+            )
+            feats, feats_lengths = speech, speech_lengths
+        return {"feats": feats, "feats_lengths": feats_lengths}

--
Gitblit v1.9.1