From 1ce704d8c09bd4d4c7e5ab087f951f31fad9fca6 Mon Sep 17 00:00:00 2001
From: nichongjia-2007 <nichongjia@gmail.com>
Date: 星期五, 07 七月 2023 15:47:19 +0800
Subject: [PATCH] Merge branch 'main' of https://github.com/alibaba-damo-academy/FunASR

---
 funasr/runtime/python/websocket/funasr_wss_client.py |   93 +++++++++++++++++++++++-----------------------
 1 files changed, 47 insertions(+), 46 deletions(-)

diff --git a/funasr/runtime/python/websocket/wss_client_asr.py b/funasr/runtime/python/websocket/funasr_wss_client.py
similarity index 82%
rename from funasr/runtime/python/websocket/wss_client_asr.py
rename to funasr/runtime/python/websocket/funasr_wss_client.py
index 45b8ac4..72121f7 100644
--- a/funasr/runtime/python/websocket/wss_client_asr.py
+++ b/funasr/runtime/python/websocket/funasr_wss_client.py
@@ -8,11 +8,10 @@
 import json
 import traceback
 from multiprocessing import Process
-from funasr.fileio.datadir_writer import DatadirWriter
+# from funasr.fileio.datadir_writer import DatadirWriter
 
 import logging
 
-SUPPORT_AUDIO_TYPE_SETS = ['.wav', '.pcm']
 logging.basicConfig(level=logging.ERROR)
 
 parser = argparse.ArgumentParser()
@@ -42,10 +41,10 @@
                     action="store_true",
                     default=True,
                     help="if audio_in is set, send_without_sleep")
