#!/usr/bin/env python3
|
|
# Copyright 2019 Hitachi, Ltd. (author: Yusuke Fujita)
|
# Licensed under the MIT license.
|
#
|
# This script generates simulated multi-talker mixtures for diarization
|
#
|
# common/make_mixture.py \
|
# mixture.scp \
|
# data/mixture \
|
# wav/mixture
|
|
|
import argparse
|
import os
|
from funasr.modules.eend_ola.utils import kaldi_data
|
import numpy as np
|
import math
|
import soundfile as sf
|
import json
|
|
parser = argparse.ArgumentParser()
|
parser.add_argument('script',
|
help='list of json')
|
parser.add_argument('out_data_dir',
|
help='output data dir of mixture')
|
parser.add_argument('out_wav_dir',
|
help='output mixture wav files are stored here')
|
parser.add_argument('--rate', type=int, default=16000,
|
help='sampling rate')
|
args = parser.parse_args()
|
|
# open output data files
|
segments_f = open(args.out_data_dir + '/segments', 'w')
|
utt2spk_f = open(args.out_data_dir + '/utt2spk', 'w')
|
wav_scp_f = open(args.out_data_dir + '/wav.scp', 'w')
|
|
# "-R" forces the default random seed for reproducibility
|
resample_cmd = "sox -R -t wav - -t wav - rate {}".format(args.rate)
|
|
for line in open(args.script):
|
recid, jsonstr = line.strip().split(None, 1)
|
indata = json.loads(jsonstr)
|
wavfn = indata['recid']
|
# recid now include out_wav_dir
|
recid = os.path.join(args.out_wav_dir, wavfn).replace('/','_')
|
noise = indata['noise']
|
noise_snr = indata['snr']
|
mixture = []
|
for speaker in indata['speakers']:
|
spkid = speaker['spkid']
|
utts = speaker['utts']
|
intervals = speaker['intervals']
|
rir = speaker['rir']
|
data = []
|
pos = 0
|
for interval, utt in zip(intervals, utts):
|
# append silence interval data
|
silence = np.zeros(int(interval * args.rate))
|
data.append(silence)
|
# utterance is reverberated using room impulse response
|
preprocess = "wav-reverberate --print-args=false " \
|
" --impulse-response={} - -".format(rir)
|
if isinstance(utt, list):
|
rec, st, et = utt
|
st = np.rint(st * args.rate).astype(int)
|
et = np.rint(et * args.rate).astype(int)
|
else:
|
rec = utt
|
st = 0
|
et = None
|
if rir is not None:
|
wav_rxfilename = kaldi_data.process_wav(rec, preprocess)
|
else:
|
wav_rxfilename = rec
|
wav_rxfilename = kaldi_data.process_wav(
|
wav_rxfilename, resample_cmd)
|
speech, _ = kaldi_data.load_wav(wav_rxfilename, st, et)
|
data.append(speech)
|
# calculate start/end position in samples
|
startpos = pos + len(silence)
|
endpos = startpos + len(speech)
|
# write segments and utt2spk
|
uttid = '{}_{}_{:07d}_{:07d}'.format(
|
spkid, recid, int(startpos / args.rate * 100),
|
int(endpos / args.rate * 100))
|
print(uttid, recid,
|
startpos / args.rate, endpos / args.rate, file=segments_f)
|
print(uttid, spkid, file=utt2spk_f)
|
# update position for next utterance
|
pos = endpos
|
data = np.concatenate(data)
|
mixture.append(data)
|
|
# fitting to the maximum-length speaker data, then mix all speakers
|
maxlen = max(len(x) for x in mixture)
|
mixture = [np.pad(x, (0, maxlen - len(x)), 'constant') for x in mixture]
|
mixture = np.sum(mixture, axis=0)
|
# noise is repeated or cutted for fitting to the mixture data length
|
noise_resampled = kaldi_data.process_wav(noise, resample_cmd)
|
noise_data, _ = kaldi_data.load_wav(noise_resampled)
|
if maxlen > len(noise_data):
|
noise_data = np.pad(noise_data, (0, maxlen - len(noise_data)), 'wrap')
|
else:
|
noise_data = noise_data[:maxlen]
|
# noise power is scaled according to selected SNR, then mixed
|
signal_power = np.sum(mixture**2) / len(mixture)
|
noise_power = np.sum(noise_data**2) / len(noise_data)
|
scale = math.sqrt(
|
math.pow(10, - noise_snr / 10) * signal_power / noise_power)
|
mixture += noise_data * scale
|
# output the wav file and write wav.scp
|
outfname = '{}.wav'.format(wavfn)
|
outpath = os.path.join(args.out_wav_dir, outfname)
|
sf.write(outpath, mixture, args.rate)
|
print(recid, os.path.abspath(outpath), file=wav_scp_f)
|
|
wav_scp_f.close()
|
segments_f.close()
|
utt2spk_f.close()
|