Shi Xian
2024-01-16 948b68774cebf2b9a2994b7b9b8102f9637a98f3
runtime/websocket/bin/websocket-server-2pass.cpp
@@ -18,6 +18,7 @@
extern std::unordered_map<std::string, int> hws_map_;
extern int fst_inc_wts_;
extern float global_beam_, lattice_beam_, am_scale_;
context_ptr WebSocketServer::on_tls_init(tls_mode mode,
                                         websocketpp::connection_hdl hdl,
@@ -80,6 +81,19 @@
    jsonresult["timestamp"] = tmp_stamp_msg;
  }
  std::string tmp_stamp_sents = FunASRGetStampSents(result);
  if (tmp_stamp_sents != "") {
    try{
      nlohmann::json json_stamp = nlohmann::json::parse(tmp_stamp_sents);
      LOG(INFO) << "offline stamp_sents : " << json_stamp;
      jsonresult["stamp_sents"] = json_stamp;
    }catch (std::exception const &e)
    {
      LOG(ERROR)<< tmp_stamp_sents << e.what();
      jsonresult["stamp_sents"] = "";
    }
  }
  return jsonresult;
}
// feed buffer to asr engine for decoder
@@ -96,10 +110,11 @@
    bool itn,
    int audio_fs,
    std::string wav_format,
    FUNASR_HANDLE& tpass_online_handle) {
    FUNASR_HANDLE& tpass_online_handle,
    FUNASR_DEC_HANDLE& decoder_handle) {
  // lock for each connection
  scoped_lock guard(thread_lock);
  if(!tpass_online_handle){
    scoped_lock guard(thread_lock);
     LOG(INFO) << "tpass_online_handle  is free, return";
     msg["access_num"]=(int)msg["access_num"]-1;
     return;
@@ -125,13 +140,15 @@
                                       subvector.data(), subvector.size(),
                                       punc_cache, false, audio_fs,
                                       wav_format, (ASR_TYPE)asr_mode_,
                                       hotwords_embedding, itn);
                                       hotwords_embedding, itn, decoder_handle);
        } else {
          scoped_lock guard(thread_lock);
          msg["access_num"]=(int)msg["access_num"]-1;
          return;
        }
      } catch (std::exception const& e) {
        scoped_lock guard(thread_lock);
        LOG(ERROR) << e.what();
        msg["access_num"]=(int)msg["access_num"]-1;
        return;
@@ -160,12 +177,14 @@
                                       buffer.data(), buffer.size(), punc_cache,
                                       is_final, audio_fs,
                                       wav_format, (ASR_TYPE)asr_mode_,
                                       hotwords_embedding, itn);
                                       hotwords_embedding, itn, decoder_handle);
        } else {
          scoped_lock guard(thread_lock);
          msg["access_num"]=(int)msg["access_num"]-1;    
          return;
        }
      } catch (std::exception const& e) {
        scoped_lock guard(thread_lock);
        LOG(ERROR) << e.what();
        msg["access_num"]=(int)msg["access_num"]-1;
        return;
@@ -209,6 +228,7 @@
  } catch (std::exception const& e) {
    std::cerr << "Error: " << e.what() << std::endl;
  }
  scoped_lock guard(thread_lock);
  msg["access_num"]=(int)msg["access_num"]-1;
 
}
@@ -227,9 +247,12 @@
    data_msg->msg["wav_name"] = "wav-default-id";
    data_msg->msg["mode"] = "2pass";
    data_msg->msg["itn"] = true;
    data_msg->msg["audio_fs"] = 16000;
    data_msg->msg["audio_fs"] = 16000; // default is 16k
    data_msg->msg["access_num"] = 0; // the number of access for this object, when it is 0, we can free it saftly
    data_msg->msg["is_eof"]=false; // if this connection is closed
    FUNASR_DEC_HANDLE decoder_handle =
      FunASRWfstDecoderInit(tpass_handle, ASR_TWO_PASS, global_beam_, lattice_beam_, am_scale_);
    data_msg->decoder_handle = decoder_handle;
    data_msg->punc_cache =
        std::make_shared<std::vector<std::vector<std::string>>>(2);
     data_msg->strand_ =   std::make_shared<asio::io_context::strand>(io_decoder_);
@@ -256,6 +279,9 @@
  // finished and avoid access freed tpass_online_handle
  unique_lock guard_decoder(*(data_msg->thread_lock));
  if (data_msg->msg["access_num"]==0 && data_msg->msg["is_eof"]==true) {
    FunWfstDecoderUnloadHwsRes(data_msg->decoder_handle);
    FunASRWfstDecoderUninit(data_msg->decoder_handle);
    data_msg->decoder_handle = nullptr;
    FunTpassOnlineUninit(data_msg->tpass_online_handle);
    data_msg->tpass_online_handle = nullptr;
     data_map.erase(hdl);
@@ -313,7 +339,7 @@
        data_msg->msg["is_eof"]=true;
        guard_decoder.unlock();
        to_remove.push_back(hdl);
        LOG(INFO)<<"connection is closed: "<<e.what();
        LOG(INFO)<<"connection is closed.";
        
      }
      iter++;
