From 28ccfbfc51068a663a80764e14074df5edf2b5ba Mon Sep 17 00:00:00 2001
From: kongdeqiang <kongdeqiang960204@163.com>
Date: 星期五, 13 三月 2026 17:41:41 +0800
Subject: [PATCH] 提交

---
 funasr/metrics/wer.py |  379 +++++++++++++++++++++++++++++------------------------
 1 files changed, 207 insertions(+), 172 deletions(-)

diff --git a/funasr/metrics/wer.py b/funasr/metrics/wer.py
index b58daab..6bac303 100755
--- a/funasr/metrics/wer.py
+++ b/funasr/metrics/wer.py
@@ -5,186 +5,221 @@
 from omegaconf import DictConfig, OmegaConf, ListConfig
 
 
-def compute_wer(ref_file,
-                hyp_file,
-                cer_file,
-                cn_postprocess=False,
-                ):
-	rst = {
-		'Wrd': 0,
-		'Corr': 0,
-		'Ins': 0,
-		'Del': 0,
-		'Sub': 0,
-		'Snt': 0,
-		'Err': 0.0,
-		'S.Err': 0.0,
-		'wrong_words': 0,
-		'wrong_sentences': 0
-	}
-	
-	hyp_dict = {}
-	ref_dict = {}
-	with open(hyp_file, 'r') as hyp_reader:
-		for line in hyp_reader:
-			key = line.strip().split()[0]
-			value = line.strip().split()[1:]
-			if cn_postprocess:
-				value = " ".join(value)
-				value = value.replace(" ", "")
-				if value[0] == "璇�":
-					value = value[1:]
-				value = [x for x in value]
-			hyp_dict[key] = value
-	with open(ref_file, 'r') as ref_reader:
-		for line in ref_reader:
-			key = line.strip().split()[0]
-			value = line.strip().split()[1:]
-			if cn_postprocess:
-				value = " ".join(value)
-				value = value.replace(" ", "")
-				value = [x for x in value]
-			ref_dict[key] = value
-	
-	cer_detail_writer = open(cer_file, 'w')
-	for hyp_key in hyp_dict:
-		if hyp_key in ref_dict:
-			out_item = compute_wer_by_line(hyp_dict[hyp_key], ref_dict[hyp_key])
-			rst['Wrd'] += out_item['nwords']
-			rst['Corr'] += out_item['cor']
-			rst['wrong_words'] += out_item['wrong']
-			rst['Ins'] += out_item['ins']
-			rst['Del'] += out_item['del']
-			rst['Sub'] += out_item['sub']
-			rst['Snt'] += 1
-			if out_item['wrong'] > 0:
-				rst['wrong_sentences'] += 1
-			cer_detail_writer.write(hyp_key + print_cer_detail(out_item) + '\n')
-			cer_detail_writer.write("ref:" + '\t' + " ".join(list(map(lambda x: x.lower(), ref_dict[hyp_key]))) + '\n')
-			cer_detail_writer.write("hyp:" + '\t' + " ".join(list(map(lambda x: x.lower(), hyp_dict[hyp_key]))) + '\n')
-			cer_detail_writer.flush()
-	
-	if rst['Wrd'] > 0:
-		rst['Err'] = round(rst['wrong_words'] * 100 / rst['Wrd'], 2)
-	if rst['Snt'] > 0:
-		rst['S.Err'] = round(rst['wrong_sentences'] * 100 / rst['Snt'], 2)
-	
-	cer_detail_writer.write('\n')
-	cer_detail_writer.write("%WER " + str(rst['Err']) + " [ " + str(rst['wrong_words']) + " / " + str(rst['Wrd']) +
-	                        ", " + str(rst['Ins']) + " ins, " + str(rst['Del']) + " del, " + str(
-		rst['Sub']) + " sub ]" + '\n')
-	cer_detail_writer.write(
-		"%SER " + str(rst['S.Err']) + " [ " + str(rst['wrong_sentences']) + " / " + str(rst['Snt']) + " ]" + '\n')
-	cer_detail_writer.write("Scored " + str(len(hyp_dict)) + " sentences, " + str(
-		len(hyp_dict) - rst['Snt']) + " not present in hyp." + '\n')
-	
-	cer_detail_writer.close()
+def compute_wer(
+    ref_file,
+    hyp_file,
+    cer_file,
+    cn_postprocess=False,
+):
+    rst = {
+        "Wrd": 0,
+        "Corr": 0,
+        "Ins": 0,
+        "Del": 0,
+        "Sub": 0,
+        "Snt": 0,
+        "Err": 0.0,
+        "S.Err": 0.0,
+        "wrong_words": 0,
+        "wrong_sentences": 0,
+    }
+
+    hyp_dict = {}
+    ref_dict = {}
+    with open(hyp_file, "r") as hyp_reader:
+        for line in hyp_reader:
+            key = line.strip().split()[0]
+            value = line.strip().split()[1:]
+            if cn_postprocess:
+                value = " ".join(value)
+                value = value.replace(" ", "")
+                # if value[0] == "璇�":
+                #     value = value[1:]
+                value = [x for x in value]
+            hyp_dict[key] = value
+    with open(ref_file, "r") as ref_reader:
+        for line in ref_reader:
+            key = line.strip().split()[0]
+            value = line.strip().split()[1:]
+            if cn_postprocess:
+                value = " ".join(value)
+                value = value.replace(" ", "")
+                value = [x for x in value]
+            ref_dict[key] = value
+
+    cer_detail_writer = open(cer_file, "w")
+    for hyp_key in hyp_dict:
+        if hyp_key in ref_dict:
+            out_item = compute_wer_by_line(hyp_dict[hyp_key], ref_dict[hyp_key])
+            rst["Wrd"] += out_item["nwords"]
+            rst["Corr"] += out_item["cor"]
+            rst["wrong_words"] += out_item["wrong"]
+            rst["Ins"] += out_item["ins"]
+            rst["Del"] += out_item["del"]
+            rst["Sub"] += out_item["sub"]
+            rst["Snt"] += 1
+            if out_item["wrong"] > 0:
+                rst["wrong_sentences"] += 1
+            cer_detail_writer.write(hyp_key + print_cer_detail(out_item) + "\n")
+            cer_detail_writer.write(
+                "ref:" + "\t" + " ".join(list(map(lambda x: x.lower(), ref_dict[hyp_key]))) + "\n"
+            )
+            cer_detail_writer.write(
+                "hyp:" + "\t" + " ".join(list(map(lambda x: x.lower(), hyp_dict[hyp_key]))) + "\n"
+            )
+            cer_detail_writer.flush()
+
+    if rst["Wrd"] > 0:
+        rst["Err"] = round(rst["wrong_words"] * 100 / rst["Wrd"], 2)
+    if rst["Snt"] > 0:
+        rst["S.Err"] = round(rst["wrong_sentences"] * 100 / rst["Snt"], 2)
+
+    cer_detail_writer.write("\n")
+    cer_detail_writer.write(
+        "%WER "
+        + str(rst["Err"])
+        + " [ "
+        + str(rst["wrong_words"])
+        + " / "
+        + str(rst["Wrd"])
+        + ", "
+        + str(rst["Ins"])
+        + " ins, "
+        + str(rst["Del"])
+        + " del, "
+        + str(rst["Sub"])
+        + " sub ]"
+        + "\n"
+    )
+    cer_detail_writer.write(
+        "%SER "
+        + str(rst["S.Err"])
+        + " [ "
+        + str(rst["wrong_sentences"])
+        + " / "
+        + str(rst["Snt"])
+        + " ]"
+        + "\n"
+    )
+    cer_detail_writer.write(
+        "Scored "
+        + str(len(hyp_dict))
+        + " sentences, "
+        + str(len(hyp_dict) - rst["Snt"])
+        + " not present in hyp."
+        + "\n"
+    )
+
+    cer_detail_writer.close()
 
 
-def compute_wer_by_line(hyp,
-                        ref):
-	hyp = list(map(lambda x: x.lower(), hyp))
-	ref = list(map(lambda x: x.lower(), ref))
-	
-	len_hyp = len(hyp)
-	len_ref = len(ref)
-	
-	cost_matrix = np.zeros((len_hyp + 1, len_ref + 1), dtype=np.int16)
-	
-	ops_matrix = np.zeros((len_hyp + 1, len_ref + 1), dtype=np.int8)
-	
-	for i in range(len_hyp + 1):
-		cost_matrix[i][0] = i
-	for j in range(len_ref + 1):
-		cost_matrix[0][j] = j
-	
-	for i in range(1, len_hyp + 1):
-		for j in range(1, len_ref + 1):
-			if hyp[i - 1] == ref[j - 1]:
-				cost_matrix[i][j] = cost_matrix[i - 1][j - 1]
-			else:
-				substitution = cost_matrix[i - 1][j - 1] + 1
-				insertion = cost_matrix[i - 1][j] + 1
-				deletion = cost_matrix[i][j - 1] + 1
-				
-				compare_val = [substitution, insertion, deletion]
-				
-				min_val = min(compare_val)
-				operation_idx = compare_val.index(min_val) + 1
-				cost_matrix[i][j] = min_val
-				ops_matrix[i][j] = operation_idx
-	
-	match_idx = []
-	i = len_hyp
-	j = len_ref
-	rst = {
-		'nwords': len_ref,
-		'cor': 0,
-		'wrong': 0,
-		'ins': 0,
-		'del': 0,
-		'sub': 0
-	}
-	while i >= 0 or j >= 0:
-		i_idx = max(0, i)
-		j_idx = max(0, j)
-		
-		if ops_matrix[i_idx][j_idx] == 0:  # correct
-			if i - 1 >= 0 and j - 1 >= 0:
-				match_idx.append((j - 1, i - 1))
-				rst['cor'] += 1
-			
-			i -= 1
-			j -= 1
-		
-		elif ops_matrix[i_idx][j_idx] == 2:  # insert
-			i -= 1
-			rst['ins'] += 1
-		
-		elif ops_matrix[i_idx][j_idx] == 3:  # delete
-			j -= 1
-			rst['del'] += 1
-		
-		elif ops_matrix[i_idx][j_idx] == 1:  # substitute
-			i -= 1
-			j -= 1
-			rst['sub'] += 1
-		
-		if i < 0 and j >= 0:
-			rst['del'] += 1
-		elif j < 0 and i >= 0:
-			rst['ins'] += 1
-	
-	match_idx.reverse()
-	wrong_cnt = cost_matrix[len_hyp][len_ref]
-	rst['wrong'] = wrong_cnt
-	
-	return rst
+def compute_wer_by_line(hyp, ref):
+    hyp = list(map(lambda x: x.lower(), hyp))
+    ref = list(map(lambda x: x.lower(), ref))
+
+    len_hyp = len(hyp)
+    len_ref = len(ref)
+
+    cost_matrix = np.zeros((len_hyp + 1, len_ref + 1), dtype=np.int16)
+
+    ops_matrix = np.zeros((len_hyp + 1, len_ref + 1), dtype=np.int8)
+
+    for i in range(len_hyp + 1):
+        cost_matrix[i][0] = i
+    for j in range(len_ref + 1):
+        cost_matrix[0][j] = j
+
+    for i in range(1, len_hyp + 1):
+        for j in range(1, len_ref + 1):
+            if hyp[i - 1] == ref[j - 1]:
+                cost_matrix[i][j] = cost_matrix[i - 1][j - 1]
+            else:
+                substitution = cost_matrix[i - 1][j - 1] + 1
+                insertion = cost_matrix[i - 1][j] + 1
+                deletion = cost_matrix[i][j - 1] + 1
+
+                compare_val = [substitution, insertion, deletion]
+
+                min_val = min(compare_val)
+                operation_idx = compare_val.index(min_val) + 1
+                cost_matrix[i][j] = min_val
+                ops_matrix[i][j] = operation_idx
+
+    match_idx = []
+    i = len_hyp
+    j = len_ref
+    rst = {"nwords": len_ref, "cor": 0, "wrong": 0, "ins": 0, "del": 0, "sub": 0}
+    while i >= 0 or j >= 0:
+        i_idx = max(0, i)
+        j_idx = max(0, j)
+
+        if ops_matrix[i_idx][j_idx] == 0:  # correct
+            if i - 1 >= 0 and j - 1 >= 0:
+                match_idx.append((j - 1, i - 1))
+                rst["cor"] += 1
+
+            i -= 1
+            j -= 1
+
+        elif ops_matrix[i_idx][j_idx] == 2:  # insert
+            i -= 1
+            rst["ins"] += 1
+
+        elif ops_matrix[i_idx][j_idx] == 3:  # delete
+            j -= 1
+            rst["del"] += 1
+
+        elif ops_matrix[i_idx][j_idx] == 1:  # substitute
+            i -= 1
+            j -= 1
+            rst["sub"] += 1
+
+        if i < 0 and j >= 0:
+            rst["del"] += 1
+        elif j < 0 and i >= 0:
+            rst["ins"] += 1
+
+    match_idx.reverse()
+    wrong_cnt = cost_matrix[len_hyp][len_ref]
+    rst["wrong"] = wrong_cnt
+
+    return rst
 
 
 def print_cer_detail(rst):
