From 28ccfbfc51068a663a80764e14074df5edf2b5ba Mon Sep 17 00:00:00 2001
From: kongdeqiang <kongdeqiang960204@163.com>
Date: 星期五, 13 三月 2026 17:41:41 +0800
Subject: [PATCH] 提交

---
 funasr/utils/misc.py |   88 ++++++++++++++++++++++++++++++++-----------
 1 files changed, 65 insertions(+), 23 deletions(-)

diff --git a/funasr/utils/misc.py b/funasr/utils/misc.py
index a853d48..eb17f97 100644
--- a/funasr/utils/misc.py
+++ b/funasr/utils/misc.py
@@ -6,49 +6,53 @@
 import numpy as np
 from omegaconf import DictConfig, OmegaConf
 
+
 def statistic_model_parameters(model, prefix=None):
     var_dict = model.state_dict()
     numel = 0
-    for i, key in enumerate(sorted(list([x for x in var_dict.keys() if "num_batches_tracked" not in x]))):
+    for i, key in enumerate(
+        sorted(list([x for x in var_dict.keys() if "num_batches_tracked" not in x]))
+    ):
         if prefix is None or key.startswith(prefix):
             numel += var_dict[key].numel()
     return numel
 
 
 def int2vec(x, vec_dim=8, dtype=np.int32):
-    b = ('{:0' + str(vec_dim) + 'b}').format(x)
+    b = ("{:0" + str(vec_dim) + "b}").format(x)
     # little-endian order: lower bit first
-    return (np.array(list(b)[::-1]) == '1').astype(dtype)
+    return (np.array(list(b)[::-1]) == "1").astype(dtype)
 
 
 def seq2arr(seq, vec_dim=8):
     return np.row_stack([int2vec(int(x), vec_dim) for x in seq])
 
 
-def load_scp_as_dict(scp_path, value_type='str', kv_sep=" "):
-    with io.open(scp_path, 'r', encoding='utf-8') as f:
+def load_scp_as_dict(scp_path, value_type="str", kv_sep=" "):
+    with io.open(scp_path, "r", encoding="utf-8") as f:
         ret_dict = OrderedDict()
         for one_line in f.readlines():
             one_line = one_line.strip()
             pos = one_line.find(kv_sep)
-            key, value = one_line[:pos], one_line[pos + 1:]
-            if value_type == 'list':
-                value = value.split(' ')
+            key, value = one_line[:pos], one_line[pos + 1 :]
+            if value_type == "list":
+                value = value.split(" ")
             ret_dict[key] = value
         return ret_dict
 
 
-def load_scp_as_list(scp_path, value_type='str', kv_sep=" "):
-    with io.open(scp_path, 'r', encoding='utf8') as f:
+def load_scp_as_list(scp_path, value_type="str", kv_sep=" "):
+    with io.open(scp_path, "r", encoding="utf8") as f:
         ret_dict = []
         for one_line in f.readlines():
             one_line = one_line.strip()
             pos = one_line.find(kv_sep)
-            key, value = one_line[:pos], one_line[pos + 1:]
-            if value_type == 'list':
-                value = value.split(' ')
+            key, value = one_line[:pos], one_line[pos + 1 :]
+            if value_type == "list":
+                value = value.split(" ")
             ret_dict.append((key, value))
         return ret_dict
+
 
 def deep_update(original, update):
     for key, value in update.items():
@@ -58,20 +62,58 @@
             deep_update(original[key], value)
         else:
             original[key] = value
-            
-            
+
+
 def prepare_model_dir(**kwargs):
-    
 
     os.makedirs(kwargs.get("output_dir", "./"), exist_ok=True)
-    
+
     yaml_file = os.path.join(kwargs.get("output_dir", "./"), "config.yaml")
     OmegaConf.save(config=kwargs, f=yaml_file)
-    print(kwargs)
+    logging.info(f"kwargs: {kwargs}")
     logging.info("config.yaml is saved to: %s", yaml_file)
 
-    # model_path = kwargs.get("model_path")
-    # if model_path is not None:
-    #     config_json = os.path.join(model_path, "configuration.json")
-    #     if os.path.exists(config_json):
-    #         shutil.copy(config_json, os.path.join(kwargs.get("output_dir", "./"), "configuration.json"))
+    model_path = kwargs.get("model_path", None)
+    if model_path is not None:
+        config_json = os.path.join(model_path, "configuration.json")
+        if os.path.exists(config_json):
+            shutil.copy(
+                config_json, os.path.join(kwargs.get("output_dir", "./"), "configuration.json")
+            )
+
+
+def extract_filename_without_extension(file_path):
+    """
+    浠庣粰瀹氱殑鏂囦欢璺緞涓彁鍙栨枃浠跺悕锛堜笉鍖呭惈璺緞鍜屾墿灞曞悕锛�
+    :param file_path: 瀹屾暣鐨勬枃浠惰矾寰�
+    :return: 鏂囦欢鍚嶏紙涓嶅惈璺緞鍜屾墿灞曞悕锛�
+    """
+    # 棣栧厛锛屼娇鐢╫s.path.basename鑾峰彇璺緞涓殑鏂囦欢鍚嶉儴鍒嗭紙鍚墿灞曞悕锛�
+    filename_with_extension = os.path.basename(file_path)
+    # 鐒跺悗锛屼娇鐢╫s.path.splitext鍒嗙鏂囦欢鍚嶅拰鎵╁睍鍚�
+    filename, extension = os.path.splitext(filename_with_extension)
+    # 杩斿洖涓嶅寘鍚墿灞曞悕鐨勬枃浠跺悕
+    return filename
+
+
+def smart_remove(path):
+    """Intelligently removes files, empty directories, and non-empty directories recursively."""
+    # Check if the provided path exists
+    if not os.path.exists(path):
+        print(f"{path} does not exist.")
+        return
+
+    # If the path is a file, delete it
+    if os.path.isfile(path):
+        os.remove(path)
+        print(f"File {path} has been deleted.")
+    # If the path is a directory
+    elif os.path.isdir(path):
+        try:
+            # Attempt to remove an empty directory
+            os.rmdir(path)
+            print(f"Empty directory {path} has been deleted.")
+        except OSError:
+            # If the directory is not empty, remove it along with all its contents
+            shutil.rmtree(path)
+            print(f"Non-empty directory {path} has been recursively deleted.")

--
Gitblit v1.9.1