From b52cc91fee0a81fd629601e593e5f2e286835670 Mon Sep 17 00:00:00 2001
From: 92homa1mst <92homa1mst@hft-stuttgart.de>
Date: Wed, 24 Feb 2021 21:45:36 +0100
Subject: [PATCH] Commit for code optimization

---
 constants.py |  12 ++--
 main.py      | 171 ++++++++++++++++++++++++---------------------------
 test.py      |  70 +++++++++++----------
 3 files changed, 123 insertions(+), 130 deletions(-)

diff --git a/constants.py b/constants.py
index 011fcc1..2f0b6dc 100644
--- a/constants.py
+++ b/constants.py
@@ -1,10 +1,10 @@
-from pandas import datetime
+from _datetime import datetime
 
-MFCC_FEATURE_START = 1
-MFCC_FEATURE_END = 21
+MFCC_RANGE_START = 1
+MFCC_RANGE_END = 21
 
 TRAINING_DATA_DIRECTORY_NAME = 'DemoTrainingDataset'
-TESTING_DATA_DIRECTORY_NAME = 'TEST'
+TESTING_DATA_DIRECTORY_NAME = 'Test'
 
 
 CAR = 'Car'
@@ -36,4 +36,6 @@ ACCURACY_METRICS = 'accuracy'
 
 LOG_DIR_PATH = "logs/scalars/" + datetime.now().strftime("%Y%m%d-%H%M%S")
 
-TRAINED_MODEL = 'trained_model.h5'
+TRAINED_MODEL = 'Trained_Model/trained_model.h5'
+
+TEST_DATA_SPLIT = 0.20
diff --git a/main.py b/main.py
index 124ce5c..defe61c 100644
--- a/main.py
+++ b/main.py
@@ -10,36 +10,43 @@ from sklearn.model_selection import train_test_split
 from sklearn.preprocessing import LabelEncoder, StandardScaler
 from keras import models
 from keras import layers
-import matplotlib.pyplot as plt
 from sklearn.metrics import classification_report
 import constants
+import sys
 
 
 def create_csv_header():
-    header = 'filename '
-    for i in range(constants.MFCC_FEATURE_START, constants.MFCC_FEATURE_END):
-        header += f' mfcc{i}'
-    header += ' label'
-    header = header.split()
-    file = open(constants.FEATURES_CSV_NAME, 'w', newline='')
-    with file:
-        writer = csv.writer(file)
-        writer.writerow(header)
+    if os.path.isfile(constants.TRAINED_MODEL):
+        sys.exit("Trained model file already exists, "
+                 "remove/move trained_model.h5 to another location and start training again")
+    if os.path.isfile(constants.FEATURES_CSV_NAME):
+        sys.exit("features.csv already exist, please remove/move the file to another location and run main.py again")
+    else:
+        header = 'filename '
+        for i in range(constants.MFCC_RANGE_START, constants.MFCC_RANGE_END):
+            header += f' mfcc{i}'
+        header += ' label'
+        header = header.split()
+        file = open(constants.FEATURES_CSV_NAME, 'x', newline='')
+        with file:
+            writer = csv.writer(file)
+            writer.writerow(header)
 
 
 def extract_features(trainingDataDir, trainingDataSubDirs):
     create_csv_header()
     # Looping over every file inside the subdirectories for feature extraction
     for trainingDataSubDir in trainingDataSubDirs:
-        for fileName in os.listdir(trainingDataDir/f'{trainingDataSubDir}'):
-            if fileName.endswith(".wav"):
-                audioFile = trainingDataDir/f'{trainingDataSubDir}/{fileName}'
-                print("Extracting Features from Directory "+trainingDataSubDir+" and file "+audioFile.name)
-                y, sr = librosa.load(audioFile, mono=True)
-                mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=(constants.MFCC_FEATURE_END - constants.MFCC_FEATURE_START))
-                to_append = f'{audioFile.name}'
-                for g in mfcc:
-                    to_append += f' {np.mean(g)}'
+        for audio_file_name in os.listdir(trainingDataDir/f'{trainingDataSubDir}'):
+            if audio_file_name.endswith(".wav"):
+                audio_file = trainingDataDir/f'{trainingDataSubDir}/{audio_file_name}'
+                print("Extracting Features from Directory "+trainingDataSubDir+" and file "+audio_file.name)
+                y, sr = librosa.load(audio_file, mono=True)
+                mfcc_features = librosa.feature.mfcc(y=y, sr=sr,
+                                                     n_mfcc=(constants.MFCC_RANGE_END - constants.MFCC_RANGE_START))
+                to_append = f'{audio_file.name}'
+                for mfcc_segment in mfcc_features:
+                    to_append += f' {np.mean(mfcc_segment)}'
                 if trainingDataSubDir == constants.CAR:
                     to_append += f' {constants.LIGHT_WEIGHT}'
                 elif trainingDataSubDir == constants.BUS:
