import librosa.feature
import pandas as pd
import numpy as np
from pathlib import Path
from os import chdir
import os
import csv
from tensorflow import keras
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from keras import models
from keras import layers
from keras import optimizers
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report
import constants
max_len = 216
def create_csv_header():
header = 'filename '
for i in range(constants.MFCC_FEATURE_START, ((constants.MFCC_FEATURE_END - 1) * max_len) + 1):
header += f' mfcc{i}'
header += ' label'
header = header.split()
file = open(constants.FEATURES_CSV_NAME, 'w', newline='')
with file:
writer = csv.writer(file)
def extract_features(trainingDataDir, trainingDataSubDirs):
# Looping over every file inside the subdirectories for feature extraction
for trainingDataSubDir in trainingDataSubDirs:
for fileName in os.listdir(trainingDataDir/f'{trainingDataSubDir}'):
if fileName.endswith(".wav"):
audioFile = trainingDataDir/f'{trainingDataSubDir}/{fileName}'
print("Extracting Features from Directory "+trainingDataSubDir+" and file "
y, sr = librosa.load(audioFile, mono=True)
mfcc = librosa.feature.mfcc(y=y, sr=44100, n_mfcc=constants.N_MFCC)
to_append = f'{}'
print ("mfcc.shape before cutting/padding: ", mfcc.shape)
#the mfcc extraction results in slighty different time frames per mfcc segment
#in the case of 5seconds and 20 mfcc segments there are som time informations between 215 and 217
#in order to unify the size the ones that stores under 216 timeinformation there will be some padding
#for the ones that stores over 216 time information there will be a trim operation
#if the time of the audiofiles or the mfcc extraction number changes (current value is 20)
#there needs to be an adjustment to the MAX_LEN parameter
#1. check the new size of all the files
#2. find a suitable average for padding or cutting
#3. adjust the MAX_LEN variable in the file
if (constants.MFCC_MAX_LEN > mfcc.shape[1]):
pad_width = max_len - mfcc.shape[1]
mfcc = np.pad(mfcc, pad_width=((0, 0), (0, pad_width)), mode='constant')
elif constants.MFCC_MAX_LEN < mfcc.shape[1]:
mfcc = mfcc[:, :constants.MFCC_MAX_LEN]
print ("mfcc.shape after cutting/padding: ", mfcc.shape)
#store every single value in the csv file
mfcc = np.reshape(mfcc, constants.MFCC_MAX_LEN * constants.N_MFCC)
#print ("new total size: ", mfcc.shape)
for g in mfcc:
to_append += f' {g}'
if trainingDataSubDir == constants.CAR:
to_append += f' {constants.LIGHT_WEIGHT}'
elif trainingDataSubDir == constants.BUS:
to_append += f' {constants.MEDIUM_WEIGHT}'
elif trainingDataSubDir == constants.TRUCK:
to_append += f' {constants.HEAVY_WEIGHT}'
elif trainingDataSubDir == constants.MOTORCYCLE:
to_append += f' {constants.TWO_WHEELED}'
elif trainingDataSubDir == constants.TRAM:
to_append += f' {constants.RAIL_BOUND}'
file = open(constants.FEATURES_CSV_NAME, 'a', newline='')
with file:
writer = csv.writer(file)
def preprocessing_csv_data():
print("Reading Features... ")
data = pd.read_csv(constants.FEATURES_CSV_NAME)
# Dropping unnecessary columns (Column Filename is dropped)
data = data.drop(['filename'], axis=1)
return data
def encode_labels(data):
# Extracting classes/label column as y from csv and converting string labels to numbers using LabelEncoder
audio_list = data.iloc[:, -1]
encoder = LabelEncoder()
target_labels = encoder.fit_transform(audio_list)
return target_labels, encoder
def normalize_data(data):
# normalizing - Extracting Remaining Columns as X and normalizing them to a common scale
scaler = StandardScaler()
print (data.iloc[:, :-1])
X = scaler.fit_transform(np.array(data.iloc[:, :-1], dtype=float))
X = X.reshape(-1, constants.N_MFCC, constants.MFCC_MAX_LEN, constants.CHANNELS)
return X
def train_test_data_split(X, y):
# splitting of dataset into train and test dataset
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20)
print (X_train[0].shape)
return X_train, X_test, y_train, y_test
def create_and_compile_model():
print("Creating a Model")
# creating a model
from keras.models import Sequential
from keras.layers import Conv2D, Dense, MaxPooling2D, Dropout, Flatten, BatchNormalization
model = models.Sequential()
model.add(Conv2D(32, kernel_size=(3, 3), activation="relu", input_shape=(constants.N_MFCC, constants.MFCC_MAX_LEN, constants.CHANNELS)))
model.add(Conv2D(32, kernel_size=(3, 3), activation="relu"))
model.add(Conv2D(64, kernel_size=(3, 3), activation="relu"))
model.add(Dense(128, activation="relu"))
model.add(Dense(constants.OUTPUT_LAYER_DIMENSIONS, activation='softmax'))
print("Compiling a Model")
optimizer = keras.optimizers.RMSprop()
model.compile(optimizer=optimizer, loss=constants.LOSS_FUNCTION_SPARSE, metrics=[constants.ACCURACY_METRICS])
return model
def train_and_save_model(model, X_train, y_train, X_test, y_test):
logdir = constants.LOG_DIR_PATH
tensorboard_callback = keras.callbacks.TensorBoard(log_dir=logdir)
print("Start Training...")
history =, y_train, batch_size=32, epochs=35, validation_data=(X_test, y_test), callbacks=[tensorboard_callback])
# Saving the trained model to avoid re-training
return history
def predict(X_test, y_test):
predictions = np.argmax(model.predict(X_test), axis=-1)
target_names = [constants.LIGHT_WEIGHT, constants.MEDIUM_WEIGHT, constants.HEAVY_WEIGHT,constants.TWO_WHEELED, constants.RAIL_BOUND]
print(classification_report(y_test, predictions, target_names=target_names))
def plot_model_accuracy(history):
# Plot graph Model Accuracy
plt.title('Model Accuracy')
plt.legend(['Train', 'Test'], loc='upper left')
def plot_model_loss(history):
# Plot graph Model Loss
plt.title('Model loss')
plt.legend(['Train', 'Test'], loc='upper right')
# Changing Directory to Training Dataset Folder
trainingDataDir = Path.cwd()
trainingDataSubDirs = os.listdir(trainingDataDir)
if os.path.isfile(constants.FEATURES_CSV_NAME):
print("features.csv already exists, skip extraction")
extract_features(trainingDataDir, trainingDataSubDirs)
#extract_features(trainingDataDir, trainingDataSubDirs)
data = preprocessing_csv_data()
target_labels, encoder = encode_labels(data)
X = normalize_data(data)
X_train, X_test, y_train, y_test = train_test_data_split(X, target_labels)
model = create_and_compile_model()
history = train_and_save_model(model, X_train, y_train, X_test, y_test)
predict(X_test, y_test)
