| | |
| | | from funasr.models.transformer.layer_norm import LayerNorm |
| | | from funasr.models.encoder.abs_encoder import AbsEncoder |
| | | import math |
| | | from funasr.models.transformer.repeat import repeat |
| | | from funasr.models.transformer.utils.repeat import repeat |
| | | |
| | | |
| | | class EncoderLayer(nn.Module): |
| | | def __init__( |
| | | self, |
| | | input_units, |
| | | num_units, |
| | | kernel_size=3, |
| | | activation="tanh", |
| | | stride=1, |
| | | include_batch_norm=False, |
| | | residual=False |
| | | self, |
| | | input_units, |
| | | num_units, |
| | | kernel_size=3, |
| | | activation="tanh", |
| | | stride=1, |
| | | include_batch_norm=False, |
| | | residual=False, |
| | | ): |
| | | super().__init__() |
| | | left_padding = math.ceil((kernel_size - stride) / 2) |
| | | right_padding = kernel_size - stride - left_padding |
| | | self.conv_padding = nn.ConstantPad1d((left_padding, right_padding), 0.0) |
| | | self.conv1d = nn.Conv1d( |
| | | input_units, |
| | | num_units, |
| | | kernel_size, |
| | | stride, |
| | | input_units, |
| | | num_units, |
| | | kernel_size, |
| | | stride, |
| | | ) |
| | | self.activation = self.get_activation(activation) |
| | | if include_batch_norm: |
| | |
| | | """ |
| | | |
| | | def __init__( |
| | | self, |
| | | num_layers, |
| | | input_units, |
| | | num_units, |
| | | kernel_size=3, |
| | | dropout_rate=0.3, |
| | | position_encoder=None, |
| | | activation='tanh', |
| | | auxiliary_states=True, |
| | | out_units=None, |
| | | out_norm=False, |
| | | out_residual=False, |
| | | include_batchnorm=False, |
| | | regularization_weight=0.0, |
| | | stride=1, |
| | | tf2torch_tensor_name_prefix_torch: str = "speaker_encoder", |
| | | tf2torch_tensor_name_prefix_tf: str = "EAND/speaker_encoder", |
| | | self, |
| | | num_layers, |
| | | input_units, |
| | | num_units, |
| | | kernel_size=3, |
| | | dropout_rate=0.3, |
| | | position_encoder=None, |
| | | activation="tanh", |
| | | auxiliary_states=True, |
| | | out_units=None, |
| | | out_norm=False, |
| | | out_residual=False, |
| | | include_batchnorm=False, |
| | | regularization_weight=0.0, |
| | | stride=1, |
| | | tf2torch_tensor_name_prefix_torch: str = "speaker_encoder", |
| | | tf2torch_tensor_name_prefix_tf: str = "EAND/speaker_encoder", |
| | | ): |
| | | super().__init__() |
| | | self._output_size = num_units |
| | |
| | | activation, |
| | | self.stride[lnum], |
| | | include_batchnorm, |
| | | residual=True if lnum > 0 else False |
| | | ) |
| | | residual=True if lnum > 0 else False, |
| | | ), |
| | | ) |
| | | |
| | | if self.out_units is not None: |
| | |
| | | num_units, |
| | | out_units, |
| | | kernel_size, |
| | | ) |
| | | ) |
| | | |
| | | if self.out_norm: |
| | | self.after_norm = LayerNorm(out_units) |
| | |
| | | outputs = outputs + inputs |
| | | |
| | | return outputs, ilens, None |
| | | |
| | | def gen_tf2torch_map_dict(self): |
| | | tensor_name_prefix_torch = self.tf2torch_tensor_name_prefix_torch |
| | | tensor_name_prefix_tf = self.tf2torch_tensor_name_prefix_tf |
| | | map_dict_local = { |
| | | # torch: conv1d.weight in "out_channel in_channel kernel_size" |
| | | # tf : conv1d.weight in "kernel_size in_channel out_channel" |
| | | # torch: linear.weight in "out_channel in_channel" |
| | | # tf : dense.weight in "in_channel out_channel" |
| | | "{}.cnn_a.0.conv1d.weight".format(tensor_name_prefix_torch): |
| | | {"name": "{}/cnn_a/conv1d/kernel".format(tensor_name_prefix_tf), |
| | | "squeeze": None, |
| | | "transpose": (2, 1, 0), |
| | | }, |
| | | "{}.cnn_a.0.conv1d.bias".format(tensor_name_prefix_torch): |
| | | {"name": "{}/cnn_a/conv1d/bias".format(tensor_name_prefix_tf), |
| | | "squeeze": None, |
| | | "transpose": None, |
| | | }, |
| | | |
| | | "{}.cnn_a.layeridx.conv1d.weight".format(tensor_name_prefix_torch): |
| | | {"name": "{}/cnn_a/conv1d_layeridx/kernel".format(tensor_name_prefix_tf), |
| | | "squeeze": None, |
| | | "transpose": (2, 1, 0), |
| | | }, |
| | | "{}.cnn_a.layeridx.conv1d.bias".format(tensor_name_prefix_torch): |
| | | {"name": "{}/cnn_a/conv1d_layeridx/bias".format(tensor_name_prefix_tf), |
| | | "squeeze": None, |
| | | "transpose": None, |
| | | }, |
| | | } |
| | | if self.out_units is not None: |
| | | # add output layer |
| | | map_dict_local.update({ |
| | | "{}.conv_out.weight".format(tensor_name_prefix_torch): |
| | | {"name": "{}/cnn_a/conv1d_{}/kernel".format(tensor_name_prefix_tf, self.num_layers), |
| | | "squeeze": None, |
| | | "transpose": (2, 1, 0), |
| | | }, # tf: (1, 256, 256) -> torch: (256, 256, 1) |
| | | "{}.conv_out.bias".format(tensor_name_prefix_torch): |
| | | {"name": "{}/cnn_a/conv1d_{}/bias".format(tensor_name_prefix_tf, self.num_layers), |
| | | "squeeze": None, |
| | | "transpose": None, |
| | | }, # tf: (256,) -> torch: (256,) |
| | | }) |
| | | |
| | | return map_dict_local |
| | | |
| | | def convert_tf2torch(self, |
| | | var_dict_tf, |
| | | var_dict_torch, |
| | | ): |
| | | |
| | | map_dict = self.gen_tf2torch_map_dict() |
| | | |
| | | var_dict_torch_update = dict() |
| | | for name in sorted(var_dict_torch.keys(), reverse=False): |
| | | if name.startswith(self.tf2torch_tensor_name_prefix_torch): |
| | | # process special (first and last) layers |
| | | if name in map_dict: |
| | | name_tf = map_dict[name]["name"] |
| | | data_tf = var_dict_tf[name_tf] |
| | | if map_dict[name]["squeeze"] is not None: |
| | | data_tf = np.squeeze(data_tf, axis=map_dict[name]["squeeze"]) |
| | | if map_dict[name]["transpose"] is not None: |
| | | data_tf = np.transpose(data_tf, map_dict[name]["transpose"]) |
| | | data_tf = torch.from_numpy(data_tf).type(torch.float32).to("cpu") |
| | | assert var_dict_torch[name].size() == data_tf.size(), \ |
| | | "{}, {}, {} != {}".format(name, name_tf, |
| | | var_dict_torch[name].size(), data_tf.size()) |
| | | var_dict_torch_update[name] = data_tf |
| | | logging.info("torch tensor: {}, {}, loading from tf tensor: {}, {}".format( |
| | | name, data_tf.size(), name_tf, var_dict_tf[name_tf].shape |
| | | )) |
| | | # process general layers |
| | | else: |
| | | # self.tf2torch_tensor_name_prefix_torch may include ".", solve this case |
| | | names = name.replace(self.tf2torch_tensor_name_prefix_torch, "todo").split('.') |
| | | layeridx = int(names[2]) |
| | | name_q = name.replace(".{}.".format(layeridx), ".layeridx.") |
| | | if name_q in map_dict.keys(): |
| | | name_v = map_dict[name_q]["name"] |
| | | name_tf = name_v.replace("layeridx", "{}".format(layeridx)) |
| | | data_tf = var_dict_tf[name_tf] |
| | | if map_dict[name_q]["squeeze"] is not None: |
| | | data_tf = np.squeeze(data_tf, axis=map_dict[name_q]["squeeze"]) |
| | | if map_dict[name_q]["transpose"] is not None: |
| | | data_tf = np.transpose(data_tf, map_dict[name_q]["transpose"]) |
| | | data_tf = torch.from_numpy(data_tf).type(torch.float32).to("cpu") |
| | | assert var_dict_torch[name].size() == data_tf.size(), \ |
| | | "{}, {}, {} != {}".format(name, name_tf, |
| | | var_dict_torch[name].size(), data_tf.size()) |
| | | var_dict_torch_update[name] = data_tf |
| | | logging.info("torch tensor: {}, {}, loading from tf tensor: {}, {}".format( |
| | | name, data_tf.size(), name_tf, var_dict_tf[name_tf].shape |
| | | )) |
| | | else: |
| | | logging.warning("{} is missed from tf checkpoint".format(name)) |
| | | |
| | | return var_dict_torch_update |
| | | |