| | |
| | | from funasr.models.transformer.scorers.scorer_interface import PartialScorerInterface |
| | | from funasr.models.transformer.scorers.scorer_interface import ScorerInterface |
| | | |
| | | |
| | | class Hypothesis(NamedTuple): |
| | | """Hypothesis data type.""" |
| | | |
| | |
| | | score=float(self.score), |
| | | scores={k: float(v) for k, v in self.scores.items()}, |
| | | )._asdict() |
| | | |
| | | |
| | | |
| | | class BeamSearch(torch.nn.Module): |
| | |
| | | new_states[k] = d.select_state(part_states[k], part_idx) |
| | | return new_states |
| | | |
| | | def search( |
| | | self, running_hyps: List[Hypothesis], x: torch.Tensor |
| | | ) -> List[Hypothesis]: |
| | | def search(self, running_hyps: List[Hypothesis], x: torch.Tensor) -> List[Hypothesis]: |
| | | """Search new tokens for running hypotheses and encoded speech x. |
| | | |
| | | Args: |
| | |
| | | Hypothesis( |
| | | score=weighted_scores[j], |
| | | yseq=self.append_token(hyp.yseq, j), |
| | | scores=self.merge_scores( |
| | | hyp.scores, scores, j, part_scores, part_j |
| | | ), |
| | | scores=self.merge_scores(hyp.scores, scores, j, part_scores, part_j), |
| | | states=self.merge_states(states, part_states, part_j), |
| | | ) |
| | | ) |
| | |
| | | # check the number of hypotheses reaching to eos |
| | | if len(nbest_hyps) == 0: |
| | | logging.warning( |
| | | "there is no N-best results, perform recognition " |
| | | "again with smaller minlenratio." |
| | | "there is no N-best results, perform recognition " "again with smaller minlenratio." |
| | | ) |
| | | return ( |
| | | [] |
| | |
| | | # report the best result |
| | | best = nbest_hyps[0] |
| | | for k, v in best.scores.items(): |
| | | logging.info( |
| | | f"{v:6.2f} * {self.weights[k]:3} = {v * self.weights[k]:6.2f} for {k}" |
| | | ) |
| | | logging.info(f"{v:6.2f} * {self.weights[k]:3} = {v * self.weights[k]:6.2f} for {k}") |
| | | logging.info(f"total log probability: {best.score:.2f}") |
| | | logging.info(f"normalized log probability: {best.score / len(best.yseq):.2f}") |
| | | logging.info(f"total number of ended hypotheses: {len(nbest_hyps)}") |
| | | if self.token_list is not None: |
| | | logging.info( |
| | | "best hypo: " |
| | | + "".join([self.token_list[x] for x in best.yseq[1:-1]]) |
| | | + "\n" |
| | | "best hypo: " + "".join([self.token_list[x] for x in best.yseq[1:-1]]) + "\n" |
| | | ) |
| | | return nbest_hyps |
| | | |
| | |
| | | logging.debug(f"the number of running hypotheses: {len(running_hyps)}") |
| | | if self.token_list is not None: |
| | | logging.debug( |
| | | "best hypo: " |
| | | + "".join([self.token_list[x] for x in running_hyps[0].yseq[1:]]) |
| | | "best hypo: " + "".join([self.token_list[x] for x in running_hyps[0].yseq[1:]]) |
| | | ) |
| | | # add eos in the final loop to avoid that there are no ended hyps |
| | | if i == maxlen - 1: |
| | | logging.info("adding <eos> in the last position in the loop") |
| | | running_hyps = [ |
| | | h._replace(yseq=self.append_token(h.yseq, self.eos)) |
| | | for h in running_hyps |
| | | h._replace(yseq=self.append_token(h.yseq, self.eos)) for h in running_hyps |
| | | ] |
| | | |
| | | # add ended hypotheses to a final list, and removed them from current hypotheses |