@@ -58,104 +65,84 @@ def extract_features(trainingDataDir, trainingDataSubDirs):
 
 
 def preprocessing_csv_data():
-    print("Reading Features... ")
-    data = pd.read_csv(constants.FEATURES_CSV_NAME)
-    data.head()
+    features_data = pd.read_csv(constants.FEATURES_CSV_NAME)
+    features_data.head()
     # Dropping unnecessary columns (Column Filename is dropped)
-    data = data.drop(['filename'], axis=1)
-    data.head()
-    return data
+    updated_features_data = features_data.drop(['filename'], axis=1)
+    updated_features_data.head()
+    return updated_features_data
 
 
-def encode_labels(data):
+def encode_labels(processedFeaturesData):
     # Extracting classes/label column as y from csv and converting string labels to numbers using LabelEncoder
-    audio_list = data.iloc[:, -1]
-    encoder = LabelEncoder()
-    target_labels = encoder.fit_transform(audio_list)
-    return target_labels, encoder
+    audio_labels_list = processedFeaturesData.iloc[:, -1]
+    encode_object = LabelEncoder()
+    encoded_target_audio_labels = encode_object.fit_transform(audio_labels_list)
+    return encoded_target_audio_labels, encode_object
 
 
-def normalize_data(data):
+def normalize_data(processedData):
     # normalizing - Extracting Remaining Columns as X and normalizing them to a common scale
-    scaler = StandardScaler()
-    X = scaler.fit_transform(np.array(data.iloc[:, :-1], dtype=float))
-    return X
+    scale_object = StandardScaler()
+    X_normalized_features = scale_object.fit_transform(np.array(processedData.iloc[:, :-1], dtype=float))
+    return X_normalized_features
 
 
-def train_test_data_split(X, y):
+def train_test_data_split(XInput, yLabels):
     # splitting of dataset into train and test dataset
-    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20)
-    return X_train, X_test, y_train, y_test
+    X_split_train, X_split_test, y_split_train, y_split_test = train_test_split(XInput, yLabels,
+                                                                                test_size=constants.TEST_DATA_SPLIT)
+    return X_split_train, X_split_test, y_split_train, y_split_test
 
 
 def create_and_compile_model():
     print("Creating a Model")
-    # creating a model
-    model = models.Sequential()
-    model.add(layers.Dense(constants.HIDDEN_LAYER_1_DIMENSIONS, activation=constants.ACTIVATION_RELU, input_shape=(X.shape[1],)))
-    model.add(layers.Dense(constants.HIDDEN_LAYER_2_DIMENSIONS, activation=constants.ACTIVATION_RELU))
-    model.add(layers.Dense(constants.HIDDEN_LAYER_3_DIMENSIONS, activation=constants.ACTIVATION_RELU))
-    model.add(layers.Dense(constants.OUTPUT_LAYER_DIMENSIONS, activation=constants.ACTIVATION_SOFTMAX))
+    model_instance = models.Sequential()
+    model_instance.add(layers.Dense(constants.HIDDEN_LAYER_1_DIMENSIONS, activation=constants.ACTIVATION_RELU,
+                                    input_shape=(X_input_features.shape[1],)))
+    model_instance.add(layers.Dense(constants.HIDDEN_LAYER_2_DIMENSIONS, activation=constants.ACTIVATION_RELU))
+    model_instance.add(layers.Dense(constants.HIDDEN_LAYER_3_DIMENSIONS, activation=constants.ACTIVATION_RELU))
+    model_instance.add(layers.Dense(constants.OUTPUT_LAYER_DIMENSIONS, activation=constants.ACTIVATION_SOFTMAX))
 
     print("Compiling a Model")
