Commit eda8bc7f authored by alessio.quaresima's avatar alessio.quaresima
Browse files

SpikeTimit + TIMIT python binding for spectra (librosa)

parent be0511f1
DOC
build
__pycache__
......@@ -19,11 +19,12 @@ n_speakers = 1
repetitions = 75 # amount of times you present the network with each unique stimulus.
silence_time = 0.15 # in seconds
n_features = 10 # number of features combined from input frequencies
words = ["that", "she"]
## Select a subset of the whole dataset.
# In this case I select all the female speakers from
# regional accent 1 that use at least on of the words in their
## I declared these functions because I am don't know how to use the Dataframes quesry properly *_*...
in_words(df_words) = isempty(intersect(Set(df_words),Set(words)))
in_words(df_words) = !isempty(intersect(Set(df_words),Set(words)))
in_dialect(df_dialect) = df_dialect target_dialects
in_gender(df_gender) = occursin(df_gender, target_gender)
......@@ -31,8 +32,9 @@ in_gender(df_gender) = occursin(df_gender, target_gender)
speaker = @where(train,in_dialect.(:dialect), in_gender.(:gender), in_words.(:words))
## Select the inputs
durations, spikes, labels = SpikeTimit.select_inputs(df=single_speaker_train, words=words, samples = samples, n_feat = n_features);
durations, spikes, labels = SpikeTimit.select_inputs(df=speaker, words=words, samples = samples, n_feat = n_features);
##
## Mix them, if you like you can mix differently. Look at the function, it's simple!
all_ft, all_n, words_t, phones_t = SpikeTimit.mix_inputs(;durations=durations, spikes=spikes, labels=labels, repetitions=repetitions, silence_time)
SpikeTimit.convert_to_dt(words_t, 0.1)
......@@ -47,6 +49,6 @@ ph_savepoints, ll = SpikeTimit.get_savepoints(trans= phones_t, n_measure=10)
## Comparing the last firing time, the duration of all words and the
## intervals of the words and phonemes we expect that it's well done!
all_ft[end]
repetitions*(sum(durations)+(silence_time*(length(durations))))-silence_time
input_length = repetitions*(sum(durations)+(silence_time*(length(durations))))-silence_time
words_t.intervals[end]
phones_t.steps[end]
using PyCall
using DataFrames
using DataFramesMeta
using Pandas
cd(@__DIR__)
py"""
import sys
import os
sys.path.insert(0, os.getcwd())
print(sys.path)
"""
TIMIT = pyimport("TIMIT_loader")
pyimport("importlib")."reload"(TIMIT)
path = "/home/cocconat/Documents/Research/phd_project/speech/litwin-kumar_model_thesis/Spike TIMIT"
dataset = TIMIT.create_dataset(joinpath(path,"train"))
spkrinfo, spkrsent = TIMIT_loader.create_spkrdata(path)
# dataset |> Pandas.DataFrame |> DataFrames.DataFrame
##
include("src/SpikeTimit.jl")
#Create the path strings leading to folders in the data set
test_path = joinpath(path, "test");
train_path = joinpath(path, "train");
dict_path = joinpath(path, "DOC/TIMITDIC.TXT");
train = SpikeTimit.create_dataset(;dir= train_path)
test = SpikeTimit.create_dataset(;dir= test_path)
dict = SpikeTimit.create_dictionary(file=dict_path)
##
words = ["that"]
target_dialects = [1]
target_gender = "f" # "fm" "m"
in_words(df_words) = !isempty(intersect(Set(df_words),Set(words)))
in_dialect(df_dialect) = df_dialect target_dialects
in_gender(df_gender) = occursin(df_gender, target_gender)
# this is a DataFrameMeta macro
speaker = @where(train,in_dialect.(:dialect), in_gender.(:gender), in_words.(:words))
speaker.words
words = TIMIT.get_spectra(speaker |> Pandas.DataFrame, target_words=["that"])
##
##
words[1].phones[1].db
##
function py2j_words(words)
jwords = []
for word in words
phs = []
for ph in word.phones
push!(phs,SpikeTimit.Phone(ph.ph, ph.t0, ph.t1, Array{Float64}(ph.db), Matrix{Float64}(ph.osc)))
end
push!(jwords,SpikeTimit.Word(word.word, phs, word.duration, word.t0, word.t1))
end
return jwords
end
py2j_words(words)
from .timit_parser import *
#New Recordings are 22050, but scripts count samples on 16Kz, hence the conversion is:
import librosa
import librosa.display
import sklearn
import numpy as np
import os
def read_file(root,name):
return read_wav(root,name)
def read_wav(root,name):
"""
Read the wav file and return:
name::string
wav::np.array
length::duration
sr::sampling rate
"""
tmp_path = os.path.join(root, name)
tmp_wav, sr = librosa.load(tmp_path)
tmp_wav = normalize_vow(tmp_wav)
t=np.arange(len(tmp_wav))/sr
return name[:-4], tmp_wav, t, sr
def normalize(x, axis=0,amp_range = (-1,1)):
norm = sklearn.preprocessing.minmax_scale(x, feature_range=amp_range,axis=axis)
norm = norm - np.mean(norm[0:8820]) #i.e the first 0.3 seconds if sampling at 22050
norm /= max(abs(norm))
return norm
#it's fundsmentally the same as above but it cut some electronic generated noise from my bad recordings
def normalize_vow(x, axis=0,amp_range = (-1,1)):
norm = normalize(x, axis, amp_range)
return librosa.effects.trim(norm,top_db=10,frame_length=len(norm), hop_length=64)[0]
import pandas as pd
import os
import librosa
import pathlib
from .load_wav import normalize
import numpy as np
from nltk.tokenize import word_tokenize
RATE_FACTOR = 1.378125
SR = 22050
TRANSCRIPT_SR = 16000
vowels = [
"iy",
"ih",
"eh",
"ey",
"ae",
"aa",
"aw",
"ay",
"ah",
"ao",
"oy",
"ow",
"uh",
"uw",
"ux",
"er",
"ax",
"ix",
"axr",
"ax-h"]
def create_spkrdata(path):
spkrsen = pd.read_fwf(os.path.join(path,"DOC/SPKRSENT.TXT"),comment=";")
spkrinfo = pd.read_fwf(os.path.join(path,"DOC/SPKRINFO.TXT"),comment=";")
columns_sent =["ID","SA","SA",'SX', 'SX', 'SX', 'SX', 'SX',"SI","SI","SI"]
columns_info = ['ID',
'Sex',
'DR',
'Use',
'RecDate',
'BirthDate',
'Ht',
'Race',
'Edu',
'Comments']
spkrsen.columns = columns_sent
spkrinfo.columns = columns_info
spkrinfo.ID = spkrinfo.ID.str.lower()
spkrinfo.Sex = spkrinfo.Sex.str.lower()
spkrinfo["index"]= spkrinfo.apply(lambda row: row["Sex"]+row["ID"], axis=1)
spkrinfo.set_index("index", inplace=True)
return spkrinfo, spkrsen
########################
#Create the dataset
########################
def count_dataset(path, dtype="train"):
for (dirname, dirpath, files) in os.walk(path):
for _file in files:
if _file.endswith("TXT"):
S_id = _file.split(".")[0][2:]
S_type = _file.split(".")[0][:2]
if S_type =="SX":
yield 1
def get_dialect(path):
return int(path.split("/")[-3][-1])
# | > x->parse(Int, filter(startswith("dr"), x)[1][end])
def yield_dataset(path):
"""
Get all the sentences in the dataset, by name order
"""
print("Import dataset from: {}".format(path))
for (dirname, dirpath,files) in os.walk(path):
for _file in files:
if _file.endswith("txt"):
S_id = _file.split(".")[0]
Speaker_ID = dirname.split("/")[-1]
path = os.path.join(dirname, _file.split(".")[0])
sentence = read_utterance(path+".txt")
words = word_tokenize(sentence.lower())
gender = Speaker_ID[0]
a, b = get_word_times(path)
words = [[w,t[0],t[1]] for w, t in zip(a,b)]
sentence = a
a,b =get_phones_times(path)
phones = [[w,t[0],t[1]] for w, t in zip(a,b)]
yield Speaker_ID, S_id, get_dialect(path), gender, path, words, phones, sentence
#%%
def create_dataset(path):
rows=[]
for sent in yield_dataset(path):
rows.append(sent)
return pd.DataFrame(rows, columns=["speaker","senID", "dialect", "gender", "path", "words", "phones" , "sentence"])
class Word():
word = ""
t0 = 0.
t1 = 0.
phones = []
duration = 0.
def __init__(self, word, times, phs):
self.word=word
self.phones=phs
self.duration = times[1] - times[0]
self.t0=times[0]
self.t1=times[1]
class Phone():
ph = ""
t0 = 0.
t1 = 0.
osc = None
db = None
def __init__(self, ph, osc, db, times):
self.ph=ph
self.db=db
self.osc=osc
self.t0=times[0]
self.t1=times[1]
#####################################################
def clean_phone(input):
phs = []
for ph in input:
ph = ph.split()[-1].replace("\n","")
if ph in vowels:
ph = "_"
phs.append(ph)
phs= " ".join(phs)
return phs
def get_phone(path, start, end):
path_wav = path + ".wav"
phn = path + ".phn"
phone = np.genfromtxt(phn, dtype=None, delimiter =" ")
selected = phone[start:end]
wav, sr = librosa.load(path_wav)
wav = normalize(wav)
scale_t=len(wav)/sr/phone[-1][1]
tr0 = int(selected[0][0]*RATE_FACTOR)
trf = int(selected[-1][1]*RATE_FACTOR)
audio = wav[tr0: trf]
t=np.arange(len(audio))/sr +tr0/sr
return audio, t, (tr0/sr, trf/sr), selected
def read_utterance(path):
utterance = open(path).read().replace(".","")
return " ".join(utterance.split()[2:])
## Find word
def find_word(word, dataset):
return dataset[[word in sent for sent in dataset.sentence]]
def find_words(words, dataset):
return dataset[[any([word in sent for word in words]) for sent in dataset.sentence]]
def get_word_times(path):
words = []
times = []
for word in open(path + ".wrd").read().strip().split("\n"):
t0, t1, w = word.split()
t0, t1 = float(t0), float(t1)
words.append(w)
times.append([t0, t1])
return words, times
def find_phone(my_ph, dataset):
def get_phones(path):
phone = []
for word in open(path + ".phn").read().strip().split("\n"):
t0, t1, w = word.split()
phone.append(w)
return phone
return dataset[dataset.apply(lambda row: my_ph in get_phones(row.path), axis=1)]
def get_phones_times(path):
phones = []
times = []
for phone in open(path + ".phn").read().strip().split("\n"):
t0, t1, w = phone.split()
t0, t1 = float(t0), float(t1)
phones.append(w)
times.append([t0, t1])
return phones, times
BAE = {"b" : int(np.ceil(1 / np.log2(238.3 / 200.3))), "hop":16, "fmin":200, "n_bins":20} # 4
def happens_within(subinterval, interval):
if subinterval[0] >= interval[0] and subinterval[1] <= interval[1]:
return True
else:
return False
#
BAE = {"b": int(np.ceil(1 / np.log2(238.3 / 200.3))), "hop": 16, "fmin": 200, "n_bins": 20} # 4
def get_spectra(dataframe, target_words=[], cqt_p=BAE):
def scale_times(times, scaling):
t0, t1 = times
return round(t0 * scaling), int(t1 * scaling)
words_list = []
paths = dataframe.path
if isinstance(dataframe.path,str):
paths = [dataframe.path]
print(paths)
for my_path in paths:
oscillogram, sr = librosa.load(my_path + ".wav")
_words, _word_times = get_word_times(my_path)
phone, ph_times = get_phones_times(my_path)
final_time = ph_times[-1][-1]
## Use the BAE encoding
cqt = librosa.cqt(oscillogram, sr=sr, hop_length=cqt_p["hop"], fmin=cqt_p["fmin"], n_bins=cqt_p["n_bins"],
bins_per_octave=cqt_p["b"])
db = librosa.amplitude_to_db(abs(cqt))
## Use the oscillogram as the correct time length
duration = len(oscillogram) / sr
osc_sr = len(oscillogram) / duration
db_sr = cqt.shape[1] / duration
print(final_time/duration, TRANSCRIPT_SR)
# %%
words, word_times = [], []
if target_words:
for w,t in zip(_words, _word_times):
if w in target_words:
words.append(w)
word_times.append(t)
else:
words = _words
word_times = _word_times
ph_times = np.array(ph_times) /TRANSCRIPT_SR
word_times = np.array(word_times) / TRANSCRIPT_SR
for (word, interval) in zip(words, word_times):
phs = []
for (ph, ph_interval) in zip(phone, ph_times):
if happens_within(ph_interval, interval):
t0_db, t1_db = scale_times(ph_interval, db_sr)
t0_osc, t1_osc = scale_times(ph_interval, osc_sr)
phs.append(Phone(ph, oscillogram[t0_osc: t1_osc - 1],db[:, t0_db:t1_db - 1],ph_interval))
words_list.append(Word(word, interval, phs))
return words_list
# def word_spectrum(word, path, cqt_p=BAE):
# def scale_times(times, scaling):
# t0, t1 = times
# return round(t0 * scaling), int(t1 * scaling)
#
# cqt_p = {"b": int(np.ceil(1 / np.log2(238.3 / 200.3))), "hop": 16, "fmin": 200, "n_bins": 20} # 4
#
# words= []
# words_ph = []
# accepted_intervals=[]
#
# oscillogram, sr = librosa.load(path + ".wav")
# words, word_times = get_word_times(path)
# phone, ph_times, final_time = get_phones_times(path)
#
# ## Use the BAE encoding
# cqt = librosa.cqt(oscillogram, sr=sr, hop_length=cqt_p["hop"], fmin=cqt_p["fmin"], n_bins=cqt_p["n_bins"],
# bins_per_octave=cqt_p["b"])
# db = librosa.amplitude_to_db(abs(cqt))
#
# ## Use the oscillogram as the correct time length
# duration = len(oscillogram) / sr
# osc_sr = len(oscillogram) / duration
# db_sr = cqt.shape[1] / duration
#
# ph_times = np.array(ph_times) * duration / final_time
# word_times = np.array(word_times) * duration / final_time
# # %%
#
# for (word, times) in zip(words, word_times):
# t0_db, t1_db = scale_times(times, db_sr)
# t0_osc, t1_osc = scale_times(times, osc_sr)
# words.append(Word(word, db[:, t0_db:t1_db - 1],oscillogram[t0_osc: t1_osc - 1],times))
# accepted_times.append(times)
#
# for intervals in accepted_intervals:
# phs = []
# for (ph, times) in zip(phone, ph_times):
# if
# t0_db, t1_db = scale_times(times, db_sr)
# t0_osc, t1_osc = scale_times(times, osc_sr)
# phs.append(Phone(ph, db[:, t0_db:t1_db - 1],oscillogram[t0_osc: t1_osc - 1],times))
# return words
#
# def get_word_spectra(my_word, dataset):
# samples = []
# for path in find_word(my_word,dataset).path:
# words, words_db, phone, ph_db = chunked_spectrum(path)
# matches = [n for n,match in enumerate(words) if my_word in match]
# for n in matches:
# samples.append(words_db[n])
# return samples
#
# def get_ph_spectra(my_word, dataset):
# samples = []
# for path in find_word(my_word,dataset).Path:
# words, words_db,_,_, phone, ph_db,_,_= chunked_spectrum(path)
# matches = [n for n,match in enumerate(words) if my_word in match]
# for n in matches:
# samples.append(words_db[n])
# return samples
# def get_sentence_paths(sentence_number):
# """
# Get all the paths to any occurrence of the sentence.
#
# Return:
# sentences:: [path1, path2, ..., pathn]
# """
# paths = []
# for (id, sen_type) in get_speakers_id(sentence_number):
# use = spkrinfo.loc[spkrinfo["ID"] == id]["Use"].values[0]
# dr = spkrinfo.loc[spkrinfo["ID"] == id]["DR"].values[0]
# sex = spkrinfo.loc[spkrinfo["ID"] == id]["Sex"].values[0]
# use = "TRAIN" if use=="TRN" else "TEST"
# id = sex+id
# paths.append(os.path.join(DIR,os.path.join(use,"DR"+str(dr),id,sen_type)))
# return paths
# def get_speakers_id(sentence_number):
# """
# Return the ID of all the speakers that pronounce that sentence
# """
# ids = spkrsen.loc[spkrsen.eq(sentence_number).any(1)]["ID"].values
# sentences = []
# for id in ids:
# x = spkrsen.set_index("ID").loc[id]
# s= x.loc[x == sentence_number].index.values[0]
# sentences.append((id,s))
# return sentences
# def first_speaker_id(sentence_number):
# return get_speakers_id(sentence_number)[1]
import os
import pathlib
DIR = os.path.join(pathlib.Path(__file__).parent.parent.absolute(),"TIMIT")
# from timit_parser import *
# from __init__ import RATE_FACTOR
# RATE =1.3799112426035502 ## This ratio is the 0.5 quantile of the rate distribution. THe rate distribution is computed by the audio-end signal #h and the length of the spectrogram
# SAMPLE_RATE = 22050 # the wav file are sampled with this rate
#
#
#
# import torch.nn as nn
# import torchaudio
train_audio_transform = nn.Sequential(
torchaudio.transforms.MelSpectrogram(sample_rate=22050, n_mels=128),
)
"""
Create a hierarchy of files with single word
"""
def get_dataset(dtype="train", sent_type="SX"):
"""
Get all the standard (SX) sentences in the dataset, ordered by name
"""
if dtype=="train":
path = os.path.join(DIR,"TRAIN")
if dtype=="test":
path = os.path.join(DIR,"TEST")
for (dirname, dirpath,files) in os.walk(path):
for _file in files:
if _file.endswith("TXT"):
S_id = _file.split(".")[0][2:]
S_type = _file.split(".")[0][:2]
if S_type ==sent_type:
data = os.path.join(dirname, _file.split(".")[0])
oscillogram, sr = librosa.load(data+".WAV")
sentence = read_utterance(data+".TXT")
yield data
# yield oscillogram, sr, sentence, S_id, S_type
generate_data = get_dataset()
def split_words(sentence_path):
def get_word(word):
start, end, word = word.split()
word = word.replace("\n","")
return (int(start)*RATE, int(end)*RATE, word)
words = []
with open(sentence_path+".WRD","r") as fp:
for word in fp.readlines():
words.append(get_word(word))
waveform, sr = librosa.load(sentence_path + ".WAV")
spec = train_audio_transform(waveform).squeeze(0).transpose(0, 1)
return spec
## In order to not create too many artifacts in the data we do the spectrogram and then chunk over it, otherwise there will be a lot of high frequency noise at the end of each word chunk, and the need of filtering each signal with some hamming window.
# chunks = []
# for word in words:
# oscillogram[]
# with open(sentence_path+".PHN","r") as fp:
# print(fp.readlines())
# return words, oscillogram
words, oscillogram = split_words(sentence)
oscillogram = split_words(sentence)
def rate_measure(sentence_path):
oscillogram, sr = librosa.load(sentence_path + ".WAV")
with open(sentence_path+".PHN","r") as fp:
end_time = fp.readlines()[-1].split()[1]
return len(oscillogram)/int(end_time)
rates = []
for sentence in generate_data:
rates.append(rate_measure((sentence)))
import numpy as np
# class WORD(np.Array):
import torch
specs = []
for z in range(50):
oscillogram, sr = librosa.load(sentence_path + ".WAV")
with open(sentence_path+".PHN","r") as fp:
end_time = fp.readlines()[-1].split()[1]
return len(oscillogram)/int(end_time)
sentence = generate_data.__next__()
waveform, sr = librosa.load(sentence + ".WAV")
spec = train_audio_transform(torch.from_numpy(waveform)).squeeze(0).transpose(0, 1)
specs.append(spec)
print(spec.size())