@@ -336,6 +362,10 @@
  auto it_data = data_map.find(hdl);
  if (it_data != data_map.end()) {
    msg_data = it_data->second;
    if(msg_data->msg["is_eof"]){
      lock.unlock();
      return;
    }
  } else {
    lock.unlock();
    return;
@@ -363,7 +393,9 @@
      }catch (std::exception const &e)
      {
        LOG(ERROR)<<e.what();
        break;
        msg_data->msg["is_eof"]=true;
        guard_decoder.unlock();
        return;
      }
      if (jsonresult.contains("wav_name")) {
@@ -377,7 +409,7 @@
      }
      // hotwords: fst/nn
      if(msg_data->hotwords_embedding == NULL){
      if(msg_data->hotwords_embedding == nullptr){
        std::unordered_map<std::string, int> merged_hws_map;
        std::string nn_hotwords = "";
@@ -387,25 +419,22 @@
            nlohmann::json json_fst_hws;
            try{
              json_fst_hws = nlohmann::json::parse(json_string);
              if(json_fst_hws.type() == nlohmann::json::value_t::object){
                // fst
                try{
                  std::unordered_map<std::string, int> client_hws_map = json_fst_hws;
                  merged_hws_map.insert(client_hws_map.begin(), client_hws_map.end());
                } catch (const std::exception& e) {
                  LOG(INFO) << e.what();
                }
              }
            } catch (std::exception const &e)
            {
              LOG(ERROR)<<e.what();
              break;
            }
            if(json_fst_hws.type() == nlohmann::json::value_t::object){
              // fst
              try{
                std::unordered_map<std::string, int> client_hws_map = json_fst_hws;
                merged_hws_map.insert(client_hws_map.begin(), client_hws_map.end());
              } catch (const std::exception& e) {
                LOG(INFO) << e.what();
              }
            }else{
              // nn
              std::string client_nn_hws = jsonresult["hotwords"];
              nn_hotwords += " " + client_nn_hws;
              LOG(INFO) << "nn hotwords: " << client_nn_hws;
              // LOG(INFO) << "nn hotwords: " << client_nn_hws;
            }
          }
        }
@@ -417,7 +446,7 @@
            nn_hotwords += " " + pair.first;
            LOG(INFO) << pair.first << " : " << pair.second;
        }
        // FunWfstDecoderLoadHwsRes(msg_data->decoder_handle, fst_inc_wts_, merged_hws_map);
        FunWfstDecoderLoadHwsRes(msg_data->decoder_handle, fst_inc_wts_, merged_hws_map);
        // nn
        std::vector<std::vector<float>> new_hotwords_embedding = CompileHotwordEmbedding(tpass_handle, nn_hotwords, ASR_TWO_PASS);
@@ -429,7 +458,7 @@
        msg_data->msg["audio_fs"] = jsonresult["audio_fs"];
      }
      if (jsonresult.contains("chunk_size")) {
        if (msg_data->tpass_online_handle == NULL) {
        if (msg_data->tpass_online_handle == nullptr) {
          std::vector<int> chunk_size_vec =
              jsonresult["chunk_size"].get<std::vector<int>>();
          // check chunk_size_vec
@@ -448,8 +477,10 @@
      }
      LOG(INFO) << "jsonresult=" << jsonresult
                << ", msg_data->msg=" << msg_data->msg;
      if (jsonresult["is_speaking"] == false ||
          jsonresult["is_finished"] == true) {
      if ((jsonresult["is_speaking"] == false ||
          jsonresult["is_finished"] == true) &&
          msg_data->msg["is_eof"] != true &&
          msg_data->hotwords_embedding != nullptr) {
        LOG(INFO) << "client done";
        // if it is in final message, post the sample_data to decode
@@ -467,7 +498,8 @@
                        msg_data->msg["itn"],
                        msg_data->msg["audio_fs"],
                        msg_data->msg["wav_format"],
                        std::ref(msg_data->tpass_online_handle)));
                        std::ref(msg_data->tpass_online_handle),
                        std::ref(msg_data->decoder_handle)));
            msg_data->msg["access_num"]=(int)(msg_data->msg["access_num"])+1;
        }
        catch (std::exception const &e)
@@ -500,21 +532,24 @@
          try{
            // post to decode
            std::vector<std::vector<float>> hotwords_embedding_(*(msg_data->hotwords_embedding));
            msg_data->strand_->post(
                      std::bind(&WebSocketServer::do_decoder, this,
                                std::move(subvector), std::move(hdl),
                                std::ref(msg_data->msg),
                                std::ref(*(punc_cache_p.get())),
                                std::move(hotwords_embedding_),
                                std::ref(*thread_lock_p), std::move(false),
                                msg_data->msg["wav_name"],
                                msg_data->msg["mode"],
                                msg_data->msg["itn"],
                                msg_data->msg["audio_fs"],
                                msg_data->msg["wav_format"],
                                std::ref(msg_data->tpass_online_handle)));
            msg_data->msg["access_num"]=(int)(msg_data->msg["access_num"])+1;
            if (msg_data->msg["is_eof"] != true && msg_data->hotwords_embedding != nullptr) {
              std::vector<std::vector<float>> hotwords_embedding_(*(msg_data->hotwords_embedding));
              msg_data->strand_->post(
                        std::bind(&WebSocketServer::do_decoder, this,
                                  std::move(subvector), std::move(hdl),
                                  std::ref(msg_data->msg),
                                  std::ref(*(punc_cache_p.get())),
                                  std::move(hotwords_embedding_),
                                  std::ref(*thread_lock_p), std::move(false),
                                  msg_data->msg["wav_name"],
                                  msg_data->msg["mode"],
                                  msg_data->msg["itn"],
                                  msg_data->msg["audio_fs"],
                                  msg_data->msg["wav_format"],
                                  std::ref(msg_data->tpass_online_handle),
                                  std::ref(msg_data->decoder_handle)));
              msg_data->msg["access_num"]=(int)(msg_data->msg["access_num"])+1;
            }
          }
          catch (std::exception const &e)
          {