speech_asr
2023-04-18 25e28c49b757cd61b63b098bfd67d1b0b1e78e9b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import logging
import os
 
import numpy as np
import torch
 
from funasr.datasets.small_datasets.dataset import ESPnetDataset
from funasr.datasets.small_datasets.preprocessor import build_preprocess
from funasr.samplers.length_batch_sampler import LengthBatchSampler
 
 
def build_dataloader(args, mode="train"):
    preprocess_fn = build_preprocess(args, train=mode == "train")
    dest_sample_rate = args.frontend_conf["fs"] if (
            args.frontend_conf is not None and "fs" in args.frontend_conf) else 16000
    if mode == "train":
        data_path_and_name_and_type = args.train_data_path_and_name_and_type
        shape_files = args.train_shape_file
    elif mode == "valid":
        data_path_and_name_and_type = args.valid_data_path_and_name_and_type
        shape_files = args.valid_shape_file
    else:
        raise NotImplementedError(f"mode={mode}")
    dataset = ESPnetDataset(
        data_path_and_name_and_type,
        preprocess=preprocess_fn,
        dest_sample_rate=dest_sample_rate,
    )
    if os.path.exists(os.path.join(data_path_and_name_and_type[0][0].parent, "utt2category")):
        utt2category_file = os.path.join(data_path_and_name_and_type[0][0].parent, "utt2category")
    else:
        utt2category_file = None
 
    dataset_conf = args.dataset_conf
    batch_sampler = LengthBatchSampler(
        batch_bins=dataset_conf["batch_size"],
        shape_files=shape_files,
        sort_in_batch=dataset_conf["sort_in_batch"] if hasattr(dataset_conf, "sort_in_batch") else "descending",
        sort_batch=dataset_conf["sort_batch"] if hasattr(dataset_conf, "sort_batch") else "ascending",
        drop_last=False,
        padding=True,
    )
 
    batches = list(batch_sampler)
    bs_list = [len(batch) for batch in batches]
    logging.info(f"[{mode}] dataset:\n{dataset}")
    logging.info(f"[{mode}] Batch sampler: {batch_sampler}")
    logging.info(
        f"[{mode}] mini-batch sizes summary: N-batch={len(bs_list)}, "
        f"mean={np.mean(bs_list):.1f}, min={np.min(bs_list)}, max={np.max(bs_list)}"
    )
 
    if args.scheduler == "tri_stage" and mode == "train":
        args.max_update = len(bs_list) * args.max_epoch
        logging.info("Max update: {}".format(args.max_update))
 
    if args.distributed:
        world_size = torch.distributed.get_world_size()
        rank = torch.distributed.get_rank()
        for batch in batches:
            if len(batch) < world_size:
                raise RuntimeError(
                    f"The batch-size must be equal or more than world_size: "
                    f"{len(batch)} < {world_size}"
                )
        batches = [batch[rank::world_size] for batch in batches]