import librosa.feature import pandas as pd import numpy as np from pathlib import Path from os import chdir import os import csv from tensorflow import keras from sklearn.model_selection import train_test_split from sklearn.preprocessing import LabelEncoder, StandardScaler from keras import models from keras import layers from keras import optimizers import matplotlib.pyplot as plt from sklearn.metrics import classification_report import constants max_len = 216 def create_csv_header(): header = 'filename ' for i in range(constants.MFCC_FEATURE_START, ((constants.MFCC_FEATURE_END - 1) * max_len) + 1): header += f' mfcc{i}' header += ' label' header = header.split() file = open(constants.FEATURES_CSV_NAME, 'w', newline='') with file: writer = csv.writer(file) writer.writerow(header) def extract_features(trainingDataDir, trainingDataSubDirs): create_csv_header() # Looping over every file inside the subdirectories for feature extraction for trainingDataSubDir in trainingDataSubDirs: for fileName in os.listdir(trainingDataDir/f'{trainingDataSubDir}'): if fileName.endswith(".wav"): audioFile = trainingDataDir/f'{trainingDataSubDir}/{fileName}' print("Extracting Features from Directory "+trainingDataSubDir+" and file "+audioFile.name) y, sr = librosa.load(audioFile, mono=True) mfcc = librosa.feature.mfcc(y=y, sr=44100, n_mfcc=constants.N_MFCC) to_append = f'{audioFile.name}' print ("mfcc.shape before cutting/padding: ", mfcc.shape) #the mfcc extraction results in slighty different time frames per mfcc segment #in the case of 5seconds and 20 mfcc segments there are som time informations between 215 and 217 #in order to unify the size the ones that stores under 216 timeinformation there will be some padding #for the ones that stores over 216 time information there will be a trim operation #IMPORTANT: #if the time of the audiofiles or the mfcc extraction number changes (current value is 20) #there needs to be an adjustment to the MAX_LEN parameter #1. check the new size of all the files #2. find a suitable average for padding or cutting #3. adjust the MAX_LEN variable in the constants.py file if (constants.MFCC_MAX_LEN > mfcc.shape[1]): pad_width = max_len - mfcc.shape[1] mfcc = np.pad(mfcc, pad_width=((0, 0), (0, pad_width)), mode='constant') elif constants.MFCC_MAX_LEN < mfcc.shape[1]: mfcc = mfcc[:, :constants.MFCC_MAX_LEN] print ("mfcc.shape after cutting/padding: ", mfcc.shape) #store every single value in the csv file mfcc = np.reshape(mfcc, constants.MFCC_MAX_LEN * constants.N_MFCC) #print ("new total size: ", mfcc.shape) for g in mfcc: to_append += f' {g}' if trainingDataSubDir == constants.CAR: to_append += f' {constants.LIGHT_WEIGHT}' elif trainingDataSubDir == constants.BUS: to_append += f' {constants.MEDIUM_WEIGHT}' elif trainingDataSubDir == constants.TRUCK: to_append += f' {constants.HEAVY_WEIGHT}' elif trainingDataSubDir == constants.MOTORCYCLE: to_append += f' {constants.TWO_WHEELED}' elif trainingDataSubDir == constants.TRAM: to_append += f' {constants.RAIL_BOUND}' file = open(constants.FEATURES_CSV_NAME, 'a', newline='') with file: writer = csv.writer(file) writer.writerow(to_append.split()) def preprocessing_csv_data(): print("Reading Features... ") data = pd.read_csv(constants.FEATURES_CSV_NAME) data.head() # Dropping unnecessary columns (Column Filename is dropped) data = data.drop(['filename'], axis=1) data.head() return data def encode_labels(data): # Extracting classes/label column as y from csv and converting string labels to numbers using LabelEncoder audio_list = data.iloc[:, -1] encoder = LabelEncoder() target_labels = encoder.fit_transform(audio_list) return target_labels, encoder def normalize_data(data): # normalizing - Extracting Remaining Columns as X and normalizing them to a common scale scaler = StandardScaler() print (data.iloc[:, :-1]) X = scaler.fit_transform(np.array(data.iloc[:, :-1], dtype=float)) X = X.reshape(-1, constants.N_MFCC, constants.MFCC_MAX_LEN, constants.CHANNELS) return X def train_test_data_split(X, y): # splitting of dataset into train and test dataset X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20) print (X_train[0].shape) return X_train, X_test, y_train, y_test def create_and_compile_model(): print("Creating a Model") # creating a model from keras.models import Sequential from keras.layers import Conv2D, Dense, MaxPooling2D, Dropout, Flatten, BatchNormalization model = models.Sequential() model.add(Conv2D(32, kernel_size=(3, 3), activation="relu", input_shape=(constants.N_MFCC, constants.MFCC_MAX_LEN, constants.CHANNELS))) model.add(Conv2D(32, kernel_size=(3, 3), activation="relu")) model.add(MaxPooling2D(pool_size=(2,2))) model.add(Conv2D(64, kernel_size=(3, 3), activation="relu")) model.add(MaxPooling2D(pool_size=(2,2))) model.add(Dropout(0.5)) model.add(Flatten()) model.add(Dense(128, activation="relu")) model.add(Dense(constants.OUTPUT_LAYER_DIMENSIONS, activation='softmax')) print("Compiling a Model") optimizer = keras.optimizers.RMSprop() model.compile(optimizer=optimizer, loss=constants.LOSS_FUNCTION_SPARSE, metrics=[constants.ACCURACY_METRICS]) print(model.summary()) return model def train_and_save_model(model, X_train, y_train, X_test, y_test): logdir = constants.LOG_DIR_PATH tensorboard_callback = keras.callbacks.TensorBoard(log_dir=logdir) print("Start Training...") history = model.fit(X_train, y_train, batch_size=32, epochs=35, validation_data=(X_test, y_test), callbacks=[tensorboard_callback]) # Saving the trained model to avoid re-training model.save(constants.TRAINED_MODEL) return history def predict(X_test, y_test): print("Predictions.....") predictions = np.argmax(model.predict(X_test), axis=-1) target_names = [constants.LIGHT_WEIGHT, constants.MEDIUM_WEIGHT, constants.HEAVY_WEIGHT,constants.TWO_WHEELED, constants.RAIL_BOUND] print(classification_report(y_test, predictions, target_names=target_names)) def plot_model_accuracy(history): # Plot graph Model Accuracy plt.plot(history.history['accuracy']) plt.plot(history.history['val_accuracy']) plt.title('Model Accuracy') plt.ylabel('Accuracy') plt.xlabel('Epoch') plt.legend(['Train', 'Test'], loc='upper left') plt.show() def plot_model_loss(history): # Plot graph Model Loss plt.plot(history.history['loss']) plt.plot(history.history['val_loss']) plt.title('Model loss') plt.ylabel('Loss') plt.xlabel('Epoch') plt.legend(['Train', 'Test'], loc='upper right') plt.show() # Changing Directory to Training Dataset Folder chdir(constants.TRAINING_DATA_DIRECTORY_NAME) trainingDataDir = Path.cwd() trainingDataSubDirs = os.listdir(trainingDataDir) chdir("..") if os.path.isfile(constants.FEATURES_CSV_NAME): print("features.csv already exists, skip extraction") else: extract_features(trainingDataDir, trainingDataSubDirs) #extract_features(trainingDataDir, trainingDataSubDirs) data = preprocessing_csv_data() target_labels, encoder = encode_labels(data) X = normalize_data(data) X_train, X_test, y_train, y_test = train_test_data_split(X, target_labels) model = create_and_compile_model() history = train_and_save_model(model, X_train, y_train, X_test, y_test) predict(X_test, y_test) plot_model_accuracy(history) plot_model_loss(history)