From 05d4176e88bdde0ce615cb22daf7af725a496020 Mon Sep 17 00:00:00 2001
From: speech_asr <wangjiaming.wjm@alibaba-inc.com>
Date: 星期二, 18 四月 2023 19:28:33 +0800
Subject: [PATCH] update

---
 funasr/datasets/small_datasets/build_dataloader.py      |   26 ++++
 funasr/datasets/small_datasets/sequence_iter_factory.py |  189 +++++++++++++++++++++++++++++++
 funasr/utils/build_dataloader.py                        |    5 
 funasr/datasets/small_datasets/collate_fn.py            |   93 +++++++++++++++
 4 files changed, 309 insertions(+), 4 deletions(-)

diff --git a/funasr/datasets/small_datasets/build_loader.py b/funasr/datasets/small_datasets/build_dataloader.py
similarity index 78%
rename from funasr/datasets/small_datasets/build_loader.py
rename to funasr/datasets/small_datasets/build_dataloader.py
index a7181a4..8b2db47 100644
--- a/funasr/datasets/small_datasets/build_loader.py
+++ b/funasr/datasets/small_datasets/build_dataloader.py
@@ -1,16 +1,26 @@
 import logging
-import os
 
 import numpy as np
 import torch
 
+from funasr.datasets.small_datasets.collate_fn import CommonCollateFn
 from funasr.datasets.small_datasets.dataset import ESPnetDataset
-from funasr.datasets.small_datasets.preprocessor import build_preprocess
 from funasr.datasets.small_datasets.length_batch_sampler import LengthBatchSampler
+from funasr.datasets.small_datasets.preprocessor import build_preprocess
+from funasr.datasets.small_datasets.sequence_iter_factory import SequenceIterFactory
 
 
 def build_dataloader(args, mode="train"):
+    # preprocess
     preprocess_fn = build_preprocess(args, train=mode == "train")
+
+    # collate
+    if args.task_name in ["punc", "lm"]:
+        collate_fn = CommonCollateFn(int_pad_value=0)
+    else:
+        collate_fn = CommonCollateFn(float_pad_value=0.0, int_pad_value=-1)
+
+    # dataset
     dest_sample_rate = args.frontend_conf["fs"] if (
             args.frontend_conf is not None and "fs" in args.frontend_conf) else 16000
     if mode == "train":
@@ -27,6 +37,7 @@
         dest_sample_rate=dest_sample_rate,
     )
 
+    # sampler
     dataset_conf = args.dataset_conf
     batch_sampler = LengthBatchSampler(
         batch_bins=dataset_conf["batch_size"],
@@ -60,3 +71,14 @@
                     f"{len(batch)} < {world_size}"
                 )
         batches = [batch[rank::world_size] for batch in batches]
