kongdeqiang
2026-03-13 28ccfbfc51068a663a80764e14074df5edf2b5ba
funasr/bin/train.py
@@ -1,172 +1,265 @@
import argparse
import logging
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import os
import sys
from io import BytesIO
from collections.abc import Sequence
import torch
import torch.nn as nn
import hydra
from omegaconf import DictConfig, OmegaConf
from funasr.torch_utils.set_all_random_seed import set_all_random_seed
# from funasr.model_class_factory1 import model_choices
from funasr.modules.lora.utils import mark_only_lora_as_trainable
from funasr.optimizers import optim_choices
from funasr.schedulers import scheduler_choices
from funasr.torch_utils.load_pretrained_model import load_pretrained_model
from funasr.torch_utils.initialize import initialize
from funasr.datasets.data_sampler import BatchSampler
# from funasr.tokenizer.build_tokenizer import build_tokenizer
# from funasr.tokenizer.token_id_converter import TokenIDConverter
from funasr.tokenizer.funtoken import build_tokenizer
from funasr.datasets.dataset_jsonl import AudioDataset
from funasr.utils.trainer import Trainer
# from funasr.utils.load_fr_py import load_class_from_path
from funasr.utils.dynamic_import import dynamic_import
import logging
import time
import argparse
from io import BytesIO
from contextlib import nullcontext
import torch.distributed as dist
from omegaconf import DictConfig, OmegaConf
from torch.cuda.amp import autocast, GradScaler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
from funasr.utils.download_from_hub import download_model
from torch.distributed.algorithms.join import Join
from torch.distributed.fsdp.sharded_grad_scaler import ShardedGradScaler
from tensorboardX import SummaryWriter
from funasr.train_utils.average_nbest_models import average_checkpoints
from funasr.register import tables
from funasr.optimizers import optim_classes
from funasr.train_utils.trainer import Trainer
from funasr.schedulers import scheduler_classes
from funasr.train_utils.initialize import initialize
from funasr.download.download_model_from_hub import download_model
from funasr.models.lora.utils import mark_only_lora_as_trainable
from funasr.train_utils.set_all_random_seed import set_all_random_seed
from funasr.train_utils.load_pretrained_model import load_pretrained_model
from funasr.utils.misc import prepare_model_dir
from funasr.train_utils.model_summary import model_summary
from funasr import AutoModel
@hydra.main(config_name=None, version_base=None)
def main_hydra(kwargs: DictConfig):
   import pdb; pdb.set_trace()
   if ":" in kwargs["model"]:
      logging.info("download models from model hub: {}".format(kwargs.get("model_hub", "ms")))
      kwargs = download_model(is_training=kwargs.get("is_training", True), **kwargs)
   import pdb;
   pdb.set_trace()
   main(**kwargs)
    if kwargs.get("debug", False):
        import pdb
        pdb.set_trace()
    assert "model" in kwargs
    if "model_conf" not in kwargs:
        logging.info("download models from model hub: {}".format(kwargs.get("hub", "ms")))
        kwargs = download_model(is_training=kwargs.get("is_training", True), **kwargs)
    main(**kwargs)