-	return ("(" + "nwords=" + str(rst['nwords']) + ",cor=" + str(rst['cor'])
-	        + ",ins=" + str(rst['ins']) + ",del=" + str(rst['del']) + ",sub="
-	        + str(rst['sub']) + ") corr:" + '{:.2%}'.format(rst['cor'] / rst['nwords'])
-	        + ",cer:" + '{:.2%}'.format(rst['wrong'] / rst['nwords']))
+    return (
+        "("
+        + "nwords="
+        + str(rst["nwords"])
+        + ",cor="
+        + str(rst["cor"])
+        + ",ins="
+        + str(rst["ins"])
+        + ",del="
+        + str(rst["del"])
+        + ",sub="
+        + str(rst["sub"])
+        + ") corr:"
+        + "{:.2%}".format(rst["cor"] / rst["nwords"])
+        + ",cer:"
+        + "{:.2%}".format(rst["wrong"] / rst["nwords"])
+    )
 
 
 @hydra.main(config_name=None, version_base=None)
 def main_hydra(cfg: DictConfig):
-	ref_file = cfg.get("ref_file", None)
-	hyp_file = cfg.get("hyp_file", None)
-	cer_file = cfg.get("cer_file", None)
-	cn_postprocess = cfg.get("cn_postprocess", False)
-	if ref_file is None or hyp_file is None or cer_file is None:
-		print(
-			"usage : python -m  funasr.metrics.wer ++ref_file=test.ref ++hyp_file=test.hyp ++cer_file=test.wer ++cn_postprocess=false")
-		sys.exit(0)
-	
-	compute_wer(ref_file, hyp_file, cer_file, cn_postprocess)
+    ref_file = cfg.get("ref_file", None)
+    hyp_file = cfg.get("hyp_file", None)
+    cer_file = cfg.get("cer_file", None)
+    cn_postprocess = cfg.get("cn_postprocess", False)
+    if ref_file is None or hyp_file is None or cer_file is None:
+        print(
+            "usage : python -m  funasr.metrics.wer ++ref_file=test.ref ++hyp_file=test.hyp ++cer_file=test.wer ++cn_postprocess=false"
+        )
+        sys.exit(0)
+
+    compute_wer(ref_file, hyp_file, cer_file, cn_postprocess)
 
 
-if __name__ == '__main__':
-	main_hydra()
-
-
-
+if __name__ == "__main__":
+    main_hydra()

--
Gitblit v1.9.1