diff --git a/constants.py b/constants.py index 011fcc12d306bf2160ca7d3fd347ed89e11461f0..2f0b6dcc72d1f027a0259acc6c67ea950b4373dd 100644 --- a/constants.py +++ b/constants.py @@ -1,10 +1,10 @@ -from pandas import datetime +from _datetime import datetime -MFCC_FEATURE_START = 1 -MFCC_FEATURE_END = 21 +MFCC_RANGE_START = 1 +MFCC_RANGE_END = 21 TRAINING_DATA_DIRECTORY_NAME = 'DemoTrainingDataset' -TESTING_DATA_DIRECTORY_NAME = 'TEST' +TESTING_DATA_DIRECTORY_NAME = 'Test' CAR = 'Car' @@ -36,4 +36,6 @@ ACCURACY_METRICS = 'accuracy' LOG_DIR_PATH = "logs/scalars/" + datetime.now().strftime("%Y%m%d-%H%M%S") -TRAINED_MODEL = 'trained_model.h5' +TRAINED_MODEL = 'Trained_Model/trained_model.h5' + +TEST_DATA_SPLIT = 0.20 diff --git a/main.py b/main.py index 124ce5c14c81b23452dcd521fd3dd8d30735b34e..defe61c48e24afd3c3849dc149fdae613af76c96 100644 --- a/main.py +++ b/main.py @@ -10,36 +10,43 @@ from sklearn.model_selection import train_test_split from sklearn.preprocessing import LabelEncoder, StandardScaler from keras import models from keras import layers -import matplotlib.pyplot as plt from sklearn.metrics import classification_report import constants +import sys def create_csv_header(): - header = 'filename ' - for i in range(constants.MFCC_FEATURE_START, constants.MFCC_FEATURE_END): - header += f' mfcc{i}' - header += ' label' - header = header.split() - file = open(constants.FEATURES_CSV_NAME, 'w', newline='') - with file: - writer = csv.writer(file) - writer.writerow(header) + if os.path.isfile(constants.TRAINED_MODEL): + sys.exit("Trained model file already exists, " + "remove/move trained_model.h5 to another location and start training again") + if os.path.isfile(constants.FEATURES_CSV_NAME): + sys.exit("features.csv already exist, please remove/move the file to another location and run main.py again") + else: + header = 'filename ' + for i in range(constants.MFCC_RANGE_START, constants.MFCC_RANGE_END): + header += f' mfcc{i}' + header += ' label' + header = header.split() + file = open(constants.FEATURES_CSV_NAME, 'x', newline='') + with file: + writer = csv.writer(file) + writer.writerow(header) def extract_features(trainingDataDir, trainingDataSubDirs): create_csv_header() # Looping over every file inside the subdirectories for feature extraction for trainingDataSubDir in trainingDataSubDirs: - for fileName in os.listdir(trainingDataDir/f'{trainingDataSubDir}'): - if fileName.endswith(".wav"): - audioFile = trainingDataDir/f'{trainingDataSubDir}/{fileName}' - print("Extracting Features from Directory "+trainingDataSubDir+" and file "+audioFile.name) - y, sr = librosa.load(audioFile, mono=True) - mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=(constants.MFCC_FEATURE_END - constants.MFCC_FEATURE_START)) - to_append = f'{audioFile.name}' - for g in mfcc: - to_append += f' {np.mean(g)}' + for audio_file_name in os.listdir(trainingDataDir/f'{trainingDataSubDir}'): + if audio_file_name.endswith(".wav"): + audio_file = trainingDataDir/f'{trainingDataSubDir}/{audio_file_name}' + print("Extracting Features from Directory "+trainingDataSubDir+" and file "+audio_file.name) + y, sr = librosa.load(audio_file, mono=True) + mfcc_features = librosa.feature.mfcc(y=y, sr=sr, + n_mfcc=(constants.MFCC_RANGE_END - constants.MFCC_RANGE_START)) + to_append = f'{audio_file.name}' + for mfcc_segment in mfcc_features: + to_append += f' {np.mean(mfcc_segment)}' if trainingDataSubDir == constants.CAR: to_append += f' {constants.LIGHT_WEIGHT}' elif trainingDataSubDir == constants.BUS: @@ -58,104 +65,84 @@ def extract_features(trainingDataDir, trainingDataSubDirs): def preprocessing_csv_data(): - print("Reading Features... ") - data = pd.read_csv(constants.FEATURES_CSV_NAME) - data.head() + features_data = pd.read_csv(constants.FEATURES_CSV_NAME) + features_data.head() # Dropping unnecessary columns (Column Filename is dropped) - data = data.drop(['filename'], axis=1) - data.head() - return data + updated_features_data = features_data.drop(['filename'], axis=1) + updated_features_data.head() + return updated_features_data -def encode_labels(data): +def encode_labels(processedFeaturesData): # Extracting classes/label column as y from csv and converting string labels to numbers using LabelEncoder - audio_list = data.iloc[:, -1] - encoder = LabelEncoder() - target_labels = encoder.fit_transform(audio_list) - return target_labels, encoder + audio_labels_list = processedFeaturesData.iloc[:, -1] + encode_object = LabelEncoder() + encoded_target_audio_labels = encode_object.fit_transform(audio_labels_list) + return encoded_target_audio_labels, encode_object -def normalize_data(data): +def normalize_data(processedData): # normalizing - Extracting Remaining Columns as X and normalizing them to a common scale - scaler = StandardScaler() - X = scaler.fit_transform(np.array(data.iloc[:, :-1], dtype=float)) - return X + scale_object = StandardScaler() + X_normalized_features = scale_object.fit_transform(np.array(processedData.iloc[:, :-1], dtype=float)) + return X_normalized_features -def train_test_data_split(X, y): +def train_test_data_split(XInput, yLabels): # splitting of dataset into train and test dataset - X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20) - return X_train, X_test, y_train, y_test + X_split_train, X_split_test, y_split_train, y_split_test = train_test_split(XInput, yLabels, + test_size=constants.TEST_DATA_SPLIT) + return X_split_train, X_split_test, y_split_train, y_split_test def create_and_compile_model(): print("Creating a Model") - # creating a model - model = models.Sequential() - model.add(layers.Dense(constants.HIDDEN_LAYER_1_DIMENSIONS, activation=constants.ACTIVATION_RELU, input_shape=(X.shape[1],))) - model.add(layers.Dense(constants.HIDDEN_LAYER_2_DIMENSIONS, activation=constants.ACTIVATION_RELU)) - model.add(layers.Dense(constants.HIDDEN_LAYER_3_DIMENSIONS, activation=constants.ACTIVATION_RELU)) - model.add(layers.Dense(constants.OUTPUT_LAYER_DIMENSIONS, activation=constants.ACTIVATION_SOFTMAX)) + model_instance = models.Sequential() + model_instance.add(layers.Dense(constants.HIDDEN_LAYER_1_DIMENSIONS, activation=constants.ACTIVATION_RELU, + input_shape=(X_input_features.shape[1],))) + model_instance.add(layers.Dense(constants.HIDDEN_LAYER_2_DIMENSIONS, activation=constants.ACTIVATION_RELU)) + model_instance.add(layers.Dense(constants.HIDDEN_LAYER_3_DIMENSIONS, activation=constants.ACTIVATION_RELU)) + model_instance.add(layers.Dense(constants.OUTPUT_LAYER_DIMENSIONS, activation=constants.ACTIVATION_SOFTMAX)) print("Compiling a Model") - model.compile(optimizer= constants.OPTIMIZER_ADAM, loss= constants.LOSS_FUNCTION_SPARSE, metrics=[constants.ACCURACY_METRICS]) - return model + model_instance.compile(optimizer=constants.OPTIMIZER_ADAM, + loss=constants.LOSS_FUNCTION_SPARSE, + metrics=[constants.ACCURACY_METRICS]) + return model_instance -def train_and_save_model(model, X_train, y_train, X_test, y_test): - logdir = constants.LOG_DIR_PATH - tensorboard_callback = keras.callbacks.TensorBoard(log_dir=logdir) +def train_and_save_model(compiledModel, X_train, y_train, X_test, y_test): + log_directory = constants.LOG_DIR_PATH + tensorboard_callback = keras.callbacks.TensorBoard(log_dir=log_directory) + print("Start Training...") - history = model.fit(X_train, y_train, epochs=35, validation_data=(X_test, y_test), callbacks=[tensorboard_callback]) + training_history = compiledModel.fit(X_train, y_train, epochs=35, + validation_data=(X_test, y_test), + callbacks=[tensorboard_callback]) + # Saving the trained model to avoid re-training - model.save(constants.TRAINED_MODEL) - return history + #print(training_history) + compiledModel.save(constants.TRAINED_MODEL) + return training_history def predict(X_test, y_test): print("Predictions.....") - predictions = np.argmax(model.predict(X_test), axis=-1) - target_names = [constants.LIGHT_WEIGHT, constants.MEDIUM_WEIGHT, constants.HEAVY_WEIGHT,constants.TWO_WHEELED, constants.RAIL_BOUND] - print(classification_report(y_test, predictions, target_names=target_names)) - - -def plot_model_accuracy(history): - # Plot graph Model Accuracy - plt.plot(history.history['accuracy']) - plt.plot(history.history['val_accuracy']) - plt.title('Model Accuracy') - plt.ylabel('Accuracy') - plt.xlabel('Epoch') - plt.legend(['Train', 'Test'], loc='upper left') - plt.show() - - -def plot_model_loss(history): - # Plot graph Model Loss - plt.plot(history.history['loss']) - plt.plot(history.history['val_loss']) - plt.title('Model loss') - plt.ylabel('Loss') - plt.xlabel('Epoch') - plt.legend(['Train', 'Test'], loc='upper right') - plt.show() + final_predictions = np.argmax(compiled_model.predict(X_test), axis=-1) + target_names = [constants.LIGHT_WEIGHT, constants.MEDIUM_WEIGHT, constants.HEAVY_WEIGHT, constants.TWO_WHEELED, + constants.RAIL_BOUND] + print(classification_report(y_test, final_predictions, target_names=target_names)) # Changing Directory to Training Dataset Folder chdir(constants.TRAINING_DATA_DIRECTORY_NAME) -trainingDataDir = Path.cwd() -trainingDataSubDirs = os.listdir(trainingDataDir) - -extract_features(trainingDataDir, trainingDataSubDirs) -data = preprocessing_csv_data() -target_labels, encoder = encode_labels(data) -X = normalize_data(data) -X_train, X_test, y_train, y_test = train_test_data_split(X, target_labels) -model = create_and_compile_model() -history = train_and_save_model(model, X_train, y_train, X_test, y_test) -predict(X_test, y_test) -plot_model_accuracy(history) -plot_model_loss(history) - - - +training_data_directory = Path.cwd() +training_data_sub_directories = os.listdir(training_data_directory) +extract_features(training_data_directory, training_data_sub_directories) +processed_features_data = preprocessing_csv_data() +target_audio_labels, encoder_object = encode_labels(processed_features_data) +X_input_features = normalize_data(processed_features_data) +X_train_data, X_test_data, y_train_data, y_test_data = train_test_data_split(X_input_features, target_audio_labels) +compiled_model = create_and_compile_model() +model_training_history = train_and_save_model(compiled_model, X_train_data, y_train_data, X_test_data, y_test_data) +predict(X_test_data, y_test_data) diff --git a/test.py b/test.py index b1b9f26c9f5e82c8831390154035aaed6eac84f1..5fa110e351be09166db357c0d0fb1b8b4565b783 100644 --- a/test.py +++ b/test.py @@ -7,31 +7,36 @@ import csv from tensorflow import keras from sklearn.preprocessing import LabelEncoder, StandardScaler import constants +import sys def create_csv_header(): - header='' - for i in range(constants.MFCC_FEATURE_START, constants.MFCC_FEATURE_END): - header += f' mfcc{i}' - header = header.split() - file = open(constants.TEST_CSV_NAME, 'w', newline='') - with file: - writer = csv.writer(file) - writer.writerow(header) + if os.path.isfile(constants.TEST_CSV_NAME): + sys.exit("test.csv already exist, please remove/move the file to another location and run test.py again") + else: + header = '' + for i in range(constants.MFCC_RANGE_START, constants.MFCC_RANGE_END): + header += f' mfcc{i}' + header = header.split() + file = open(constants.TEST_CSV_NAME, 'x', newline='') + with file: + writer = csv.writer(file) + writer.writerow(header) def extract_features(workingDir, subDirectories): create_csv_header() for subDirectory in subDirectories: if subDirectory == constants.TESTING_DATA_DIRECTORY_NAME: - for fileName in os.listdir(workingDir/f'{subDirectory}'): - if fileName.endswith(".wav"): - audioFile = workingDir / f'{subDirectory}/{fileName}' - y, sr = librosa.load(audioFile, mono=True) - mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=(constants.MFCC_FEATURE_END - constants.MFCC_FEATURE_START)) + for test_audio_file_name in os.listdir(workingDir/f'{subDirectory}'): + if test_audio_file_name.endswith(".wav"): + test_audio_file = workingDir / f'{subDirectory}/{test_audio_file_name}' + y, sr = librosa.load(test_audio_file, mono=True) + mfcc_features = librosa.feature.mfcc(y=y, sr=sr, + n_mfcc=(constants.MFCC_RANGE_END - constants.MFCC_RANGE_START)) to_append = '' - for g in mfcc: - to_append += f' {np.mean(g)}' + for mfcc_segment in mfcc_features: + to_append += f' {np.mean(mfcc_segment)}' file = open(constants.TEST_CSV_NAME, 'a', newline='') with file: writer = csv.writer(file) @@ -39,30 +44,29 @@ def extract_features(workingDir, subDirectories): def preprocessing_csv_data(): - # reading dataset from csv print("Reading Features... ") - data = pd.read_csv(constants.TEST_CSV_NAME) - data.head() - return data + test_features_data = pd.read_csv(constants.TEST_CSV_NAME) + test_features_data.head() + return test_features_data -def normalize_data(data): +def normalize_data(processedData): # # normalizing - Extracting Remaining Columns as X and normalizing them to a common scale - scaler = StandardScaler() - X = scaler.fit_transform(np.array(data.iloc[:, :], dtype=float)) - print(X) - print(X.shape) - return X + scale_object = StandardScaler() + X_test = scale_object.fit_transform(np.array(processedData.iloc[:, :], dtype=float)) + return X_test -WorkingDir = Path.cwd() -subDirectories = os.listdir(WorkingDir) -extract_features(WorkingDir, subDirectories) -data = preprocessing_csv_data() -X = normalize_data(data) -model = keras.models.load_model('./DemoTrainingDataset/trained_model.h5') -model.summary() -predictions = np.argmax(model.predict(X), axis=-1) +working_directory = Path.cwd() +sub_directories = os.listdir(working_directory) +extract_features(working_directory, sub_directories) +processed_data = preprocessing_csv_data() +X_test_data = normalize_data(processed_data) +if os.path.isfile('./DemoTrainingDataset/Trained_Model/trained_model.h5'): + model = keras.models.load_model('./DemoTrainingDataset/Trained_Model/trained_model.h5') +else: + sys.exit("Trained model file does not exists") +predictions = np.argmax(model.predict(X_test_data), axis=-1) encoder = LabelEncoder() labels = ['Light-Weight', 'Medium-Weight', 'Heavy-Weight', 'Two-Wheeled', 'Rail-Bound'] encoder.fit_transform(labels)