rebear077
2024-08-14 167cea2074a9ab2b697fc3b43ed63babe276217f
Go_ws_client客户端V1 (#2011)

提供了一个Golang语言版本的FunASR Client,与run_server.sh对接
8个文件已添加
560 ■■■■■ 已修改文件
runtime/golang/websocket/go.mod 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
runtime/golang/websocket/go.sum 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
runtime/golang/websocket/go_ws_client.go 377 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
runtime/golang/websocket/readme.assets/image-20240814151836940.png 补丁 | 查看 | 原始文档 | blame | 历史
runtime/golang/websocket/readme.assets/image-20240814152105574.png 补丁 | 查看 | 原始文档 | blame | 历史
runtime/golang/websocket/readme.assets/image-20240814155214256.png 补丁 | 查看 | 原始文档 | blame | 历史
runtime/golang/websocket/readme.md 125 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
runtime/golang/websocket/wavhandler.py 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
runtime/golang/websocket/go.mod
New file
@@ -0,0 +1,7 @@
module gowsclient
go 1.18
require (
    github.com/gorilla/websocket v1.5.3
)
runtime/golang/websocket/go.sum
New file
@@ -0,0 +1,24 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-audio/audio v1.0.0 h1:zS9vebldgbQqktK4H0lUqWrG8P0NxCJVqcj7ZpNnwd4=
github.com/go-audio/audio v1.0.0/go.mod h1:6uAu0+H2lHkwdGsAY+j2wHPNPpPoeg5AaEFh9FlA+Zs=
github.com/go-audio/riff v1.0.0 h1:d8iCGbDvox9BfLagY94fBynxSPHO80LmZCaOsmKxokA=
github.com/go-audio/riff v1.0.0/go.mod h1:l3cQwc85y79NQFCRB7TiPoNiaijp6q8Z0Uv38rVG498=
github.com/go-audio/wav v1.1.0 h1:jQgLtbqBzY7G+BM8fXF7AHUk1uHUviWS4X39d5rsL2g=
github.com/go-audio/wav v1.1.0/go.mod h1:mpe9qfwbScEbkd8uybLuIpTgHyrISw/OTuvjUW2iGtE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/youpy/go-riff v0.1.0 h1:vZO/37nI4tIET8tQI0Qn0Y79qQh99aEpponTPiPut7k=
github.com/youpy/go-riff v0.1.0/go.mod h1:83nxdDV4Z9RzrTut9losK7ve4hUnxUR8ASSz4BsKXwQ=
github.com/youpy/go-wav v0.3.2 h1:NLM8L/7yZ0Bntadw/0h95OyUsen+DQIVf9gay+SUsMU=
github.com/youpy/go-wav v0.3.2/go.mod h1:0FCieAXAeSdcxFfwLpRuEo0PFmAoc+8NU34h7TUvk50=
github.com/zaf/g711 v0.0.0-20190814101024-76a4a538f52b h1:QqixIpc5WFIqTLxB3Hq8qs0qImAgBdq0p6rq2Qdl634=
github.com/zaf/g711 v0.0.0-20190814101024-76a4a538f52b/go.mod h1:T2h1zV50R/q0CVYnsQOQ6L7P4a2ZxH47ixWcMXFGyx8=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
runtime/golang/websocket/go_ws_client.go
New file
@@ -0,0 +1,377 @@
package main
import (
    "bufio"
    "bytes"
    "encoding/base64"
    "encoding/json"
    "fmt"
    "log"
    "net/url"
    "os"
    "os/exec"
    "strconv"
    "strings"
    "sync"
    "time"
    "github.com/gorilla/websocket"
)
var args struct {
    audio_in       string
    thread_num     int
    host           string
    port           string
    output_dir     string
    hotword        string
    audio_fs       int
    use_itn        int
    mode           string
    chunk_size     []int
    chunk_interval int
}
var websocketConn *websocket.Conn
var offline_msg_done = false
type Message struct {
    WavName   string `json:"wav_name"`
    Text      string `json:"text"`
    TimeStamp string `json:"timestamp"`
    Mode      string `json:"mode"`
}
type AudioData struct {
    SampleRate int    `json:"sample_rate"`
    Stride     int    `json:"stride"`
    ChunkNum   int    `json:"chunk_num"`
    AudioBytes string `json:"audio_bytes"`
}
func IntSlicetoString(nums []int) string {
    strNums := make([]string, len(nums))
    for i, num := range nums {
        strNums[i] = strconv.Itoa(num)
    }
    result := strings.Join(strNums, ",")
    return result
}
func recordFromScp(chunk_begin, chunk_size int) {
    wavs := []string{args.audio_in}
    sample_rate := args.audio_fs
    wav_format := "pcm"
    use_itn := true
    wav_name := "demo"
    wav_path := wavs[0]
    var audio_bytes []byte
    var stride, chunk_num int
    fst_dict := make(map[string]int)
    hotword_msg := ""
    if args.hotword != "" {
        file, err := os.Open(args.hotword)
        if err != nil {
            log.Fatalf("failed to open file: %v", err)
        }
        defer file.Close()
        scanner := bufio.NewScanner(file)
        for scanner.Scan() {
            words := strings.Fields(scanner.Text())
            if len(words) < 2 {
                fmt.Println("Please checkout format of hotwords")
                continue
            }
            weight, err := strconv.Atoi(words[1])
            if err != nil {
                fmt.Println("The weight of hotwords must be Int!")
            }
            fst_dict[words[0]] = weight
        }
        if err := scanner.Err(); err != nil {
            log.Fatalf("error reading file: %v", err)
        }
        bytes, _ := json.Marshal(fst_dict)
        hotword_msg = string(bytes)
        fmt.Println("HotWord: ", hotword_msg)
    }
    if args.use_itn == 0 {
        use_itn = false
    }
    if chunk_size > 0 {
        wavs = wavs[chunk_begin : chunk_begin+chunk_size]
    }
    if strings.HasSuffix(wav_path, ".wav") {
        cmd := exec.Command("python", "wavhandler.py", wav_path, IntSlicetoString(args.chunk_size), strconv.Itoa(args.chunk_interval))
        var out bytes.Buffer
        cmd.Stdout = &out
        err := cmd.Run()
        if err != nil {
            fmt.Println("Error running Python script:", err)
            return
        }
        var audioData AudioData
        err = json.Unmarshal(out.Bytes(), &audioData)
        if err != nil {
            fmt.Println("Error parsing JSON:", err)
            return
        }
        stride = audioData.Stride
        chunk_num = audioData.ChunkNum
        sample_rate = audioData.SampleRate
        audio_bytes, err = base64.StdEncoding.DecodeString(audioData.AudioBytes)
        if err != nil {
            fmt.Println("Error decoding Base64:", err)
            return
        }
    } else {
        fmt.Println("Currently, only the WAV format is supported")
        return
    }
    first_message := make(map[string]interface{})
    first_message["mode"] = args.mode
    first_message["chunk_size"] = args.chunk_size
    first_message["chunk_interval"] = args.chunk_interval
    first_message["audio_fs"] = sample_rate
    first_message["wav_name"] = wav_name
    first_message["wav_format"] = wav_format
    first_message["is_speaking"] = true
    first_message["hotwords"] = hotword_msg
    first_message["itn"] = use_itn
    bytes, _ := json.Marshal(first_message)
    message := string(bytes)
    // fmt.Println(audio_bytes)
    // fmt.Println(stride)
    // fmt.Println(chunk_num)
    // fmt.Println(message)
    err := websocketConn.WriteMessage(websocket.TextMessage, []byte(message))
    if err != nil {
        log.Println("Failed to send the message:", err)
        return
    }
    is_speaking := true
    for i := 0; i < chunk_num; i++ {
        beg := i * stride
        var data []byte
        if i == chunk_num-1 {
            data = audio_bytes[beg:]
        } else {
            data = audio_bytes[beg : beg+stride]
        }
        err = websocketConn.WriteMessage(websocket.BinaryMessage, data)
        if err != nil {
            fmt.Println("Failed to send audio data:", err)
            return
        }
        if i == chunk_num-1 {
            is_speaking = false
            endMsg := map[string]bool{"is_speaking": is_speaking}
            endMsgBytes, err := json.Marshal(endMsg)
            if err != nil {
                fmt.Println("JSON serialization failed:", err)
                return
            }
            err = websocketConn.WriteMessage(websocket.TextMessage, endMsgBytes)
            if err != nil {
                fmt.Println("Failed to send the termination message:", err)
                return
            }
        }
        var sleepDuration time.Duration
        if args.mode == "offline" {
            sleepDuration = time.Millisecond
        } else {
            fmt.Println("timesleep:  Currently, only offline mode is supported.")
            // sleepDuration = time.Duration(60*float64(args.chunk_size[1])/float64(args.chunk_interval)) * time.Millisecond
            return
        }
        time.Sleep(sleepDuration)
    }
    if args.mode != "offline" {
        fmt.Println("Currently, only offline mode is supported.")
        return
    }
    if args.mode == "offline" {
        for !offline_msg_done {
            time.Sleep(1 * time.Second)
        }
    }
    websocketConn.Close()
}
func message(id string) {
    text_print := ""
    var ibestWriter *os.File
    var err error
    if args.output_dir != "" {
        filePath := fmt.Sprintf("%s/text.%d", args.output_dir, id)
        ibestWriter, err = os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
        if err != nil {
            log.Fatalf("failed to open file: %v", err)
        }
    } else {
        ibestWriter = nil
    }
    for {
        _, message, err := websocketConn.ReadMessage()
        if err != nil {
            log.Println("read:", err)
            break
        }
        var meg Message
        var wav_name string
        timestamp := ""
        err = json.Unmarshal(message, &meg)
        if err != nil {
            log.Println("unmarshal:", err)
            continue
        }
        if meg.WavName != "" {
            wav_name = meg.WavName
        } else {
            wav_name = "demo"
        }
        text := meg.Text
        if meg.TimeStamp != "" {
            timestamp = meg.TimeStamp
        }
        if ibestWriter != nil {
            var text_write_line string
            if timestamp != "" {
                text_write_line = fmt.Sprintf("%s\t%s\t%s\n", wav_name, text, timestamp)
            } else {
                text_write_line = fmt.Sprintf("%s\t%s\n", wav_name, text)
            }
            _, err = ibestWriter.WriteString(text_write_line)
            if err != nil {
                log.Fatalf("Failed to write to file: %v", err)
            }
        }
        if meg.Mode != "offline" {
            fmt.Println("Currently, only offline mode is supported.")
            return
        }
        if meg.Mode == "offline" {
            if timestamp != "" {
                text_print += fmt.Sprintf("%s timestamp: %s", text, timestamp)
            } else {
                text_print += fmt.Sprintf("%s ", text)
            }
            fmt.Println("\rpid" + id + ": " + wav_name + ": " + text_print)
            offline_msg_done = true
        }
    }
}
func wsClient(id, chunk_begin, chunk_size int, done chan bool) {
    for i := chunk_begin; i < chunk_begin+chunk_size; i++ {
        offline_msg_done = false
        u := url.URL{Scheme: "ws", Host: fmt.Sprintf("%s:%s", args.host, args.port), Path: "/"}
        fmt.Printf("Thread %d: Connecting to %s\n", id, u.String())
        var err error
        websocketConn, _, err = websocket.DefaultDialer.Dial(u.String(), nil)
        if err != nil {
            log.Fatal("dial:", err)
        }
        defer websocketConn.Close()
        var wg sync.WaitGroup
        wg.Add(2)
        go func() {
            defer wg.Done()
            recordFromScp(i, 1)
        }()
        go func() {
            defer wg.Done()
            id_str := strconv.Itoa(id)
            i_str := strconv.Itoa(i)
            message(id_str + "_" + i_str)
        }()
        wg.Wait()
    }
    done <- true
}
func oneThread(id, chunk_begin, chunk_size int, wg *sync.WaitGroup) {
    defer wg.Done()
    done := make(chan bool)
    go wsClient(id, chunk_begin, chunk_size, done)
    select {
    case <-done:
        fmt.Printf("Thread %d: Task completed\n", id)
    }
}
func main() {
    args.audio_in = "../audio/asr_example.wav"
    args.thread_num = 1
    args.host = "127.0.0.1"
    args.port = "10095"
    args.output_dir = "/workspace/models/Outputs"
    args.hotword = "/workspace/models/hotword.txt"
    args.chunk_size = []int{5, 10, 5}
    args.chunk_interval = 10
    args.mode = "offline"
    args.audio_fs = 16000
    args.use_itn = 1
    var chunk_size, remain_wavs int
    wavs := []string{args.audio_in}
    total_len := len(wavs)
    if total_len >= args.thread_num {
        chunk_size = total_len / args.thread_num
        remain_wavs = total_len - chunk_size*args.thread_num
    } else {
        chunk_size = 1
        remain_wavs = 0
    }
    var wg sync.WaitGroup
    chunk_begin := 0
    for i := 0; i < args.thread_num; i++ {
        wg.Add(1)
        now_chunk_size := chunk_size
        if remain_wavs > 0 {
            now_chunk_size = chunk_size + 1
            remain_wavs = remain_wavs - 1
        }
        go oneThread(i, chunk_begin, now_chunk_size, &wg)
        chunk_begin = chunk_begin + now_chunk_size
    }
    wg.Wait()
}
runtime/golang/websocket/readme.assets/image-20240814151836940.png
runtime/golang/websocket/readme.assets/image-20240814152105574.png
runtime/golang/websocket/readme.assets/image-20240814155214256.png
runtime/golang/websocket/readme.md
New file
@@ -0,0 +1,125 @@
# Golang版本的Websocket客户端-V1.0
本客户端基于Golang编程实现,对接cpp websocket后端(run_server.sh)。为Golang开发者使用FunASR服务提供一点参考。
本客户端开发项目当前仅支持wav格式音频文件的离线模式,对接run_server.sh(关闭ssl)。
## 一 启动命令
### 1.1 服务端启动
run_server.sh的启动命令:
```sh
bash run_server.sh --certfile 0
```
其他参数采取run_server.sh脚本中的默认值。
run_server.sh脚本的详细内容见[官方github](https://github.com/modelscope/FunASR/blob/main/runtime/run_server.sh),或本文的[附录]()部分。
### 1.2 客户端启动
```go
go run go_ws_client.go
```
命令参数目前是直接赋值给了代码中的变量,在客户端启动时会在main函数读取。
后续可以考虑改为配置文件或命令启动中的参数。
```go
func main() {
    args.audio_in = "../audio/asr_example.wav"
    args.thread_num = 1
    args.host = "127.0.0.1"
    args.port = "10095"
    args.output_dir = "/workspace/models/Outputs"
    args.hotword = "/workspace/models/hotword.txt"
    args.chunk_size = []int{5, 10, 5}
    args.chunk_interval = 10
    args.mode = "offline"
    args.audio_fs = 16000
    args.use_itn = 1
    /*
        more code
    */
}
```
## 二 实现效果
### 2.1 服务端接收情况
![image-20240814152105574](readme.assets/image-20240814152105574.png)
### 2.2 客户端返回结果
![image-20240814151836940](readme.assets/image-20240814151836940.png)
## 三 未来展望
Golang语言由于自身优秀的并发支持能力,在处理高并发和大规模数据时表现优异,常用于高性能的后端服务和网络应用程序开发,适用于并发和并行处理需求较高的系统。
官方提供的cpp后端支持模型处理的多线程。
因此若有高并发的语音文件识别系统开发需求,需要使用FunASR的run_server服务,可以考虑将Golang作为开发语言,开发客户端与run_server之间的服务中台系统。
![image-20240814155214256](readme.assets/image-20240814155214256.png)
## 附录
本Demo中的run_server.sh脚本
```sh
download_model_dir="/workspace/models"
model_dir="damo/speech_paraformer-large-vad-punc_asr_nat-zh-cn-16k-common-vocab8404-onnx"
vad_dir="damo/speech_fsmn_vad_zh-cn-16k-common-onnx"
punc_dir="damo/punc_ct-transformer_cn-en-common-vocab471067-large-onnx"
itn_dir="thuduj12/fst_itn_zh"
lm_dir="damo/speech_ngram_lm_zh-cn-ai-wesp-fst"
port=10095
certfile="$(pwd)/ssl_key/server.crt"
keyfile="$(pwd)/ssl_key/server.key"
hotword="$(pwd)/websocket/hotwords.txt"
# set decoder_thread_num
decoder_thread_num=$(cat /proc/cpuinfo | grep "processor"|wc -l) || { echo "Get cpuinfo failed. Set decoder_thread_num = 32"; decoder_thread_num=32; }
multiple_io=16
io_thread_num=$(( (decoder_thread_num + multiple_io - 1) / multiple_io ))
model_thread_num=1
cmd_path=/workspace/FunASR/runtime/websocket/build/bin
cmd=funasr-wss-server
. ./tools/utils/parse_options.sh || exit 1;
if [ -z "$certfile" ] || [ "$certfile" = "0" ]; then
  certfile=""
  keyfile=""
fi
cd $cmd_path
$cmd_path/${cmd}  \
  --download-model-dir "${download_model_dir}" \
  --model-dir "${model_dir}" \
  --vad-dir "${vad_dir}" \
  --punc-dir "${punc_dir}" \
  --itn-dir "${itn_dir}" \
  --lm-dir "${lm_dir}" \
  --decoder-thread-num ${decoder_thread_num} \
  --model-thread-num ${model_thread_num} \
  --io-thread-num  ${io_thread_num} \
  --port ${port} \
  --certfile  "${certfile}" \
  --keyfile "${keyfile}" \
  --hotword "${hotword}" &
server_cmd="{\"server\":[{\"exec\":\"${cmd_path}/${cmd}\",\"--download-model-dir\":\"${download_model_dir}\",\"--model-dir\":\"${model_dir}\",\"--vad-dir\":\"${vad_dir}\",\"--punc-dir\":\"${punc_dir}\",\"--itn-dir\":\"${itn_dir}\",\"--lm-dir\":\"${lm_dir}\",\"--decoder-thread-num\":\"${decoder_thread_num}\",\"--model-thread-num\":\"${model_thread_num}\",\"--io-thread-num\":\"${io_thread_num}\",\"--port\":\"${port}\",\"--certfile\":\"${certfile}\",\"--keyfile\":\"${keyfile}\",\"--hotword\":\"${hotword}\"}]}"
mkdir -p /workspace/.config
echo $server_cmd > /workspace/.config/server_config
```
runtime/golang/websocket/wavhandler.py
New file
@@ -0,0 +1,27 @@
import sys
import wave
import json
import base64
if __name__ == "__main__":
    wav_path = sys.argv[1]
    chunk_size = [int(x) for x in sys.argv[2].split(",")]
    chunk_interval = int(sys.argv[3])
    with wave.open(wav_path, "rb") as wav_file:
        params = wav_file.getparams()
        sample_rate = wav_file.getframerate()
        frames = wav_file.readframes(wav_file.getnframes())
        audio_bytes = bytes(frames)
    stride = int(60 * chunk_size[1] / chunk_interval / 1000 * sample_rate * 2)
    chunk_num = (len(audio_bytes) - 1) // stride + 1
    result = {
        "sample_rate": sample_rate,
        "stride": stride,
        "chunk_num": chunk_num,
        "audio_bytes": base64.b64encode(audio_bytes).decode('utf-8')
    }
    print(json.dumps(result))