From ae013cf597db1c523c9fac21b7e83db62304ae2d Mon Sep 17 00:00:00 2001
From: zhaomingwork <61895407+zhaomingwork@users.noreply.github.com>
Date: 星期四, 08 五月 2025 23:52:09 +0800
Subject: [PATCH] fix bug for core dump in http, use libboost as parse (#2509)
---
runtime/http/bin/connection.cpp | 408 +++++++++++++++++++++++++++++++++++++--------------------
1 files changed, 263 insertions(+), 145 deletions(-)
diff --git a/runtime/http/bin/connection.cpp b/runtime/http/bin/connection.cpp
index b75fc77..e2633e3 100644
--- a/runtime/http/bin/connection.cpp
+++ b/runtime/http/bin/connection.cpp
@@ -2,7 +2,7 @@
* Copyright FunASR (https://github.com/alibaba-damo-academy/FunASR). All Rights
* Reserved. MIT License (https://opensource.org/licenses/MIT)
*/
-/* 2023-2024 by zhaomingwork@qq.com */
+/* 2023-2025 by zhaomingwork@qq.com */
//
// connection.cpp
// copy some codes from http://www.boost.org/
@@ -10,28 +10,35 @@
#include <thread>
#include <utility>
+#include "util.hpp"
-namespace http {
-namespace server2 {
-//std::ofstream fwout("out.data", std::ios::binary);
-std::shared_ptr<FUNASR_MESSAGE> &connection::get_data_msg() { return data_msg; }
-connection::connection(asio::ip::tcp::socket socket,
- asio::io_context &io_decoder, int connection_id,
- std::shared_ptr<ModelDecoder> model_decoder)
- : socket_(std::move(socket)),
- io_decoder(io_decoder),
- connection_id(connection_id),
- model_decoder(model_decoder)
+namespace http
{
- s_timer = std::make_shared<asio::steady_timer>(io_decoder);
-}
+ namespace server2
+ {
+
+ std::shared_ptr<FUNASR_MESSAGE> &connection::get_data_msg() { return data_msg; }
+ connection::connection(asio::ip::tcp::socket socket,
+ asio::io_context &io_decoder, int connection_id,
+ std::shared_ptr<ModelDecoder> model_decoder)
+ : socket_(std::move(socket)),
+ io_decoder(io_decoder),
+ connection_id(connection_id),
+ model_decoder(model_decoder)
-void connection::setup_timer() {
- if (data_msg->status == 1) return;
+ {
+ s_timer = std::make_shared<asio::steady_timer>(io_decoder);
+ }
- s_timer->expires_after(std::chrono::seconds(3));
- s_timer->async_wait([=](const asio::error_code &ec) {
+ void connection::setup_timer()
+ {
+ if (data_msg->status == 1)
+ return;
+
+ s_timer->expires_after(std::chrono::seconds(10));
+ s_timer->async_wait([=](const asio::error_code &ec)
+ {
if (!ec) {
std::cout << "time is out!" << std::endl;
if (data_msg->status == 1) return;
@@ -40,157 +47,268 @@
auto wf = std::bind(&connection::write_back, std::ref(*this), "");
// close the connection
strand_->post(wf);
+ } });
}
- });
-}
-void connection::start() {
- std::lock_guard<std::mutex> lock(m_lock); // for threads safty
- try {
-
- data_msg = std::make_shared<FUNASR_MESSAGE>(); // put a new data vector for
- // new connection
- data_msg->samples = std::make_shared<std::vector<char>>();
- //data_msg->samples->reserve(16000*20);
- data_msg->msg = nlohmann::json::parse("{}");
- data_msg->msg["wav_format"] = "pcm";
- data_msg->msg["wav_name"] = "wav-default-id";
- data_msg->msg["itn"] = true;
- data_msg->msg["audio_fs"] = 16000; // default is 16k
- data_msg->msg["access_num"] = 0; // the number of access for this object,
- // when it is 0, we can free it saftly
- data_msg->msg["is_eof"] = false;
- data_msg->status = 0;
+ void connection::start()
+ {
+ std::lock_guard<std::mutex> lock(m_lock); // for threads safty
+ try
+ {
- strand_ = std::make_shared<asio::io_context::strand>(io_decoder);
+ data_msg = std::make_shared<FUNASR_MESSAGE>(); // put a new data vector for
+ // new connection
+ data_msg->samples = std::make_shared<std::vector<char>>();
+ // data_msg->samples->reserve(16000*20);
+ data_msg->msg = nlohmann::json::parse("{}");
+ data_msg->msg["wav_format"] = "pcm";
+ data_msg->msg["wav_name"] = "wav-default-id";
+ data_msg->msg["itn"] = true;
+ data_msg->msg["audio_fs"] = 16000; // default is 16k
+ data_msg->msg["access_num"] = 0; // the number of access for this object,
+ // when it is 0, we can free it saftly
+ data_msg->msg["is_eof"] = false;
+ data_msg->status = 0;
- FUNASR_DEC_HANDLE decoder_handle = FunASRWfstDecoderInit(
- model_decoder->get_asr_handle(), ASR_OFFLINE, global_beam_, lattice_beam_, am_scale_);
+ strand_ = std::make_shared<asio::io_context::strand>(io_decoder);
- data_msg->decoder_handle = decoder_handle;
+ FUNASR_DEC_HANDLE decoder_handle = FunASRWfstDecoderInit(
+ model_decoder->get_asr_handle(), ASR_OFFLINE, global_beam_, lattice_beam_, am_scale_);
- if (data_msg->hotwords_embedding == nullptr) {
- std::unordered_map<std::string, int> merged_hws_map;
- std::string nn_hotwords = "";
+ data_msg->decoder_handle = decoder_handle;
- if (true) {
- std::string json_string = "{}";
- if (!json_string.empty()) {
- nlohmann::json json_fst_hws;
- try {
- json_fst_hws = nlohmann::json::parse(json_string);
- if (json_fst_hws.type() == nlohmann::json::value_t::object) {
- // fst
- try {
- std::unordered_map<std::string, int> client_hws_map =
- json_fst_hws;
- merged_hws_map.insert(client_hws_map.begin(),
- client_hws_map.end());
- } catch (const std::exception &e) {
+ if (data_msg->hotwords_embedding == nullptr)
+ {
+ std::unordered_map<std::string, int> merged_hws_map;
+ std::string nn_hotwords = "";
+
+ if (true)
+ {
+ std::string json_string = "{}";
+ if (!json_string.empty())
+ {
+ nlohmann::json json_fst_hws;
+ try
+ {
+ json_fst_hws = nlohmann::json::parse(json_string);
+ if (json_fst_hws.type() == nlohmann::json::value_t::object)
+ {
+ // fst
+ try
+ {
+ std::unordered_map<std::string, int> client_hws_map =
+ json_fst_hws;
+ merged_hws_map.insert(client_hws_map.begin(),
+ client_hws_map.end());
+ }
+ catch (const std::exception &e)
+ {
+ std::cout << e.what();
+ }
+ }
+ }
+ catch (std::exception const &e)
+ {
std::cout << e.what();
+ // nn
+ std::string client_nn_hws = "{}";
+ nn_hotwords += " " + client_nn_hws;
+ std::cout << "nn hotwords: " << client_nn_hws;
}
}
- } catch (std::exception const &e) {
- std::cout << e.what();
- // nn
- std::string client_nn_hws = "{}";
- nn_hotwords += " " + client_nn_hws;
- std::cout << "nn hotwords: " << client_nn_hws;
}
+ merged_hws_map.insert(hws_map_.begin(), hws_map_.end());
+
+ // fst
+ std::cout << "hotwords: ";
+ for (const auto &pair : merged_hws_map)
+ {
+ nn_hotwords += " " + pair.first;
+ std::cout << pair.first << " : " << pair.second;
+ }
+ FunWfstDecoderLoadHwsRes(data_msg->decoder_handle, fst_inc_wts_,
+ merged_hws_map);
+
+ // nn
+ std::vector<std::vector<float>> new_hotwords_embedding =
+ CompileHotwordEmbedding(model_decoder->get_asr_handle(), nn_hotwords);
+ data_msg->hotwords_embedding =
+ std::make_shared<std::vector<std::vector<float>>>(
+ new_hotwords_embedding);
}
- }
- merged_hws_map.insert(hws_map_.begin(), hws_map_.end());
- // fst
- std::cout << "hotwords: ";
- for (const auto &pair : merged_hws_map) {
- nn_hotwords += " " + pair.first;
- std::cout << pair.first << " : " << pair.second;
+
+ do_read();
}
- FunWfstDecoderLoadHwsRes(data_msg->decoder_handle, fst_inc_wts_,
- merged_hws_map);
-
- // nn
- std::vector<std::vector<float>> new_hotwords_embedding =
- CompileHotwordEmbedding(model_decoder->get_asr_handle(), nn_hotwords);
- data_msg->hotwords_embedding =
- std::make_shared<std::vector<std::vector<float>>>(
- new_hotwords_embedding);
+ catch (const std::exception &e)
+ {
+ std::cout << "error:" << e.what();
+ }
}
- file_parse = std::make_shared<http::server2::file_parser>(data_msg);
- do_read();
- } catch (const std::exception &e) {
- std::cout << "error:" << e.what();
- }
-}
+ void connection::write_back(std::string str)
+ {
+ s_timer->cancel();
+ reply_ = reply::stock_reply(
+ data_msg->msg["asr_result"].dump()); // reply::stock_reply();
+ do_write();
+ }
+ void connection::do_read()
+ {
-void connection::write_back(std::string str) {
-
- s_timer->cancel();
- std::cout << "jsonresult=" << data_msg->msg["asr_result"].dump() << std::endl;
- reply_ = reply::stock_reply(
- data_msg->msg["asr_result"].dump()); // reply::stock_reply();
- do_write();
-}
-void connection::do_read() {
- // status==1 means time out
- if (data_msg->status == 1) return;
-
+ if (data_msg->status == 1)
+ return;
- s_timer->cancel();
- setup_timer();
- auto self(shared_from_this());
- socket_.async_read_some(
- asio::buffer(buffer_),
- [this, self](asio::error_code ec, std::size_t bytes_transferred) {
- if (!ec) {
- auto is = std::begin(buffer_);
- auto ie = std::next(is, bytes_transferred);
+ s_timer->cancel();
+ setup_timer();
+ auto self(shared_from_this());
+ socket_.async_read_some(
+ asio::buffer(buffer_),
+ [this, self](asio::error_code ec, std::size_t bytes_transferred)
+ {
+ if (ec)
+ {
+ handle_error(ec);
+ return;
+ }
- http::server2::file_parser::result_type rtype =
- file_parse->parse_file(is, ie);
- if (rtype == http::server2::file_parser::result_type::ok) {
+ // 灏嗘柊鏁版嵁杩藉姞鍒扮疮绉紦鍐插尯
+ received_data_.append(buffer_.data(), bytes_transferred);
+ switch (state_)
+ {
+ case State::ReadingHeaders:
- //fwout.write(data_msg->samples->data(),data_msg->samples->size());
- //fwout.flush();
- auto wf = std::bind(&connection::write_back, std::ref(*this), "aa");
- auto f = std::bind(&ModelDecoder::do_decoder,
- std::ref(*model_decoder), std::ref(data_msg));
+ if (try_parse_headers())
+ {
+ if (state_ == State::SendingContinue)
+ {
- // for decode task
- strand_->post(f);
- // for close task
- strand_->post(wf);
-
- // std::this_thread::sleep_for(std::chrono::milliseconds(1000*10));
- }
+ handle_100_continue();
+ }
+ else
+ {
- do_read();
- }
- });
-}
+ handle_body();
+ }
+ }
+ else
+ {
-void connection::do_write() {
- auto self(shared_from_this());
- asio::async_write(socket_, reply_.to_buffers(),
- [this, self](asio::error_code ec, std::size_t) {
- if (!ec) {
- // Initiate graceful connection closure.
- asio::error_code ignored_ec;
- socket_.shutdown(asio::ip::tcp::socket::shutdown_both,
- ignored_ec);
- }
+ do_read();
+ }
+ break;
- // No new asynchronous operations are started. This means
- // that all shared_ptr references to the connection object
- // will disappear and the object will be destroyed
- // automatically after this handler returns. The
- // connection class's destructor closes the socket.
- });
-}
+ case State::ReadingBody:
+
+ handle_body();
+ break;
-} // namespace server2
-} // namespace http
+ case State::SendingContinue:
+
+ break; // 绛夊緟100 Continue鍙戦�佸畬鎴�
+ }
+ });
+ }
+
+ std::string connection::parse_attachment_filename(const std::string &header)
+ {
+
+ size_t pos = header.find("Content-Disposition: ");
+ if (pos == std::string::npos)
+ return "";
+
+ pos += 21; // "Content-Disposition: "闀垮害
+ size_t end = header.find("\r\n", pos);
+ if (end == std::string::npos)
+ return "";
+
+ // 璋冪敤瑙f瀽鍑芥暟
+ return parse_attachment_filename_impl(header.substr(pos, end - pos));
+ }
+
+ void connection::handle_body()
+ {
+
+
+ process_multipart_data();
+ if (in_file_part_ == false)
+ {
+ std::cout << "鏂囦欢鑾峰彇缁撴潫" << std::endl;
+ std::cout << "寮�濮嬭В鐮侊紝鏁版嵁澶у皬= " << data_msg->samples->size() << std::endl;
+
+ auto close_thread = std::bind(&connection::write_back, std::ref(*this), "close");
+ auto decoder_thread = std::bind(&ModelDecoder::do_decoder,
+ std::ref(*model_decoder), std::ref(data_msg));
+
+ // for decode task
+ strand_->post(decoder_thread);
+ // for close task
+ strand_->post(close_thread);
+ data_msg->sem_resultok.acquire();
+ std::cout << "瑙g爜绾跨▼鎻愪氦缁撴潫锛侊紒锛侊紒 " << std::endl;
+ }
+ else
+ do_read();
+ return;
+
+ }
+
+ // 杈呭姪鍑芥暟锛氳В鏋� Content-Length
+ size_t connection::parse_content_length(const std::string &header)
+ {
+ size_t pos = header.find("Content-Length: ");
+ if (pos == std::string::npos)
+ return 0;
+
+ pos += 16; // "Content-Length: "闀垮害
+ size_t end = header.find("\r\n", pos);
+ if (end == std::string::npos)
+ return 0;
+
+ try
+ {
+ return std::stoul(header.substr(pos, end - pos));
+ }
+ catch (...)
+ {
+ return 0;
+ }
+ }
+
+ void connection::handle_error(asio::error_code ec)
+ {
+ if (ec == asio::error::eof)
+ {
+ std::cout << "Connection closed gracefully\n";
+ }
+ else
+ {
+ std::cerr << "Error: " << ec.message() << "\n";
+ }
+ }
+ void connection::do_write()
+ {
+ auto self(shared_from_this());
+ asio::async_write(socket_, reply_.to_buffers(),
+ [this, self](asio::error_code ec, std::size_t)
+ {
+ if (!ec)
+ {
+ // Initiate graceful connection closure.
+ asio::error_code ignored_ec;
+ socket_.shutdown(asio::ip::tcp::socket::shutdown_both,
+ ignored_ec);
+ }
+ data_msg->sem_resultok.release();
+ // No new asynchronous operations are started. This means
+ // that all shared_ptr references to the connection object
+ // will disappear and the object will be destroyed
+ // automatically after this handler returns. The
+ // connection class's destructor closes the socket.
+ });
+ }
+
+ } // namespace server2
+} // namespace http
--
Gitblit v1.9.1