-parser.add_argument("--test_thread_num",
+parser.add_argument("--thread_num",
                     type=int,
                     default=1,
-                    help="test_thread_num")
+                    help="thread_num")
 parser.add_argument("--words_max_print",
                     type=int,
                     default=10000,
@@ -72,11 +71,13 @@
 
 voices = Queue()
 offline_msg_done=False
- 
-ibest_writer = None
+
 if args.output_dir is not None:
-    writer = DatadirWriter(args.output_dir)
-    ibest_writer = writer[f"1best_recog"]
+    # if os.path.exists(args.output_dir):
+    #     os.remove(args.output_dir)
+        
+    if not os.path.exists(args.output_dir):
+        os.makedirs(args.output_dir)
 
 
 async def record_microphone():
@@ -100,11 +101,13 @@
 
     message = json.dumps({"mode": args.mode, "chunk_size": args.chunk_size, "chunk_interval": args.chunk_interval,
                           "wav_name": "microphone", "is_speaking": True})
-    voices.put(message)
+    #voices.put(message)
+    await websocket.send(message)
     while True:
         data = stream.read(CHUNK)
         message = data
-        voices.put(message)
+        #voices.put(message)
+        await websocket.send(message)
         await asyncio.sleep(0.005)
 
 async def record_from_scp(chunk_begin, chunk_size):
@@ -134,8 +137,17 @@
                 frames = wav_file.readframes(wav_file.getnframes())
                 audio_bytes = bytes(frames)
         else:
-            raise NotImplementedError(
-                f'Not supported audio type')
+            import ffmpeg
+            try:
+                # This launches a subprocess to decode audio while down-mixing and resampling as necessary.
+                # Requires the ffmpeg CLI and `ffmpeg-python` package to be installed.
+                audio_bytes, _ = (
+                    ffmpeg.input(wav_path, threads=0)
+                    .output("-", format="s16le", acodec="pcm_s16le", ac=1, ar=16000)
+                    .run(cmd=["ffmpeg", "-nostdin"], capture_stdout=True, capture_stderr=True)
+                )
+            except ffmpeg.Error as e:
+                raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from e
 
         # stride = int(args.chunk_size/1000*16000*2)
         stride = int(60 * args.chunk_size[1] / args.chunk_interval / 1000 * 16000 * 2)
@@ -164,10 +176,9 @@
             sleep_duration = 0.001 if args.mode == "offline" else 60 * args.chunk_size[1] / args.chunk_interval / 1000
             
             await asyncio.sleep(sleep_duration)
-    # when all data sent, we need to close websocket
-    while not voices.empty():
-         await asyncio.sleep(1)
-    await asyncio.sleep(3)
+    
+    if not args.mode=="offline":
+        await asyncio.sleep(2)
     # offline model need to wait for message recved
     
     if args.mode=="offline":
@@ -176,17 +187,18 @@
          await asyncio.sleep(1)
     
     await websocket.close()
-     
- 
- 
 
- 
-             
+
+          
 async def message(id):
     global websocket,voices,offline_msg_done
     text_print = ""
     text_print_2pass_online = ""
     text_print_2pass_offline = ""
+    if args.output_dir is not None:
+        ibest_writer = open(os.path.join(args.output_dir, "text.{}".format(id)), "a", encoding="utf-8")
+    else:
+        ibest_writer = None
     try:
        while True:
         
@@ -194,9 +206,11 @@
             meg = json.loads(meg)
             wav_name = meg.get("wav_name", "demo")
             text = meg["text"]
-            if ibest_writer is not None:
-                ibest_writer["text"][wav_name] = text
 
+            if ibest_writer is not None:
+                text_write_line = "{}\t{}\n".format(wav_name, text)
+                ibest_writer.write(text_write_line)
+                
             if meg["mode"] == "online":
                 text_print += "{}".format(text)
                 text_print = text_print[-args.words_max_print:]
@@ -204,10 +218,10 @@
                 print("\rpid" + str(id) + ": " + text_print)
             elif meg["mode"] == "offline":
                 text_print += "{}".format(text)
-                text_print = text_print[-args.words_max_print:]
-                os.system('clear')
-                print("\rpid" + str(id) + ": " + text_print)
-                offline_msg_done=True
+                # text_print = text_print[-args.words_max_print:]
+                # os.system('clear')
+                print("\rpid" + str(id) + ": " + wav_name + ": " + text_print)
+                offline_msg_done = True
             else:
                 if meg["mode"] == "2pass-online":
                     text_print_2pass_online += "{}".format(text)
@@ -219,6 +233,7 @@
                 text_print = text_print[-args.words_max_print:]
                 os.system('clear')
                 print("\rpid" + str(id) + ": " + text_print)
+                offline_msg_done=True
 
     except Exception as e:
             print("Exception:", e)
@@ -227,17 +242,6 @@
  
 
 
-async def print_messge():
-    global websocket
-    while True:
-        try:
-            meg = await websocket.recv()
-            meg = json.loads(meg)
-            print(meg)
-        except Exception as e:
-            print("Exception:", e)
-            #traceback.print_exc()
-            exit(0)
 
 async def ws_client(id, chunk_begin, chunk_size):
   if args.audio_in is None:
@@ -262,7 +266,6 @@
             task = asyncio.create_task(record_from_scp(i, 1))
         else:
             task = asyncio.create_task(record_microphone())
-        #task2 = asyncio.create_task(ws_send())
         task3 = asyncio.create_task(message(str(id)+"_"+str(i))) #processid+fileid
         await asyncio.gather(task, task3)
   exit(0)
@@ -291,21 +294,19 @@
             wav_name = wav_splits[0] if len(wav_splits) > 1 else "demo"
             wav_path = wav_splits[1] if len(wav_splits) > 1 else wav_splits[0]
             audio_type = os.path.splitext(wav_path)[-1].lower()
-            if audio_type not in SUPPORT_AUDIO_TYPE_SETS:
-                raise NotImplementedError(
-                    f'Not supported audio type: {audio_type}')
+
 
         total_len = len(wavs)
-        if total_len >= args.test_thread_num:
-            chunk_size = int(total_len / args.test_thread_num)
-            remain_wavs = total_len - chunk_size * args.test_thread_num
+        if total_len >= args.thread_num:
+            chunk_size = int(total_len / args.thread_num)
+            remain_wavs = total_len - chunk_size * args.thread_num
         else:
             chunk_size = 1
             remain_wavs = 0
 
         process_list = []
         chunk_begin = 0
-        for i in range(args.test_thread_num):
+        for i in range(args.thread_num):
             now_chunk_size = chunk_size
             if remain_wavs > 0:
                 now_chunk_size = chunk_size + 1

--
Gitblit v1.9.1