雾聪
2023-06-28 54931dd4e1a099d7d6f144c4e12e5453deb3aa26
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# Copyright FunASR (https://github.com/alibaba-damo-academy/FunASR). All Rights Reserved.
#  MIT License  (https://opensource.org/licenses/MIT)
 
import logging
from pathlib import Path
from typing import Any
from typing import Optional
from typing import Tuple
from typing import Union
 
import numpy as np
import torch
from typeguard import check_argument_types
from typeguard import check_return_type
 
from funasr.build_utils.build_model_from_file import build_model_from_file
from funasr.torch_utils.device_funcs import to_device
from funasr.utils.misc import statistic_model_parameters
 
 
class Speech2Xvector:
    """Speech2Xvector class
 
    Examples:
        >>> import soundfile
        >>> speech2xvector = Speech2Xvector("sv_config.yml", "sv.pb")
        >>> audio, rate = soundfile.read("speech.wav")
        >>> speech2xvector(audio)
        [(text, token, token_int, hypothesis object), ...]
 
    """
 
    def __init__(
            self,
            sv_train_config: Union[Path, str] = None,
            sv_model_file: Union[Path, str] = None,
            device: str = "cpu",
            batch_size: int = 1,
            dtype: str = "float32",
            streaming: bool = False,
            embedding_node: str = "resnet1_dense",
    ):
        assert check_argument_types()
 
        # TODO: 1. Build SV model
        sv_model, sv_train_args = build_model_from_file(
            config_file=sv_train_config,
            model_file=sv_model_file,
            cmvn_file=None,
            device=device,
            task_name="sv",
            mode="sv",
        )
        logging.info("sv_model: {}".format(sv_model))
        logging.info("model parameter number: {}".format(statistic_model_parameters(sv_model)))
        logging.info("sv_train_args: {}".format(sv_train_args))
        sv_model.to(dtype=getattr(torch, dtype)).eval()
 
        self.sv_model = sv_model
        self.sv_train_args = sv_train_args
        self.device = device
        self.dtype = dtype
        self.embedding_node = embedding_node
 
    @torch.no_grad()
    def calculate_embedding(self, speech: Union[torch.Tensor, np.ndarray]) -> torch.Tensor:
        # Input as audio signal
        if isinstance(speech, np.ndarray):
            speech = torch.tensor(speech)
 
        # data: (Nsamples,) -> (1, Nsamples)
        speech = speech.unsqueeze(0).to(getattr(torch, self.dtype))
        # lengths: (1,)
        lengths = speech.new_full([1], dtype=torch.long, fill_value=speech.size(1))
        batch = {"speech": speech, "speech_lengths": lengths}
 
        # a. To device
        batch = to_device(batch, device=self.device)
 
        # b. Forward Encoder
        enc, ilens = self.sv_model.encode(**batch)
 
        # c. Forward Pooling
        pooling = self.sv_model.pooling_layer(enc)
 
        # d. Forward Decoder
        outputs, embeddings = self.sv_model.decoder(pooling)
 
        if self.embedding_node not in embeddings:
            raise ValueError("Required embedding node {} not in {}".format(
                self.embedding_node, embeddings.keys()))
 
        return embeddings[self.embedding_node]
 
    @torch.no_grad()
    def __call__(
            self, speech: Union[torch.Tensor, np.ndarray],
            ref_speech: Optional[Union[torch.Tensor, np.ndarray]] = None,
    ) -> Tuple[torch.Tensor, Union[torch.Tensor, None], Union[torch.Tensor, None]]:
        """Inference
 
        Args:
            speech: Input speech data
            ref_speech: Reference speech to compare
        Returns:
            embedding, ref_embedding, similarity_score
 
        """
        assert check_argument_types()
        self.sv_model.eval()
        embedding = self.calculate_embedding(speech)
        ref_emb, score = None, None
        if ref_speech is not None:
            ref_emb = self.calculate_embedding(ref_speech)
            score = torch.cosine_similarity(embedding, ref_emb)
 
        results = (embedding, ref_emb, score)
        assert check_return_type(results)
        return results
 
    @staticmethod
    def from_pretrained(
            model_tag: Optional[str] = None,
            **kwargs: Optional[Any],
    ):
        """Build Speech2Xvector instance from the pretrained model.
 
        Args:
            model_tag (Optional[str]): Model tag of the pretrained models.
                Currently, the tags of espnet_model_zoo are supported.
 
        Returns:
            Speech2Xvector: Speech2Xvector instance.
 
        """
        if model_tag is not None:
            try:
                from espnet_model_zoo.downloader import ModelDownloader
 
            except ImportError:
                logging.error(
                    "`espnet_model_zoo` is not installed. "
                    "Please install via `pip install -U espnet_model_zoo`."
                )
                raise
            d = ModelDownloader()
            kwargs.update(**d.download_and_unpack(model_tag))
 
        return Speech2Xvector(**kwargs)