import librosa.feature import pandas as pd import numpy as np from pathlib import Path from os import chdir import os import csv from tensorflow import keras from sklearn.model_selection import train_test_split from sklearn.preprocessing import LabelEncoder, StandardScaler from keras import models from keras import layers import matplotlib.pyplot as plt from sklearn.metrics import classification_report import constants import json from openpyxl import Workbook import time def create_csv_header(): header = 'filename ' for i in range(constants.MFCC_FEATURE_START, constants.MFCC_FEATURE_END): header += f' mfcc{i}' header += ' label' header = header.split() file = open(constants.FEATURES_CSV_NAME, 'w', newline='') with file: writer = csv.writer(file) writer.writerow(header) def extract_features(trainingDataDir, trainingDataSubDirs): create_csv_header() # Looping over every file inside the subdirectories for feature extraction for trainingDataSubDir in trainingDataSubDirs: for fileName in os.listdir(trainingDataDir/f'{trainingDataSubDir}'): if fileName.endswith(".wav"): audioFile = trainingDataDir/f'{trainingDataSubDir}/{fileName}' print("Extracting Features from Directory "+trainingDataSubDir+" and file "+audioFile.name) y, sr = librosa.load(audioFile, mono=True) mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=(constants.MFCC_FEATURE_END - constants.MFCC_FEATURE_START)) to_append = f'{audioFile.name}' for g in mfcc: to_append += f' {np.mean(g)}' if trainingDataSubDir == constants.CAR: to_append += f' {constants.LIGHT_WEIGHT}' elif trainingDataSubDir == constants.BUS: to_append += f' {constants.MEDIUM_WEIGHT}' elif trainingDataSubDir == constants.TRUCK: to_append += f' {constants.HEAVY_WEIGHT}' elif trainingDataSubDir == constants.MOTORCYCLE: to_append += f' {constants.TWO_WHEELED}' elif trainingDataSubDir == constants.TRAM: to_append += f' {constants.RAIL_BOUND}' file = open(constants.FEATURES_CSV_NAME, 'a', newline='') with file: writer = csv.writer(file) writer.writerow(to_append.split()) def preprocessing_csv_data(): print("Reading Features... ") data = pd.read_csv(constants.FEATURES_CSV_NAME) data.head() # Dropping unnecessary columns (Column Filename is dropped) data = data.drop(['filename'], axis=1) data.head() return data def encode_labels(data): # Extracting classes/label column as y from csv and converting string labels to numbers using LabelEncoder audio_list = data.iloc[:, -1] encoder = LabelEncoder() target_labels = encoder.fit_transform(audio_list) return target_labels, encoder def normalize_data(data): # normalizing - Extracting Remaining Columns as X and normalizing them to a common scale scaler = StandardScaler() X = scaler.fit_transform(np.array(data.iloc[:, :-1], dtype=float)) return X def train_test_data_split(X, y): # splitting of dataset into train and test dataset X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20) return X_train, X_test, y_train, y_test def create_and_compile_model(X, hidden_layer_dimensions): print("Creating a Model") # creating a model model = models.Sequential() for i, layer_dimension in enumerate(hidden_layer_dimensions): if i == 0: model.add(layers.Dense(layer_dimension, activation=constants.ACTIVATION_RELU, input_shape=(X.shape[1],))) else: model.add(layers.Dense(layer_dimension, activation=constants.ACTIVATION_RELU)) model.add(layers.Dense(constants.OUTPUT_LAYER_DIMENSIONS, activation=constants.ACTIVATION_SOFTMAX)) print("Compiling a Model") model.compile(optimizer= constants.OPTIMIZER_ADAM, loss= constants.LOSS_FUNCTION_SPARSE, metrics=[constants.ACCURACY_METRICS]) return model def train_and_save_model(model, X_train, y_train, X_test, y_test): logdir = constants.LOG_DIR_PATH tensorboard_callback = keras.callbacks.TensorBoard(log_dir=logdir) print("Start Training...") history = model.fit(X_train, y_train, epochs=35, validation_data=(X_test, y_test), callbacks=[tensorboard_callback]) # Saving the trained model to avoid re-training #model.save(constants.TRAINED_MODEL) return history def model_predict(model, X_test, y_test): test_loss, test_acc = model.evaluate(X_test, y_test) print('test_acc: ', test_acc) y_predicted = np.argmax(model.predict(X_test), axis=-1) accuracy = np.mean(y_test == y_predicted) print(accuracy) return accuracy def predict(model, X_test, y_test): print("Predictions.....") predictions = np.argmax(model.predict(X_test), axis=-1) target_names = [constants.LIGHT_WEIGHT, constants.MEDIUM_WEIGHT, constants.HEAVY_WEIGHT,constants.TWO_WHEELED, constants.RAIL_BOUND] print(classification_report(y_test, predictions, target_names=target_names)) def plot_model_accuracy(history): # Plot graph Model Accuracy plt.plot(history.history['accuracy']) plt.plot(history.history['val_accuracy']) plt.title('Model Accuracy') plt.ylabel('Accuracy') plt.xlabel('Epoch') plt.legend(['Train', 'Test'], loc='upper left') plt.show() def plot_model_loss(history): # Plot graph Model Loss plt.plot(history.history['loss']) plt.plot(history.history['val_loss']) plt.title('Model loss') plt.ylabel('Loss') plt.xlabel('Epoch') plt.legend(['Train', 'Test'], loc='upper right') plt.show() def construct_and_apply_network(hidden_layer_dimensions): data = preprocessing_csv_data() target_labels, encoder = encode_labels(data) X = normalize_data(data) X_train, X_test, y_train, y_test = train_test_data_split(X, target_labels) model = create_and_compile_model(X, hidden_layer_dimensions) history = train_and_save_model(model, X_train, y_train, X_test, y_test) history predict(model, X_test, y_test) accuracy = model_predict(model, X_test, y_test) #plot_model_accuracy(history) #plot_model_loss(history) return accuracy def save_mfcc(trainingDataDir, trainingDataSubDirs, dataset_path, json_path, n_mfcc=13, n_fft=2048, hop_length=512): data = { "mapping": [], "mfcc": [] } # Looping over every file inside the subdirectories for feature extraction for trainingDataSubDir in trainingDataSubDirs: for fileName in os.listdir(trainingDataDir / f'{trainingDataSubDir}'): if fileName.endswith(".wav"): audioFile = trainingDataDir / f'{trainingDataSubDir}/{fileName}' print("Extracting Features from Directory " + trainingDataSubDir + " and file " + audioFile.name) y, sr = librosa.load(audioFile, mono=True) mfcc = librosa.feature.mfcc(y=y, sr=sr, n_fft=n_fft, n_mfcc=n_mfcc, hop_length=hop_length) data["mfcc"].append(mfcc.tolist()) to_append = f'{audioFile.name}' for g in mfcc: to_append += f' {np.mean(g)}' if trainingDataSubDir == constants.CAR: data["mapping"].append(constants.CAR) to_append += f' {constants.LIGHT_WEIGHT}' elif trainingDataSubDir == constants.BUS: data["mapping"].append(constants.BUS) to_append += f' {constants.MEDIUM_WEIGHT}' elif trainingDataSubDir == constants.TRUCK: data["mapping"].append(constants.TRUCK) to_append += f' {constants.HEAVY_WEIGHT}' elif trainingDataSubDir == constants.MOTORCYCLE: data["mapping"].append(constants.MOTORCYCLE) to_append += f' {constants.TWO_WHEELED}' elif trainingDataSubDir == constants.TRAM: data["mapping"].append(constants.TRAM) to_append += f' {constants.RAIL_BOUND}' file = open(constants.FEATURES_CSV_NAME, 'a', newline='') with file: writer = csv.writer(file) writer.writerow(to_append.split()) with open(json_path, "w") as fp: json.dump(data, fp, indent=4) if __name__ == "__main__": # Changing Directory to Training Dataset Folder chdir(constants.TRAINING_DATA_DIRECTORY_NAME) trainingDataDir = Path.cwd() trainingDataSubDirs = os.listdir(trainingDataDir) chdir("..") if os.path.isfile(constants.FEATURES_CSV_NAME): print("already exists") else: extract_features(trainingDataDir, trainingDataSubDirs) max_accuracy = 0 neurons_increment_by = 8 start_neuron_value = 8 max_neuron_value = 128 hidden_layers = 5 hidden_layer_dimensions = [] book = Workbook() sheet = book.active # loop_count = int((max_neuron_value / neurons_increment_by) * 4) row_counter = 0 for i in range(hidden_layers): hidden_layer_dimensions.append(0) for j in range(start_neuron_value, (max_neuron_value + 1), neurons_increment_by): row_counter += 1 hidden_layer_dimensions[i] = j start = time.time() new_accuracy = construct_and_apply_network(hidden_layer_dimensions) end = time.time() elapsed_time = end - start sheet.cell(row=(row_counter), column=1).value = hidden_layer_dimensions.__str__() sheet.cell(row=(row_counter), column=2).value = new_accuracy sheet.cell(row=(row_counter), column=3).value = elapsed_time ''' for i in range (loop_count): start = time.time() new_accuracy = construct_and_apply_network(hidden_layer_dimensions) end = time.time() if max_accuracy < new_accuracy: max_accuracy = new_accuracy elapsed_time = end - start print("durchlauf: ", (i+1)) print("\nmax accuracy: ", max_accuracy) print("\nnew accuracy: ", new_accuracy) print("\nlist: ", hidden_layer_dimensions) sheet.cell(row=(i+1), column=1).value = hidden_layer_dimensions.__str__() sheet.cell(row=(i + 1), column=2).value = new_accuracy sheet.cell(row=(i + 1), column=3).value = elapsed_time if neurons_count == max_neuron_value: neurons_count = start_neuron_value hidden_layer_dimensions.append(start_neuron_value) pointer += 1 else: neurons_count += neurons_increment_by hidden_layer_dimensions[pointer] = neurons_count ''' book.save("sample.xlsx")