-    model.compile(optimizer= constants.OPTIMIZER_ADAM, loss= constants.LOSS_FUNCTION_SPARSE, metrics=[constants.ACCURACY_METRICS])
-    return model
+    model_instance.compile(optimizer=constants.OPTIMIZER_ADAM,
+                           loss=constants.LOSS_FUNCTION_SPARSE,
+                           metrics=[constants.ACCURACY_METRICS])
+    return model_instance
 
 
-def train_and_save_model(model, X_train, y_train, X_test, y_test):
-    logdir = constants.LOG_DIR_PATH
-    tensorboard_callback = keras.callbacks.TensorBoard(log_dir=logdir)
+def train_and_save_model(compiledModel, X_train, y_train, X_test, y_test):
+    log_directory = constants.LOG_DIR_PATH
+    tensorboard_callback = keras.callbacks.TensorBoard(log_dir=log_directory)
+
     print("Start Training...")
-    history = model.fit(X_train, y_train, epochs=35, validation_data=(X_test, y_test), callbacks=[tensorboard_callback])
+    training_history = compiledModel.fit(X_train, y_train, epochs=35,
+                                         validation_data=(X_test, y_test),
+                                         callbacks=[tensorboard_callback])
+
     # Saving the trained model to avoid re-training
-    model.save(constants.TRAINED_MODEL)
-    return history
+    #print(training_history)
+    compiledModel.save(constants.TRAINED_MODEL)
+    return training_history
 
 
 def predict(X_test, y_test):
     print("Predictions.....")
-    predictions = np.argmax(model.predict(X_test), axis=-1)
-    target_names = [constants.LIGHT_WEIGHT, constants.MEDIUM_WEIGHT, constants.HEAVY_WEIGHT,constants.TWO_WHEELED, constants.RAIL_BOUND]
-    print(classification_report(y_test, predictions, target_names=target_names))
-
-
-def plot_model_accuracy(history):
-    # Plot graph Model Accuracy
-    plt.plot(history.history['accuracy'])
-    plt.plot(history.history['val_accuracy'])
-    plt.title('Model Accuracy')
-    plt.ylabel('Accuracy')
-    plt.xlabel('Epoch')
-    plt.legend(['Train', 'Test'], loc='upper left')
-    plt.show()
-
-
-def plot_model_loss(history):
-    # Plot graph Model Loss
-    plt.plot(history.history['loss'])
-    plt.plot(history.history['val_loss'])
-    plt.title('Model loss')
-    plt.ylabel('Loss')
-    plt.xlabel('Epoch')
-    plt.legend(['Train', 'Test'], loc='upper right')
-    plt.show()
+    final_predictions = np.argmax(compiled_model.predict(X_test), axis=-1)
+    target_names = [constants.LIGHT_WEIGHT, constants.MEDIUM_WEIGHT, constants.HEAVY_WEIGHT, constants.TWO_WHEELED,
+                    constants.RAIL_BOUND]
+    print(classification_report(y_test, final_predictions, target_names=target_names))
 
 
 # Changing Directory to Training Dataset Folder
 chdir(constants.TRAINING_DATA_DIRECTORY_NAME)
-trainingDataDir = Path.cwd()
-trainingDataSubDirs = os.listdir(trainingDataDir)
-
-extract_features(trainingDataDir, trainingDataSubDirs)
-data = preprocessing_csv_data()
-target_labels, encoder = encode_labels(data)
-X = normalize_data(data)
-X_train, X_test, y_train, y_test = train_test_data_split(X, target_labels)
-model = create_and_compile_model()
-history = train_and_save_model(model, X_train, y_train, X_test, y_test)
-predict(X_test, y_test)
-plot_model_accuracy(history)
-plot_model_loss(history)
-
-
-
+training_data_directory = Path.cwd()
+training_data_sub_directories = os.listdir(training_data_directory)
+extract_features(training_data_directory, training_data_sub_directories)
+processed_features_data = preprocessing_csv_data()
+target_audio_labels, encoder_object = encode_labels(processed_features_data)
+X_input_features = normalize_data(processed_features_data)
+X_train_data, X_test_data, y_train_data, y_test_data = train_test_data_split(X_input_features, target_audio_labels)
+compiled_model = create_and_compile_model()
+model_training_history = train_and_save_model(compiled_model, X_train_data, y_train_data, X_test_data, y_test_data)
+predict(X_test_data, y_test_data)
diff --git a/test.py b/test.py
index b1b9f26..5fa110e 100644
--- a/test.py
+++ b/test.py
@@ -7,31 +7,36 @@ import csv
 from tensorflow import keras
 from sklearn.preprocessing import LabelEncoder, StandardScaler
 import constants
