游雁
2024-02-19 25fbf7110b5fc66d592bd5e94624a0ddb60e50b8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import os
import json
import torch
import logging
import concurrent.futures
import librosa
import torch.distributed as dist
from typing import Collection
import torch
import torchaudio
from torch import nn
import random
import re
from funasr.tokenizer.cleaner import TextCleaner
from funasr.register import tables
 
 
@tables.register("preprocessor_classes", "SpeechPreprocessSpeedPerturb")
class SpeechPreprocessSpeedPerturb(nn.Module):
    def __init__(self, speed_perturb: list=None, **kwargs):
        super().__init__()
        self.speed_perturb = speed_perturb
        
    def forward(self, waveform, fs, **kwargs):
        if self.speed_perturb is None:
            return waveform
        speed = random.choice(self.speed_perturb)
        if speed != 1.0:
            waveform, _ = torchaudio.sox_effects.apply_effects_tensor(
                torch.tensor(waveform).view(1, -1), fs, [['speed', str(speed)], ['rate', str(fs)]])
            waveform = waveform.view(-1)
            
        return waveform
 
 
@tables.register("preprocessor_classes", "TextPreprocessSegDict")
class TextPreprocessSegDict(nn.Module):
    def __init__(self, seg_dict: str = None,
                 text_cleaner: Collection[str] = None,
                 split_with_space: bool = False,
                 **kwargs):
        super().__init__()
        
        self.seg_dict = None
        if seg_dict is not None:
            self.seg_dict = {}
            with open(seg_dict, "r", encoding="utf8") as f:
                lines = f.readlines()
            for line in lines:
                s = line.strip().split()
                key = s[0]
                value = s[1:]
                self.seg_dict[key] = " ".join(value)
        self.text_cleaner = TextCleaner(text_cleaner)
        self.split_with_space = split_with_space
    
    def forward(self, text, **kwargs):
        if self.seg_dict is not None:
            text = self.text_cleaner(text)
            if self.split_with_space:
                tokens = text.strip().split(" ")
                if self.seg_dict is not None:
                    text = seg_tokenize(tokens, self.seg_dict)
 
        return text
 
def seg_tokenize(txt, seg_dict):
    pattern = re.compile(r'^[\u4E00-\u9FA50-9]+$')
    out_txt = ""
    for word in txt:
        word = word.lower()
        if word in seg_dict:
            out_txt += seg_dict[word] + " "
        else:
            if pattern.match(word):
                for char in word:
                    if char in seg_dict:
                        out_txt += seg_dict[char] + " "
                    else:
                        out_txt += "<unk>" + " "
            else:
                out_txt += "<unk>" + " "
    return out_txt.strip().split()