From ae013cf597db1c523c9fac21b7e83db62304ae2d Mon Sep 17 00:00:00 2001
From: zhaomingwork <61895407+zhaomingwork@users.noreply.github.com>
Date: 星期四, 08 五月 2025 23:52:09 +0800
Subject: [PATCH] fix bug for core dump in http, use libboost as parse (#2509)
---
runtime/http/bin/connection.hpp | 412 ++++++++++++++++--
/dev/null | 234 -----------
runtime/http/bin/asr_sessions.h | 36 +
runtime/http/readme.md | 14
runtime/http/readme_zh.md | 13
runtime/http/bin/util.hpp | 95 ++++
runtime/http/bin/connection.cpp | 408 ++++++++++++------
7 files changed, 751 insertions(+), 461 deletions(-)
diff --git a/runtime/http/bin/asr_sessions.h b/runtime/http/bin/asr_sessions.h
index 0e1c54d..cbbf9c1 100644
--- a/runtime/http/bin/asr_sessions.h
+++ b/runtime/http/bin/asr_sessions.h
@@ -2,13 +2,41 @@
* Copyright FunASR (https://github.com/alibaba-damo-academy/FunASR). All Rights
* Reserved. MIT License (https://opensource.org/licenses/MIT)
*/
-/* 2023-2024 by zhaomingwork@qq.com */
+/* 2023-2025 by zhaomingwork@qq.com */
// FUNASR_MESSAGE define the needed message between funasr engine and http server
#ifndef HTTP_SERVER2_SESSIONS_HPP
#define HTTP_SERVER2_SESSIONS_HPP
#include "funasrruntime.h"
#include "nlohmann/json.hpp"
+#include <iostream>
+#include <thread>
#include <atomic>
+#include <mutex>
+#include <condition_variable>
+
+class Semaphore {
+public:
+ explicit Semaphore(int count = 0) : count_(count) {}
+
+
+ void acquire() {
+ std::unique_lock<std::mutex> lock(mutex_);
+ cv_.wait(lock, [this]() { return count_ > 0; });
+ --count_;
+ }
+
+
+ void release() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ ++count_;
+ cv_.notify_one();
+ }
+
+private:
+ int count_;
+ std::mutex mutex_;
+ std::condition_variable cv_;
+};
typedef struct {
nlohmann::json msg;
std::shared_ptr<std::vector<char>> samples;
@@ -16,5 +44,11 @@
FUNASR_DEC_HANDLE decoder_handle=nullptr;
std::atomic<int> status;
+ //std::counting_semaphore<3> sem(0);
+ Semaphore sem_resultok;
} FUNASR_MESSAGE;
+
+
+
+
#endif // HTTP_SERVER2_REQUEST_PARSER_HPP
\ No newline at end of file
diff --git a/runtime/http/bin/connection.cpp b/runtime/http/bin/connection.cpp
index b75fc77..e2633e3 100644
--- a/runtime/http/bin/connection.cpp
+++ b/runtime/http/bin/connection.cpp
@@ -2,7 +2,7 @@
* Copyright FunASR (https://github.com/alibaba-damo-academy/FunASR). All Rights
* Reserved. MIT License (https://opensource.org/licenses/MIT)
*/
-/* 2023-2024 by zhaomingwork@qq.com */
+/* 2023-2025 by zhaomingwork@qq.com */
//
// connection.cpp
// copy some codes from http://www.boost.org/
@@ -10,28 +10,35 @@
#include <thread>
#include <utility>
+#include "util.hpp"
-namespace http {
-namespace server2 {
-//std::ofstream fwout("out.data", std::ios::binary);
-std::shared_ptr<FUNASR_MESSAGE> &connection::get_data_msg() { return data_msg; }
-connection::connection(asio::ip::tcp::socket socket,
- asio::io_context &io_decoder, int connection_id,
- std::shared_ptr<ModelDecoder> model_decoder)
- : socket_(std::move(socket)),
- io_decoder(io_decoder),
- connection_id(connection_id),
- model_decoder(model_decoder)
+namespace http
{
- s_timer = std::make_shared<asio::steady_timer>(io_decoder);
-}
+ namespace server2
+ {
+
+ std::shared_ptr<FUNASR_MESSAGE> &connection::get_data_msg() { return data_msg; }
+ connection::connection(asio::ip::tcp::socket socket,
+ asio::io_context &io_decoder, int connection_id,
+ std::shared_ptr<ModelDecoder> model_decoder)
+ : socket_(std::move(socket)),
+ io_decoder(io_decoder),
+ connection_id(connection_id),
+ model_decoder(model_decoder)
-void connection::setup_timer() {
- if (data_msg->status == 1) return;
+ {
+ s_timer = std::make_shared<asio::steady_timer>(io_decoder);
+ }
- s_timer->expires_after(std::chrono::seconds(3));
- s_timer->async_wait([=](const asio::error_code &ec) {
+ void connection::setup_timer()
+ {
+ if (data_msg->status == 1)
+ return;
+
+ s_timer->expires_after(std::chrono::seconds(10));
+ s_timer->async_wait([=](const asio::error_code &ec)
+ {
if (!ec) {
std::cout << "time is out!" << std::endl;
if (data_msg->status == 1) return;
@@ -40,157 +47,268 @@
auto wf = std::bind(&connection::write_back, std::ref(*this), "");
// close the connection
strand_->post(wf);
+ } });
}
- });
-}
-void connection::start() {
- std::lock_guard<std::mutex> lock(m_lock); // for threads safty
- try {
-
- data_msg = std::make_shared<FUNASR_MESSAGE>(); // put a new data vector for
- // new connection
- data_msg->samples = std::make_shared<std::vector<char>>();
- //data_msg->samples->reserve(16000*20);
- data_msg->msg = nlohmann::json::parse("{}");
- data_msg->msg["wav_format"] = "pcm";
- data_msg->msg["wav_name"] = "wav-default-id";
- data_msg->msg["itn"] = true;
- data_msg->msg["audio_fs"] = 16000; // default is 16k
- data_msg->msg["access_num"] = 0; // the number of access for this object,
- // when it is 0, we can free it saftly
- data_msg->msg["is_eof"] = false;
- data_msg->status = 0;
+ void connection::start()
+ {
+ std::lock_guard<std::mutex> lock(m_lock); // for threads safty
+ try
+ {
- strand_ = std::make_shared<asio::io_context::strand>(io_decoder);
+ data_msg = std::make_shared<FUNASR_MESSAGE>(); // put a new data vector for
+ // new connection
+ data_msg->samples = std::make_shared<std::vector<char>>();
+ // data_msg->samples->reserve(16000*20);
+ data_msg->msg = nlohmann::json::parse("{}");
+ data_msg->msg["wav_format"] = "pcm";
+ data_msg->msg["wav_name"] = "wav-default-id";
+ data_msg->msg["itn"] = true;
+ data_msg->msg["audio_fs"] = 16000; // default is 16k
+ data_msg->msg["access_num"] = 0; // the number of access for this object,
+ // when it is 0, we can free it saftly
+ data_msg->msg["is_eof"] = false;
+ data_msg->status = 0;
- FUNASR_DEC_HANDLE decoder_handle = FunASRWfstDecoderInit(
- model_decoder->get_asr_handle(), ASR_OFFLINE, global_beam_, lattice_beam_, am_scale_);
+ strand_ = std::make_shared<asio::io_context::strand>(io_decoder);
- data_msg->decoder_handle = decoder_handle;
+ FUNASR_DEC_HANDLE decoder_handle = FunASRWfstDecoderInit(
+ model_decoder->get_asr_handle(), ASR_OFFLINE, global_beam_, lattice_beam_, am_scale_);
- if (data_msg->hotwords_embedding == nullptr) {
- std::unordered_map<std::string, int> merged_hws_map;
- std::string nn_hotwords = "";
+ data_msg->decoder_handle = decoder_handle;
- if (true) {
- std::string json_string = "{}";
- if (!json_string.empty()) {
- nlohmann::json json_fst_hws;
- try {
- json_fst_hws = nlohmann::json::parse(json_string);
- if (json_fst_hws.type() == nlohmann::json::value_t::object) {
- // fst
- try {
- std::unordered_map<std::string, int> client_hws_map =
- json_fst_hws;
- merged_hws_map.insert(client_hws_map.begin(),
- client_hws_map.end());
- } catch (const std::exception &e) {
+ if (data_msg->hotwords_embedding == nullptr)
+ {
+ std::unordered_map<std::string, int> merged_hws_map;
+ std::string nn_hotwords = "";
+
+ if (true)
+ {
+ std::string json_string = "{}";
+ if (!json_string.empty())
+ {
+ nlohmann::json json_fst_hws;
+ try
+ {
+ json_fst_hws = nlohmann::json::parse(json_string);
+ if (json_fst_hws.type() == nlohmann::json::value_t::object)
+ {
+ // fst
+ try
+ {
+ std::unordered_map<std::string, int> client_hws_map =
+ json_fst_hws;
+ merged_hws_map.insert(client_hws_map.begin(),
+ client_hws_map.end());
+ }
+ catch (const std::exception &e)
+ {
+ std::cout << e.what();
+ }
+ }
+ }
+ catch (std::exception const &e)
+ {
std::cout << e.what();
+ // nn
+ std::string client_nn_hws = "{}";
+ nn_hotwords += " " + client_nn_hws;
+ std::cout << "nn hotwords: " << client_nn_hws;
}
}
- } catch (std::exception const &e) {
- std::cout << e.what();
- // nn
- std::string client_nn_hws = "{}";
- nn_hotwords += " " + client_nn_hws;
- std::cout << "nn hotwords: " << client_nn_hws;
}
+ merged_hws_map.insert(hws_map_.begin(), hws_map_.end());
+
+ // fst
+ std::cout << "hotwords: ";
+ for (const auto &pair : merged_hws_map)
+ {
+ nn_hotwords += " " + pair.first;
+ std::cout << pair.first << " : " << pair.second;
+ }
+ FunWfstDecoderLoadHwsRes(data_msg->decoder_handle, fst_inc_wts_,
+ merged_hws_map);
+
+ // nn
+ std::vector<std::vector<float>> new_hotwords_embedding =
+ CompileHotwordEmbedding(model_decoder->get_asr_handle(), nn_hotwords);
+ data_msg->hotwords_embedding =
+ std::make_shared<std::vector<std::vector<float>>>(
+ new_hotwords_embedding);
}
- }
- merged_hws_map.insert(hws_map_.begin(), hws_map_.end());
- // fst
- std::cout << "hotwords: ";
- for (const auto &pair : merged_hws_map) {
- nn_hotwords += " " + pair.first;
- std::cout << pair.first << " : " << pair.second;
+
+ do_read();
}
- FunWfstDecoderLoadHwsRes(data_msg->decoder_handle, fst_inc_wts_,
- merged_hws_map);
-
- // nn
- std::vector<std::vector<float>> new_hotwords_embedding =
- CompileHotwordEmbedding(model_decoder->get_asr_handle(), nn_hotwords);
- data_msg->hotwords_embedding =
- std::make_shared<std::vector<std::vector<float>>>(
- new_hotwords_embedding);
+ catch (const std::exception &e)
+ {
+ std::cout << "error:" << e.what();
+ }
}
- file_parse = std::make_shared<http::server2::file_parser>(data_msg);
- do_read();
- } catch (const std::exception &e) {
- std::cout << "error:" << e.what();
- }
-}
+ void connection::write_back(std::string str)
+ {
+ s_timer->cancel();
+ reply_ = reply::stock_reply(
+ data_msg->msg["asr_result"].dump()); // reply::stock_reply();
+ do_write();
+ }
+ void connection::do_read()
+ {
-void connection::write_back(std::string str) {
-
- s_timer->cancel();
- std::cout << "jsonresult=" << data_msg->msg["asr_result"].dump() << std::endl;
- reply_ = reply::stock_reply(
- data_msg->msg["asr_result"].dump()); // reply::stock_reply();
- do_write();
-}
-void connection::do_read() {
- // status==1 means time out
- if (data_msg->status == 1) return;
-
+ if (data_msg->status == 1)
+ return;
- s_timer->cancel();
- setup_timer();
- auto self(shared_from_this());
- socket_.async_read_some(
- asio::buffer(buffer_),
- [this, self](asio::error_code ec, std::size_t bytes_transferred) {
- if (!ec) {
- auto is = std::begin(buffer_);
- auto ie = std::next(is, bytes_transferred);
+ s_timer->cancel();
+ setup_timer();
+ auto self(shared_from_this());
+ socket_.async_read_some(
+ asio::buffer(buffer_),
+ [this, self](asio::error_code ec, std::size_t bytes_transferred)
+ {
+ if (ec)
+ {
+ handle_error(ec);
+ return;
+ }
- http::server2::file_parser::result_type rtype =
- file_parse->parse_file(is, ie);
- if (rtype == http::server2::file_parser::result_type::ok) {
+ // 灏嗘柊鏁版嵁杩藉姞鍒扮疮绉紦鍐插尯
+ received_data_.append(buffer_.data(), bytes_transferred);
+ switch (state_)
+ {
+ case State::ReadingHeaders:
- //fwout.write(data_msg->samples->data(),data_msg->samples->size());
- //fwout.flush();
- auto wf = std::bind(&connection::write_back, std::ref(*this), "aa");
- auto f = std::bind(&ModelDecoder::do_decoder,
- std::ref(*model_decoder), std::ref(data_msg));
+ if (try_parse_headers())
+ {
+ if (state_ == State::SendingContinue)
+ {
- // for decode task
- strand_->post(f);
- // for close task
- strand_->post(wf);
-
- // std::this_thread::sleep_for(std::chrono::milliseconds(1000*10));
- }
+ handle_100_continue();
+ }
+ else
+ {
- do_read();
- }
- });
-}
+ handle_body();
+ }
+ }
+ else
+ {
-void connection::do_write() {
- auto self(shared_from_this());
- asio::async_write(socket_, reply_.to_buffers(),
- [this, self](asio::error_code ec, std::size_t) {
- if (!ec) {
- // Initiate graceful connection closure.
- asio::error_code ignored_ec;
- socket_.shutdown(asio::ip::tcp::socket::shutdown_both,
- ignored_ec);
- }
+ do_read();
+ }
+ break;
- // No new asynchronous operations are started. This means
- // that all shared_ptr references to the connection object
- // will disappear and the object will be destroyed
- // automatically after this handler returns. The
- // connection class's destructor closes the socket.
- });
-}
+ case State::ReadingBody:
+
+ handle_body();
+ break;
-} // namespace server2
-} // namespace http
+ case State::SendingContinue:
+
+ break; // 绛夊緟100 Continue鍙戦�佸畬鎴�
+ }
+ });
+ }
+
+ std::string connection::parse_attachment_filename(const std::string &header)
+ {
+
+ size_t pos = header.find("Content-Disposition: ");
+ if (pos == std::string::npos)
+ return "";
+
+ pos += 21; // "Content-Disposition: "闀垮害
+ size_t end = header.find("\r\n", pos);
+ if (end == std::string::npos)
+ return "";
+
+ // 璋冪敤瑙f瀽鍑芥暟
+ return parse_attachment_filename_impl(header.substr(pos, end - pos));
+ }
+
+ void connection::handle_body()
+ {
+
+
+ process_multipart_data();
+ if (in_file_part_ == false)
+ {
+ std::cout << "鏂囦欢鑾峰彇缁撴潫" << std::endl;
+ std::cout << "寮�濮嬭В鐮侊紝鏁版嵁澶у皬= " << data_msg->samples->size() << std::endl;
+
+ auto close_thread = std::bind(&connection::write_back, std::ref(*this), "close");
+ auto decoder_thread = std::bind(&ModelDecoder::do_decoder,
+ std::ref(*model_decoder), std::ref(data_msg));
+
+ // for decode task
+ strand_->post(decoder_thread);
+ // for close task
+ strand_->post(close_thread);
+ data_msg->sem_resultok.acquire();
+ std::cout << "瑙g爜绾跨▼鎻愪氦缁撴潫锛侊紒锛侊紒 " << std::endl;
+ }
+ else
+ do_read();
+ return;
+
+ }
+
+ // 杈呭姪鍑芥暟锛氳В鏋� Content-Length
+ size_t connection::parse_content_length(const std::string &header)
+ {
+ size_t pos = header.find("Content-Length: ");
+ if (pos == std::string::npos)
+ return 0;
+
+ pos += 16; // "Content-Length: "闀垮害
+ size_t end = header.find("\r\n", pos);
+ if (end == std::string::npos)
+ return 0;
+
+ try
+ {
+ return std::stoul(header.substr(pos, end - pos));
+ }
+ catch (...)
+ {
+ return 0;
+ }
+ }
+
+ void connection::handle_error(asio::error_code ec)
+ {
+ if (ec == asio::error::eof)
+ {
+ std::cout << "Connection closed gracefully\n";
+ }
+ else
+ {
+ std::cerr << "Error: " << ec.message() << "\n";
+ }
+ }
+ void connection::do_write()
+ {
+ auto self(shared_from_this());
+ asio::async_write(socket_, reply_.to_buffers(),
+ [this, self](asio::error_code ec, std::size_t)
+ {
+ if (!ec)
+ {
+ // Initiate graceful connection closure.
+ asio::error_code ignored_ec;
+ socket_.shutdown(asio::ip::tcp::socket::shutdown_both,
+ ignored_ec);
+ }
+ data_msg->sem_resultok.release();
+ // No new asynchronous operations are started. This means
+ // that all shared_ptr references to the connection object
+ // will disappear and the object will be destroyed
+ // automatically after this handler returns. The
+ // connection class's destructor closes the socket.
+ });
+ }
+
+ } // namespace server2
+} // namespace http
diff --git a/runtime/http/bin/connection.hpp b/runtime/http/bin/connection.hpp
index d198533..0f642f1 100644
--- a/runtime/http/bin/connection.hpp
+++ b/runtime/http/bin/connection.hpp
@@ -2,7 +2,7 @@
* Copyright FunASR (https://github.com/alibaba-damo-academy/FunASR). All Rights
* Reserved. MIT License (https://opensource.org/licenses/MIT)
*/
-/* 2023-2024 by zhaomingwork@qq.com */
+/* 2023-2025 by zhaomingwork@qq.com */
//
// copy some codes from http://www.boost.org/
//
@@ -19,86 +19,376 @@
#include "reply.hpp"
#include <fstream>
-
-#include "file_parse.hpp"
+#include <boost/beast.hpp>
+
#include "model-decoder.h"
-
+
+namespace beast = boost::beast;
+namespace beasthttp = beast::http;
extern std::unordered_map<std::string, int> hws_map_;
extern int fst_inc_wts_;
extern float global_beam_, lattice_beam_, am_scale_;
-namespace http {
-namespace server2 {
+namespace http
+{
+ namespace server2
+ {
-/// Represents a single connection from a client.
-class connection : public std::enable_shared_from_this<connection> {
- public:
- connection(const connection &) = delete;
- connection &operator=(const connection &) = delete;
- ~connection() { std::cout << "one connection is close()" << std::endl; };
+ /// Represents a single connection from a client.
+ class connection : public std::enable_shared_from_this<connection>
+ {
+ public:
+ connection(const connection &) = delete;
+ connection &operator=(const connection &) = delete;
+ ~connection()
+ {
+ std::cout << "one connection is close()" << std::endl;
+ };
- /// Construct a connection with the given socket.
- explicit connection(asio::ip::tcp::socket socket,
- asio::io_context &io_decoder, int connection_id,
- std::shared_ptr<ModelDecoder> model_decoder);
+ /// Construct a connection with the given socket.
+ explicit connection(asio::ip::tcp::socket socket,
+ asio::io_context &io_decoder, int connection_id,
+ std::shared_ptr<ModelDecoder> model_decoder);
+ /// Start the first asynchronous operation for the connection.
+ void start();
+ std::shared_ptr<FUNASR_MESSAGE> &get_data_msg();
+ void write_back(std::string str);
+
+ // 澶勭悊100 Continue閫昏緫
+ void handle_100_continue()
+ {
+ // start_timer(5); // 5绉掕秴鏃�
+
+ auto self = shared_from_this();
+ const std::string response =
+ "HTTP/1.1 100 Continue\r\n"
+ "Connection: keep-alive\r\n\r\n";
+
+ asio::async_write(socket_, asio::buffer(response),
+ [this, self](asio::error_code ec, size_t)
+ {
+ if (ec)
+ return handle_error(ec);
+
+ state_ = State::ReadingHeaders;
+
+ do_read();
+ });
+ }
+
+ // 鍑嗗鏂囦欢瀛樺偍
+ void prepare_body_handling()
+ {
+ if (!filename_.empty())
+ {
+ sanitize_filename(filename_);
+ output_file_.open(filename_, std::ios::binary);
+ if (!output_file_)
+ {
+ std::cerr << "Failed to open: " << filename_ << "\n";
+ socket_.close();
+ }
+ }
+ }
+
+ void finalize_request()
+ {
+ std::cout << "finalize_request" << std::endl;
+ send_final_response();
+ }
+
+ void send_final_response()
+ {
+ const std::string response =
+ "HTTP/1.1 200 OK\r\n"
+ "Content-Length: 0\r\n\r\n";
+ asio::write(socket_, asio::buffer(response));
+ socket_.close();
+ }
+
+ void send_417_expectation_failed()
+ {
+ const std::string response =
+ "HTTP/1.1 417 Expectation Failed\r\n"
+ "Connection: close\r\n\r\n";
+ asio::write(socket_, asio::buffer(response));
+ socket_.close();
+ }
+
+ // 瀹夊叏澶勭悊鏂囦欢鍚�
+ static void sanitize_filename(std::string &name)
+ {
+ std::replace(name.begin(), name.end(), '/', '_');
+ std::replace(name.begin(), name.end(), '\\', '_');
+ name = name.substr(name.find_last_of(":") + 1); // 绉婚櫎娼滃湪璺緞
+ }
+
+ // 鍗忚鐗堟湰瑙f瀽
+ bool parse_http_version(const std::string &headers)
+ {
+ size_t start = headers.find("HTTP/");
+ if (start == std::string::npos)
+ return false;
+
+ start += 5;
+ size_t dot = headers.find('.', start);
+ if (dot == std::string::npos)
+ return false;
+
+ try
+ {
+ http_version_major_ = std::stoi(headers.substr(start, dot - start));
+ http_version_minor_ = std::stoi(headers.substr(dot + 1, 1));
+ return true;
+ }
+ catch (...)
+ {
+ return false;
+ }
+ }
+ // 澶撮儴瑙f瀽
+ bool try_parse_headers()
+ {
+
+ size_t header_end = received_data_.find("\r\n\r\n");
+ if (header_end == std::string::npos)
+ {
+
+ return false;
+ }
+
+ std::string headers = received_data_.substr(0, header_end);
+
+ // 瑙f瀽鍐呭淇℃伅
+ if (content_length_ <= 0)
+ content_length_ = parse_content_length(headers);
+ // 瑙f瀽HTTP鐗堟湰
+ if (!parse_http_version(headers))
+ {
+ return false;
+ }
+
+ // 妫�鏌xpect澶�
+ std::string continue100 = "Expect: 100-continue";
+ size_t pos = headers.find(continue100);
+ expect_100_continue_ = pos != std::string::npos;
+
+ // 妫�鏌ュ崗璁吋瀹规��
+ if (expect_100_continue_)
+ {
+
+ headers.erase(pos, continue100.length());
+
+ received_data_ = headers;
+ state_ = State::SendingContinue;
+ if (http_version_minor_ < 1)
+ send_417_expectation_failed();
+ return true;
+ }
+
+ filename_ = parse_attachment_filename(headers);
+
+ // 鐘舵�佽浆绉�
+ std::string ext = parese_file_ext(filename_);
+
+ if (filename_.find(".wav") != std::string::npos)
+ {
+ std::cout << "set wav_format=pcm, file_name=" << filename_
+ << std::endl;
+ data_msg->msg["wav_format"] = "pcm";
+ }
+ else
+ {
+ std::cout << "set wav_format=" << ext << ", file_name=" << filename_
+ << std::endl;
+ data_msg->msg["wav_format"] = ext;
+ }
+ data_msg->msg["wav_name"] = filename_;
- /// Start the first asynchronous operation for the connection.
- void start();
- std::shared_ptr<FUNASR_MESSAGE> &get_data_msg();
- void write_back(std::string str);
+ state_ = State::ReadingBody;
+ return true;
+ }
- private:
- /// Perform an asynchronous read operation.
- void do_read();
+ void parse_multipart_boundary()
+ {
+ size_t content_type_pos = received_data_.find("Content-Type: multipart/form-data");
+ if (content_type_pos == std::string::npos)
+ return;
- /// Perform an asynchronous write operation.
- void do_write();
+ size_t boundary_pos = received_data_.find("boundary=", content_type_pos);
+ if (boundary_pos == std::string::npos)
+ return;
- void do_decoder();
+ boundary_pos += 9; // "boundary="闀垮害
+ size_t boundary_end = received_data_.find("\r\n", boundary_pos);
+ boundary_ = received_data_.substr(boundary_pos, boundary_end - boundary_pos);
- void setup_timer();
+ // 娓呯悊boundary鐨勫紩鍙�
+ if (boundary_.front() == '"' && boundary_.back() == '"')
+ {
+ boundary_ = boundary_.substr(1, boundary_.size() - 2);
+ }
+ }
+ // multipart 鏁版嵁澶勭悊鏍稿績
+ void process_multipart_data()
+ {
+ if (boundary_.empty())
+ {
+ parse_multipart_boundary();
+ if (boundary_.empty())
+ {
+ std::cerr << "Invalid multipart format\n";
+ return;
+ }
+ }
- /// Socket for the connection.
- asio::ip::tcp::socket socket_;
+ while (true)
+ {
+ if (!in_file_part_)
+ {
+
+ // 鏌ユ壘boundary璧峰
+ size_t boundary_pos = received_data_.find("--" + boundary_);
+ if (boundary_pos == std::string::npos)
+ break;
+
+ // 绉诲姩鍒皃art澶撮儴
+ size_t part_start = received_data_.find("\r\n\r\n", boundary_pos);
+ if (part_start == std::string::npos)
+ break;
+
+ part_start += 4; // 璺宠繃绌鸿
+ parse_part_headers(received_data_.substr(boundary_pos, part_start - boundary_pos));
+
+ received_data_.erase(0, part_start);
+
+ in_file_part_ = true;
+ }
+ else
+ {
+ // 鏌ユ壘boundary缁撴潫
+
+ size_t boundary_end = received_data_.find("\r\n--" + boundary_);
+
+ if (boundary_end == std::string::npos)
+
+ break;
+
+ // 鍐欏叆鍐呭
+
+ std::string tmpstr = received_data_.substr(0, boundary_end);
+ data_msg->samples->insert(data_msg->samples->end(), tmpstr.begin(), tmpstr.end());
+
+ received_data_.erase(0, boundary_end + 2); // 淇濈暀\r\n渚涗笅娆¤В鏋�
+
+ in_file_part_ = false;
+ }
+ }
+ }
+ std::string parese_file_ext(std::string file_name)
+ {
+ int pos = file_name.rfind('.');
+ std::string ext = "";
+ if (pos != std::string::npos)
+ ext = file_name.substr(pos + 1);
+
+ return ext;
+ }
+ // 瑙f瀽part澶撮儴淇℃伅
+ void parse_part_headers(const std::string &headers)
+ {
+ current_part_filename_.clear();
+ expected_part_size_ = 0;
+
+ // 瑙f瀽鏂囦欢鍚�
+ size_t filename_pos = headers.find("filename=\"");
+ if (filename_pos != std::string::npos)
+ {
+ filename_pos += 10;
+ size_t filename_end = headers.find('"', filename_pos);
+ current_part_filename_ = headers.substr(filename_pos, filename_end - filename_pos);
+ sanitize_filename(current_part_filename_);
+ }
+
+ // 瑙f瀽Content-Length
+ size_t cl_pos = headers.find("Content-Length: ");
+ if (cl_pos != std::string::npos)
+ {
+ cl_pos += 15;
+ size_t cl_end = headers.find("\r\n", cl_pos);
+ expected_part_size_ = std::stoull(headers.substr(cl_pos, cl_end - cl_pos));
+ }
+ }
+
+ private:
+ /// Perform an asynchronous read operation.
+ void do_read();
+ void handle_body();
+ std::string parse_attachment_filename(const std::string &header);
+ size_t parse_content_length(const std::string &header);
+
+ void handle_error(asio::error_code ec);
+ /// Perform an asynchronous write operation.
+ void do_write();
+
+ void do_decoder();
+
+ void setup_timer();
+
+ /// Socket for the connection.
+ asio::ip::tcp::socket socket_;
+
+ /// Buffer for incoming data.
+ std::array<char, 8192> buffer_;
+ /// for time out
+ std::shared_ptr<asio::steady_timer> s_timer;
+
+ std::shared_ptr<ModelDecoder> model_decoder;
+
+ int connection_id = 0;
+
+ /// The reply to be sent back to the client.
+ reply reply_;
+
+ asio::io_context &io_decoder;
+
+ std::shared_ptr<FUNASR_MESSAGE> data_msg;
+
+ std::mutex m_lock;
+
+ std::shared_ptr<asio::io_context::strand> strand_;
- /// Buffer for incoming data.
- std::array<char, 8192> buffer_;
- /// for time out
- std::shared_ptr<asio::steady_timer> s_timer;
+ beasthttp::response_parser<beasthttp::string_body> parser_; // 娓愯繘寮忚В鏋愬櫒
-
+ std::string received_data_; // 绱Н鎺ユ敹鐨勬暟鎹�
+ bool header_parsed_ = false; // 澶撮儴瑙f瀽鐘舵�佹爣璁�
+ size_t content_length_ = 0; // Content-Length 鍊�
+ enum class State
+ {
+ ReadingHeaders,
+ SendingContinue,
+ ReadingBody
+ };
+ bool expect_100_continue_ = false;
+ State state_ = State::ReadingHeaders;
+ std::string filename_;
+ std::ofstream output_file_;
+ int http_version_major_ = 1;
+ int http_version_minor_ = 1;
+ std::string boundary_ = "";
+ bool in_file_part_ = false;
+ std::string current_part_filename_;
+ size_t expected_part_size_ = 0;
+ };
- std::shared_ptr<ModelDecoder> model_decoder;
+ typedef std::shared_ptr<connection> connection_ptr;
-
+ } // namespace server2
+} // namespace http
- int connection_id = 0;
-
- /// The reply to be sent back to the client.
- reply reply_;
-
- asio::io_context &io_decoder;
-
-
-
- std::shared_ptr<FUNASR_MESSAGE> data_msg;
-
- std::mutex m_lock;
-
-
- std::shared_ptr<asio::io_context::strand> strand_;
-
- std::shared_ptr<http::server2::file_parser> file_parse;
-};
-
-typedef std::shared_ptr<connection> connection_ptr;
-
-} // namespace server2
-} // namespace http
-
-#endif // HTTP_SERVER2_CONNECTION_HPP
+#endif // HTTP_SERVER2_CONNECTION_HPP
diff --git a/runtime/http/bin/file_parse.cpp b/runtime/http/bin/file_parse.cpp
deleted file mode 100644
index 48c0416..0000000
--- a/runtime/http/bin/file_parse.cpp
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Copyright FunASR (https://github.com/alibaba-damo-academy/FunASR). All Rights
- * Reserved. MIT License (https://opensource.org/licenses/MIT)
- */
-/* 2023-2024 by zhaomingwork@qq.com */
-
-
-#include "file_parse.hpp"
-
-
-
-namespace http {
-namespace server2 {
-
-
-file_parser::file_parser(std::shared_ptr<FUNASR_MESSAGE> data_msg)
-:data_msg(data_msg)
-
-{
- now_state=start;
-}
-
-
-
-
-
-
-} // namespace server2
-} // namespace http
diff --git a/runtime/http/bin/file_parse.hpp b/runtime/http/bin/file_parse.hpp
deleted file mode 100644
index 50bea83..0000000
--- a/runtime/http/bin/file_parse.hpp
+++ /dev/null
@@ -1,234 +0,0 @@
-/**
- * Copyright FunASR (https://github.com/alibaba-damo-academy/FunASR). All Rights
- * Reserved. MIT License (https://opensource.org/licenses/MIT)
- */
-/* 2023-2024 by zhaomingwork@qq.com */
-// ~~~~~~~~~~~~~~~~~~
-
-
-#ifndef HTTP_SERVER2_REQUEST_FILEPARSER_HPP
-#define HTTP_SERVER2_REQUEST_FILEPARSER_HPP
-
-#include <iostream>
-#include <memory>
-#include <tuple>
-
-#include "asr_sessions.h"
-namespace http {
-namespace server2 {
-
-/// Parser for incoming requests.
-class file_parser {
- public:
- /// Construct ready to parse the request method.
-
- explicit file_parser(std::shared_ptr<FUNASR_MESSAGE> data_msg);
-
- /// Result of parse.
- enum result_type { start, in_boundary, data, ok };
-
- template <typename InputIterator>
- void parse_one_line(InputIterator &is, InputIterator &ie, InputIterator &it) {
- if (is != it) {
- is = it;
- }
- if (*it == '\n') {
- is = std::next(is);
- }
-
- it = std::find(is, ie, '\n');
- std::string str(is, it);
-
- }
- std::string trim_name(std::string raw_string) {
- int pos = raw_string.find('\"');
-
- if (pos != std::string::npos) {
- raw_string = raw_string.substr(pos + 1);
- pos = raw_string.find('\"');
- raw_string = raw_string.substr(0, pos);
- }
- return raw_string;
- }
-
- std::string parese_file_ext(std::string file_name) {
- int pos = file_name.rfind('.');
- std::string ext = "";
- if (pos != std::string::npos) ext = file_name.substr(pos + 1);
-
- return ext;
- }
- template <typename InputIterator>
- int parse_data_content(InputIterator is, InputIterator ie, InputIterator it) {
- int len = std::distance(it + 1, ie);
- if (len <= 0) {
- return 0;
- }
- std::string str(it + 1, ie);
-
- // check if at the end, "--boundary--" need +4 for "--"
- if (len == boundary.length() + 4)
-
- {
- std::string str(it + 1, ie);
- // std::cout << "len good=" << str << std::endl;
- if (boundary.length() > 1 && boundary[boundary.length() - 1] == '\n') {
- // remove '\n' in boundary
- boundary = boundary.substr(0, boundary.length() - 2);
- }
- if (boundary.length() > 1 && boundary[boundary.length() - 1] == '\r') {
- // remove '\r' in boundary
- boundary = boundary.substr(0, boundary.length() - 2);
- }
-
- auto found_boundary = str.find(boundary);
-
- if (found_boundary == std::string::npos) {
- std::cout << "not found end boundary!=" << found_boundary << std::endl;
-
-
- return 0;
- }
- // remove the end of data that contains '\n' or '\r'
- int last_sub = 0;
- if (*(it) == '\n') {
- last_sub++;
- }
-
-
- int lasts_len = std::distance(it, ie);
-
- data_msg->samples->erase(data_msg->samples->end() - last_sub - lasts_len,
- data_msg->samples->end());
- std::cout << "one file finished, file size=" << data_msg->samples->size()
- << std::endl;
- return 1;
- }
-
- }
- template <typename InputIterator>
- void parse_boundary_content(InputIterator is, InputIterator ie,
- InputIterator it) {
- parse_one_line(is, ie, it);
- std::string str;
-
- while (it != ie) {
-
- str = std::string(is, it);
-
- auto found_content = str.find("Content-Disposition:");
- auto found_filename = str.find("filename=");
- if (found_content != std::string::npos &&
- found_filename != std::string::npos) {
- std::string file_name =
- str.substr(found_filename + 9, std::string::npos);
- file_name = trim_name(file_name);
-
- std::string ext = parese_file_ext(file_name);
-
- if (file_name.find(".wav") != std::string::npos) {
- std::cout << "set wav_format=pcm, file_name=" << file_name
- << std::endl;
- data_msg->msg["wav_format"] = "pcm";
- } else {
- std::cout << "set wav_format=" << ext << ", file_name=" << file_name
- << std::endl;
- data_msg->msg["wav_format"] = ext;
- }
- data_msg->msg["wav_name"] = file_name;
- now_state = data;
- } else {
- auto found_content = str.find("Content-Disposition:");
- auto found_name = str.find("name=");
- if (found_content != std::string::npos &&
- found_name != std::string::npos) {
- std::string name = str.substr(found_name + 5, std::string::npos);
- name = trim_name(name);
- parse_one_line(is, ie, it);
- if (*it == '\n') it++;
- parse_one_line(is, ie, it);
- str = std::string(is, it);
- std::cout << "para: name=" << name << ",value=" << str << std::endl;
- }
- }
-
- parse_one_line(is, ie, it);
- if (now_state == data && std::distance(is, it) <= 2) {
- break;
- }
-
- }
-
-
- if (now_state == data) {
- if (*it == '\n') it++;
-
- data_msg->samples->insert(data_msg->samples->end(), it,
- it + std::distance(it, ie));
- // it=ie;
- }
- }
- template <typename InputIterator>
- result_type parse_file(InputIterator is, InputIterator ie) {
-
- if (now_state == data) {
- data_msg->samples->insert(data_msg->samples->end(), is, ie);
- }
- auto it = is;
-
- while (it != ie) {
- std::string str(is, it);
-
- parse_one_line(is, ie, it);
- if (now_state == data) {
- // for data end search
-
- int ret = parse_data_content(is, ie, it);
- if (ret == 0) continue;
- return ok;
- } else {
- std::string str(is, it + 1);
-
-
- if (now_state == start) {
-
-
- auto found_boundary = str.find("Content-Length:");
- if (found_boundary != std::string::npos) {
- std::string file_len =
- str.substr(found_boundary + 15, std::string::npos);
-
- data_msg->samples->reserve(std::stoi(file_len));
-
- }
- found_boundary = str.find("boundary=");
- if (found_boundary != std::string::npos) {
- boundary = str.substr(found_boundary + 9, std::string::npos);
- now_state = in_boundary;
- }
- } else if (now_state == in_boundary) {
- // for file header
- auto found_boundary = str.find(boundary);
- if (found_boundary != std::string::npos) {
- parse_boundary_content(is, ie, it);
- }
- }
-
- }
-
-
- }
-
- return now_state;
- }
-
- private:
- std::shared_ptr<FUNASR_MESSAGE> data_msg;
- result_type now_state;
- std::string boundary = "";
-};
-
-} // namespace server2
-} // namespace http
-
-#endif // HTTP_SERVER2_REQUEST_FILEPARSER_HPP
diff --git a/runtime/http/bin/util.hpp b/runtime/http/bin/util.hpp
new file mode 100644
index 0000000..e6d6ed5
--- /dev/null
+++ b/runtime/http/bin/util.hpp
@@ -0,0 +1,95 @@
+#include <string>
+#include <vector>
+#include <sstream>
+#include <cctype>
+#include <cstdio>
+#include <algorithm>
+
+// 杈呭姪鍑芥暟锛氫慨鍓瓧绗︿覆涓ょ鐨勭┖鐧�
+std::string trim(const std::string& s) {
+ auto start = s.find_first_not_of(" \t");
+ if (start == std::string::npos) return "";
+
+ auto end = s.find_last_not_of(" \t");
+ return s.substr(start, end - start + 1);
+}
+
+// 杈呭姪鍑芥暟锛氭鏌ュ瓧绗︿覆鏄惁浠ユ煇涓墠缂�寮�澶�
+bool starts_with(const std::string& s, const std::string& prefix) {
+ return s.size() >= prefix.size() &&
+ s.compare(0, prefix.size(), prefix) == 0;
+}
+
+// 杈呭姪鍑芥暟锛氬垎鍓插瓧绗︿覆
+std::vector<std::string> split(const std::string& s, char delimiter) {
+ std::vector<std::string> tokens;
+ std::string token;
+ std::istringstream token_stream(s);
+ while (std::getline(token_stream, token, delimiter)) {
+ if (!token.empty()) {
+ tokens.push_back(token);
+ }
+ }
+ return tokens;
+}
+
+// URL 瑙g爜锛堢畝鏄撳疄鐜帮級
+std::string url_decode(const std::string& str) {
+ std::string result;
+ for (size_t i = 0; i < str.size(); ++i) {
+ if (str[i] == '%' && i + 2 < str.size()) {
+ int hex_val;
+ if (sscanf(str.substr(i + 1, 2).c_str(), "%x", &hex_val) == 1) {
+ result += static_cast<char>(hex_val);
+ i += 2;
+ } else {
+ result += str[i];
+ }
+ } else if (str[i] == '+') {
+ result += ' ';
+ } else {
+ result += str[i];
+ }
+ }
+ return result;
+}
+
+// 瑙f瀽 RFC 5987 缂栫爜锛堝 filename*=UTF-8''%C2%A3.txt锛�
+std::string decode_rfc5987(const std::string& value) {
+ size_t pos = value.find("''");
+ if (pos != std::string::npos) {
+ std::string encoded = value.substr(pos + 2);
+ return url_decode(encoded);
+ }
+ return value;
+}
+
+// 涓昏В鏋愬嚱鏁�
+std::string parse_attachment_filename_impl(const std::string& content_disp) {
+ std::vector<std::string> parts = split(content_disp, ';');
+ std::string filename;
+
+ for (auto& part : parts) {
+ std::string trimmed = trim(part);
+
+ // 浼樺厛澶勭悊 RFC 5987 缂栫爜鐨� filename*
+ if (starts_with(trimmed, "filename*=")) {
+ std::string value = trimmed.substr(10);
+ if (value.size() >= 2 && value.front() == '"' && value.back() == '"') {
+ value = value.substr(1, value.size() - 2);
+ }
+ return decode_rfc5987(value);
+ }
+
+ // 鍏舵澶勭悊鏅�� filename
+ else if (starts_with(trimmed, "filename=")) {
+ std::string value = trimmed.substr(9);
+ if (value.size() >= 2 && value.front() == '"' && value.back() == '"') {
+ value = value.substr(1, value.size() - 2);
+ }
+ filename = value;
+ }
+ }
+
+ return filename;
+}
\ No newline at end of file
diff --git a/runtime/http/readme.md b/runtime/http/readme.md
index 4e04500..edecba9 100644
--- a/runtime/http/readme.md
+++ b/runtime/http/readme.md
@@ -18,6 +18,9 @@
### Install deps
```shell
+
+# need to install boost lib
+apt install libboost-dev libboost-system-dev #ubuntu
# openblas
sudo apt-get install libopenblas-dev #ubuntu
# sudo yum -y install openblas-devel #centos
@@ -44,15 +47,6 @@
### run
```shell
-./funasr-http-server \
- --lm-dir '' \
- --itn-dir '' \
- --download-model-dir ${download_model_dir} \
- --model-dir ${model_dir} \
- --vad-dir ${vad_dir} \
- --punc-dir ${punc_dir} \
- --decoder-thread-num ${decoder_thread_num} \
- --io-thread-num ${io_thread_num} \
- --port ${port} \
+./funasr-http-server --vad-dir damo/speech_fsmn_vad_zh-cn-16k-common-onnx --model-dir damo/speech_paraformer-large_asr_nat-zh-cn-16k-common-vocab8404-onnx --punc-dir damo/punc_ct-transformer_cn-en-common-vocab471067-large-onnx --itn-dir '' --lm-dir '' --port 10001
```
diff --git a/runtime/http/readme_zh.md b/runtime/http/readme_zh.md
index c0fea8c..6b60f7d 100644
--- a/runtime/http/readme_zh.md
+++ b/runtime/http/readme_zh.md
@@ -18,6 +18,8 @@
### 瀹夎渚濊禆
```shell
+# need to install boost lib
+apt install libboost-dev libboost-system-dev #ubuntu
# openblas
sudo apt-get install libopenblas-dev #ubuntu
# sudo yum -y install openblas-devel #centos
@@ -45,16 +47,7 @@
### 杩愯
```shell
-./funasr-http-server \
- --lm-dir '' \
- --itn-dir '' \
- --download-model-dir ${download_model_dir} \
- --model-dir ${model_dir} \
- --vad-dir ${vad_dir} \
- --punc-dir ${punc_dir} \
- --decoder-thread-num ${decoder_thread_num} \
- --io-thread-num ${io_thread_num} \
- --port ${port} \
+./funasr-http-server --vad-dir damo/speech_fsmn_vad_zh-cn-16k-common-onnx --model-dir damo/speech_paraformer-large_asr_nat-zh-cn-16k-common-vocab8404-onnx --punc-dir damo/punc_ct-transformer_cn-en-common-vocab471067-large-onnx --itn-dir '' --lm-dir '' --port 10001
```
--
Gitblit v1.9.1