import librosa.feature import pandas as pd import numpy as np from pathlib import Path from os import chdir import os import csv from tensorflow import keras from sklearn.model_selection import train_test_split from sklearn.preprocessing import LabelEncoder, StandardScaler from keras import models from keras import layers from sklearn.metrics import classification_report import constants import sys def create_csv_header(): if os.path.isfile(constants.TRAINED_MODEL): sys.exit("Trained model file already exists, " "remove/move trained_model.h5 to another location and start training again") if os.path.isfile(constants.FEATURES_CSV_NAME): sys.exit("features.csv already exist, please remove/move the file to another location and run main.py again") else: header = 'filename ' for i in range(constants.MFCC_RANGE_START, constants.MFCC_RANGE_END): header += f' mfcc{i}' header += ' label' header = header.split() file = open(constants.FEATURES_CSV_NAME, 'x', newline='') with file: writer = csv.writer(file) writer.writerow(header) def extract_features(trainingDataDir, trainingDataSubDirs): create_csv_header() # Looping over every file inside the subdirectories for feature extraction for trainingDataSubDir in trainingDataSubDirs: for audio_file_name in os.listdir(trainingDataDir/f'{trainingDataSubDir}'): if audio_file_name.endswith(".wav"): audio_file = trainingDataDir/f'{trainingDataSubDir}/{audio_file_name}' print("Extracting Features from Directory "+trainingDataSubDir+" and file "+audio_file.name) y, sr = librosa.load(audio_file, mono=True) mfcc_features = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=(constants.MFCC_RANGE_END - constants.MFCC_RANGE_START)) to_append = f'{audio_file.name}' for mfcc_segment in mfcc_features: to_append += f' {np.mean(mfcc_segment)}' if trainingDataSubDir == constants.CAR: to_append += f' {constants.LIGHT_WEIGHT}' elif trainingDataSubDir == constants.BUS: to_append += f' {constants.MEDIUM_WEIGHT}' elif trainingDataSubDir == constants.TRUCK: to_append += f' {constants.HEAVY_WEIGHT}' elif trainingDataSubDir == constants.MOTORCYCLE: to_append += f' {constants.TWO_WHEELED}' elif trainingDataSubDir == constants.TRAM: to_append += f' {constants.RAIL_BOUND}' file = open(constants.FEATURES_CSV_NAME, 'a', newline='') with file: writer = csv.writer(file) writer.writerow(to_append.split()) def preprocessing_csv_data(): features_data = pd.read_csv(constants.FEATURES_CSV_NAME) features_data.head() # Dropping unnecessary columns (Column Filename is dropped) updated_features_data = features_data.drop(['filename'], axis=1) updated_features_data.head() return updated_features_data def encode_labels(processedFeaturesData): # Extracting classes/label column as y from csv and converting string labels to numbers using LabelEncoder audio_labels_list = processedFeaturesData.iloc[:, -1] encode_object = LabelEncoder() encoded_target_audio_labels = encode_object.fit_transform(audio_labels_list) return encoded_target_audio_labels, encode_object def normalize_data(processedData): # normalizing - Extracting Remaining Columns as X and normalizing them to a common scale scale_object = StandardScaler() X_normalized_features = scale_object.fit_transform(np.array(processedData.iloc[:, :-1], dtype=float)) return X_normalized_features def train_test_data_split(XInput, yLabels): # splitting of dataset into train and test dataset X_split_train, X_split_test, y_split_train, y_split_test = train_test_split(XInput, yLabels, test_size=constants.TEST_DATA_SPLIT) return X_split_train, X_split_test, y_split_train, y_split_test def create_and_compile_model(): print("Creating a Model") model_instance = models.Sequential() model_instance.add(layers.Dense(constants.HIDDEN_LAYER_1_DIMENSIONS, activation=constants.ACTIVATION_RELU, input_shape=(X_input_features.shape[1],))) model_instance.add(layers.Dense(constants.HIDDEN_LAYER_2_DIMENSIONS, activation=constants.ACTIVATION_RELU)) model_instance.add(layers.Dense(constants.HIDDEN_LAYER_3_DIMENSIONS, activation=constants.ACTIVATION_RELU)) model_instance.add(layers.Dense(constants.OUTPUT_LAYER_DIMENSIONS, activation=constants.ACTIVATION_SOFTMAX)) print("Compiling a Model") model_instance.compile(optimizer=constants.OPTIMIZER_ADAM, loss=constants.LOSS_FUNCTION_SPARSE, metrics=[constants.ACCURACY_METRICS]) return model_instance def train_and_save_model(compiledModel, X_train, y_train, X_test, y_test): log_directory = constants.LOG_DIR_PATH tensorboard_callback = keras.callbacks.TensorBoard(log_dir=log_directory) print("Start Training...") training_history = compiledModel.fit(X_train, y_train, epochs=35, validation_data=(X_test, y_test), callbacks=[tensorboard_callback]) # Saving the trained model to avoid re-training #print(training_history) compiledModel.save(constants.TRAINED_MODEL) return training_history def predict(X_test, y_test): print("Predictions.....") final_predictions = np.argmax(compiled_model.predict(X_test), axis=-1) target_names = [constants.LIGHT_WEIGHT, constants.MEDIUM_WEIGHT, constants.HEAVY_WEIGHT, constants.TWO_WHEELED, constants.RAIL_BOUND] print(classification_report(y_test, final_predictions, target_names=target_names)) # Changing Directory to Training Dataset Folder chdir(constants.TRAINING_DATA_DIRECTORY_NAME) training_data_directory = Path.cwd() training_data_sub_directories = os.listdir(training_data_directory) extract_features(training_data_directory, training_data_sub_directories) processed_features_data = preprocessing_csv_data() target_audio_labels, encoder_object = encode_labels(processed_features_data) X_input_features = normalize_data(processed_features_data) X_train_data, X_test_data, y_train_data, y_test_data = train_test_data_split(X_input_features, target_audio_labels) compiled_model = create_and_compile_model() model_training_history = train_and_save_model(compiled_model, X_train_data, y_train_data, X_test_data, y_test_data) predict(X_test_data, y_test_data)