From bc723ea200144bd6fa8a5dff4b9a780feda144fc Mon Sep 17 00:00:00 2001
From: 游雁 <zhifu.gzf@alibaba-inc.com>
Date: 星期四, 29 六月 2023 18:55:01 +0800
Subject: [PATCH] dcos
---
funasr/runtime/python/websocket/wss_client_asr.py | 92 +++++++++++++++++++++++++++-------------------
1 files changed, 54 insertions(+), 38 deletions(-)
diff --git a/funasr/runtime/python/websocket/wss_client_asr.py b/funasr/runtime/python/websocket/wss_client_asr.py
index dec598a..2ea8a16 100644
--- a/funasr/runtime/python/websocket/wss_client_asr.py
+++ b/funasr/runtime/python/websocket/wss_client_asr.py
@@ -71,7 +71,8 @@
from queue import Queue
voices = Queue()
-
+offline_msg_done=False
+
ibest_writer = None
if args.output_dir is not None:
writer = DatadirWriter(args.output_dir)
@@ -118,9 +119,11 @@
wavs = wavs[chunk_begin:chunk_begin + chunk_size]
for wav in wavs:
wav_splits = wav.strip().split()
+
wav_name = wav_splits[0] if len(wav_splits) > 1 else "demo"
wav_path = wav_splits[1] if len(wav_splits) > 1 else wav_splits[0]
-
+ if not len(wav_path.strip())>0:
+ continue
if wav_path.endswith(".pcm"):
with open(wav_path, "rb") as f:
audio_bytes = f.read()
@@ -142,51 +145,53 @@
# send first time
message = json.dumps({"mode": args.mode, "chunk_size": args.chunk_size, "chunk_interval": args.chunk_interval,
"wav_name": wav_name, "is_speaking": True})
- voices.put(message)
+ #voices.put(message)
+ await websocket.send(message)
is_speaking = True
for i in range(chunk_num):
beg = i * stride
data = audio_bytes[beg:beg + stride]
message = data
- voices.put(message)
+ #voices.put(message)
+ await websocket.send(message)
if i == chunk_num - 1:
is_speaking = False
message = json.dumps({"is_speaking": is_speaking})
- voices.put(message)
- # print("data_chunk: ", len(data_chunk))
- # print(voices.qsize())
+ #voices.put(message)
+ await websocket.send(message)
+
sleep_duration = 0.001 if args.send_without_sleep else 60 * args.chunk_size[1] / args.chunk_interval / 1000
await asyncio.sleep(sleep_duration)
+ # when all data sent, we need to close websocket
+ while not voices.empty():
+ await asyncio.sleep(1)
+ await asyncio.sleep(3)
+ # offline model need to wait for message recved
+
+ if args.mode=="offline":
+ global offline_msg_done
+ while not offline_msg_done:
+ await asyncio.sleep(1)
+
+ await websocket.close()
+
+
+
-async def ws_send():
- global voices
- global websocket
- print("started to sending data!")
- while True:
- while not voices.empty():
- data = voices.get()
- voices.task_done()
- try:
- await websocket.send(data)
- except Exception as e:
- print('Exception occurred:', e)
- traceback.print_exc()
- exit(0)
- await asyncio.sleep(0.005)
- await asyncio.sleep(0.005)
-
+
+
async def message(id):
- global websocket
+ global websocket,voices,offline_msg_done
text_print = ""
text_print_2pass_online = ""
text_print_2pass_offline = ""
- while True:
- try:
+ try:
+ while True:
+
meg = await websocket.recv()
meg = json.loads(meg)
wav_name = meg.get("wav_name", "demo")
- # print(wav_name)
text = meg["text"]
if ibest_writer is not None:
ibest_writer["text"][wav_name] = text
@@ -201,6 +206,7 @@
text_print = text_print[-args.words_max_print:]
os.system('clear')
print("\rpid" + str(id) + ": " + text_print)
+ offline_msg_done=True
else:
if meg["mode"] == "2pass-online":
text_print_2pass_online += "{}".format(text)
@@ -213,10 +219,11 @@
os.system('clear')
print("\rpid" + str(id) + ": " + text_print)
- except Exception as e:
+ except Exception as e:
print("Exception:", e)
- traceback.print_exc()
- exit(0)
+ #traceback.print_exc()
+ #await websocket.close()
+
async def print_messge():
@@ -228,11 +235,18 @@
print(meg)
except Exception as e:
print("Exception:", e)
- traceback.print_exc()
+ #traceback.print_exc()
exit(0)
async def ws_client(id, chunk_begin, chunk_size):
- global websocket
+ if args.audio_in is None:
+ chunk_begin=0
+ chunk_size=1
+ global websocket,voices,offline_msg_done
+
+ for i in range(chunk_begin,chunk_begin+chunk_size):
+ offline_msg_done=False
+ voices = Queue()
if args.ssl == 1:
ssl_context = ssl.SSLContext()
ssl_context.check_hostname = False
@@ -242,14 +256,16 @@
uri = "ws://{}:{}".format(args.host, args.port)
ssl_context = None
print("connect to", uri)
- async for websocket in websockets.connect(uri, subprotocols=["binary"], ping_interval=None, ssl=ssl_context):
+ async with websockets.connect(uri, subprotocols=["binary"], ping_interval=None, ssl=ssl_context) as websocket:
if args.audio_in is not None:
- task = asyncio.create_task(record_from_scp(chunk_begin, chunk_size))
+ task = asyncio.create_task(record_from_scp(i, 1))
else:
task = asyncio.create_task(record_microphone())
- task2 = asyncio.create_task(ws_send())
- task3 = asyncio.create_task(message(id))
- await asyncio.gather(task, task2, task3)
+ #task2 = asyncio.create_task(ws_send())
+ task3 = asyncio.create_task(message(str(id)+"_"+str(i))) #processid+fileid
+ await asyncio.gather(task, task3)
+ exit(0)
+
def one_thread(id, chunk_begin, chunk_size):
asyncio.get_event_loop().run_until_complete(ws_client(id, chunk_begin, chunk_size))
--
Gitblit v1.9.1