diff --git a/autosyl/LyricsAlignment/LICENSE b/autosyl/LyricsAlignment/LICENSE new file mode 100644 index 0000000000000000000000000000000000000000..52ee0f284c95ab6f0b82c36a5e155845cb0a20df --- /dev/null +++ b/autosyl/LyricsAlignment/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2022 Jiawen Huang + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/autosyl/LyricsAlignment/checkpoints/checkpoint_BDR b/autosyl/LyricsAlignment/checkpoints/checkpoint_BDR new file mode 100644 index 0000000000000000000000000000000000000000..d94b94d9bf90621d49d3be8feed1c0f3194611d0 Binary files /dev/null and b/autosyl/LyricsAlignment/checkpoints/checkpoint_BDR differ diff --git a/autosyl/LyricsAlignment/checkpoints/checkpoint_Baseline b/autosyl/LyricsAlignment/checkpoints/checkpoint_Baseline new file mode 100644 index 0000000000000000000000000000000000000000..f547d3e3b5968012e39411ce08552f989a4fa74f Binary files /dev/null and b/autosyl/LyricsAlignment/checkpoints/checkpoint_Baseline differ diff --git a/autosyl/LyricsAlignment/checkpoints/checkpoint_MTL b/autosyl/LyricsAlignment/checkpoints/checkpoint_MTL new file mode 100644 index 0000000000000000000000000000000000000000..5fc7a6ec9ebae369229d215811451f3aa1d71b08 Binary files /dev/null and b/autosyl/LyricsAlignment/checkpoints/checkpoint_MTL differ diff --git a/autosyl/LyricsAlignment/model.py b/autosyl/LyricsAlignment/model.py new file mode 100644 index 0000000000000000000000000000000000000000..33669d6823ea84684f5888c73505f643230070e8 --- /dev/null +++ b/autosyl/LyricsAlignment/model.py @@ -0,0 +1,236 @@ +import torch +import torch.nn as nn +import torch.nn.functional as F +import torchaudio +import warnings + +from utils import notes_to_pc + +# following FFT parameters are designed for a 22.5k sampling rate +sr = 22050 +n_fft = 512 +resolution = 256/22050*3 + +with warnings.catch_warnings(): + warnings.simplefilter("ignore") + train_audio_transforms = nn.Sequential( + torchaudio.transforms.MelSpectrogram(sample_rate=sr, n_mels=128, n_fft=n_fft), + ) + +def data_processing(data): + spectrograms = [] + phones = [] + pcs = [] + input_lengths = [] + phone_lengths = [] + for (waveform, _, _, phone, notes) in data: + waveform = torch.Tensor(waveform) + # convert to Mel + spec = train_audio_transforms(waveform).squeeze(0).transpose(0, 1) # time x n_mels + spectrograms.append(spec) + + # get phoneme list (mapped to integers) + phone = torch.Tensor(phone) + phones.append(phone) + + # get the pitch contour + # the number 3 here and below is due the the maxpooling along the frequency axis + pc = notes_to_pc(notes, resolution, spec.shape[0] // 3) + pcs.append(pc) + + input_lengths.append(spec.shape[0]//3) + phone_lengths.append(len(phone)) + + spectrograms = nn.utils.rnn.pad_sequence(spectrograms, batch_first=True).unsqueeze(1).transpose(2, 3) + phones = nn.utils.rnn.pad_sequence(phones, batch_first=True) + + return spectrograms, phones, input_lengths, phone_lengths, torch.LongTensor(pcs) + +class CNNLayerNorm(nn.Module): + '''Layer normalization built for cnns input''' + + def __init__(self, n_feats): + super(CNNLayerNorm, self).__init__() + self.layer_norm = nn.LayerNorm(n_feats) + + def forward(self, x): + # x (batch, channel, feature, time) + x = x.transpose(2, 3).contiguous() # (batch, channel, time, feature) + x = self.layer_norm(x) + return x.transpose(2, 3).contiguous() # (batch, channel, feature, time) + + +class ResidualCNN(nn.Module): + '''Residual CNN inspired by https://arxiv.org/pdf/1603.05027.pdf + except with layer norm instead of batch norm + ''' + + def __init__(self, in_channels, out_channels, kernel, stride, dropout, n_feats): + super(ResidualCNN, self).__init__() + + self.cnn1 = nn.Conv2d(in_channels, out_channels, kernel, stride, padding=kernel // 2) + self.cnn2 = nn.Conv2d(out_channels, out_channels, kernel, stride, padding=kernel // 2) + self.dropout1 = nn.Dropout(dropout) + self.dropout2 = nn.Dropout(dropout) + self.layer_norm1 = CNNLayerNorm(n_feats) + self.layer_norm2 = CNNLayerNorm(n_feats) + + def forward(self, x): + residual = x # (batch, channel, feature, time) + x = self.layer_norm1(x) + x = F.gelu(x) + x = self.dropout1(x) + x = self.cnn1(x) + x = self.layer_norm2(x) + x = F.gelu(x) + x = self.dropout2(x) + x = self.cnn2(x) + x += residual + return x # (batch, channel, feature, time) + + +class BidirectionalLSTM(nn.Module): + + def __init__(self, rnn_dim, hidden_size, dropout, batch_first): + super(BidirectionalLSTM, self).__init__() + + self.BiLSTM = nn.LSTM( + input_size=rnn_dim, hidden_size=hidden_size, + num_layers=1, batch_first=batch_first, bidirectional=True) + self.dropout = nn.Dropout(dropout) + + def forward(self, x): + x, _ = self.BiLSTM(x) + x = self.dropout(x) + return x + +class AcousticModel(nn.Module): + ''' + The acoustic model: baseline and MTL share the same class, + the only difference is the target dimension of the last fc layer + ''' + + def __init__(self, n_cnn_layers, rnn_dim, n_class, n_feats, stride=1, dropout=0.1): + super(AcousticModel, self).__init__() + + self.n_class = n_class + if isinstance(n_class, int): + target_dim = n_class + else: + target_dim = n_class[0] * n_class[1] + + self.cnn_layers = nn.Sequential( + nn.Conv2d(1, n_feats, 3, stride=stride, padding=3 // 2), + nn.ReLU() + ) + + self.rescnn_layers = nn.Sequential(*[ + ResidualCNN(n_feats, n_feats, kernel=3, stride=1, dropout=dropout, n_feats=128) + for _ in range(n_cnn_layers) + ]) + + self.maxpooling = nn.MaxPool2d(kernel_size=(2, 3)) + self.fully_connected = nn.Linear(n_feats * 64, rnn_dim) + + self.bilstm = nn.Sequential( + BidirectionalLSTM(rnn_dim=rnn_dim, hidden_size=rnn_dim, dropout=dropout, batch_first=True), + BidirectionalLSTM(rnn_dim=rnn_dim * 2, hidden_size=rnn_dim, dropout=dropout, batch_first=False), + BidirectionalLSTM(rnn_dim=rnn_dim * 2, hidden_size=rnn_dim, dropout=dropout, batch_first=False) + ) + + self.classifier = nn.Sequential( + nn.Linear(rnn_dim * 2, target_dim) + ) + + def forward(self, x): + x = self.cnn_layers(x) + x = self.rescnn_layers(x) + x = self.maxpooling(x) + + sizes = x.size() + x = x.view(sizes[0], sizes[1] * sizes[2], sizes[3]) # (batch, feature, time) + x = x.transpose(1, 2) # (batch, time, feature) + x = self.fully_connected(x) + + x = self.bilstm(x) + x = self.classifier(x) + + if isinstance(self.n_class, tuple): + x = x.view(sizes[0], sizes[3], self.n_class[0], self.n_class[1]) + + return x + +class MultiTaskLossWrapper(nn.Module): + def __init__(self): + super(MultiTaskLossWrapper, self).__init__() + + self.criterion_lyrics = nn.CTCLoss(blank=40, zero_infinity=True) + self.criterion_melody = nn.CrossEntropyLoss() + + def forward(self, mat3d, lyrics_gt, melody_gt): + + n_batch, n_frame, n_ch, n_p = mat3d.shape # (batch, time, phone, pitch) + + y_lyrics = torch.sum(mat3d, dim=3) # (batch, time, n_ch) + y_melody = torch.sum(mat3d, dim=2) # (batch, time, n_p) + + y_lyrics = F.log_softmax(y_lyrics, dim=2) + y_lyrics = y_lyrics.transpose(0, 1) # (time, batch, n_ch) reshape for CTC + labels, input_lengths, label_lengths = lyrics_gt + loss_lyrics = self.criterion_lyrics(y_lyrics, labels, input_lengths, label_lengths) + + y_melody = y_melody.transpose(1, 2) # (batch, n_p, time) + loss_melody = self.criterion_melody(y_melody, melody_gt) + + return loss_lyrics, loss_melody + + +class BoundaryDetection(nn.Module): + + def __init__(self, n_cnn_layers, rnn_dim, n_class, n_feats, stride=1, dropout=0.1): + super(BoundaryDetection, self).__init__() + + self.n_class = n_class + + # n residual cnn layers with filter size of 32 + self.cnn_layers = nn.Sequential( + nn.Conv2d(1, n_feats, 3, stride=stride, padding=3 // 2), + nn.ReLU() + ) + + self.rescnn_layers = nn.Sequential(*[ + ResidualCNN(n_feats, n_feats, kernel=3, stride=1, dropout=dropout, n_feats=128) + for _ in range(n_cnn_layers) + ]) + + self.maxpooling = nn.MaxPool2d(kernel_size=(2, 3)) + self.fully_connected = nn.Linear(n_feats * 64, rnn_dim) # add a linear layer + + self.bilstm_layers = nn.Sequential( + BidirectionalLSTM(rnn_dim=rnn_dim, hidden_size=rnn_dim, dropout=dropout, batch_first=True), + BidirectionalLSTM(rnn_dim=rnn_dim * 2, hidden_size=rnn_dim, dropout=dropout, batch_first=False), + BidirectionalLSTM(rnn_dim=rnn_dim * 2, hidden_size=rnn_dim, dropout=dropout, batch_first=False) + ) + + self.classifier = nn.Sequential( + nn.Linear(rnn_dim * 2, n_class) # birnn returns rnn_dim*2 + ) + + def forward(self, x): + x = self.cnn_layers(x) + x = self.rescnn_layers(x) + x = self.maxpooling(x) + + sizes = x.size() + x = x.view(sizes[0], sizes[1] * sizes[2], sizes[3]) # (batch, feature, time) + x = x.transpose(1, 2) # (batch, time, feature) + x = self.fully_connected(x) + + x = self.bilstm_layers(x) + + x = self.classifier(x) + x = x.view(sizes[0], sizes[3], self.n_class) + + x = torch.sigmoid(x) + + return x \ No newline at end of file diff --git a/autosyl/LyricsAlignment/utils.py b/autosyl/LyricsAlignment/utils.py new file mode 100644 index 0000000000000000000000000000000000000000..f8274d0544610604b3c7d1d9f12658d0d88a8bfc --- /dev/null +++ b/autosyl/LyricsAlignment/utils.py @@ -0,0 +1,476 @@ +import os +import soundfile +import torch +import numpy as np +import librosa +import string +import warnings +from g2p_en import G2p + +g2p = G2p() + +phone_dict = ['AA', 'AE', 'AH', 'AO', 'AW', 'AY', 'B', 'CH', 'D', 'DH', 'EH', 'ER', 'EY', 'F', 'G', 'HH', 'IH', 'IY', + 'JH', 'K', 'L', 'M', 'N', 'NG', 'OW', 'OY', 'P', 'R', 'S', 'SH', 'T', 'TH', 'UH', 'UW', 'V', 'W', 'Y', + 'Z', 'ZH', ' '] +phone2int = {phone_dict[i]: i for i in range(len(phone_dict))} + +def my_collate(batch): + audio, targets, seqs = zip(*batch) + audio = np.array(audio) + targets = list(targets) + seqs = list(seqs) + return audio, targets, seqs + +def worker_init_fn(worker_id): + np.random.seed(np.random.get_state()[1][0] + worker_id) + +def find_separated_vocal(fileid): + + pass + +def load(path, sr=22050, mono=True, offset=0., duration=None): + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + y, curr_sr = librosa.load(path, sr=sr, mono=mono, res_type='kaiser_fast', offset=offset, duration=duration) + + if len(y.shape) == 1: + y = y[np.newaxis, :] # (channel, sample) + + return y, curr_sr + +def load_lyrics(lyrics_file): + from string import ascii_lowercase + d = {ascii_lowercase[i]: i for i in range(26)} + d["'"] = 26 + d[" "] = 27 + d["~"] = 28 + + # process raw + with open(lyrics_file + '.raw.txt', 'r') as f: + raw_lines = f.read().splitlines() + raw_lines = ["".join([c for c in line.lower() if c in d.keys()]).strip() for line in raw_lines] + raw_lines = [" ".join(line.split()) for line in raw_lines if len(line) > 0] + # concat + full_lyrics = " ".join(raw_lines) + + # split to words + with open(lyrics_file + '.words.txt', 'r') as f: + words_lines = f.read().splitlines() + idx = [] + last_end = 0 + for i in range(len(words_lines)): + word = words_lines[i] + try: + assert (word[0] in ascii_lowercase) + except: + # print(word) + pass + new_word = "".join([c for c in word.lower() if c in d.keys()]) + offset = full_lyrics[last_end:].find(new_word) + assert (offset >= 0) + assert (new_word == full_lyrics[last_end + offset:last_end + offset + len(new_word)]) + idx.append([last_end + offset, last_end + offset + len(new_word)]) + last_end += offset + len(new_word) + + # beginning of a line + idx_line = [] + last_end = 0 + for i in range(len(raw_lines)): + line = raw_lines[i] + offset = full_lyrics[last_end:].find(line) + assert (offset >= 0) + assert (line == full_lyrics[last_end + offset:last_end + offset + len(line)]) + idx_line.append([last_end + offset, last_end + offset + len(line)]) + last_end += offset + len(line) + + return full_lyrics, words_lines, idx, idx_line, raw_lines + +def write_wav(path, audio, sr): + soundfile.write(path, audio.T, sr, "PCM_16") + +def gen_phone_gt(words, raw_lines): + + # helper function + def getsubidx(x, y): # find y in x + l1, l2 = len(x), len(y) + for i in range(l1 - l2 + 1): + if x[i:i + l2] == y: + return i + words_p = [] + lyrics_p = [] + for word in words: + out = g2p(word) + out = [phone if phone[-1] not in string.digits else phone[:-1] for phone in out] + words_p.append(out) + if len(lyrics_p) > 0: + lyrics_p.append(' ') + lyrics_p += out + + len_words_p = [len(phones) for phones in words_p] + idx_in_full_p = [] + s1 = 0 + s2 = s1 + for l in len_words_p: + s2 = s1 + l + idx_in_full_p.append([s1, s2]) + s1 = s2 + 1 + + # beginning of a line + idx_line_p = [] + last_end = 0 + for i in range(len(raw_lines)): + line = [] + line_phone = [g2p(word) for word in raw_lines[i].split()] + for l in line_phone: + line += l + [' '] + line = line[:-1] + line = [phone if phone[-1] not in string.digits else phone[:-1] for phone in line] + offset = getsubidx(lyrics_p[last_end:], line) + assert (offset >= 0) + assert (line == lyrics_p[last_end + offset:last_end + offset + len(line)]) + idx_line_p.append([last_end + offset, last_end + offset + len(line)]) + last_end += offset + len(line) + + return lyrics_p, words_p, idx_in_full_p, idx_line_p + +class DataParallel(torch.nn.DataParallel): + def __init__(self, module, device_ids=None, output_device=None, dim=0): + super(DataParallel, self).__init__(module, device_ids, output_device, dim) + + def __getattr__(self, name): + try: + return super().__getattr__(name) + except AttributeError: + return getattr(self.module, name) + +def save_model(model, optimizer, state, path): + if isinstance(model, torch.nn.DataParallel): + model = model.module # save state dict of wrapped module + if len(os.path.dirname(path)) > 0 and not os.path.exists(os.path.dirname(path)): + os.makedirs(os.path.dirname(path)) + torch.save({ + 'model_state_dict': model.state_dict(), + 'optimizer_state_dict': optimizer.state_dict(), + 'state': state, + }, path) + +def load_model(model, path, cuda): + if isinstance(model, torch.nn.DataParallel): + model = model.module # load state dict of wrapped module + if cuda: + checkpoint = torch.load(path) + else: + checkpoint = torch.load(path, map_location='cpu') + model.load_state_dict(checkpoint['model_state_dict']) + + if 'state' in checkpoint: + state = checkpoint['state'] + else: + state = {"step": 0, + "worse_epochs": 0, + "epochs": checkpoint['epoch'], + "best_loss": np.Inf} + + return state + +def seed_torch(seed=0): + # random.seed(seed) + np.random.seed(seed) + torch.manual_seed(seed) + torch.cuda.manual_seed(seed) + +def move_data_to_device(x, device): + if 'float' in str(x.dtype): + x = torch.Tensor(x) + elif 'int' in str(x.dtype): + x = torch.LongTensor(x) + else: + return x + + return x.to(device) + +def alignment(song_pred, lyrics, idx): + audio_length, num_class = song_pred.shape + lyrics_int = phone2seq(lyrics) + lyrics_length = len(lyrics_int) + + s = np.zeros((audio_length, 2*lyrics_length+1)) - np.Inf + opt = np.zeros((audio_length, 2*lyrics_length+1)) + + blank = 40 + + # init + s[0][0] = song_pred[0][blank] + # insert eps + for i in np.arange(1, audio_length): + s[i][0] = s[i-1][0] + song_pred[i][blank] + + for j in np.arange(lyrics_length): + if j == 0: + s[j+1][2*j+1] = s[j][2*j] + song_pred[j+1][lyrics_int[j]] + opt[j+1][2*j+1] = 1 # 45 degree + else: + s[j+1][2*j+1] = s[j][2*j-1] + song_pred[j+1][lyrics_int[j]] + opt[j+1][2*j+1] = 2 # 28 degree + + s[j+2][2*j+2] = s[j+1][2*j+1] + song_pred[j+2][blank] + opt[j+2][2*j+2] = 1 # 45 degree + + + for audio_pos in np.arange(2, audio_length): + + for ch_pos in np.arange(1, 2*lyrics_length+1): + + if ch_pos % 2 == 1 and (ch_pos+1)/2 >= audio_pos: + break + if ch_pos % 2 == 0 and ch_pos/2 + 1 >= audio_pos: + break + + if ch_pos % 2 == 1: # ch + ch_idx = int((ch_pos-1)/2) + # cur ch -> ch + a = s[audio_pos-1][ch_pos] + song_pred[audio_pos][lyrics_int[ch_idx]] + # last ch -> ch + b = s[audio_pos-1][ch_pos-2] + song_pred[audio_pos][lyrics_int[ch_idx]] + # eps -> ch + c = s[audio_pos-1][ch_pos-1] + song_pred[audio_pos][lyrics_int[ch_idx]] + if a > b and a > c: + s[audio_pos][ch_pos] = a + opt[audio_pos][ch_pos] = 0 + elif b >= a and b >= c: + s[audio_pos][ch_pos] = b + opt[audio_pos][ch_pos] = 2 + else: + s[audio_pos][ch_pos] = c + opt[audio_pos][ch_pos] = 1 + + if ch_pos % 2 == 0: # eps + # cur ch -> ch + a = s[audio_pos-1][ch_pos] + song_pred[audio_pos][blank] + # eps -> ch + c = s[audio_pos-1][ch_pos-1] + song_pred[audio_pos][blank] + if a > c: + s[audio_pos][ch_pos] = a + opt[audio_pos][ch_pos] = 0 + else: + s[audio_pos][ch_pos] = c + opt[audio_pos][ch_pos] = 1 + + score = s[audio_length-1][2*lyrics_length] + + # retrive optimal path + path = [] + x = audio_length-1 + y = 2*lyrics_length + path.append([x, y]) + while x > 0 or y > 0: + if opt[x][y] == 1: + x -= 1 + y -= 1 + elif opt[x][y] == 2: + x -= 1 + y -= 2 + else: + x -= 1 + path.append([x, y]) + + path = list(reversed(path)) + word_align = [] + path_i = 0 + + word_i = 0 + while word_i < len(idx): + # e.g. "happy day" + # find the first time "h" appears + if path[path_i][1] == 2*idx[word_i][0]+1: + st = path[path_i][0] + # find the first time " " appears after "h" + while path_i < len(path)-1 and (path[path_i][1] != 2*idx[word_i][1]+1): + path_i += 1 + ed = path[path_i][0] + # append + word_align.append([st, ed]) + # move to next word + word_i += 1 + else: + # move to next audio frame + path_i += 1 + + return word_align, score + +def alignment_bdr(song_pred, lyrics, idx, bdr_pred, line_start): + audio_length, num_class = song_pred.shape + lyrics_int = phone2seq(lyrics) + lyrics_length = len(lyrics_int) + + s = np.zeros((audio_length, 2*lyrics_length+1)) - np.Inf + opt = np.zeros((audio_length, 2*lyrics_length+1)) + + blank = 40 + + # init + s[0][0] = song_pred[0][blank] + # insert eps + for i in np.arange(1, audio_length): + s[i][0] = s[i-1][0] + song_pred[i][blank] + + for j in np.arange(lyrics_length): + if j == 0: + s[j+1][2*j+1] = s[j][2*j] + song_pred[j+1][lyrics_int[j]] + opt[j+1][2*j+1] = 1 # 45 degree + else: + s[j+1][2*j+1] = s[j][2*j-1] + song_pred[j+1][lyrics_int[j]] + opt[j+1][2*j+1] = 2 # 28 degree + if j in line_start: + s[j + 1][2 * j + 1] += bdr_pred[j+1] + + s[j+2][2*j+2] = s[j+1][2*j+1] + song_pred[j+2][blank] + opt[j+2][2*j+2] = 1 # 45 degree + + for audio_pos in np.arange(2, audio_length): + + for ch_pos in np.arange(1, 2*lyrics_length+1): + + if ch_pos % 2 == 1 and (ch_pos+1)/2 >= audio_pos: + break + if ch_pos % 2 == 0 and ch_pos/2 + 1 >= audio_pos: + break + + if ch_pos % 2 == 1: # ch + ch_idx = int((ch_pos-1)/2) + # cur ch -> ch + a = s[audio_pos-1][ch_pos] + song_pred[audio_pos][lyrics_int[ch_idx]] + # last ch -> ch + b = s[audio_pos-1][ch_pos-2] + song_pred[audio_pos][lyrics_int[ch_idx]] + # eps -> ch + c = s[audio_pos-1][ch_pos-1] + song_pred[audio_pos][lyrics_int[ch_idx]] + if a > b and a > c: + s[audio_pos][ch_pos] = a + opt[audio_pos][ch_pos] = 0 + elif b >= a and b >= c: + s[audio_pos][ch_pos] = b + opt[audio_pos][ch_pos] = 2 + else: + s[audio_pos][ch_pos] = c + opt[audio_pos][ch_pos] = 1 + + if ch_idx in line_start: + s[audio_pos][ch_pos] += bdr_pred[audio_pos] + + if ch_pos % 2 == 0: # eps + # cur ch -> ch + a = s[audio_pos-1][ch_pos] + song_pred[audio_pos][blank] + # eps -> ch + c = s[audio_pos-1][ch_pos-1] + song_pred[audio_pos][blank] + if a > c: + s[audio_pos][ch_pos] = a + opt[audio_pos][ch_pos] = 0 + else: + s[audio_pos][ch_pos] = c + opt[audio_pos][ch_pos] = 1 + + score = s[audio_length-1][2*lyrics_length] + + # retrive optimal path + path = [] + x = audio_length-1 + y = 2*lyrics_length + path.append([x, y]) + while x > 0 or y > 0: + if opt[x][y] == 1: + x -= 1 + y -= 1 + elif opt[x][y] == 2: + x -= 1 + y -= 2 + else: + x -= 1 + path.append([x, y]) + + path = list(reversed(path)) + word_align = [] + path_i = 0 + + word_i = 0 + while word_i < len(idx): + # e.g. "happy day" + # find the first time "h" appears + if path[path_i][1] == 2*idx[word_i][0]+1: + st = path[path_i][0] + # find the first time " " appears after "h" + while path_i < len(path)-1 and (path[path_i][1] != 2*idx[word_i][1]+1): + path_i += 1 + ed = path[path_i][0] + # append + word_align.append([st, ed]) + # move to next word + word_i += 1 + else: + # move to next audio frame + path_i += 1 + + return word_align, score + +def phone2seq(text): + seq = [] + for c in text: + if c in phone_dict: + idx = phone2int[c] + else: + # print(c) # unknown + idx = 40 + seq.append(idx) + return np.array(seq) + +def ToolFreq2Midi(fInHz, fA4InHz=440): + ''' + source: https://www.audiocontentanalysis.org/code/helper-functions/frequency-to-midi-pitch-conversion-2/ + ''' + def convert_freq2midi_scalar(f, fA4InHz): + + if f <= 0: + return 0 + else: + return (69 + 12 * np.log2(f / fA4InHz)) + + fInHz = np.asarray(fInHz) + if fInHz.ndim == 0: + return convert_freq2midi_scalar(fInHz, fA4InHz) + + midi = np.zeros(fInHz.shape) + for k, f in enumerate(fInHz): + midi[k] = convert_freq2midi_scalar(f, fA4InHz) + + return (midi) + +def notes_to_pc(notes, resolution, total_length): + + pc = np.full(shape=(total_length,), fill_value=46, dtype=np.short) + + for i in np.arange(len(notes[0])): + pitch = notes[0][i] + if pitch == -100: + pc[0:total_length] = pitch + else: + times = np.floor(notes[1][i] / resolution) + st = int(np.max([0, times[0]])) + ed = int(np.min([total_length, times[1]])) + pc[st:ed] = pitch + + return pc + +def voc_to_contour(times, resolution, total_length, smoothing=False): + + contour = np.full(shape=(total_length,), fill_value=0, dtype=np.short) + + for i in np.arange(len(times)): + time = np.floor(times[i] / resolution) + st = int(np.max([0, time[0]])) + ed = int(np.min([total_length, time[1]])) + contour[st:ed] = 1 + + # TODO: add smoothing option + if smoothing: + pass + + return contour \ No newline at end of file diff --git a/autosyl/LyricsAlignment/wrapper.py b/autosyl/LyricsAlignment/wrapper.py new file mode 100644 index 0000000000000000000000000000000000000000..1727822c1806726dfa5bf08fc97829f3e8ea6cd8 --- /dev/null +++ b/autosyl/LyricsAlignment/wrapper.py @@ -0,0 +1,175 @@ +import warnings, librosa +import numpy as np +from time import time +import torch +import torch.nn as nn +import torch.nn.functional as F + +import utils +from model import train_audio_transforms, AcousticModel, BoundaryDetection + +np.random.seed(7) + +def preprocess_from_file(audio_file, lyrics_file, word_file=None): + y, sr = preprocess_audio(audio_file) + + words, lyrics_p, idx_word_p, idx_line_p = preprocess_lyrics(lyrics_file, word_file) + + return y, words, lyrics_p, idx_word_p, idx_line_p + +def align(audio, words, lyrics_p, idx_word_p, idx_line_p, method="Baseline", cuda=True): + + # start timer + t = time() + + # constants + resolution = 256 / 22050 * 3 + alpha = 0.8 + + # decode method + if "BDR" in method: + model_type = method[:-4] + bdr_flag = True + else: + model_type = method + bdr_flag = False + print("Model: {} BDR?: {}".format(model_type, bdr_flag)) + + # prepare acoustic model params + if model_type == "Baseline": + n_class = 41 + elif model_type == "MTL": + n_class = (41, 47) + else: + ValueError("Invalid model type.") + + hparams = { + "n_cnn_layers": 1, + "n_rnn_layers": 3, + "rnn_dim": 256, + "n_class": n_class, + "n_feats": 32, + "stride": 1, + "dropout": 0.1 + } + + device = 'cuda' if (cuda and torch.cuda.is_available()) else 'cpu' + + ac_model = AcousticModel( + hparams['n_cnn_layers'], hparams['rnn_dim'], hparams['n_class'], \ + hparams['n_feats'], hparams['stride'], hparams['dropout'] + ).to(device) + + print("Loading acoustic model from checkpoint...") + state = utils.load_model(ac_model, "./checkpoints/checkpoint_{}".format(model_type), cuda=(device=="gpu")) + ac_model.eval() + + print("Computing phoneme posteriorgram...") + + # reshape input, prepare mel + x = audio.reshape(1, 1, -1) + x = utils.move_data_to_device(x, device) + x = x.squeeze(0) + x = x.squeeze(1) + x = train_audio_transforms.to(device)(x) + x = nn.utils.rnn.pad_sequence(x, batch_first=True).unsqueeze(1) + + # predict + all_outputs = ac_model(x) + if model_type == "MTL": + all_outputs = torch.sum(all_outputs, dim=3) + + all_outputs = F.log_softmax(all_outputs, dim=2) + + batch_num, output_length, num_classes = all_outputs.shape + song_pred = all_outputs.data.cpu().numpy().reshape(-1, num_classes) # total_length, num_classes + total_length = int(audio.shape[1] / 22050 // resolution) + song_pred = song_pred[:total_length, :] + + # smoothing + P_noise = np.random.uniform(low=1e-11, high=1e-10, size=song_pred.shape) + song_pred = np.log(np.exp(song_pred) + P_noise) + + if bdr_flag: + # boundary model: fixed + bdr_hparams = { + "n_cnn_layers": 1, + "rnn_dim": 32, # a smaller rnn dim than acoustic model + "n_class": 1, # binary classification + "n_feats": 32, + "stride": 1, + "dropout": 0.1, + } + + bdr_model = BoundaryDetection( + bdr_hparams['n_cnn_layers'], bdr_hparams['rnn_dim'], bdr_hparams['n_class'], + bdr_hparams['n_feats'], bdr_hparams['stride'], bdr_hparams['dropout'] + ).to(device) + print("Loading BDR model from checkpoint...") + state = utils.load_model(bdr_model, "./checkpoints/checkpoint_BDR", cuda=(device == "gpu")) + bdr_model.eval() + + print("Computing boundary probability curve...") + # get boundary prob curve + bdr_outputs = bdr_model(x).data.cpu().numpy().reshape(-1) + # apply log + bdr_outputs = np.log(bdr_outputs) * alpha + + line_start = [d[0] for d in idx_line_p] + + # start alignment + print("Aligning...It might take a few minutes...") + word_align, score = utils.alignment_bdr(song_pred, lyrics_p, idx_word_p, bdr_outputs, line_start) + else: + # start alignment + print("Aligning...It might take a few minutes...") + word_align, score = utils.alignment(song_pred, lyrics_p, idx_word_p) + + t = time() - t + print("Alignment Score:\t{}\tTime:\t{}".format(score, t)) + + return word_align, words + +def preprocess_audio(audio_file, sr=22050): + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + y, curr_sr = librosa.load(audio_file, sr=sr, mono=True, res_type='kaiser_fast') + + if len(y.shape) == 1: + y = y[np.newaxis, :] # (channel, sample) + + return y, curr_sr + +def preprocess_lyrics(lyrics_file, word_file=None): + from string import ascii_lowercase + d = {ascii_lowercase[i]: i for i in range(26)} + d["'"] = 26 + d[" "] = 27 + d["~"] = 28 + + # process raw + with open(lyrics_file, 'r') as f: + raw_lines = f.read().splitlines() + + raw_lines = ["".join([c for c in line.lower() if c in d.keys()]).strip() for line in raw_lines] + raw_lines = [" ".join(line.split()) for line in raw_lines if len(line) > 0] + # concat + full_lyrics = " ".join(raw_lines) + + if word_file: + with open(word_file) as f: + words_lines = f.read().splitlines() + else: + words_lines = full_lyrics.split() + + lyrics_p, words_p, idx_word_p, idx_line_p = utils.gen_phone_gt(words_lines, raw_lines) + + return words_lines, lyrics_p, idx_word_p, idx_line_p + +def write_csv(pred_file, word_align, words): + resolution = 256 / 22050 * 3 + + with open(pred_file, 'w') as f: + for j in range(len(word_align)): + word_time = word_align[j] + f.write("{},{},{}\n".format(word_time[0] * resolution, word_time[1] * resolution, words[j])) diff --git a/requirements.txt b/requirements.txt index 037cec811b5956b459fe862eae3b0b82dfc61d45..8b063f5e47c50d3149df2cc4787c7f127c576cce 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,4 +10,15 @@ scipy cython mido git+https://github.com/CPJKU/madmom.git -praat-parselmouth \ No newline at end of file +praat-parselmouth +future +musdb +museval +h5py +tqdm +torch>=1.8.0 +torchaudio +tensorboard +sortedcontainers +g2p_en +resampy \ No newline at end of file