def main(**kwargs):
   # preprocess_config(kwargs)
   # import pdb; pdb.set_trace()
   # set random seed
   set_all_random_seed(kwargs.get("seed", 0))
   torch.backends.cudnn.enabled = kwargs.get("cudnn_enabled", torch.backends.cudnn.enabled)
   torch.backends.cudnn.benchmark = kwargs.get("cudnn_benchmark", torch.backends.cudnn.benchmark)
   torch.backends.cudnn.deterministic = kwargs.get("cudnn_deterministic", True)
   local_rank = int(os.environ.get('LOCAL_RANK', 0))
   # Check if we are using DDP or FSDP
   use_ddp = 'WORLD_SIZE' in os.environ and int(os.environ["WORLD_SIZE"]) > 1
   use_fsdp = kwargs.get("use_fsdp", None)
   if use_ddp or use_fsdp:
      dist.init_process_group(backend=kwargs.get("backend", "nccl"), init_method='env://')
      torch.cuda.set_device(local_rank)
   # build_tokenizer
   tokenizer = build_tokenizer(
      token_type=kwargs.get("token_type", "char"),
      bpemodel=kwargs.get("bpemodel", None),
      delimiter=kwargs.get("delimiter", None),
      space_symbol=kwargs.get("space_symbol", "<space>"),
      non_linguistic_symbols=kwargs.get("non_linguistic_symbols", None),
      g2p_type=kwargs.get("g2p_type", None),
      token_list=kwargs.get("token_list", None),
      unk_symbol=kwargs.get("unk_symbol", "<unk>"),
   )
   # import pdb;
   # pdb.set_trace()
   # build model
   # model_class = model_choices.get_class(kwargs.get("model", "asr"))
   # model_class = load_class_from_path(kwargs.get("model").split(":"))
   model_class = dynamic_import(kwargs.get("model"))
   model = model_class(**kwargs, **kwargs["model_conf"], vocab_size=len(tokenizer.token_list))
   frontend = model.frontend
   # init_param
   init_param = kwargs.get("init_param", None)
   if init_param is not None:
      if not isinstance(init_param, Sequence):
         init_param = (init_param,)
      logging.info("init_param is not None: %s", init_param)
      for p in init_param:
         logging.info(f"Loading pretrained params from {p}")
         load_pretrained_model(
            model=model,
            init_param=p,
            ignore_init_mismatch=kwargs.get("ignore_init_mismatch", True),
            oss_bucket=kwargs.get("oss_bucket", None),
         )
   else:
      initialize(model, kwargs.get("init", "kaiming_normal"))
   # import pdb;
   # pdb.set_trace()
   # freeze_param
   freeze_param = kwargs.get("freeze_param", None)
   if freeze_param is not None:
      freeze_param = eval(freeze_param)
      if isinstance(freeze_param, Sequence):
         freeze_param = (freeze_param,)
      logging.info("freeze_param is not None: %s", freeze_param)
      for t in freeze_param:
         for k, p in model.named_parameters():
            if k.startswith(t + ".") or k == t:
               logging.info(f"Setting {k}.requires_grad = False")
               p.requires_grad = False
    # set random seed
    set_all_random_seed(kwargs.get("seed", 0))
    torch.backends.cudnn.enabled = kwargs.get("cudnn_enabled", torch.backends.cudnn.enabled)
    torch.backends.cudnn.benchmark = kwargs.get("cudnn_benchmark", torch.backends.cudnn.benchmark)
    torch.backends.cudnn.deterministic = kwargs.get("cudnn_deterministic", True)
    # open tf32
    torch.backends.cuda.matmul.allow_tf32 = kwargs.get("enable_tf32", True)
   if use_ddp:
      model = model.cuda(local_rank)
      model = DDP(model, device_ids=[local_rank],
                  find_unused_parameters=kwargs.get("train_conf", {}).get("find_unused_parameters", False))
   elif use_fsdp:
      model = FSDP(model).cuda(local_rank)
   else:
      model = model.to(device=kwargs.get("device", "cuda"))
   # optim
   optim = kwargs.get("optim", "adam")
   assert optim in optim_choices
   optim_class = optim_choices.get(optim)
   optim = optim_class(model.parameters(), **kwargs.get("optim_conf"))
   # scheduler
   scheduler = kwargs.get("scheduler", "warmuplr")
   assert scheduler in scheduler_choices
   scheduler_class = scheduler_choices.get(scheduler)
   scheduler = scheduler_class(optim, **kwargs.get("scheduler_conf"))
    local_rank = int(os.environ.get("LOCAL_RANK", 0))
    if local_rank == 0:
        tables.print()
    # Check if we are using DDP or FSDP
    use_ddp = "WORLD_SIZE" in os.environ and int(os.environ["WORLD_SIZE"]) > 1
    use_fsdp = kwargs.get("use_fsdp", False)
    # use_ddp = False if use_fsdp else use_fsdp
    if use_ddp or use_fsdp:
        dist.init_process_group(backend=kwargs.get("backend", "nccl"), init_method="env://")
        torch.cuda.set_device(local_rank)
    logging.info("Build model, frontend, tokenizer")
    device = kwargs.get("device", "cuda")
    kwargs["device"] = "cpu"
    model = AutoModel(**kwargs)
   # dataset
   dataset_tr = AudioDataset(kwargs.get("train_data_set_list"), frontend=frontend, tokenizer=tokenizer, **kwargs.get("dataset_conf"))
    # save config.yaml
    if (
        (use_ddp or use_fsdp)
        and dist.get_rank() == 0
        or not (use_ddp or use_fsdp)
        and local_rank == 0
    ):
        prepare_model_dir(**kwargs)
   # dataloader
   batch_sampler = BatchSampler(dataset_tr, **kwargs.get("dataset_conf"), **kwargs.get("dataset_conf").get("batch_conf"))
   dataloader_tr = torch.utils.data.DataLoader(dataset_tr,
                                               collate_fn=dataset_tr.collator,
                                               batch_sampler=batch_sampler,
                                               num_workers=kwargs.get("dataset_conf").get("num_workers", 4),
                                               pin_memory=True)
   if (use_ddp or use_fsdp) and dist.get_rank() == 0 or not (use_ddp or use_fsdp) and local_rank == 0:
      os.makedirs(kwargs.get("output_dir", "./"), exist_ok=True)
      yaml_file = os.path.join(kwargs.get("output_dir", "./"), "config.yaml")
      OmegaConf.save(config=kwargs, f=yaml_file)
      logging.info("config.yaml is saved to: %s", yaml_file)
   trainer = Trainer(
       model=model,
       optim=optim,
       scheduler=scheduler,
       dataloader_train=dataloader_tr,
       dataloader_val=None,
      local_rank=local_rank,
      use_ddp=use_ddp,
      use_fsdp=use_fsdp,
      **kwargs.get("train_conf"),
   )
   trainer.run()
   if use_ddp or use_fsdp:
      torch.distributed.destroy_process_group()
    # parse kwargs
    kwargs = model.kwargs
    kwargs["device"] = device
    tokenizer = kwargs["tokenizer"]
    frontend = kwargs["frontend"]
    model = model.model
    del kwargs["model"]
    # freeze_param
    freeze_param = kwargs.get("freeze_param", None)
    if freeze_param is not None:
        if "," in freeze_param:
            freeze_param = eval(freeze_param)
        if not isinstance(freeze_param, (list, tuple)):
            freeze_param = (freeze_param,)
        logging.info("freeze_param is not None: %s", freeze_param)
        for t in freeze_param:
            for k, p in model.named_parameters():
                if k.startswith(t + ".") or k == t:
                    logging.info(f"Setting {k}.requires_grad = False")
                    p.requires_grad = False
    if local_rank == 0:
        logging.info(f"{model_summary(model)}")
    if use_ddp:
        model = model.cuda(local_rank)
        model = DDP(
            model,
            device_ids=[local_rank],
            find_unused_parameters=kwargs.get("train_conf", {}).get(
                "find_unused_parameters", False
            ),
        )
    elif use_fsdp:
        # model = FSDP(model).cuda(local_rank)
        def custom_auto_wrap_policy(
            module: nn.Module,
            recurse: bool,
            nonwrapped_numel: int,
            # Additional custom arguments
            min_num_params: int = int(1e8),
        ) -> bool:
            # 根据自定义逻辑决定是否包装模块
            is_large = unwrapped_params >= min_num_params
            requires_grad_uniform = len({p.requires_grad for p in module.parameters()}) == 1
            return is_large and requires_grad_uniform
        # Configure a custom `min_num_params`
        my_auto_wrap_policy = functools.partial(custom_auto_wrap_policy, min_num_params=int(1e5))
        torch.cuda.set_device(local_rank)
        model = FSDP(
            model,
            auto_wrap_policy=custom_auto_wrap_policy,
            mixed_precision=None,
            device_id=torch.cuda.current_device(),
        )
    else:
        model = model.to(device=kwargs.get("device", "cuda"))
    kwargs["device"] = next(model.parameters()).device
    # optim
    logging.info("Build optim")
    optim = kwargs.get("optim", "adam")
    assert optim in optim_classes
    optim_class = optim_classes.get(optim)
    optim = optim_class(model.parameters(), **kwargs.get("optim_conf"))
    # scheduler
    logging.info("Build scheduler")
    scheduler = kwargs.get("scheduler", "warmuplr")
    assert scheduler in scheduler_classes
    scheduler_class = scheduler_classes.get(scheduler)
    scheduler = scheduler_class(optim, **kwargs.get("scheduler_conf"))
    # dataset
    logging.info("Build dataloader")
    dataloader_class = tables.dataloader_classes.get(
        kwargs["dataset_conf"].get("dataloader", "DataloaderMapStyle")
    )
    dataloader = dataloader_class(**kwargs)
    # dataloader_tr, dataloader_val = dataloader_class(**kwargs)
    trainer = Trainer(
        local_rank=local_rank,
        use_ddp=use_ddp,
        use_fsdp=use_fsdp,
        device=kwargs["device"],
        output_dir=kwargs.get("output_dir", "./exp"),
        **kwargs.get("train_conf"),
    )
    scaler = GradScaler(enabled=trainer.use_fp16) if trainer.use_fp16 else None
    scaler = ShardedGradScaler(enabled=trainer.use_fp16) if trainer.use_fsdp else scaler
    trainer.resume_checkpoint(
        model=model,
        optim=optim,
        scheduler=scheduler,
        scaler=scaler,
    )
    tensorboard_dir = os.path.join(kwargs.get("output_dir"), "tensorboard")
    os.makedirs(tensorboard_dir, exist_ok=True)
    try:
        writer = SummaryWriter(tensorboard_dir)  # if trainer.rank == 0 else None
    except:
        writer = None
    dataloader_tr, dataloader_val = None, None
    for epoch in range(trainer.start_epoch, trainer.max_epoch):
        time1 = time.perf_counter()
        for data_split_i in range(trainer.start_data_split_i, dataloader.data_split_num):
            time_slice_i = time.perf_counter()
            dataloader_tr, dataloader_val = dataloader.build_iter(
                epoch, data_split_i=data_split_i, start_step=trainer.start_step
            )
            trainer.train_epoch(
                model=model,
                optim=optim,
                scheduler=scheduler,
                scaler=scaler,
                dataloader_train=dataloader_tr,
                dataloader_val=dataloader_val,
                epoch=epoch,
                writer=writer,
                data_split_i=data_split_i,
                data_split_num=dataloader.data_split_num,
                start_step=trainer.start_step,
            )
            trainer.start_step = 0
            device = next(model.parameters()).device
            if device.type == "cuda":
                with torch.cuda.device(device):
                    torch.cuda.empty_cache()
            time_escaped = (time.perf_counter() - time_slice_i) / 3600.0
            logging.info(
                f"rank: {local_rank}, "
                f"time_escaped_epoch: {time_escaped:.3f} hours, "
                f"estimated to finish {dataloader.data_split_num} data_slices, remaining: {dataloader.data_split_num-data_split_i} slices, {(dataloader.data_split_num-data_split_i)*time_escaped:.3f} hours, "
                f"epoch: {trainer.max_epoch - epoch} epochs, {((trainer.max_epoch - epoch - 1)*dataloader.data_split_num + dataloader.data_split_num-data_split_i)*time_escaped:.3f} hours\n"
            )
        trainer.start_data_split_i = 0
        trainer.validate_epoch(
            model=model, dataloader_val=dataloader_val, epoch=epoch + 1, writer=writer
        )
        scheduler.step()
        trainer.step_in_epoch = 0
        trainer.save_checkpoint(
            epoch + 1, model=model, optim=optim, scheduler=scheduler, scaler=scaler
        )
        time2 = time.perf_counter()
        time_escaped = (time2 - time1) / 3600.0
        logging.info(
            f"rank: {local_rank}, "
            f"time_escaped_epoch: {time_escaped:.3f} hours, "
            f"estimated to finish {trainer.max_epoch} "
            f"epoch: {(trainer.max_epoch - epoch) * time_escaped:.3f} hours\n"
        )
        trainer.train_acc_avg = 0.0
        trainer.train_loss_avg = 0.0
    if trainer.rank == 0:
        average_checkpoints(trainer.output_dir, trainer.avg_nbest_model)
    trainer.close()
if __name__ == "__main__":
   main_hydra()
    main_hydra()