游雁
2023-07-03 4ee715e70e36cdba7b05fe044fecab9cf4fa16ff
funasr/runtime/python/websocket/wss_client_asr.py
@@ -40,7 +40,7 @@
                    help="audio_in")
parser.add_argument("--send_without_sleep",
                    action="store_true",
                    default=False,
                    default=True,
                    help="if audio_in is set, send_without_sleep")
parser.add_argument("--test_thread_num",
                    type=int,
@@ -71,6 +71,8 @@
from queue import Queue
voices = Queue()
offline_msg_done=False
ibest_writer = None
if args.output_dir is not None:
    writer = DatadirWriter(args.output_dir)
@@ -158,22 +160,45 @@
                message = json.dumps({"is_speaking": is_speaking})
                #voices.put(message)
                await websocket.send(message)
            # print("data_chunk: ", len(data_chunk))
            # print(voices.qsize())
            sleep_duration = 0.001 if args.send_without_sleep else 60 * args.chunk_size[1] / args.chunk_interval / 1000
            sleep_duration = 0.001 if args.mode == "offline" else 60 * args.chunk_size[1] / args.chunk_interval / 1000
            await asyncio.sleep(sleep_duration)
    # when all data sent, we need to close websocket
    while not voices.empty():
         await asyncio.sleep(1)
    await asyncio.sleep(3)
    # offline model need to wait for message recved
    if args.mode=="offline":
      global offline_msg_done
      while  not  offline_msg_done:
         await asyncio.sleep(1)
    await websocket.close()
async def ws_send():
    global voices
    global websocket
    print("started to sending data!")
    while True:
        while not voices.empty():
            data = voices.get()
            voices.task_done()
            try:
                await websocket.send(data)
            except Exception as e:
                print('Exception occurred:', e)
                traceback.print_exc()
                exit(0)
            await asyncio.sleep(0.005)
        await asyncio.sleep(0.005)
 
             
async def message(id):
    global websocket,voices
    global websocket,voices,offline_msg_done
    text_print = ""
    text_print_2pass_online = ""
    text_print_2pass_offline = ""
@@ -183,7 +208,6 @@
            meg = await websocket.recv()
            meg = json.loads(meg)
            wav_name = meg.get("wav_name", "demo")
            # print(wav_name)
            text = meg["text"]
            if ibest_writer is not None:
                ibest_writer["text"][wav_name] = text
@@ -198,6 +222,7 @@
                text_print = text_print[-args.words_max_print:]
                os.system('clear')
                print("\rpid" + str(id) + ": " + text_print)
                offline_msg_done=True
            else:
                if meg["mode"] == "2pass-online":
                    text_print_2pass_online += "{}".format(text)
@@ -233,8 +258,10 @@
  if args.audio_in is None:
       chunk_begin=0
       chunk_size=1
  global websocket,voices
  global websocket,voices,offline_msg_done
  for i in range(chunk_begin,chunk_begin+chunk_size):
    offline_msg_done=False
    voices = Queue()
    if args.ssl == 1:
        ssl_context = ssl.SSLContext()
@@ -250,9 +277,9 @@
            task = asyncio.create_task(record_from_scp(i, 1))
        else:
            task = asyncio.create_task(record_microphone())
        #task2 = asyncio.create_task(ws_send())
        task3 = asyncio.create_task(message(id))
        await asyncio.gather(task, task3)
        task2 = asyncio.create_task(ws_send())
        task3 = asyncio.create_task(message(str(id)+"_"+str(i))) #processid+fileid
        await asyncio.gather(task, task2, task3)
  exit(0)