游雁
2024-03-19 d29f201e3201bde6a984e436888a2aae877e449f
funasr/bin/train_llm.py
@@ -6,17 +6,22 @@
import torch
import hydra
import logging
import time
import argparse
from io import BytesIO
import torch.distributed as dist
from collections.abc import Sequence
from omegaconf import DictConfig, OmegaConf
from torch.cuda.amp import autocast, GradScaler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
from torch.distributed.fsdp.sharded_grad_scaler import ShardedGradScaler
from funasr.train_utils.average_nbest_models import average_checkpoints
from funasr.register import tables
from funasr.optimizers import optim_classes
from funasr.train_utils.trainer import Trainer
from funasr.train_utils.trainer_llm import Trainer
from funasr.schedulers import scheduler_classes
from funasr.train_utils.initialize import initialize
from funasr.download.download_from_hub import download_model
@@ -26,7 +31,7 @@
# from funasr.tokenizer.build_tokenizer import build_tokenizer
# from funasr.tokenizer.token_id_converter import TokenIDConverter
# from funasr.tokenizer.funtoken import build_tokenizer
from funasr import AutoModel
@hydra.main(config_name=None, version_base=None)
def main_hydra(kwargs: DictConfig):
@@ -60,6 +65,11 @@
    if use_ddp or use_fsdp:
        dist.init_process_group(backend=kwargs.get("backend", "nccl"), init_method='env://')
        torch.cuda.set_device(local_rank)
    device = kwargs.get("device", "cuda")
    kwargs["device"] = "cpu"
    model = AutoModel(**kwargs)
    
    # save config.yaml
    if (use_ddp or use_fsdp) and dist.get_rank() == 0 or not (use_ddp or use_fsdp) and local_rank == 0:
@@ -67,54 +77,14 @@
        yaml_file = os.path.join(kwargs.get("output_dir", "./"), "config.yaml")
        OmegaConf.save(config=kwargs, f=yaml_file)
        logging.info("config.yaml is saved to: %s", yaml_file)
    tokenizer = kwargs.get("tokenizer", None)
    if tokenizer is not None:
        tokenizer_class = tables.tokenizer_classes.get(tokenizer)
        tokenizer = tokenizer_class(**kwargs["tokenizer_conf"])
        kwargs["tokenizer"] = tokenizer
    
    # build frontend if frontend is none None
    frontend = kwargs.get("frontend", None)
    if frontend is not None:
        frontend_class = tables.frontend_classes.get(frontend)
        frontend = frontend_class(**kwargs["frontend_conf"])
        kwargs["frontend"] = frontend
        kwargs["input_size"] = frontend.output_size()
    # build model
    model_class = tables.model_classes.get(kwargs["model"])
    vocab_size = len(tokenizer.token_list) if hasattr(tokenizer, "token_list") else None
    vocab_size = len(tokenizer.get_vocab()) if hasattr(tokenizer, "get_vocab") else vocab_size
    model = model_class(**kwargs, **kwargs["model_conf"], vocab_size=vocab_size)
    # init_param
    init_param = kwargs.get("init_param", None)
    if init_param is not None:
        if not isinstance(init_param, (list, tuple)):
            init_param = (init_param,)
        logging.info("init_param is not None: %s", init_param)
        for p in init_param:
            if os.path.exists(p):
                logging.info(f"Loading pretrained params from {p}")
                load_pretrained_model(
                    model=model,
                    path=p,
                    ignore_init_mismatch=kwargs.get("ignore_init_mismatch", True),
                    oss_bucket=kwargs.get("oss_bucket", None),
                    scope_map=kwargs.get("scope_map", []),
                    excludes=kwargs.get("excludes", None),
                )
            else:
                logging.info(f"Checkpoint does not exist, init randomly: {p}")
    elif kwargs.get("init", None):
        initialize(model, kwargs.get("init", "kaiming_normal"))
    else:
        print("No initialize method")
    # parse kwargs
    kwargs = model.kwargs
    kwargs["device"] = device
    tokenizer = kwargs["tokenizer"]
    frontend = kwargs["frontend"]
    model = model.model
    del kwargs["model"]
    # freeze_param
    freeze_param = kwargs.get("freeze_param", None)