+import sys
 
 
 def create_csv_header():
-    header=''
-    for i in range(constants.MFCC_FEATURE_START, constants.MFCC_FEATURE_END):
-        header += f' mfcc{i}'
-    header = header.split()
-    file = open(constants.TEST_CSV_NAME, 'w', newline='')
-    with file:
-        writer = csv.writer(file)
-        writer.writerow(header)
+    if os.path.isfile(constants.TEST_CSV_NAME):
+        sys.exit("test.csv already exist, please remove/move the file to another location and run test.py again")
+    else:
+        header = ''
+        for i in range(constants.MFCC_RANGE_START, constants.MFCC_RANGE_END):
+            header += f' mfcc{i}'
+        header = header.split()
+        file = open(constants.TEST_CSV_NAME, 'x', newline='')
+        with file:
+            writer = csv.writer(file)
+            writer.writerow(header)
 
 
 def extract_features(workingDir, subDirectories):
     create_csv_header()
     for subDirectory in subDirectories:
         if subDirectory == constants.TESTING_DATA_DIRECTORY_NAME:
-            for fileName in os.listdir(workingDir/f'{subDirectory}'):
-                if fileName.endswith(".wav"):
-                    audioFile = workingDir / f'{subDirectory}/{fileName}'
-                    y, sr = librosa.load(audioFile, mono=True)
-                    mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=(constants.MFCC_FEATURE_END - constants.MFCC_FEATURE_START))
+            for test_audio_file_name in os.listdir(workingDir/f'{subDirectory}'):
+                if test_audio_file_name.endswith(".wav"):
+                    test_audio_file = workingDir / f'{subDirectory}/{test_audio_file_name}'
+                    y, sr = librosa.load(test_audio_file, mono=True)
+                    mfcc_features = librosa.feature.mfcc(y=y, sr=sr,
+                                                         n_mfcc=(constants.MFCC_RANGE_END - constants.MFCC_RANGE_START))
                     to_append = ''
-                    for g in mfcc:
-                        to_append += f' {np.mean(g)}'
+                    for mfcc_segment in mfcc_features:
+                        to_append += f' {np.mean(mfcc_segment)}'
                     file = open(constants.TEST_CSV_NAME, 'a', newline='')
                     with file:
                         writer = csv.writer(file)
@@ -39,30 +44,29 @@ def extract_features(workingDir, subDirectories):
 
 
 def preprocessing_csv_data():
-    # reading dataset from csv
     print("Reading Features... ")
-    data = pd.read_csv(constants.TEST_CSV_NAME)
-    data.head()
-    return data
+    test_features_data = pd.read_csv(constants.TEST_CSV_NAME)
+    test_features_data.head()
+    return test_features_data
 
 
-def normalize_data(data):
+def normalize_data(processedData):
     # # normalizing - Extracting Remaining Columns as X and normalizing them to a common scale
-    scaler = StandardScaler()
-    X = scaler.fit_transform(np.array(data.iloc[:, :], dtype=float))
-    print(X)
-    print(X.shape)
-    return X
+    scale_object = StandardScaler()
+    X_test = scale_object.fit_transform(np.array(processedData.iloc[:, :], dtype=float))
+    return X_test
 
 
-WorkingDir = Path.cwd()
-subDirectories = os.listdir(WorkingDir)
-extract_features(WorkingDir, subDirectories)
-data = preprocessing_csv_data()
-X = normalize_data(data)
-model = keras.models.load_model('./DemoTrainingDataset/trained_model.h5')
-model.summary()
-predictions = np.argmax(model.predict(X), axis=-1)
+working_directory = Path.cwd()
+sub_directories = os.listdir(working_directory)
+extract_features(working_directory, sub_directories)
+processed_data = preprocessing_csv_data()
+X_test_data = normalize_data(processed_data)
+if os.path.isfile('./DemoTrainingDataset/Trained_Model/trained_model.h5'):
+    model = keras.models.load_model('./DemoTrainingDataset/Trained_Model/trained_model.h5')
+else:
+    sys.exit("Trained model file does not exists")
+predictions = np.argmax(model.predict(X_test_data), axis=-1)
 encoder = LabelEncoder()
 labels = ['Light-Weight', 'Medium-Weight', 'Heavy-Weight', 'Two-Wheeled', 'Rail-Bound']
 encoder.fit_transform(labels)
-- 
GitLab