From 4e2fe544ae37174a3e09dfcdbbdae5abfe711e53 Mon Sep 17 00:00:00 2001
From: 游雁 <zhifu.gzf@alibaba-inc.com>
Date: 星期三, 05 七月 2023 16:57:21 +0800
Subject: [PATCH] funasr sdk

---
 funasr/datasets/collate_fn.py |   55 ++++++++++++++++++++++++++++++++++++++++++++++++++-----
 1 files changed, 50 insertions(+), 5 deletions(-)

diff --git a/funasr/datasets/collate_fn.py b/funasr/datasets/collate_fn.py
index d52032f..cbc1f0b 100644
--- a/funasr/datasets/collate_fn.py
+++ b/funasr/datasets/collate_fn.py
@@ -6,8 +6,6 @@
 
 import numpy as np
 import torch
-from typeguard import check_argument_types
-from typeguard import check_return_type
 
 from funasr.modules.nets_utils import pad_list
 
@@ -22,7 +20,6 @@
             not_sequence: Collection[str] = (),
             max_sample_size=None
     ):
-        assert check_argument_types()
         self.float_pad_value = float_pad_value
         self.int_pad_value = int_pad_value
         self.not_sequence = set(not_sequence)
@@ -53,7 +50,6 @@
 ) -> Tuple[List[str], Dict[str, torch.Tensor]]:
     """Concatenate ndarray-list to an array and convert to torch.Tensor.
     """
-    assert check_argument_types()
     uttids = [u for u, _ in data]
     data = [d for _, d in data]
 
@@ -79,5 +75,54 @@
             output[key + "_lengths"] = lens
 
     output = (uttids, output)
-    assert check_return_type(output)
+    return output
+
+def crop_to_max_size(feature, target_size):
+    size = len(feature)
+    diff = size - target_size
+    if diff <= 0:
+        return feature
+
+    start = np.random.randint(0, diff + 1)
+    end = size - diff + start
+    return feature[start:end]
+
+
+def clipping_collate_fn(
+        data: Collection[Tuple[str, Dict[str, np.ndarray]]],
+        max_sample_size=None,
+        not_sequence: Collection[str] = (),
+) -> Tuple[List[str], Dict[str, torch.Tensor]]:
+    # mainly for pre-training
+    uttids = [u for u, _ in data]
+    data = [d for _, d in data]
+
+    assert all(set(data[0]) == set(d) for d in data), "dict-keys mismatching"
+    assert all(
+        not k.endswith("_lengths") for k in data[0]
+    ), f"*_lengths is reserved: {list(data[0])}"
+
+    output = {}
+    for key in data[0]:
+        array_list = [d[key] for d in data]
+        tensor_list = [torch.from_numpy(a) for a in array_list]
+        sizes = [len(s) for s in tensor_list]
+        if max_sample_size is None:
+            target_size = min(sizes)
+        else:
+            target_size = min(min(sizes), max_sample_size)
+        tensor = tensor_list[0].new_zeros(len(tensor_list), target_size, tensor_list[0].shape[1])
+        for i, (source, size) in enumerate(zip(tensor_list, sizes)):
+            diff = size - target_size
+            if diff == 0:
+                tensor[i] = source
+            else:
+                tensor[i] = crop_to_max_size(source, target_size)
+        output[key] = tensor
+
+        if key not in not_sequence:
+            lens = torch.tensor([source.shape[0] for source in tensor], dtype=torch.long)
+            output[key + "_lengths"] = lens
+
+    output = (uttids, output)
     return output
\ No newline at end of file

--
Gitblit v1.9.1