zhifu gao
2024-03-27 2297d7515afbf7a081132f11cfc9e225d1c784e3
Dev gzf new (#1553)

* train

* train

* train

* train

* train

* train

* train

* train
7个文件已修改
98 ■■■■■ 已修改文件
docs/tutorial/README_zh.md 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
examples/README_zh.md 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
examples/industrial_data_pretraining/paraformer-zh-spk/README_zh.md 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
examples/industrial_data_pretraining/paraformer/README_zh.md 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
examples/industrial_data_pretraining/paraformer_streaming/README_zh.md 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
funasr/datasets/audio_datasets/samplers.py 70 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
funasr/train_utils/trainer.py 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
docs/tutorial/README_zh.md
@@ -268,7 +268,7 @@
export CUDA_VISIBLE_DEVICES="0,1"
gpu_num=$(echo $CUDA_VISIBLE_DEVICES | awk -F "," '{print NF}')
torchrun --nnodes 2 --node_rank 0 --nproc_per_node ${gpu_num} --master_addr=192.168.1.1 --master_port=12345 \
torchrun --nnodes 2 --node_rank 0 --nproc_per_node ${gpu_num} --master_addr 192.168.1.1 --master_port 12345 \
../../../funasr/bin/train.py ${train_args}
```
在从节点上(假设IP为192.168.1.2),你需要确保MASTER_ADDR和MASTER_PORT环境变量与主节点设置的一致,并运行同样的命令:
@@ -276,7 +276,7 @@
export CUDA_VISIBLE_DEVICES="0,1"
gpu_num=$(echo $CUDA_VISIBLE_DEVICES | awk -F "," '{print NF}')
torchrun --nnodes 2 --node_rank 1 --nproc_per_node ${gpu_num} --master_addr=192.168.1.1 --master_port=12345 \
torchrun --nnodes 2 --node_rank 1 --nproc_per_node ${gpu_num} --master_addr 192.168.1.1 --master_port 12345 \
../../../funasr/bin/train.py ${train_args}
```
examples/README_zh.md
@@ -268,7 +268,7 @@
export CUDA_VISIBLE_DEVICES="0,1"
gpu_num=$(echo $CUDA_VISIBLE_DEVICES | awk -F "," '{print NF}')
torchrun --nnodes 2 --node_rank 0 --nproc_per_node ${gpu_num} --master_addr=192.168.1.1 --master_port=12345 \
torchrun --nnodes 2 --node_rank 0 --nproc_per_node ${gpu_num} --master_addr 192.168.1.1 --master_port 12345 \
../../../funasr/bin/train.py ${train_args}
```
在从节点上(假设IP为192.168.1.2),你需要确保MASTER_ADDR和MASTER_PORT环境变量与主节点设置的一致,并运行同样的命令:
@@ -276,7 +276,7 @@
export CUDA_VISIBLE_DEVICES="0,1"
gpu_num=$(echo $CUDA_VISIBLE_DEVICES | awk -F "," '{print NF}')
torchrun --nnodes 2 --node_rank 1 --nproc_per_node ${gpu_num} --master_addr=192.168.1.1 --master_port=12345 \
torchrun --nnodes 2 --node_rank 1 --nproc_per_node ${gpu_num} --master_addr 192.168.1.1 --master_port 12345 \
../../../funasr/bin/train.py ${train_args}
```
examples/industrial_data_pretraining/paraformer-zh-spk/README_zh.md
@@ -268,7 +268,7 @@
export CUDA_VISIBLE_DEVICES="0,1"
gpu_num=$(echo $CUDA_VISIBLE_DEVICES | awk -F "," '{print NF}')
torchrun --nnodes 2 --node_rank 0 --nproc_per_node ${gpu_num} --master_addr=192.168.1.1 --master_port=12345 \
torchrun --nnodes 2 --node_rank 0 --nproc_per_node ${gpu_num} --master_addr 192.168.1.1 --master_port 12345 \
../../../funasr/bin/train.py ${train_args}
```
在从节点上(假设IP为192.168.1.2),你需要确保MASTER_ADDR和MASTER_PORT环境变量与主节点设置的一致,并运行同样的命令:
@@ -276,7 +276,7 @@
export CUDA_VISIBLE_DEVICES="0,1"
gpu_num=$(echo $CUDA_VISIBLE_DEVICES | awk -F "," '{print NF}')
torchrun --nnodes 2 --node_rank 1 --nproc_per_node ${gpu_num} --master_addr=192.168.1.1 --master_port=12345 \
torchrun --nnodes 2 --node_rank 1 --nproc_per_node ${gpu_num} --master_addr 192.168.1.1 --master_port 12345 \
../../../funasr/bin/train.py ${train_args}
```
examples/industrial_data_pretraining/paraformer/README_zh.md
@@ -268,7 +268,7 @@
export CUDA_VISIBLE_DEVICES="0,1"
gpu_num=$(echo $CUDA_VISIBLE_DEVICES | awk -F "," '{print NF}')
torchrun --nnodes 2 --node_rank 0 --nproc_per_node ${gpu_num} --master_addr=192.168.1.1 --master_port=12345 \
torchrun --nnodes 2 --node_rank 0 --nproc_per_node ${gpu_num} --master_addr 192.168.1.1 --master_port 12345 \
../../../funasr/bin/train.py ${train_args}
```
在从节点上(假设IP为192.168.1.2),你需要确保MASTER_ADDR和MASTER_PORT环境变量与主节点设置的一致,并运行同样的命令:
@@ -276,7 +276,7 @@
export CUDA_VISIBLE_DEVICES="0,1"
gpu_num=$(echo $CUDA_VISIBLE_DEVICES | awk -F "," '{print NF}')
torchrun --nnodes 2 --node_rank 1 --nproc_per_node ${gpu_num} --master_addr=192.168.1.1 --master_port=12345 \
torchrun --nnodes 2 --node_rank 1 --nproc_per_node ${gpu_num} --master_addr 192.168.1.1 --master_port 12345 \
../../../funasr/bin/train.py ${train_args}
```
examples/industrial_data_pretraining/paraformer_streaming/README_zh.md
@@ -268,7 +268,7 @@
export CUDA_VISIBLE_DEVICES="0,1"
gpu_num=$(echo $CUDA_VISIBLE_DEVICES | awk -F "," '{print NF}')
torchrun --nnodes 2 --node_rank 0 --nproc_per_node ${gpu_num} --master_addr=192.168.1.1 --master_port=12345 \
torchrun --nnodes 2 --node_rank 0 --nproc_per_node ${gpu_num} --master_addr 192.168.1.1 --master_port 12345 \
../../../funasr/bin/train.py ${train_args}
```
在从节点上(假设IP为192.168.1.2),你需要确保MASTER_ADDR和MASTER_PORT环境变量与主节点设置的一致,并运行同样的命令:
@@ -276,7 +276,7 @@
export CUDA_VISIBLE_DEVICES="0,1"
gpu_num=$(echo $CUDA_VISIBLE_DEVICES | awk -F "," '{print NF}')
torchrun --nnodes 2 --node_rank 1 --nproc_per_node ${gpu_num} --master_addr=192.168.1.1 --master_port=12345 \
torchrun --nnodes 2 --node_rank 1 --nproc_per_node ${gpu_num} --master_addr 192.168.1.1 --master_port 12345 \
../../../funasr/bin/train.py ${train_args}
```
funasr/datasets/audio_datasets/samplers.py
@@ -23,11 +23,11 @@
        batch_sampler = CustomDistributedBatchSampler(dataset, **kwargs)
        
    else:
        # if kwargs.get("sort_size", -1) > 0:
        #     batch_sampler = CustomDistributedBufferDynamicBatchSampler(dataset, **kwargs)
        # else:
        #     batch_sampler = CustomDistributedDynamicBatchSampler(dataset, **kwargs)
        batch_sampler = CustomDistributedDynamicBatchSampler(dataset, **kwargs)
        if kwargs.get("sort_size", -1) > 0:
            batch_sampler = CustomDistributedBufferDynamicBatchSampler(dataset, **kwargs)
        else:
            batch_sampler = CustomDistributedDynamicBatchSampler(dataset, **kwargs)
        # batch_sampler = CustomDistributedDynamicBatchSampler(dataset, **kwargs)
        
    dataloader_args["batch_sampler"] = batch_sampler
    dataloader_args["num_workers"] = kwargs.get("num_workers", 4)
@@ -244,6 +244,8 @@
        self.total_size = len(self.dataset)
        # self.num_samples = int(math.ceil(self.total_size / self.num_replicas))
        self.epoch = 0
        self.max_token_length = kwargs.get("max_token_length", 2048)
        self.length_scale_source = kwargs.get("length_scale_source", 1.0)
    
    def __iter__(self):
        if self.shuffle:
@@ -262,6 +264,8 @@
        
        for idx in indices:
            sample_length = self.dataset.get_source_len(idx)
            if sample_length > self.max_token_length:
                continue
            potential_batch_length = (max_len_in_batch if sample_length < max_len_in_batch else sample_length) * (
                    len(batch) + 1)
            
@@ -269,12 +273,12 @@
                batch.append(idx)
                if sample_length > max_len_in_batch:
                    max_len_in_batch = sample_length
                    current_batch_length = max_len_in_batch * len(batch)
                    # current_batch_length = max_len_in_batch * len(batch)
            else:
                batches.append(batch)
                batch = [idx]
                max_len_in_batch = sample_length
                current_batch_length = max_len_in_batch
                # current_batch_length = max_len_in_batch
        
        # Add the last batch if it's not empty and we're not dropping it
        if batch and (not self.drop_last or len(batch) * max_len_in_batch == self.batch_size):
@@ -293,6 +297,7 @@
class CustomDistributedBufferDynamicBatchSampler(DistributedSampler):
    def __init__(self, dataset,
                 batch_size,
                 batch_type="token",
                 num_replicas=None,
                 rank=None,
                 shuffle=True,
@@ -312,6 +317,7 @@
        self.num_replicas = num_replicas
        self.dataset = dataset
        self.batch_size = batch_size
        self.batch_type = batch_type
        self.is_training = is_training
        self.shuffle = shuffle and is_training
        self.drop_last = drop_last
@@ -319,42 +325,54 @@
        self.total_size = len(self.dataset)
        # self.num_samples = int(math.ceil(self.total_size / self.num_replicas))
        self.epoch = 0
        self.sort_size = sort_size
        self.sort_size = sort_size * num_replicas
        self.max_token_length = kwargs.get("max_token_length", 2048)
        self.length_scale_source = kwargs.get("length_scale_source", 1.0)
    def __iter__(self):
        if self.shuffle:
            g = torch.Generator()
            g.manual_seed(self.epoch)
            indices = torch.randperm(self.total_size, generator=g).tolist()
            indices = torch.randperm(len(self.dataset), generator=g).tolist()
        else:
            indices = list(range(self.total_size))
        # Distribute indices among replicas
        indices = indices[self.rank:self.total_size:self.num_replicas]
            indices = list(range(len(self.dataset)))
        # Sort indices into buffers
        sorted_buffers = [sorted(indices[i:i + self.sort_size], key=lambda idx: self.dataset.get_source_len(idx)) for i in range(0, len(indices), self.sort_size)]
        batches = []
        for buffer in sorted_buffers:
        # Create sorted buffers and form batches
        buffer_batches = []
        for i in range(0, len(indices), self.sort_size):
            buffer = sorted(indices[i:i + self.sort_size], key=lambda idx: self.dataset.get_source_len(idx))
            batch = []
            max_len_in_batch = 0
            for idx in buffer:
                sample_length = self.dataset.get_source_len(idx)
                original_sample_length = self.dataset.get_source_len(idx)
                if original_sample_length > self.max_sample_length:
                    continue
                sample_length = 1 if self.batch_type == "example" else original_sample_length
                potential_batch_length = max(max_len_in_batch, sample_length) * (len(batch) + 1)
                if potential_batch_length <= self.batch_size:
                    batch.append(idx)
                    max_len_in_batch = max(max_len_in_batch, sample_length)
                else:
                    batches.append(batch)
                    buffer_batches.append(batch)
                    batch = [idx]
                    max_len_in_batch = sample_length
            # Add the last batch if it's not empty and we're not dropping it
            if batch and (not self.drop_last or len(batch) * max_len_in_batch == self.batch_size):
                batches.append(batch)
            if batch:
                buffer_batches.append(batch)
        return iter(batches)
        # Ensure each rank gets the same number of batches, duplicate data if needed
        batches_per_rank = math.ceil(len(buffer_batches) / self.num_replicas)
        total_batches_needed = batches_per_rank * self.num_replicas
        buffer_batches.extend(buffer_batches[:total_batches_needed - len(buffer_batches)])
        # Evenly distribute batches from buffer_batches to each rank
        rank_batches = [[] for _ in range(self.num_replicas)]
        for i, batch in enumerate(buffer_batches):
            rank_batches[i % self.num_replicas].append(batch)
        # Assign all batches for the current rank directly
        final_batches = rank_batches[self.rank]
        return iter(final_batches)
    
    def __len__(self):
funasr/train_utils/trainer.py
@@ -161,17 +161,17 @@
                    self.best_step_or_epoch = ckpt_name
                    best_ckpt = Path(os.path.join(self.output_dir, f'model.pt.best'))
                    torch.save(state, best_ckpt)
                    logging.info(f"Update best acc: {self.val_acc_step_or_eoch[self.best_step_or_epoch]}, {best_ckpt}")
                    logging.info(f"Update best acc: {self.val_acc_step_or_eoch[self.best_step_or_epoch]:.4f}, {best_ckpt}")
                else:
                    logging.info(f"No improvement in acc: {self.val_acc_step_or_eoch[ckpt_name]} < {self.val_acc_step_or_eoch[self.best_step_or_epoch]}")
                    logging.info(f"No improvement in acc: {self.val_acc_step_or_eoch[ckpt_name]:.4f} < {self.val_acc_step_or_eoch[self.best_step_or_epoch]:.4f}")
            elif self.avg_keep_nbest_models_type == "loss":
                if self.val_loss_step_or_eoch[ckpt_name] <= self.val_loss_step_or_eoch[self.best_step_or_epoch]:
                    self.best_step_or_epoch = ckpt_name
                    best_ckpt = Path(os.path.join(self.output_dir, f'model.pt.best'))
                    torch.save(state, best_ckpt)
                    logging.info(f"Update best loss: {self.val_loss_step_or_eoch[self.best_step_or_epoch]}, {best_ckpt}")
                    logging.info(f"Update best loss: {self.val_loss_step_or_eoch[self.best_step_or_epoch]:.4f}, {best_ckpt}")
                else:
                    logging.info(f"No improvement in loss: {self.val_loss_step_or_eoch[ckpt_name]} > {self.val_loss_step_or_eoch[self.best_step_or_epoch]}")
                    logging.info(f"No improvement in loss: {self.val_loss_step_or_eoch[ckpt_name]:.4f} > {self.val_loss_step_or_eoch[self.best_step_or_epoch]:.4f}")
            else:
                print("Undo")
            self.saved_ckpts[ckpt_name] = getattr(self, f"val_{self.avg_keep_nbest_models_type}_step_or_eoch")[ckpt_name]