diff --git a/KI-KNN b/KI-KNN new file mode 100644 index 0000000000000000000000000000000000000000..dbacbaf6358e2a1a933c93c8770ba5da260f5b1e --- /dev/null +++ b/KI-KNN @@ -0,0 +1,201 @@ +import librosa.feature +import pandas as pd +import numpy as np +from pathlib import Path +from os import chdir +import os +import csv +from tensorflow import keras +from sklearn.model_selection import train_test_split +from sklearn.preprocessing import LabelEncoder, StandardScaler +from keras import models +from keras import layers +from keras import optimizers +import matplotlib.pyplot as plt +from sklearn.metrics import classification_report +import constants + + +max_len = 216 + + +def create_csv_header(): + header = 'filename ' + for i in range(constants.MFCC_FEATURE_START, ((constants.MFCC_FEATURE_END - 1) * max_len) + 1): + header += f' mfcc{i}' + header += ' label' + header = header.split() + file = open(constants.FEATURES_CSV_NAME, 'w', newline='') + with file: + writer = csv.writer(file) + writer.writerow(header) + + +def extract_features(trainingDataDir, trainingDataSubDirs): + create_csv_header() + # Looping over every file inside the subdirectories for feature extraction + for trainingDataSubDir in trainingDataSubDirs: + for fileName in os.listdir(trainingDataDir/f'{trainingDataSubDir}'): + if fileName.endswith(".wav"): + audioFile = trainingDataDir/f'{trainingDataSubDir}/{fileName}' + print("Extracting Features from Directory "+trainingDataSubDir+" and file "+audioFile.name) + y, sr = librosa.load(audioFile, mono=True) + mfcc = librosa.feature.mfcc(y=y, sr=44100, n_mfcc=constants.N_MFCC) + to_append = f'{audioFile.name}' + + print ("mfcc.shape before cutting/padding: ", mfcc.shape) + #the mfcc extraction results in slighty different time frames per mfcc segment + #in the case of 5seconds and 20 mfcc segments there are som time informations between 215 and 217 + #in order to unify the size the ones that stores under 216 timeinformation there will be some padding + #for the ones that stores over 216 time information there will be a trim operation + #IMPORTANT: + #if the time of the audiofiles or the mfcc extraction number changes (current value is 20) + #there needs to be an adjustment to the MAX_LEN parameter + #1. check the new size of all the files + #2. find a suitable average for padding or cutting + #3. adjust the MAX_LEN variable in the constants.py file + if (constants.MFCC_MAX_LEN > mfcc.shape[1]): + pad_width = max_len - mfcc.shape[1] + mfcc = np.pad(mfcc, pad_width=((0, 0), (0, pad_width)), mode='constant') + elif constants.MFCC_MAX_LEN < mfcc.shape[1]: + mfcc = mfcc[:, :constants.MFCC_MAX_LEN] + print ("mfcc.shape after cutting/padding: ", mfcc.shape) + #store every single value in the csv file + mfcc = np.reshape(mfcc, constants.MFCC_MAX_LEN * constants.N_MFCC) + #print ("new total size: ", mfcc.shape) + for g in mfcc: + to_append += f' {g}' + if trainingDataSubDir == constants.CAR: + to_append += f' {constants.LIGHT_WEIGHT}' + elif trainingDataSubDir == constants.BUS: + to_append += f' {constants.MEDIUM_WEIGHT}' + elif trainingDataSubDir == constants.TRUCK: + to_append += f' {constants.HEAVY_WEIGHT}' + elif trainingDataSubDir == constants.MOTORCYCLE: + to_append += f' {constants.TWO_WHEELED}' + elif trainingDataSubDir == constants.TRAM: + to_append += f' {constants.RAIL_BOUND}' + + file = open(constants.FEATURES_CSV_NAME, 'a', newline='') + with file: + writer = csv.writer(file) + writer.writerow(to_append.split()) + +def preprocessing_csv_data(): + print("Reading Features... ") + data = pd.read_csv(constants.FEATURES_CSV_NAME) + data.head() + # Dropping unnecessary columns (Column Filename is dropped) + data = data.drop(['filename'], axis=1) + data.head() + return data + + +def encode_labels(data): + # Extracting classes/label column as y from csv and converting string labels to numbers using LabelEncoder + audio_list = data.iloc[:, -1] + encoder = LabelEncoder() + target_labels = encoder.fit_transform(audio_list) + return target_labels, encoder + + +def normalize_data(data): + # normalizing - Extracting Remaining Columns as X and normalizing them to a common scale + scaler = StandardScaler() + print (data.iloc[:, :-1]) + X = scaler.fit_transform(np.array(data.iloc[:, :-1], dtype=float)) + X = X.reshape(-1, constants.N_MFCC, constants.MFCC_MAX_LEN, constants.CHANNELS) + return X + + +def train_test_data_split(X, y): + # splitting of dataset into train and test dataset + X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20) + print (X_train[0].shape) + return X_train, X_test, y_train, y_test + + +def create_and_compile_model(): + print("Creating a Model") + # creating a model + from keras.models import Sequential + from keras.layers import Conv2D, Dense, MaxPooling2D, Dropout, Flatten, BatchNormalization + model = models.Sequential() + model.add(Conv2D(32, kernel_size=(3, 3), activation="relu", input_shape=(constants.N_MFCC, constants.MFCC_MAX_LEN, constants.CHANNELS))) + model.add(Conv2D(32, kernel_size=(3, 3), activation="relu")) + model.add(MaxPooling2D(pool_size=(2,2))) + model.add(Conv2D(64, kernel_size=(3, 3), activation="relu")) + model.add(MaxPooling2D(pool_size=(2,2))) + model.add(Dropout(0.5)) + model.add(Flatten()) + model.add(Dense(128, activation="relu")) + model.add(Dense(constants.OUTPUT_LAYER_DIMENSIONS, activation='softmax')) + print("Compiling a Model") + optimizer = keras.optimizers.RMSprop() + model.compile(optimizer=optimizer, loss=constants.LOSS_FUNCTION_SPARSE, metrics=[constants.ACCURACY_METRICS]) + print(model.summary()) + return model + + +def train_and_save_model(model, X_train, y_train, X_test, y_test): + logdir = constants.LOG_DIR_PATH + tensorboard_callback = keras.callbacks.TensorBoard(log_dir=logdir) + print("Start Training...") + history = model.fit(X_train, y_train, batch_size=32, epochs=35, validation_data=(X_test, y_test), callbacks=[tensorboard_callback]) + # Saving the trained model to avoid re-training + model.save(constants.TRAINED_MODEL) + return history + + +def predict(X_test, y_test): + print("Predictions.....") + predictions = np.argmax(model.predict(X_test), axis=-1) + target_names = [constants.LIGHT_WEIGHT, constants.MEDIUM_WEIGHT, constants.HEAVY_WEIGHT,constants.TWO_WHEELED, constants.RAIL_BOUND] + print(classification_report(y_test, predictions, target_names=target_names)) + + +def plot_model_accuracy(history): + # Plot graph Model Accuracy + plt.plot(history.history['accuracy']) + plt.plot(history.history['val_accuracy']) + plt.title('Model Accuracy') + plt.ylabel('Accuracy') + plt.xlabel('Epoch') + plt.legend(['Train', 'Test'], loc='upper left') + plt.show() + + +def plot_model_loss(history): + # Plot graph Model Loss + plt.plot(history.history['loss']) + plt.plot(history.history['val_loss']) + plt.title('Model loss') + plt.ylabel('Loss') + plt.xlabel('Epoch') + plt.legend(['Train', 'Test'], loc='upper right') + plt.show() + + +# Changing Directory to Training Dataset Folder +chdir(constants.TRAINING_DATA_DIRECTORY_NAME) +trainingDataDir = Path.cwd() +trainingDataSubDirs = os.listdir(trainingDataDir) +chdir("..") +if os.path.isfile(constants.FEATURES_CSV_NAME): + print("features.csv already exists, skip extraction") +else: + extract_features(trainingDataDir, trainingDataSubDirs) +#extract_features(trainingDataDir, trainingDataSubDirs) + + + +data = preprocessing_csv_data() +target_labels, encoder = encode_labels(data) +X = normalize_data(data) +X_train, X_test, y_train, y_test = train_test_data_split(X, target_labels) +model = create_and_compile_model() +history = train_and_save_model(model, X_train, y_train, X_test, y_test) +predict(X_test, y_test) +plot_model_accuracy(history) +plot_model_loss(history) +