@@ -138,7 +108,8 @@
        model = FSDP(model).cuda(local_rank)
    else:
        model = model.to(device=kwargs.get("device", "cuda"))
    kwargs["device"] = next(model.parameters()).device
        
    # optim
    optim = kwargs.get("optim", "adam")
@@ -165,34 +136,68 @@
        batch_sampler_class = tables.batch_sampler_classes.get(batch_sampler)
        batch_sampler = batch_sampler_class(dataset_tr, **kwargs.get("dataset_conf"))
        batch_sampler_val = batch_sampler_class(dataset_val, is_training=False, **kwargs.get("dataset_conf"))
    dataloader_tr = torch.utils.data.DataLoader(dataset_tr,
                                                collate_fn=dataset_tr.collator,
                                                batch_sampler=batch_sampler,
                                                num_workers=kwargs.get("dataset_conf").get("num_workers", 4),
                                                pin_memory=True)
    dataloader_tr = torch.utils.data.DataLoader(dataset_tr, collate_fn=dataset_tr.collator, **batch_sampler)
    dataloader_val = torch.utils.data.DataLoader(dataset_val, collate_fn=dataset_val.collator, **batch_sampler_val)
    trainer = Trainer(local_rank=local_rank,
                      use_ddp=use_ddp,
                      resume=kwargs.get("resume", True),
                      device=kwargs["device"],
                      **kwargs.get("train_conf"),
                      )
    scaler = GradScaler(enabled=trainer.use_fp16) if trainer.use_fp16 else None
    scaler = ShardedGradScaler(enabled=trainer.use_fp16) if trainer.use_fsdp else scaler
    trainer.resume_checkpoint(model=model, optim=optim, scheduler=scheduler, scaler=scaler)
    tensorboard_dir = os.path.join(kwargs.get("output_dir"), "tensorboard")
    os.makedirs(tensorboard_dir, exist_ok=True)
    try:
        from tensorboardX import SummaryWriter
        writer = SummaryWriter(tensorboard_dir) if trainer.rank == 0 else None
    except:
        writer = None
    
    dataloader_val = torch.utils.data.DataLoader(dataset_val,
                                                collate_fn=dataset_val.collator,
                                                batch_sampler=batch_sampler_val,
                                                num_workers=kwargs.get("dataset_conf").get("num_workers", 4),
                                                pin_memory=True)
    trainer = Trainer(
        model=model,
        optim=optim,
        scheduler=scheduler,
        dataloader_train=dataloader_tr,
        dataloader_val=dataloader_val,
        local_rank=local_rank,
        use_ddp=use_ddp,
        use_fsdp=use_fsdp,
        output_dir=kwargs.get("output_dir", "./exp"),
        resume=kwargs.get("resume", True),
        **kwargs.get("train_conf"),
    )
    trainer.run()
    if use_ddp or use_fsdp:
        torch.distributed.destroy_process_group()
    for epoch in range(trainer.start_epoch, trainer.max_epoch + 1):
        time1 = time.perf_counter()
        trainer.train_epoch(
                            model=model,
                            optim=optim,
                            scheduler=scheduler,
                            scaler=scaler,
                            dataloader_train=dataloader_tr,
                            dataloader_val=dataloader_val,
                            epoch=epoch,
                            writer=writer
                            )
        trainer.validate_epoch(
            model=model,
            dataloader_val=dataloader_val,
            epoch=epoch,
            writer=writer
        )
        trainer.save_checkpoint(epoch, model=model, optim=optim, scheduler=scheduler, scaler=scaler)
        scheduler.step()
        time2 = time.perf_counter()
        time_escaped = (time2 - time1) / 3600.0
        logging.info(
            f"\nrank: {local_rank}, "
            f"time_escaped_epoch: {time_escaped:.3f} hours, "
            f"estimated to finish {trainer.max_epoch} "
            f"epoch: {(trainer.max_epoch - epoch) * time_escaped:.3f} hours\n")
    if trainer.rank == 0:
        average_checkpoints(trainer.output_dir, trainer.avg_nbest_model)
    trainer.close()