+
+    # dataloader
+    return SequenceIterFactory(
+        dataset=dataset,
+        batches=batches,
+        seed=args.seed,
+        shuffle=mode == "train",
+        num_workers=args.num_workers,
+        collate_fn=collate_fn,
+        pin_memory=args.ngpu > 0,
+    )
diff --git a/funasr/datasets/small_datasets/collate_fn.py b/funasr/datasets/small_datasets/collate_fn.py
new file mode 100644
index 0000000..573f581
--- /dev/null
+++ b/funasr/datasets/small_datasets/collate_fn.py
@@ -0,0 +1,93 @@
+from typing import Collection
+from typing import Dict
+from typing import List
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import torch
+from typeguard import check_argument_types
+from typeguard import check_return_type
+
+from funasr.modules.nets_utils import pad_list
+
+
+class CommonCollateFn:
+    """Functor class of common_collate_fn()"""
+
+    def __init__(
+            self,
+            float_pad_value: Union[float, int] = 0.0,
+            int_pad_value: int = -32768,
+            not_sequence: Collection[str] = (),
+            max_sample_size=None
+    ):
+        assert check_argument_types()
+        self.float_pad_value = float_pad_value
+        self.int_pad_value = int_pad_value
+        self.not_sequence = set(not_sequence)
+        self.max_sample_size = max_sample_size
+
+    def __repr__(self):
+        return (
+            f"{self.__class__}(float_pad_value={self.float_pad_value}, "
+            f"int_pad_value={self.float_pad_value})"
+        )
+
+    def __call__(
+            self, data: Collection[Tuple[str, Dict[str, np.ndarray]]]
+    ) -> Tuple[List[str], Dict[str, torch.Tensor]]:
+        return common_collate_fn(
+            data,
+            float_pad_value=self.float_pad_value,
+            int_pad_value=self.int_pad_value,
+            not_sequence=self.not_sequence,
+        )
+
+
+def common_collate_fn(
+        data: Collection[Tuple[str, Dict[str, np.ndarray]]],
+        float_pad_value: Union[float, int] = 0.0,
+        int_pad_value: int = -32768,
+        not_sequence: Collection[str] = (),
+) -> Tuple[List[str], Dict[str, torch.Tensor]]:
+    """Concatenate ndarray-list to an array and convert to torch.Tensor.
+    """
+    assert check_argument_types()
+    uttids = [u for u, _ in data]
+    data = [d for _, d in data]
+
+    assert all(set(data[0]) == set(d) for d in data), "dict-keys mismatching"
+    assert all(
+        not k.endswith("_lengths") for k in data[0]
+    ), f"*_lengths is reserved: {list(data[0])}"
+
+    output = {}
+    for key in data[0]:
+        if data[0][key].dtype.kind == "i":
+            pad_value = int_pad_value
+        else:
+            pad_value = float_pad_value
+
+        array_list = [d[key] for d in data]
+        tensor_list = [torch.from_numpy(a) for a in array_list]
+        tensor = pad_list(tensor_list, pad_value)
+        output[key] = tensor
+
+        if key not in not_sequence:
+            lens = torch.tensor([d[key].shape[0] for d in data], dtype=torch.long)
+            output[key + "_lengths"] = lens
+
+    output = (uttids, output)
+    assert check_return_type(output)
+    return output
+
+def crop_to_max_size(feature, target_size):
+    size = len(feature)
+    diff = size - target_size
+    if diff <= 0:
+        return feature
+
+    start = np.random.randint(0, diff + 1)
+    end = size - diff + start
+    return feature[start:end]
\ No newline at end of file
diff --git a/funasr/datasets/small_datasets/sequence_iter_factory.py b/funasr/datasets/small_datasets/sequence_iter_factory.py
new file mode 100644
index 0000000..c69b2ba
--- /dev/null
+++ b/funasr/datasets/small_datasets/sequence_iter_factory.py
@@ -0,0 +1,189 @@
+import logging
+
+import numpy as np
+import torch
+from torch.utils.data import DataLoader
+
+from funasr.datasets.small_datasets.collate_fn import CommonCollateFn
+from funasr.datasets.small_datasets.dataset import ESPnetDataset
+from funasr.datasets.small_datasets.length_batch_sampler import LengthBatchSampler
+from funasr.datasets.small_datasets.preprocessor import build_preprocess
+from funasr.iterators.abs_iter_factory import AbsIterFactory
+from funasr.samplers.abs_sampler import AbsSampler
+
+
+class RawSampler(AbsSampler):
+    def __init__(self, batches):
+        self.batches = batches
+
+    def __len__(self):
+        return len(self.batches)
+
+    def __iter__(self):
+        return iter(self.batches)
+
+    def generate(self, seed):
+        return list(self.batches)
+
+
+class SequenceIterFactory(AbsIterFactory):
+    """Build iterator for each epoch.
+
+
+    """
+
+    def __init__(self, args, mode="train"):
+
+        # preprocess
+        preprocess_fn = build_preprocess(args, train=mode == "train")
+
+        # collate
+        if args.task_name in ["punc", "lm"]:
+            collate_fn = CommonCollateFn(int_pad_value=0)
+        else:
+            collate_fn = CommonCollateFn(float_pad_value=0.0, int_pad_value=-1)
+
+        # dataset
+        dest_sample_rate = args.frontend_conf["fs"] if (
+                args.frontend_conf is not None and "fs" in args.frontend_conf) else 16000
+        if mode == "train":
+            data_path_and_name_and_type = args.train_data_path_and_name_and_type
+            shape_files = args.train_shape_file
+        elif mode == "valid":
+            data_path_and_name_and_type = args.valid_data_path_and_name_and_type
+            shape_files = args.valid_shape_file
+        else:
+            raise NotImplementedError(f"mode={mode}")
+        dataset = ESPnetDataset(
+            data_path_and_name_and_type,
+            preprocess=preprocess_fn,
+            dest_sample_rate=dest_sample_rate,
+        )
+
+        # sampler
+        dataset_conf = args.dataset_conf
+        batch_sampler = LengthBatchSampler(
+            batch_bins=dataset_conf["batch_size"],
+            shape_files=shape_files,
+            sort_in_batch=dataset_conf["sort_in_batch"] if hasattr(dataset_conf, "sort_in_batch") else "descending",
+            sort_batch=dataset_conf["sort_batch"] if hasattr(dataset_conf, "sort_batch") else "ascending",
+            drop_last=False,
+            padding=True,
+        )
+
+        batches = list(batch_sampler)
+        bs_list = [len(batch) for batch in batches]
+        logging.info(f"[{mode}] dataset:\n{dataset}")
+        logging.info(f"[{mode}] Batch sampler: {batch_sampler}")
+        logging.info(
+            f"[{mode}] mini-batch sizes summary: N-batch={len(bs_list)}, "
+            f"mean={np.mean(bs_list):.1f}, min={np.min(bs_list)}, max={np.max(bs_list)}"
+        )
+
+        if args.scheduler == "tri_stage" and mode == "train":
+            args.max_update = len(bs_list) * args.max_epoch
+            logging.info("Max update: {}".format(args.max_update))
+
+        if args.distributed:
+            world_size = torch.distributed.get_world_size()
+            rank = torch.distributed.get_rank()
+            for batch in batches:
+                if len(batch) < world_size:
+                    raise RuntimeError(
+                        f"The batch-size must be equal or more than world_size: "
+                        f"{len(batch)} < {world_size}"
+                    )
+            batches = [batch[rank::world_size] for batch in batches]
+
+        if not isinstance(batches, AbsSampler):
+            self.sampler = RawSampler(batches)
+        else:
+            self.sampler = batches
+
+        self.dataset = dataset
+        self.num_iters_per_epoch = None
+        self.shuffle = mode == "train"
+        self.seed = args.seed
+        self.num_workers = args.num_workers
+        self.collate_fn = collate_fn
+        self.pin_memory = args.ngpu > 0
+
+    def build_iter(self, epoch: int, shuffle: bool = None) -> DataLoader:
+        if shuffle is None:
+            shuffle = self.shuffle
+
+        if self.num_iters_per_epoch is not None:
+            N = len(self.sampler)
+            # If corpus size is larger than the num_per_epoch
+            if self.num_iters_per_epoch < N:
+                N = len(self.sampler)
+                real_epoch, offset = divmod(self.num_iters_per_epoch * epoch, N)
+
+                if offset >= self.num_iters_per_epoch:
+                    current_batches = self.sampler.generate(real_epoch + self.seed)
+                    if shuffle:
+                        np.random.RandomState(real_epoch + self.seed).shuffle(
+                            current_batches
+                        )
+                    batches = current_batches[
+                              offset - self.num_iters_per_epoch: offset
+                              ]
+                else:
+                    prev_batches = self.sampler.generate(real_epoch - 1 + self.seed)
+                    current_batches = self.sampler.generate(real_epoch + self.seed)
+                    if shuffle:
+                        np.random.RandomState(real_epoch - 1 + self.seed).shuffle(
+                            prev_batches
+                        )
+                        np.random.RandomState(real_epoch + self.seed).shuffle(
+                            current_batches
+                        )
+                    batches = (
+                            prev_batches[offset - self.num_iters_per_epoch:]
+                            + current_batches[:offset]
+                    )
+
+            # If corpus size is less than the num_per_epoch
+            else:
+                _epoch, _cursor = divmod(self.num_iters_per_epoch * (epoch - 1), N)
+                _remain = self.num_iters_per_epoch
+                batches = []
+                current_batches = self.sampler.generate(_epoch + self.seed)
+                if shuffle:
+                    np.random.RandomState(_epoch + self.seed).shuffle(current_batches)
+                while _remain > 0:
+
+                    _batches = current_batches[_cursor: _cursor + _remain]
+                    batches += _batches
+                    if _cursor + _remain >= N:
+                        _epoch += 1
+                        _cursor = 0
+                        current_batches = self.sampler.generate(_epoch + self.seed)
+                        if shuffle:
+                            np.random.RandomState(_epoch + self.seed).shuffle(
+                                current_batches
+                            )
+                    else:
+                        _cursor = _cursor + _remain
+                    _remain -= len(_batches)
+
+                assert len(batches) == self.num_iters_per_epoch
+
+        else:
+            batches = self.sampler.generate(epoch + self.seed)
+            if shuffle:
+                np.random.RandomState(epoch + self.seed).shuffle(batches)
+
+        # For backward compatibility for pytorch DataLoader
+        if self.collate_fn is not None:
+            kwargs = dict(collate_fn=self.collate_fn)
+        else:
+            kwargs = {}
+
+        return DataLoader(
+            dataset=self.dataset,
+            batch_sampler=batches,
+            num_workers=self.num_workers,
+            pin_memory=self.pin_memory,
+            **kwargs,
+        )
diff --git a/funasr/utils/build_dataloader.py b/funasr/utils/build_dataloader.py
index 59b19ba..b0836fe 100644
--- a/funasr/utils/build_dataloader.py
+++ b/funasr/utils/build_dataloader.py
@@ -1,9 +1,10 @@
 from funasr.datasets.large_datasets.build_dataloader import LargeDataLoader
-
+from funasr.datasets.small_datasets.build_dataloader import build_dataloader
 
 def build_dataloader(args):
     if args.dataset_type == "small":
-        pass
+        train_iter_factory = LargeDataLoader(args, mode="train")
+        valid_iter_factory = LargeDataLoader(args, mode="valid")
     elif args.dataset_type == "large":
         train_iter_factory = LargeDataLoader(args, mode="train")
         valid_iter_factory = LargeDataLoader(args,  mode="valid")

--
Gitblit v1.9.1