北念
2023-02-21 f53c55cff30ec34bdfde61e318e6910d57d184a8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
from concurrent import futures
import grpc
import json
import time
 
import paraformer_pb2_grpc
from paraformer_pb2 import Response
from rapid_paraformer.utils.frontend import load_bytes
 
 
class ASRServicer(paraformer_pb2_grpc.ASRServicer):
    def __init__(self, user_allowed, model, sample_rate, backend, onnx_dir):
        print("ASRServicer init")
        self.backend = backend
        self.init_flag = 0
        self.client_buffers = {}
        self.client_transcription = {}
        self.auth_user = user_allowed.split("|")
        if self.backend == "pipeline":
            try:
                from modelscope.pipelines import pipeline
                from modelscope.utils.constant import Tasks
            except ImportError:
                raise ImportError(f"Please install modelscope")
            self.inference_16k_pipeline = pipeline(task=Tasks.auto_speech_recognition, model=model)
        elif self.backend == "onnxruntime":
            try:
                from rapid_paraformer.paraformer_onnx import Paraformer
            except ImportError:
                raise ImportError(f"Please install onnxruntime environment")
            self.inference_16k_pipeline = Paraformer(model_dir=onnx_dir)
        self.sample_rate = sample_rate
 
    def clear_states(self, user):
        self.clear_buffers(user)
        self.clear_transcriptions(user)
 
    def clear_buffers(self, user):
        if user in self.client_buffers:
            del self.client_buffers[user]
 
    def clear_transcriptions(self, user):
        if user in self.client_transcription:
            del self.client_transcription[user]
 
    def disconnect(self, user):
        self.clear_states(user)
        print("Disconnecting user: %s" % str(user))
 
    def Recognize(self, request_iterator, context):
        
            
        for req in request_iterator:
            if req.user not in self.auth_user:
                result = {}
                result["success"] = False
                result["detail"] = "Not Authorized user: %s " % req.user
                result["text"] = ""
                yield Response(sentence=json.dumps(result), user=req.user, action="terminate", language=req.language)
            elif req.isEnd: #end grpc
                print("asr end")
                self.disconnect(req.user)
                result = {}
                result["success"] = True
                result["detail"] = "asr end"
                result["text"] = ""
                yield Response(sentence=json.dumps(result), user=req.user, action="terminate",language=req.language)
            elif req.speaking: #continue speaking
                if req.audio_data is not None and len(req.audio_data) > 0:
                    if req.user in self.client_buffers:
                        self.client_buffers[req.user] += req.audio_data #append audio
                    else:
                        self.client_buffers[req.user] = req.audio_data
                result = {}
                result["success"] = True
                result["detail"] = "speaking"
                result["text"] = ""
                yield Response(sentence=json.dumps(result), user=req.user, action="speaking", language=req.language)
            elif not req.speaking: #silence
                if req.user not in self.client_buffers:
                    result = {}
                    result["success"] = True
                    result["detail"] = "waiting_for_more_voice"
                    result["text"] = ""
                    yield Response(sentence=json.dumps(result), user=req.user, action="waiting", language=req.language)
                else:
                    begin_time = int(round(time.time() * 1000))
                    tmp_data = self.client_buffers[req.user]
                    self.clear_states(req.user)
                    result = {}
                    result["success"] = True
                    result["detail"] = "decoding data: %d bytes" % len(tmp_data)
                    result["text"] = ""
                    yield Response(sentence=json.dumps(result), user=req.user, action="decoding", language=req.language)
                    if len(tmp_data) < 9600: #min input_len for asr model , 300ms
                        end_time = int(round(time.time() * 1000))
                        delay_str = str(end_time - begin_time)
                        result = {}
                        result["success"] = True
                        result["detail"] = "waiting_for_more_voice"
                        result["server_delay_ms"] = delay_str
                        result["text"] = ""
                        print ("user: %s , delay(ms): %s, info: %s " % (req.user, delay_str, "waiting_for_more_voice"))
                        yield Response(sentence=json.dumps(result), user=req.user, action="waiting", language=req.language)
                    else:
                        if self.backend == "pipeline":
                            asr_result = self.inference_16k_pipeline(audio_in=tmp_data, audio_fs = self.sample_rate)
                            if "text" in asr_result:
                                asr_result = asr_result['text']
                            else:
                                asr_result = ""
                        elif self.backend == "onnxruntime":
                            array = load_bytes(tmp_data)
                            asr_result = self.inference_16k_pipeline(array)[0]
                        end_time = int(round(time.time() * 1000))
                        delay_str = str(end_time - begin_time)
                        print ("user: %s , delay(ms): %s, text: %s " % (req.user, delay_str, asr_result))
                        result = {}
                        result["success"] = True
                        result["detail"] = "finish_sentence"
                        result["server_delay_ms"] = delay_str
                        result["text"] = asr_result
                        yield Response(sentence=json.dumps(result), user=req.user, action="finish", language=req.language)
            else:
                result = {}
                result["success"] = False 
                result["detail"] = "error, no condition matched! Unknown reason."
                result["text"] = ""
                self.disconnect(req.user)
                yield Response(sentence=json.dumps(result), user=req.user, action="terminate", language=req.language)