| funasr/runtime/python/websocket/wss_client_asr.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| funasr/runtime/websocket/CMakeLists.txt | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| funasr/runtime/websocket/funasr-ws-client.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| funasr/runtime/websocket/funasr-ws-server.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| funasr/runtime/websocket/readme.md | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| funasr/runtime/websocket/websocket-server.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| funasr/runtime/websocket/websocket-server.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| funasr/runtime/websocket/websocketclient.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| tests/test_asr_inference_pipeline.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| tests/test_asr_vad_punc_inference_pipeline.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
funasr/runtime/python/websocket/wss_client_asr.py
@@ -1,7 +1,7 @@ # -*- encoding: utf-8 -*- import os import time import websockets,ssl import websockets, ssl import asyncio # import threading import argparse @@ -12,6 +12,7 @@ import logging SUPPORT_AUDIO_TYPE_SETS = ['.wav', '.pcm'] logging.basicConfig(level=logging.ERROR) parser = argparse.ArgumentParser() @@ -53,7 +54,7 @@ type=str, default=None, help="output_dir") parser.add_argument("--ssl", type=int, default=1, @@ -68,22 +69,25 @@ print(args) # voices = asyncio.Queue() from queue import Queue voices = Queue() voices = Queue() offline_msg_done=False ibest_writer = None if args.output_dir is not None: writer = DatadirWriter(args.output_dir) ibest_writer = writer[f"1best_recog"] async def record_microphone(): is_finished = False import pyaudio #print("2") global voices # print("2") global voices FORMAT = pyaudio.paInt16 CHANNELS = 1 RATE = 16000 chunk_size = 60*args.chunk_size[1]/args.chunk_interval chunk_size = 60 * args.chunk_size[1] / args.chunk_interval CHUNK = int(RATE / 1000 * chunk_size) p = pyaudio.PyAudio() @@ -94,19 +98,16 @@ input=True, frames_per_buffer=CHUNK) message = json.dumps({"mode": args.mode, "chunk_size": args.chunk_size, "chunk_interval": args.chunk_interval, "wav_name": "microphone", "is_speaking": True}) message = json.dumps({"mode": args.mode, "chunk_size": args.chunk_size, "chunk_interval": args.chunk_interval, "wav_name": "microphone", "is_speaking": True}) voices.put(message) while True: data = stream.read(CHUNK) message = data message = data voices.put(message) await asyncio.sleep(0.005) async def record_from_scp(chunk_begin,chunk_size): import wave async def record_from_scp(chunk_begin, chunk_size): global voices is_finished = False if args.audio_in.endswith(".scp"): @@ -114,91 +115,98 @@ wavs = f_scp.readlines() else: wavs = [args.audio_in] if chunk_size>0: wavs=wavs[chunk_begin:chunk_begin+chunk_size] if chunk_size > 0: wavs = wavs[chunk_begin:chunk_begin + chunk_size] for wav in wavs: wav_splits = wav.strip().split() wav_name = wav_splits[0] if len(wav_splits) > 1 else "demo" wav_path = wav_splits[1] if len(wav_splits) > 1 else wav_splits[0] # bytes_f = open(wav_path, "rb") # bytes_data = bytes_f.read() with wave.open(wav_path, "rb") as wav_file: params = wav_file.getparams() # header_length = wav_file.getheaders()[0][1] # wav_file.setpos(header_length) frames = wav_file.readframes(wav_file.getnframes()) if not len(wav_path.strip())>0: continue if wav_path.endswith(".pcm"): with open(wav_path, "rb") as f: audio_bytes = f.read() elif wav_path.endswith(".wav"): import wave with wave.open(wav_path, "rb") as wav_file: params = wav_file.getparams() frames = wav_file.readframes(wav_file.getnframes()) audio_bytes = bytes(frames) else: raise NotImplementedError( f'Not supported audio type') audio_bytes = bytes(frames) # stride = int(args.chunk_size/1000*16000*2) stride = int(60*args.chunk_size[1]/args.chunk_interval/1000*16000*2) chunk_num = (len(audio_bytes)-1)//stride + 1 stride = int(60 * args.chunk_size[1] / args.chunk_interval / 1000 * 16000 * 2) chunk_num = (len(audio_bytes) - 1) // stride + 1 # print(stride) # send first time message = json.dumps({"mode": args.mode, "chunk_size": args.chunk_size, "chunk_interval": args.chunk_interval, "wav_name": wav_name,"is_speaking": True}) voices.put(message) message = json.dumps({"mode": args.mode, "chunk_size": args.chunk_size, "chunk_interval": args.chunk_interval, "wav_name": wav_name, "is_speaking": True}) #voices.put(message) await websocket.send(message) is_speaking = True for i in range(chunk_num): beg = i*stride data = audio_bytes[beg:beg+stride] message = data voices.put(message) if i == chunk_num-1: beg = i * stride data = audio_bytes[beg:beg + stride] message = data #voices.put(message) await websocket.send(message) if i == chunk_num - 1: is_speaking = False message = json.dumps({"is_speaking": is_speaking}) voices.put(message) # print("data_chunk: ", len(data_chunk)) # print(voices.qsize()) sleep_duration = 0.001 if args.send_without_sleep else 60*args.chunk_size[1]/args.chunk_interval/1000 #voices.put(message) await websocket.send(message) sleep_duration = 0.001 if args.send_without_sleep else 60 * args.chunk_size[1] / args.chunk_interval / 1000 await asyncio.sleep(sleep_duration) # when all data sent, we need to close websocket while not voices.empty(): await asyncio.sleep(1) await asyncio.sleep(3) # offline model need to wait for message recved if args.mode=="offline": global offline_msg_done while not offline_msg_done: await asyncio.sleep(1) await websocket.close() async def ws_send(): global voices global websocket print("started to sending data!") while True: while not voices.empty(): data = voices.get() voices.task_done() try: await websocket.send(data) except Exception as e: print('Exception occurred:', e) traceback.print_exc() exit(0) await asyncio.sleep(0.005) await asyncio.sleep(0.005) async def message(id): global websocket global websocket,voices,offline_msg_done text_print = "" text_print_2pass_online = "" text_print_2pass_offline = "" while True: try: try: while True: meg = await websocket.recv() meg = json.loads(meg) wav_name = meg.get("wav_name", "demo") # print(wav_name) text = meg["text"] if ibest_writer is not None: ibest_writer["text"][wav_name] = text if meg["mode"] == "online": text_print += "{}".format(text) text_print = text_print[-args.words_max_print:] os.system('clear') print("\rpid"+str(id)+": "+text_print) print("\rpid" + str(id) + ": " + text_print) elif meg["mode"] == "offline": text_print += "{}".format(text) text_print = text_print[-args.words_max_print:] os.system('clear') print("\rpid"+str(id)+": "+text_print) print("\rpid" + str(id) + ": " + text_print) offline_msg_done=True else: if meg["mode"] == "2pass-online": text_print_2pass_online += "{}".format(text) @@ -211,10 +219,12 @@ os.system('clear') print("\rpid" + str(id) + ": " + text_print) except Exception as e: except Exception as e: print("Exception:", e) traceback.print_exc() exit(0) #traceback.print_exc() #await websocket.close() async def print_messge(): global websocket @@ -225,72 +235,87 @@ print(meg) except Exception as e: print("Exception:", e) traceback.print_exc() #traceback.print_exc() exit(0) async def ws_client(id,chunk_begin,chunk_size): global websocket if args.ssl==1: ssl_context = ssl.SSLContext() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE uri = "wss://{}:{}".format(args.host, args.port) async def ws_client(id, chunk_begin, chunk_size): if args.audio_in is None: chunk_begin=0 chunk_size=1 global websocket,voices,offline_msg_done for i in range(chunk_begin,chunk_begin+chunk_size): offline_msg_done=False voices = Queue() if args.ssl == 1: ssl_context = ssl.SSLContext() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE uri = "wss://{}:{}".format(args.host, args.port) else: uri = "ws://{}:{}".format(args.host, args.port) ssl_context=None print("connect to",uri) async for websocket in websockets.connect(uri, subprotocols=["binary"], ping_interval=None,ssl=ssl_context): uri = "ws://{}:{}".format(args.host, args.port) ssl_context = None print("connect to", uri) async with websockets.connect(uri, subprotocols=["binary"], ping_interval=None, ssl=ssl_context) as websocket: if args.audio_in is not None: task = asyncio.create_task(record_from_scp(chunk_begin,chunk_size)) task = asyncio.create_task(record_from_scp(i, 1)) else: task = asyncio.create_task(record_microphone()) task2 = asyncio.create_task(ws_send()) task3 = asyncio.create_task(message(id)) await asyncio.gather(task, task2, task3) #task2 = asyncio.create_task(ws_send()) task3 = asyncio.create_task(message(str(id)+"_"+str(i))) #processid+fileid await asyncio.gather(task, task3) exit(0) def one_thread(id,chunk_begin,chunk_size): asyncio.get_event_loop().run_until_complete(ws_client(id,chunk_begin,chunk_size)) asyncio.get_event_loop().run_forever() def one_thread(id, chunk_begin, chunk_size): asyncio.get_event_loop().run_until_complete(ws_client(id, chunk_begin, chunk_size)) asyncio.get_event_loop().run_forever() if __name__ == '__main__': # for microphone if args.audio_in is None: p = Process(target=one_thread,args=(0, 0, 0)) p.start() p.join() print('end') else: # calculate the number of wavs for each preocess if args.audio_in.endswith(".scp"): f_scp = open(args.audio_in) wavs = f_scp.readlines() else: wavs = [args.audio_in] total_len=len(wavs) if total_len>=args.test_thread_num: chunk_size=int((total_len)/args.test_thread_num) remain_wavs=total_len-chunk_size*args.test_thread_num else: chunk_size=1 remain_wavs=0 # for microphone if args.audio_in is None: p = Process(target=one_thread, args=(0, 0, 0)) p.start() p.join() print('end') else: # calculate the number of wavs for each preocess if args.audio_in.endswith(".scp"): f_scp = open(args.audio_in) wavs = f_scp.readlines() else: wavs = [args.audio_in] for wav in wavs: wav_splits = wav.strip().split() wav_name = wav_splits[0] if len(wav_splits) > 1 else "demo" wav_path = wav_splits[1] if len(wav_splits) > 1 else wav_splits[0] audio_type = os.path.splitext(wav_path)[-1].lower() if audio_type not in SUPPORT_AUDIO_TYPE_SETS: raise NotImplementedError( f'Not supported audio type: {audio_type}') process_list = [] chunk_begin=0 for i in range(args.test_thread_num): now_chunk_size= chunk_size if remain_wavs>0: now_chunk_size=chunk_size+1 remain_wavs=remain_wavs-1 # process i handle wavs at chunk_begin and size of now_chunk_size p = Process(target=one_thread,args=(i,chunk_begin,now_chunk_size)) chunk_begin=chunk_begin+now_chunk_size p.start() process_list.append(p) total_len = len(wavs) if total_len >= args.test_thread_num: chunk_size = int(total_len / args.test_thread_num) remain_wavs = total_len - chunk_size * args.test_thread_num else: chunk_size = 1 remain_wavs = 0 for i in process_list: p.join() process_list = [] chunk_begin = 0 for i in range(args.test_thread_num): now_chunk_size = chunk_size if remain_wavs > 0: now_chunk_size = chunk_size + 1 remain_wavs = remain_wavs - 1 # process i handle wavs at chunk_begin and size of now_chunk_size p = Process(target=one_thread, args=(i, chunk_begin, now_chunk_size)) chunk_begin = chunk_begin + now_chunk_size p.start() process_list.append(p) print('end') for i in process_list: p.join() print('end') funasr/runtime/websocket/CMakeLists.txt
@@ -6,12 +6,10 @@ set(CMAKE_POSITION_INDEPENDENT_CODE ON) set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin) option(ENABLE_WEBSOCKET "Whether to build websocket server" ON) if(ENABLE_WEBSOCKET) # cmake_policy(SET CMP0135 NEW) include(FetchContent) FetchContent_Declare(websocketpp GIT_REPOSITORY https://github.com/zaphoyd/websocketpp.git @@ -22,7 +20,6 @@ FetchContent_MakeAvailable(websocketpp) include_directories(${PROJECT_SOURCE_DIR}/third_party/websocket) FetchContent_Declare(asio URL https://github.com/chriskohlhoff/asio/archive/refs/tags/asio-1-24-0.tar.gz SOURCE_DIR ${PROJECT_SOURCE_DIR}/third_party/asio @@ -38,8 +35,6 @@ FetchContent_MakeAvailable(json) include_directories(${PROJECT_SOURCE_DIR}/third_party/json/include) endif() @@ -61,8 +56,8 @@ # install openssl first apt-get install libssl-dev find_package(OpenSSL REQUIRED) add_executable(websocketmain "websocketmain.cpp" "websocketsrv.cpp") add_executable(websocketclient "websocketclient.cpp") add_executable(funasr-ws-server "funasr-ws-server.cpp" "websocket-server.cpp") add_executable(funasr-ws-client "funasr-ws-client.cpp") target_link_libraries(websocketclient PUBLIC funasr ssl crypto) target_link_libraries(websocketmain PUBLIC funasr ssl crypto) target_link_libraries(funasr-ws-client PUBLIC funasr ssl crypto) target_link_libraries(funasr-ws-server PUBLIC funasr ssl crypto) funasr/runtime/websocket/funasr-ws-client.cpp
New file @@ -0,0 +1,366 @@ /** * Copyright FunASR (https://github.com/alibaba-damo-academy/FunASR). All Rights * Reserved. MIT License (https://opensource.org/licenses/MIT) */ /* 2022-2023 by zhaomingwork */ // client for websocket, support multiple threads // ./funasr-ws-client --server-ip <string> // --port <string> // --wav-path <string> // [--thread-num <int>] // [--is-ssl <int>] [--] // [--version] [-h] // example: // ./funasr-ws-client --server-ip 127.0.0.1 --port 8889 --wav-path test.wav --thread-num 1 --is-ssl 0 #define ASIO_STANDALONE 1 #include <websocketpp/client.hpp> #include <websocketpp/common/thread.hpp> #include <websocketpp/config/asio_client.hpp> #include <fstream> #include <atomic> #include <glog/logging.h> #include "audio.h" #include "nlohmann/json.hpp" #include "tclap/CmdLine.h" /** * Define a semi-cross platform helper method that waits/sleeps for a bit. */ void WaitABit() { #ifdef WIN32 Sleep(1000); #else sleep(1); #endif } std::atomic<int> wav_index(0); bool IsTargetFile(const std::string& filename, const std::string target) { std::size_t pos = filename.find_last_of("."); if (pos == std::string::npos) { return false; } std::string extension = filename.substr(pos + 1); return (extension == target); } typedef websocketpp::config::asio_client::message_type::ptr message_ptr; typedef websocketpp::lib::shared_ptr<websocketpp::lib::asio::ssl::context> context_ptr; using websocketpp::lib::bind; using websocketpp::lib::placeholders::_1; using websocketpp::lib::placeholders::_2; context_ptr OnTlsInit(websocketpp::connection_hdl) { context_ptr ctx = websocketpp::lib::make_shared<asio::ssl::context>( asio::ssl::context::sslv23); try { ctx->set_options( asio::ssl::context::default_workarounds | asio::ssl::context::no_sslv2 | asio::ssl::context::no_sslv3 | asio::ssl::context::single_dh_use); } catch (std::exception& e) { LOG(ERROR) << e.what(); } return ctx; } // template for tls or not config template <typename T> class WebsocketClient { public: // typedef websocketpp::client<T> client; // typedef websocketpp::client<websocketpp::config::asio_tls_client> // wss_client; typedef websocketpp::lib::lock_guard<websocketpp::lib::mutex> scoped_lock; WebsocketClient(int is_ssl) : m_open(false), m_done(false) { // set up access channels to only log interesting things m_client.clear_access_channels(websocketpp::log::alevel::all); m_client.set_access_channels(websocketpp::log::alevel::connect); m_client.set_access_channels(websocketpp::log::alevel::disconnect); m_client.set_access_channels(websocketpp::log::alevel::app); // Initialize the Asio transport policy m_client.init_asio(); // Bind the handlers we are using using websocketpp::lib::bind; using websocketpp::lib::placeholders::_1; m_client.set_open_handler(bind(&WebsocketClient::on_open, this, _1)); m_client.set_close_handler(bind(&WebsocketClient::on_close, this, _1)); // m_client.set_close_handler(bind(&WebsocketClient::on_close, this, _1)); m_client.set_message_handler( [this](websocketpp::connection_hdl hdl, message_ptr msg) { on_message(hdl, msg); }); m_client.set_fail_handler(bind(&WebsocketClient::on_fail, this, _1)); m_client.clear_access_channels(websocketpp::log::alevel::all); } void on_message(websocketpp::connection_hdl hdl, message_ptr msg) { const std::string& payload = msg->get_payload(); switch (msg->get_opcode()) { case websocketpp::frame::opcode::text: total_num=total_num+1; LOG(INFO)<<total_num<<",on_message = " << payload; if((total_num+1)==wav_index) { websocketpp::lib::error_code ec; m_client.close(m_hdl, websocketpp::close::status::going_away, "", ec); if (ec){ LOG(ERROR)<< "Error closing connection " << ec.message(); } } } } // This method will block until the connection is complete void run(const std::string& uri, const std::vector<string>& wav_list, const std::vector<string>& wav_ids) { // Create a new connection to the given URI websocketpp::lib::error_code ec; typename websocketpp::client<T>::connection_ptr con = m_client.get_connection(uri, ec); if (ec) { m_client.get_alog().write(websocketpp::log::alevel::app, "Get Connection Error: " + ec.message()); return; } // Grab a handle for this connection so we can talk to it in a thread // safe manor after the event loop starts. m_hdl = con->get_handle(); // Queue the connection. No DNS queries or network connections will be // made until the io_service event loop is run. m_client.connect(con); // Create a thread to run the ASIO io_service event loop websocketpp::lib::thread asio_thread(&websocketpp::client<T>::run, &m_client); while(true){ int i = wav_index.fetch_add(1); if (i >= wav_list.size()) { break; } send_wav_data(wav_list[i], wav_ids[i]); } WaitABit(); asio_thread.join(); } // The open handler will signal that we are ready to start sending data void on_open(websocketpp::connection_hdl) { m_client.get_alog().write(websocketpp::log::alevel::app, "Connection opened, starting data!"); scoped_lock guard(m_lock); m_open = true; } // The close handler will signal that we should stop sending data void on_close(websocketpp::connection_hdl) { m_client.get_alog().write(websocketpp::log::alevel::app, "Connection closed, stopping data!"); scoped_lock guard(m_lock); m_done = true; } // The fail handler will signal that we should stop sending data void on_fail(websocketpp::connection_hdl) { m_client.get_alog().write(websocketpp::log::alevel::app, "Connection failed, stopping data!"); scoped_lock guard(m_lock); m_done = true; } // send wav to server void send_wav_data(string wav_path, string wav_id) { uint64_t count = 0; std::stringstream val; funasr::Audio audio(1); int32_t sampling_rate = 16000; if(IsTargetFile(wav_path.c_str(), "wav")){ int32_t sampling_rate = -1; if(!audio.LoadWav(wav_path.c_str(), &sampling_rate)) return ; }else if(IsTargetFile(wav_path.c_str(), "pcm")){ if (!audio.LoadPcmwav(wav_path.c_str(), &sampling_rate)) return ; }else{ printf("Wrong wav extension"); exit(-1); } float* buff; int len; int flag = 0; bool wait = false; while (1) { { scoped_lock guard(m_lock); // If the connection has been closed, stop generating data if (m_done) { break; } // If the connection hasn't been opened yet wait a bit and retry if (!m_open) { wait = true; } else { break; } } if (wait) { LOG(INFO) << "wait.." << m_open; WaitABit(); continue; } } websocketpp::lib::error_code ec; nlohmann::json jsonbegin; nlohmann::json chunk_size = nlohmann::json::array(); chunk_size.push_back(5); chunk_size.push_back(0); chunk_size.push_back(5); jsonbegin["chunk_size"] = chunk_size; jsonbegin["chunk_interval"] = 10; jsonbegin["wav_name"] = wav_id; jsonbegin["is_speaking"] = true; m_client.send(m_hdl, jsonbegin.dump(), websocketpp::frame::opcode::text, ec); // fetch wav data use asr engine api while (audio.Fetch(buff, len, flag) > 0) { short iArray[len]; // convert float -1,1 to short -32768,32767 for (size_t i = 0; i < len; ++i) { iArray[i] = (short)(buff[i] * 32767); } // send data to server m_client.send(m_hdl, iArray, len * sizeof(short), websocketpp::frame::opcode::binary, ec); LOG(INFO) << "sended data len=" << len * sizeof(short); // The most likely error that we will get is that the connection is // not in the right state. Usually this means we tried to send a // message to a connection that was closed or in the process of // closing. While many errors here can be easily recovered from, // in this simple example, we'll stop the data loop. if (ec) { m_client.get_alog().write(websocketpp::log::alevel::app, "Send Error: " + ec.message()); break; } // WaitABit(); } nlohmann::json jsonresult; jsonresult["is_speaking"] = false; m_client.send(m_hdl, jsonresult.dump(), websocketpp::frame::opcode::text, ec); // WaitABit(); } websocketpp::client<T> m_client; private: websocketpp::connection_hdl m_hdl; websocketpp::lib::mutex m_lock; bool m_open; bool m_done; int total_num=0; }; int main(int argc, char* argv[]) { google::InitGoogleLogging(argv[0]); FLAGS_logtostderr = true; TCLAP::CmdLine cmd("funasr-ws-client", ' ', "1.0"); TCLAP::ValueArg<std::string> server_ip_("", "server-ip", "server-ip", true, "127.0.0.1", "string"); TCLAP::ValueArg<std::string> port_("", "port", "port", true, "8889", "string"); TCLAP::ValueArg<std::string> wav_path_("", "wav-path", "the input could be: wav_path, e.g.: asr_example.wav; pcm_path, e.g.: asr_example.pcm; wav.scp, kaldi style wav list (wav_id \t wav_path)", true, "", "string"); TCLAP::ValueArg<int> thread_num_("", "thread-num", "thread-num", false, 1, "int"); TCLAP::ValueArg<int> is_ssl_( "", "is-ssl", "is-ssl is 1 means use wss connection, or use ws connection", false, 0, "int"); cmd.add(server_ip_); cmd.add(port_); cmd.add(wav_path_); cmd.add(thread_num_); cmd.add(is_ssl_); cmd.parse(argc, argv); std::string server_ip = server_ip_.getValue(); std::string port = port_.getValue(); std::string wav_path = wav_path_.getValue(); int threads_num = thread_num_.getValue(); int is_ssl = is_ssl_.getValue(); std::vector<websocketpp::lib::thread> client_threads; std::string uri = ""; if (is_ssl == 1) { uri = "wss://" + server_ip + ":" + port; } else { uri = "ws://" + server_ip + ":" + port; } // read wav_path std::vector<string> wav_list; std::vector<string> wav_ids; string default_id = "wav_default_id"; if(IsTargetFile(wav_path, "wav") || IsTargetFile(wav_path, "pcm")){ wav_list.emplace_back(wav_path); wav_ids.emplace_back(default_id); } else if(IsTargetFile(wav_path, "scp")){ ifstream in(wav_path); if (!in.is_open()) { printf("Failed to open scp file"); return 0; } string line; while(getline(in, line)) { istringstream iss(line); string column1, column2; iss >> column1 >> column2; wav_list.emplace_back(column2); wav_ids.emplace_back(column1); } in.close(); }else{ printf("Please check the wav extension!"); exit(-1); } for (size_t i = 0; i < threads_num; i++) { client_threads.emplace_back([uri, wav_list, wav_ids, is_ssl]() { if (is_ssl == 1) { WebsocketClient<websocketpp::config::asio_tls_client> c(is_ssl); c.m_client.set_tls_init_handler(bind(&OnTlsInit, ::_1)); c.run(uri, wav_list, wav_ids); } else { WebsocketClient<websocketpp::config::asio_client> c(is_ssl); c.run(uri, wav_list, wav_ids); } }); } for (auto& t : client_threads) { t.join(); } } funasr/runtime/websocket/funasr-ws-server.cpp
File was renamed from funasr/runtime/websocket/websocketmain.cpp @@ -5,12 +5,12 @@ /* 2022-2023 by zhaomingwork */ // io server // Usage:websocketmain [--model_thread_num <int>] [--decoder_thread_num <int>] // Usage:funasr-ws-server [--model_thread_num <int>] [--decoder_thread_num <int>] // [--io_thread_num <int>] [--port <int>] [--listen_ip // <string>] [--punc-quant <string>] [--punc-dir <string>] // [--vad-quant <string>] [--vad-dir <string>] [--quantize // <string>] --model-dir <string> [--] [--version] [-h] #include "websocketsrv.h" #include "websocket-server.h" using namespace std; void GetValue(TCLAP::ValueArg<std::string>& value_arg, string key, @@ -25,7 +25,7 @@ google::InitGoogleLogging(argv[0]); FLAGS_logtostderr = true; TCLAP::CmdLine cmd("websocketmain", ' ', "1.0"); TCLAP::CmdLine cmd("funasr-ws-server", ' ', "1.0"); TCLAP::ValueArg<std::string> model_dir( "", MODEL_DIR, "the asr model path, which contains model.onnx, config.yaml, am.mvn", funasr/runtime/websocket/readme.md
@@ -51,7 +51,7 @@ ```shell cd bin ./websocketmain [--model_thread_num <int>] [--decoder_thread_num <int>] ./funasr-ws-server [--model_thread_num <int>] [--decoder_thread_num <int>] [--io_thread_num <int>] [--port <int>] [--listen_ip <string>] [--punc-quant <string>] [--punc-dir <string>] [--vad-quant <string>] [--vad-dir <string>] [--quantize @@ -88,19 +88,38 @@ If use vad, please add: --vad-dir <string> If use punc, please add: --punc-dir <string> example: websocketmain --model-dir /FunASR/funasr/runtime/onnxruntime/export/damo/speech_paraformer-large_asr_nat-zh-cn-16k-common-vocab8404-pytorch funasr-ws-server --model-dir /FunASR/funasr/runtime/onnxruntime/export/damo/speech_paraformer-large_asr_nat-zh-cn-16k-common-vocab8404-pytorch ``` ## Run websocket client test ```shell Usage: ./websocketclient server_ip port wav_path threads_num is_ssl ./funasr-ws-client --server-ip <string> --port <string> --wav-path <string> [--thread-num <int>] [--is-ssl <int>] [--] [--version] [-h] is_ssl is 1 means use wss connection, or use ws connection Where: --server-ip <string> (required) server-ip --port <string> (required) port --wav-path <string> (required) the input could be: wav_path, e.g.: asr_example.wav; pcm_path, e.g.: asr_example.pcm; wav.scp, kaldi style wav list (wav_id \t wav_path) --thread-num <int> thread-num --is-ssl <int> is-ssl is 1 means use wss connection, or use ws connection example: websocketclient 127.0.0.1 8889 funasr/runtime/websocket/test.pcm.wav 64 0 ./funasr-ws-client --server-ip 127.0.0.1 --port 8889 --wav-path test.wav --thread-num 1 --is-ssl 0 result json, example like: {"mode":"offline","text":"欢迎大家来体验达摩院推出的语音识别模型","wav_name":"wav2"} funasr/runtime/websocket/websocket-server.cpp
File was renamed from funasr/runtime/websocket/websocketsrv.cpp @@ -10,7 +10,7 @@ // pools, one for handle network data and one for asr decoder. // now only support offline engine. #include "websocketsrv.h" #include "websocket-server.h" #include <thread> #include <utility> @@ -22,12 +22,11 @@ std::string& s_keyfile) { namespace asio = websocketpp::lib::asio; std::cout << "on_tls_init called with hdl: " << hdl.lock().get() << std::endl; std::cout << "using TLS mode: " LOG(INFO) << "on_tls_init called with hdl: " << hdl.lock().get(); LOG(INFO) << "using TLS mode: " << (mode == MOZILLA_MODERN ? "Mozilla Modern" : "Mozilla Intermediate") << std::endl; : "Mozilla Intermediate"); context_ptr ctx = websocketpp::lib::make_shared<asio::ssl::context>( asio::ssl::context::sslv23); @@ -49,7 +48,7 @@ ctx->use_private_key_file(s_keyfile, asio::ssl::context::pem); } catch (std::exception& e) { std::cout << "Exception: " << e.what() << std::endl; LOG(INFO) << "Exception: " << e.what(); } return ctx; } @@ -86,8 +85,7 @@ ec); } std::cout << "buffer.size=" << buffer.size() << ",result json=" << jsonresult.dump() << std::endl; LOG(INFO) << "buffer.size=" << buffer.size() << ",result json=" << jsonresult.dump(); if (!isonline) { // close the client if it is not online asr // server_->close(hdl, websocketpp::close::status::normal, "DONE", ec); @@ -110,14 +108,14 @@ data_msg->samples = std::make_shared<std::vector<char>>(); data_msg->msg = nlohmann::json::parse("{}"); data_map.emplace(hdl, data_msg); std::cout << "on_open, active connections: " << data_map.size() << std::endl; LOG(INFO) << "on_open, active connections: " << data_map.size(); } void WebSocketServer::on_close(websocketpp::connection_hdl hdl) { scoped_lock guard(m_lock); data_map.erase(hdl); // remove data vector when connection is closed std::cout << "on_close, active connections: " << data_map.size() << std::endl; LOG(INFO) << "on_close, active connections: " << data_map.size(); } // remove closed connection @@ -143,7 +141,7 @@ } for (auto hdl : to_remove) { data_map.erase(hdl); std::cout << "remove one connection " << std::endl; LOG(INFO)<< "remove one connection "; } } void WebSocketServer::on_message(websocketpp::connection_hdl hdl, @@ -161,7 +159,7 @@ lock.unlock(); if (sample_data_p == nullptr) { std::cout << "error when fetch sample data vector" << std::endl; LOG(INFO) << "error when fetch sample data vector"; return; } @@ -176,7 +174,7 @@ if (jsonresult["is_speaking"] == false || jsonresult["is_finished"] == true) { std::cout << "client done" << std::endl; LOG(INFO) << "client done"; if (isonline) { // do_close(ws); @@ -225,9 +223,9 @@ // init model with api asr_hanlde = FunOfflineInit(model_path, thread_num); std::cout << "model ready" << std::endl; LOG(INFO) << "model successfully inited"; } catch (const std::exception& e) { std::cout << e.what() << std::endl; LOG(INFO) << e.what(); } } funasr/runtime/websocket/websocket-server.h
File was renamed from funasr/runtime/websocket/websocketsrv.h @@ -10,8 +10,8 @@ // pools, one for handle network data and one for asr decoder. // now only support offline engine. #ifndef WEBSOCKETSRV_SERVER_H_ #define WEBSOCKETSRV_SERVER_H_ #ifndef WEBSOCKET_SERVER_H_ #define WEBSOCKET_SERVER_H_ #include <iostream> #include <map> @@ -134,4 +134,4 @@ websocketpp::lib::mutex m_lock; // mutex for sample_map }; #endif // WEBSOCKETSRV_SERVER_H_ #endif // WEBSOCKET_SERVER_H_ funasr/runtime/websocket/websocketclient.cpp
File was deleted tests/test_asr_inference_pipeline.py
@@ -87,6 +87,7 @@ rec_result = inference_pipeline( audio_in='https://isv-data.oss-cn-hangzhou.aliyuncs.com/ics/MaaS/ASR/test_audio/asr_example_hotword.wav') logger.info("asr inference result: {0}".format(rec_result)) assert rec_result["text"] == "国务院发展研究中心市场经济研究所副所长邓郁松认为" def test_paraformer_large_aishell1(self): inference_pipeline = pipeline( @@ -95,6 +96,7 @@ rec_result = inference_pipeline( audio_in='https://isv-data.oss-cn-hangzhou.aliyuncs.com/ics/MaaS/ASR/test_audio/asr_example_zh.wav') logger.info("asr inference result: {0}".format(rec_result)) assert rec_result["text"] == "欢迎大家来体验达摩院推出的语音识别模型" def test_paraformer_large_aishell2(self): inference_pipeline = pipeline( @@ -103,6 +105,7 @@ rec_result = inference_pipeline( audio_in='https://isv-data.oss-cn-hangzhou.aliyuncs.com/ics/MaaS/ASR/test_audio/asr_example_zh.wav') logger.info("asr inference result: {0}".format(rec_result)) assert rec_result["text"] == "欢迎大家来体验达摩院推出的语音识别模型" def test_paraformer_large_common(self): inference_pipeline = pipeline( @@ -111,6 +114,7 @@ rec_result = inference_pipeline( audio_in='https://isv-data.oss-cn-hangzhou.aliyuncs.com/ics/MaaS/ASR/test_audio/asr_example_zh.wav') logger.info("asr inference result: {0}".format(rec_result)) assert rec_result["text"] == "欢迎大家来体验达摩院推出的语音识别模型" def test_paraformer_large_online_common(self): inference_pipeline = pipeline( @@ -119,6 +123,7 @@ rec_result = inference_pipeline( audio_in='https://isv-data.oss-cn-hangzhou.aliyuncs.com/ics/MaaS/ASR/test_audio/asr_example_zh.wav') logger.info("asr inference result: {0}".format(rec_result)) assert rec_result["text"] == "欢迎大 家来 体验达 摩院推 出的 语音识 别模 型" def test_paraformer_online_common(self): inference_pipeline = pipeline( @@ -127,6 +132,7 @@ rec_result = inference_pipeline( audio_in='https://isv-data.oss-cn-hangzhou.aliyuncs.com/ics/MaaS/ASR/test_audio/asr_example_zh.wav') logger.info("asr inference result: {0}".format(rec_result)) assert rec_result["text"] == "欢迎 大家来 体验达 摩院推 出的 语音识 别模 型" def test_paraformer_tiny_commandword(self): inference_pipeline = pipeline( tests/test_asr_vad_punc_inference_pipeline.py
@@ -26,6 +26,7 @@ rec_result = inference_pipeline( audio_in='https://isv-data.oss-cn-hangzhou.aliyuncs.com/ics/MaaS/ASR/test_audio/asr_example_zh.wav') logger.info("asr_vad_punc inference result: {0}".format(rec_result)) assert rec_result["text"] == "欢迎大家来体验达摩院推出的语音识别模型。" if __name__ == '__main__':