| | |
| | | # dataset |
| | | logging.info("Build dataloader") |
| | | dataloader_class = tables.dataloader_classes.get(kwargs["dataset_conf"].get("dataloader", "DataloaderMapStyle")) |
| | | # dataloader = dataloader_class(**kwargs) |
| | | dataloader_tr, dataloader_val = dataloader_class(**kwargs) |
| | | |
| | | trainer = Trainer(local_rank=local_rank, |
| | | use_ddp=use_ddp, |
| | | use_fsdp=use_fsdp, |
| | |
| | | except: |
| | | writer = None |
| | | |
| | | if use_ddp or use_fsdp: |
| | | context = Join([model]) |
| | | else: |
| | | # if use_ddp or use_fsdp: |
| | | # context = Join([model]) |
| | | # else: |
| | | context = nullcontext() |
| | | |
| | | for epoch in range(trainer.start_epoch, trainer.max_epoch + 1): |
| | | time1 = time.perf_counter() |
| | | with context: |
| | | |
| | | # dataloader_tr, dataloader_val = dataloader.build_iter(epoch) |
| | | trainer.train_epoch( |
| | | model=model, |
| | | optim=optim, |
| | |
| | | def set_epoch(self, epoch): |
| | | self.epoch = epoch |
| | | |
| | | class CustomDistributedDynamicBatchSampler(Sampler): |
| | | class CustomDistributedDynamicBatchSampler(DistributedSampler): |
| | | def __init__(self, dataset, |
| | | batch_size, |
| | | num_replicas=None, |
| | |
| | | |
| | | return dataloader_tr, dataloader_val |
| | | |
| | | # @tables.register("dataloader_classes", "DataloaderMapStyle") |
| | | class DataloaderMapStyle: |
| | | def __init__(self, frontend=None, tokenizer=None, **kwargs): |
| | | # dataset |
| | | logging.info("Build dataloader") |
| | | dataset_class = tables.dataset_classes.get(kwargs.get("dataset", "AudioDataset")) |
| | | dataset_tr = dataset_class(kwargs.get("train_data_set_list"), frontend=frontend, tokenizer=tokenizer, |
| | | is_training=True, **kwargs.get("dataset_conf")) |
| | | dataset_val = dataset_class(kwargs.get("valid_data_set_list"), frontend=frontend, tokenizer=tokenizer, |
| | | is_training=False, **kwargs.get("dataset_conf")) |
| | | |
| | | self.dataset_tr = dataset_tr |
| | | self.dataset_val = dataset_val |
| | | self.kwargs = kwargs |
| | | |
| | | def build_iter(self, epoch=0): |
| | | # dataloader |
| | | batch_sampler = self.kwargs["dataset_conf"].get("batch_sampler", "BatchSampler") |
| | | batch_sampler_val = None |
| | | if batch_sampler is not None: |
| | | batch_sampler_class = tables.batch_sampler_classes.get(batch_sampler) |
| | | batch_sampler = batch_sampler_class(self.dataset_tr, **self.kwargs.get("dataset_conf")) |
| | | batch_sampler_val = batch_sampler_class(self.dataset_val, is_training=False, **self.kwargs.get("dataset_conf")) |
| | | |
| | | batch_sampler["batch_sampler"].set_epoch(epoch) |
| | | batch_sampler_val.set_epoch(epohc) |
| | | dataloader_tr = torch.utils.data.DataLoader(self.dataset_tr, collate_fn=self.dataset_tr.collator, **batch_sampler) |
| | | dataloader_val = torch.utils.data.DataLoader(self.dataset_val, collate_fn=self.dataset_val.collator, **batch_sampler_val) |
| | | |
| | | return dataloader_tr, dataloader_val |
| | | |
| | | |
| | | @tables.register("dataloader_classes", "DataloaderIterable") |
| | | def DataloaderIterable(frontend=None, tokenizer=None, **kwargs): |
| | |
| | | speed_stats = {} |
| | | time5 = time.perf_counter() |
| | | iterator_stop = torch.tensor(0).to(self.device) |
| | | dist.barrier() |
| | | print(f"before iter, iterator_stop: {iterator_stop}\n") |
| | | dataloader_train.batch_sampler.set_epoch(epoch) |
| | | for batch_idx, batch in enumerate(dataloader_train): |
| | | if self.use_ddp or self.use_fsdp: |
| | | dist.all_reduce(iterator_stop, dist.ReduceOp.SUM) |
| | |
| | | speed_stats = {} |
| | | time5 = time.perf_counter() |
| | | iterator_stop = torch.tensor(0).to(self.device) |
| | | dist.barrier() |
| | | print(f"before iter, iterator_stop: {iterator_stop}\n") |
| | | for batch_idx, batch in enumerate(dataloader_val): |
| | | if self.use_ddp or self.use_fsdp: |
| | | dist.all_reduce(iterator_stop, dist.ReduceOp.SUM) |
| | | if epoch >= 1: |
| | | print(f"iterator_stop: {iterator_stop}\n") |
| | | if iterator_stop > 0: |
| | | break |
| | | time1 = time.perf_counter() |