| | |
| | | import numpy as np |
| | | from omegaconf import DictConfig, OmegaConf |
| | | |
| | | |
| | | def statistic_model_parameters(model, prefix=None): |
| | | var_dict = model.state_dict() |
| | | numel = 0 |
| | | for i, key in enumerate(sorted(list([x for x in var_dict.keys() if "num_batches_tracked" not in x]))): |
| | | for i, key in enumerate( |
| | | sorted(list([x for x in var_dict.keys() if "num_batches_tracked" not in x])) |
| | | ): |
| | | if prefix is None or key.startswith(prefix): |
| | | numel += var_dict[key].numel() |
| | | return numel |
| | | |
| | | |
| | | def int2vec(x, vec_dim=8, dtype=np.int32): |
| | | b = ('{:0' + str(vec_dim) + 'b}').format(x) |
| | | b = ("{:0" + str(vec_dim) + "b}").format(x) |
| | | # little-endian order: lower bit first |
| | | return (np.array(list(b)[::-1]) == '1').astype(dtype) |
| | | return (np.array(list(b)[::-1]) == "1").astype(dtype) |
| | | |
| | | |
| | | def seq2arr(seq, vec_dim=8): |
| | | return np.row_stack([int2vec(int(x), vec_dim) for x in seq]) |
| | | |
| | | |
| | | def load_scp_as_dict(scp_path, value_type='str', kv_sep=" "): |
| | | with io.open(scp_path, 'r', encoding='utf-8') as f: |
| | | def load_scp_as_dict(scp_path, value_type="str", kv_sep=" "): |
| | | with io.open(scp_path, "r", encoding="utf-8") as f: |
| | | ret_dict = OrderedDict() |
| | | for one_line in f.readlines(): |
| | | one_line = one_line.strip() |
| | | pos = one_line.find(kv_sep) |
| | | key, value = one_line[:pos], one_line[pos + 1:] |
| | | if value_type == 'list': |
| | | value = value.split(' ') |
| | | key, value = one_line[:pos], one_line[pos + 1 :] |
| | | if value_type == "list": |
| | | value = value.split(" ") |
| | | ret_dict[key] = value |
| | | return ret_dict |
| | | |
| | | |
| | | def load_scp_as_list(scp_path, value_type='str', kv_sep=" "): |
| | | with io.open(scp_path, 'r', encoding='utf8') as f: |
| | | def load_scp_as_list(scp_path, value_type="str", kv_sep=" "): |
| | | with io.open(scp_path, "r", encoding="utf8") as f: |
| | | ret_dict = [] |
| | | for one_line in f.readlines(): |
| | | one_line = one_line.strip() |
| | | pos = one_line.find(kv_sep) |
| | | key, value = one_line[:pos], one_line[pos + 1:] |
| | | if value_type == 'list': |
| | | value = value.split(' ') |
| | | key, value = one_line[:pos], one_line[pos + 1 :] |
| | | if value_type == "list": |
| | | value = value.split(" ") |
| | | ret_dict.append((key, value)) |
| | | return ret_dict |
| | | |
| | | |
| | | def deep_update(original, update): |
| | | for key, value in update.items(): |
| | |
| | | deep_update(original[key], value) |
| | | else: |
| | | original[key] = value |
| | | |
| | | |
| | | |
| | | |
| | | def prepare_model_dir(**kwargs): |
| | | |
| | | |
| | | os.makedirs(kwargs.get("output_dir", "./"), exist_ok=True) |
| | | |
| | | |
| | | yaml_file = os.path.join(kwargs.get("output_dir", "./"), "config.yaml") |
| | | OmegaConf.save(config=kwargs, f=yaml_file) |
| | | print(kwargs) |
| | | logging.info(f"kwargs: {kwargs}") |
| | | logging.info("config.yaml is saved to: %s", yaml_file) |
| | | |
| | | # model_path = kwargs.get("model_path") |
| | | # if model_path is not None: |
| | | # config_json = os.path.join(model_path, "configuration.json") |
| | | # if os.path.exists(config_json): |
| | | # shutil.copy(config_json, os.path.join(kwargs.get("output_dir", "./"), "configuration.json")) |
| | | model_path = kwargs.get("model_path", None) |
| | | if model_path is not None: |
| | | config_json = os.path.join(model_path, "configuration.json") |
| | | if os.path.exists(config_json): |
| | | shutil.copy( |
| | | config_json, os.path.join(kwargs.get("output_dir", "./"), "configuration.json") |
| | | ) |
| | | |
| | | |
| | | def extract_filename_without_extension(file_path): |
| | | """ |
| | | 从给定的文件路径中提取文件名(不包含路径和扩展名) |
| | | :param file_path: 完整的文件路径 |
| | | :return: 文件名(不含路径和扩展名) |
| | | """ |
| | | # 首先,使用os.path.basename获取路径中的文件名部分(含扩展名) |
| | | filename_with_extension = os.path.basename(file_path) |
| | | # 然后,使用os.path.splitext分离文件名和扩展名 |
| | | filename, extension = os.path.splitext(filename_with_extension) |
| | | # 返回不包含扩展名的文件名 |
| | | return filename |
| | | |
| | | |
| | | def smart_remove(path): |
| | | """Intelligently removes files, empty directories, and non-empty directories recursively.""" |
| | | # Check if the provided path exists |
| | | if not os.path.exists(path): |
| | | print(f"{path} does not exist.") |
| | | return |
| | | |
| | | # If the path is a file, delete it |
| | | if os.path.isfile(path): |
| | | os.remove(path) |
| | | print(f"File {path} has been deleted.") |
| | | # If the path is a directory |
| | | elif os.path.isdir(path): |
| | | try: |
| | | # Attempt to remove an empty directory |
| | | os.rmdir(path) |
| | | print(f"Empty directory {path} has been deleted.") |
| | | except OSError: |
| | | # If the directory is not empty, remove it along with all its contents |
| | | shutil.rmtree(path) |
| | | print(f"Non-empty directory {path} has been recursively deleted.") |