Commit 3e877ca0 authored by Naundorf's avatar Naundorf
Browse files

Update KI-KNN

parent 034883b5
Pipeline #6554 failed with stage
in 2 minutes and 18 seconds
import librosa.feature
import pandas as pd import pandas as pd
import numpy as np import numpy as np
from pathlib import Path from pathlib import Path
...@@ -14,114 +13,26 @@ from keras import optimizers ...@@ -14,114 +13,26 @@ from keras import optimizers
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from sklearn.metrics import classification_report from sklearn.metrics import classification_report
import constants import constants
from pandas import datetime
# splitting of dataset into train and test dataset
max_len = 216
def create_csv_header():
header = 'filename '
for i in range(constants.MFCC_FEATURE_START, ((constants.MFCC_FEATURE_END - 1) * max_len) + 1):
header += f' mfcc{i}'
header += ' label'
header = header.split()
file = open(constants.FEATURES_CSV_NAME, 'w', newline='')
with file:
writer = csv.writer(file)
writer.writerow(header)
def extract_features(trainingDataDir, trainingDataSubDirs):
create_csv_header()
# Looping over every file inside the subdirectories for feature extraction
for trainingDataSubDir in trainingDataSubDirs:
for fileName in os.listdir(trainingDataDir/f'{trainingDataSubDir}'):
if fileName.endswith(".wav"):
audioFile = trainingDataDir/f'{trainingDataSubDir}/{fileName}'
print("Extracting Features from Directory "+trainingDataSubDir+" and file "+audioFile.name)
y, sr = librosa.load(audioFile, mono=True)
mfcc = librosa.feature.mfcc(y=y, sr=44100, n_mfcc=constants.N_MFCC)
to_append = f'{audioFile.name}'
print ("mfcc.shape before cutting/padding: ", mfcc.shape)
#the mfcc extraction results in slighty different time frames per mfcc segment
#in the case of 5seconds and 20 mfcc segments there are som time informations between 215 and 217
#in order to unify the size the ones that stores under 216 timeinformation there will be some padding
#for the ones that stores over 216 time information there will be a trim operation
#IMPORTANT:
#if the time of the audiofiles or the mfcc extraction number changes (current value is 20)
#there needs to be an adjustment to the MAX_LEN parameter
#1. check the new size of all the files
#2. find a suitable average for padding or cutting
#3. adjust the MAX_LEN variable in the constants.py file
if (constants.MFCC_MAX_LEN > mfcc.shape[1]):
pad_width = max_len - mfcc.shape[1]
mfcc = np.pad(mfcc, pad_width=((0, 0), (0, pad_width)), mode='constant')
elif constants.MFCC_MAX_LEN < mfcc.shape[1]:
mfcc = mfcc[:, :constants.MFCC_MAX_LEN]
print ("mfcc.shape after cutting/padding: ", mfcc.shape)
#store every single value in the csv file
mfcc = np.reshape(mfcc, constants.MFCC_MAX_LEN * constants.N_MFCC)
#print ("new total size: ", mfcc.shape)
for g in mfcc:
to_append += f' {g}'
if trainingDataSubDir == constants.CAR:
to_append += f' {constants.LIGHT_WEIGHT}'
elif trainingDataSubDir == constants.BUS:
to_append += f' {constants.MEDIUM_WEIGHT}'
elif trainingDataSubDir == constants.TRUCK:
to_append += f' {constants.HEAVY_WEIGHT}'
elif trainingDataSubDir == constants.MOTORCYCLE:
to_append += f' {constants.TWO_WHEELED}'
elif trainingDataSubDir == constants.TRAM:
to_append += f' {constants.RAIL_BOUND}'
file = open(constants.FEATURES_CSV_NAME, 'a', newline='')
with file:
writer = csv.writer(file)
writer.writerow(to_append.split())
def preprocessing_csv_data():
print("Reading Features... ")
data = pd.read_csv(constants.FEATURES_CSV_NAME)
data.head()
# Dropping unnecessary columns (Column Filename is dropped)
data = data.drop(['filename'], axis=1)
data.head()
return data
def encode_labels(data):
# Extracting classes/label column as y from csv and converting string labels to numbers using LabelEncoder
audio_list = data.iloc[:, -1]
encoder = LabelEncoder()
target_labels = encoder.fit_transform(audio_list)
return target_labels, encoder
def normalize_data(data):
# normalizing - Extracting Remaining Columns as X and normalizing them to a common scale
scaler = StandardScaler()
print (data.iloc[:, :-1])
X = scaler.fit_transform(np.array(data.iloc[:, :-1], dtype=float))
X = X.reshape(-1, constants.N_MFCC, constants.MFCC_MAX_LEN, constants.CHANNELS)
return X
def train_test_data_split(X, y): def train_test_data_split(X, y):
# splitting of dataset into train and test dataset
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20) X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20)
print (X_train[0].shape) print (X_train[0].shape)
return X_train, X_test, y_train, y_test return X_train, X_test, y_train, y_test
# creating a model
def create_and_compile_model(): def create_and_compile_model():
print("Creating a Model") print("Creating a Model")
# creating a model
from keras.models import Sequential from keras.models import Sequential
from keras.layers import Conv2D, Dense, MaxPooling2D, Dropout, Flatten, BatchNormalization from keras.layers import Conv2D, Dense, MaxPooling2D, Dropout, Flatten,
BatchNormalization
model = models.Sequential() model = models.Sequential()
model.add(Conv2D(32, kernel_size=(3, 3), activation="relu", input_shape=(constants.N_MFCC, constants.MFCC_MAX_LEN, constants.CHANNELS))) model.add(Conv2D(32, kernel_size=(3, 3), activation="relu",
input_shape=(constants.N_FEATURE, constants.FEATURE_MAX_LEN,
constants.CHANNELS)))
model.add(Conv2D(32, kernel_size=(3, 3), activation="relu")) model.add(Conv2D(32, kernel_size=(3, 3), activation="relu"))
model.add(MaxPooling2D(pool_size=(2,2))) model.add(MaxPooling2D(pool_size=(2,2)))
model.add(Conv2D(64, kernel_size=(3, 3), activation="relu")) model.add(Conv2D(64, kernel_size=(3, 3), activation="relu"))
...@@ -132,28 +43,26 @@ def create_and_compile_model(): ...@@ -132,28 +43,26 @@ def create_and_compile_model():
model.add(Dense(constants.OUTPUT_LAYER_DIMENSIONS, activation='softmax')) model.add(Dense(constants.OUTPUT_LAYER_DIMENSIONS, activation='softmax'))
print("Compiling a Model") print("Compiling a Model")
optimizer = keras.optimizers.RMSprop() optimizer = keras.optimizers.RMSprop()
model.compile(optimizer=optimizer, loss=constants.LOSS_FUNCTION_SPARSE, metrics=[constants.ACCURACY_METRICS]) model.compile(optimizer=optimizer, loss=constants.LOSS_FUNCTION_SPARSE,
metrics=[constants.ACCURACY_METRICS])
print(model.summary()) print(model.summary())
return model return model
def train_and_save_model(model, X_train, y_train, X_test, y_test): def train_and_save_model(model, X_train, y_train, X_test, y_test):
logdir = constants.LOG_DIR_PATH logdir = constants.LOG_DIR_PATH
tensorboard_callback = keras.callbacks.TensorBoard(log_dir=logdir) tensorboard_callback = keras.callbacks.TensorBoard(log_dir=logdir)
print("Start Training...") print("Start Training...")
history = model.fit(X_train, y_train, batch_size=32, epochs=35, validation_data=(X_test, y_test), callbacks=[tensorboard_callback]) history = model.fit(X_train, y_train, batch_size=32, epochs=35, validation_data=(X_test, y_test), callbacks=[tensorboard_callback])
# Saving the trained model to avoid re-training # Saving the trained model to avoid re-training
model.save(constants.TRAINED_MODEL) model.save(constants.TRAINED_MODEL)
return history return history
def predict(X_test, y_test): def predict(X_test, y_test):
print("Predictions.....") print("Predictions.....")
predictions = np.argmax(model.predict(X_test), axis=-1) predictions = np.argmax(model.predict(X_test), axis=-1)
target_names = [constants.LIGHT_WEIGHT, constants.MEDIUM_WEIGHT, constants.HEAVY_WEIGHT,constants.TWO_WHEELED, constants.RAIL_BOUND] target_names = [constants.LIGHT_WEIGHT, constants.MEDIUM_WEIGHT, constants.HEAVY_WEIGHT,constants.TWO_WHEELED, constants.RAIL_BOUND]
print(classification_report(y_test, predictions, target_names=target_names)) print(classification_report(y_test, predictions, target_names=target_names))
def plot_model_accuracy(history): def plot_model_accuracy(history):
# Plot graph Model Accuracy # Plot graph Model Accuracy
plt.plot(history.history['accuracy']) plt.plot(history.history['accuracy'])
...@@ -164,7 +73,6 @@ def plot_model_accuracy(history): ...@@ -164,7 +73,6 @@ def plot_model_accuracy(history):
plt.legend(['Train', 'Test'], loc='upper left') plt.legend(['Train', 'Test'], loc='upper left')
plt.show() plt.show()
def plot_model_loss(history): def plot_model_loss(history):
# Plot graph Model Loss # Plot graph Model Loss
plt.plot(history.history['loss']) plt.plot(history.history['loss'])
...@@ -175,7 +83,6 @@ def plot_model_loss(history): ...@@ -175,7 +83,6 @@ def plot_model_loss(history):
plt.legend(['Train', 'Test'], loc='upper right') plt.legend(['Train', 'Test'], loc='upper right')
plt.show() plt.show()
# Changing Directory to Training Dataset Folder # Changing Directory to Training Dataset Folder
chdir(constants.TRAINING_DATA_DIRECTORY_NAME) chdir(constants.TRAINING_DATA_DIRECTORY_NAME)
trainingDataDir = Path.cwd() trainingDataDir = Path.cwd()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment