From 28ccfbfc51068a663a80764e14074df5edf2b5ba Mon Sep 17 00:00:00 2001
From: kongdeqiang <kongdeqiang960204@163.com>
Date: 星期五, 13 三月 2026 17:41:41 +0800
Subject: [PATCH] 提交
---
funasr/utils/misc.py | 88 ++++++++++++++++++++++++++++++++-----------
1 files changed, 65 insertions(+), 23 deletions(-)
diff --git a/funasr/utils/misc.py b/funasr/utils/misc.py
index a853d48..eb17f97 100644
--- a/funasr/utils/misc.py
+++ b/funasr/utils/misc.py
@@ -6,49 +6,53 @@
import numpy as np
from omegaconf import DictConfig, OmegaConf
+
def statistic_model_parameters(model, prefix=None):
var_dict = model.state_dict()
numel = 0
- for i, key in enumerate(sorted(list([x for x in var_dict.keys() if "num_batches_tracked" not in x]))):
+ for i, key in enumerate(
+ sorted(list([x for x in var_dict.keys() if "num_batches_tracked" not in x]))
+ ):
if prefix is None or key.startswith(prefix):
numel += var_dict[key].numel()
return numel
def int2vec(x, vec_dim=8, dtype=np.int32):
- b = ('{:0' + str(vec_dim) + 'b}').format(x)
+ b = ("{:0" + str(vec_dim) + "b}").format(x)
# little-endian order: lower bit first
- return (np.array(list(b)[::-1]) == '1').astype(dtype)
+ return (np.array(list(b)[::-1]) == "1").astype(dtype)
def seq2arr(seq, vec_dim=8):
return np.row_stack([int2vec(int(x), vec_dim) for x in seq])
-def load_scp_as_dict(scp_path, value_type='str', kv_sep=" "):
- with io.open(scp_path, 'r', encoding='utf-8') as f:
+def load_scp_as_dict(scp_path, value_type="str", kv_sep=" "):
+ with io.open(scp_path, "r", encoding="utf-8") as f:
ret_dict = OrderedDict()
for one_line in f.readlines():
one_line = one_line.strip()
pos = one_line.find(kv_sep)
- key, value = one_line[:pos], one_line[pos + 1:]
- if value_type == 'list':
- value = value.split(' ')
+ key, value = one_line[:pos], one_line[pos + 1 :]
+ if value_type == "list":
+ value = value.split(" ")
ret_dict[key] = value
return ret_dict
-def load_scp_as_list(scp_path, value_type='str', kv_sep=" "):
- with io.open(scp_path, 'r', encoding='utf8') as f:
+def load_scp_as_list(scp_path, value_type="str", kv_sep=" "):
+ with io.open(scp_path, "r", encoding="utf8") as f:
ret_dict = []
for one_line in f.readlines():
one_line = one_line.strip()
pos = one_line.find(kv_sep)
- key, value = one_line[:pos], one_line[pos + 1:]
- if value_type == 'list':
- value = value.split(' ')
+ key, value = one_line[:pos], one_line[pos + 1 :]
+ if value_type == "list":
+ value = value.split(" ")
ret_dict.append((key, value))
return ret_dict
+
def deep_update(original, update):
for key, value in update.items():
@@ -58,20 +62,58 @@
deep_update(original[key], value)
else:
original[key] = value
-
-
+
+
def prepare_model_dir(**kwargs):
-
os.makedirs(kwargs.get("output_dir", "./"), exist_ok=True)
-
+
yaml_file = os.path.join(kwargs.get("output_dir", "./"), "config.yaml")
OmegaConf.save(config=kwargs, f=yaml_file)
- print(kwargs)
+ logging.info(f"kwargs: {kwargs}")
logging.info("config.yaml is saved to: %s", yaml_file)
- # model_path = kwargs.get("model_path")
- # if model_path is not None:
- # config_json = os.path.join(model_path, "configuration.json")
- # if os.path.exists(config_json):
- # shutil.copy(config_json, os.path.join(kwargs.get("output_dir", "./"), "configuration.json"))
+ model_path = kwargs.get("model_path", None)
+ if model_path is not None:
+ config_json = os.path.join(model_path, "configuration.json")
+ if os.path.exists(config_json):
+ shutil.copy(
+ config_json, os.path.join(kwargs.get("output_dir", "./"), "configuration.json")
+ )
+
+
+def extract_filename_without_extension(file_path):
+ """
+ 浠庣粰瀹氱殑鏂囦欢璺緞涓彁鍙栨枃浠跺悕锛堜笉鍖呭惈璺緞鍜屾墿灞曞悕锛�
+ :param file_path: 瀹屾暣鐨勬枃浠惰矾寰�
+ :return: 鏂囦欢鍚嶏紙涓嶅惈璺緞鍜屾墿灞曞悕锛�
+ """
+ # 棣栧厛锛屼娇鐢╫s.path.basename鑾峰彇璺緞涓殑鏂囦欢鍚嶉儴鍒嗭紙鍚墿灞曞悕锛�
+ filename_with_extension = os.path.basename(file_path)
+ # 鐒跺悗锛屼娇鐢╫s.path.splitext鍒嗙鏂囦欢鍚嶅拰鎵╁睍鍚�
+ filename, extension = os.path.splitext(filename_with_extension)
+ # 杩斿洖涓嶅寘鍚墿灞曞悕鐨勬枃浠跺悕
+ return filename
+
+
+def smart_remove(path):
+ """Intelligently removes files, empty directories, and non-empty directories recursively."""
+ # Check if the provided path exists
+ if not os.path.exists(path):
+ print(f"{path} does not exist.")
+ return
+
+ # If the path is a file, delete it
+ if os.path.isfile(path):
+ os.remove(path)
+ print(f"File {path} has been deleted.")
+ # If the path is a directory
+ elif os.path.isdir(path):
+ try:
+ # Attempt to remove an empty directory
+ os.rmdir(path)
+ print(f"Empty directory {path} has been deleted.")
+ except OSError:
+ # If the directory is not empty, remove it along with all its contents
+ shutil.rmtree(path)
+ print(f"Non-empty directory {path} has been recursively deleted.")
--
Gitblit v1.9.1