Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
Hotwani
HPC_Vehicle_Classification
Commits
b52cc91f
Commit
b52cc91f
authored
Feb 24, 2021
by
Hotwani
Browse files
Commit for code optimization
parent
6c4a93b8
Pipeline
#1990
failed with stages
Changes
3
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
constants.py
View file @
b52cc91f
from
pandas
import
datetime
from
_datetime
import
datetime
MFCC_
FEATUR
E_START
=
1
MFCC_
FEATUR
E_END
=
21
MFCC_
RANG
E_START
=
1
MFCC_
RANG
E_END
=
21
TRAINING_DATA_DIRECTORY_NAME
=
'DemoTrainingDataset'
TESTING_DATA_DIRECTORY_NAME
=
'T
EST
'
TESTING_DATA_DIRECTORY_NAME
=
'T
est
'
CAR
=
'Car'
...
...
@@ -36,4 +36,6 @@ ACCURACY_METRICS = 'accuracy'
LOG_DIR_PATH
=
"logs/scalars/"
+
datetime
.
now
().
strftime
(
"%Y%m%d-%H%M%S"
)
TRAINED_MODEL
=
'trained_model.h5'
TRAINED_MODEL
=
'Trained_Model/trained_model.h5'
TEST_DATA_SPLIT
=
0.20
main.py
View file @
b52cc91f
...
...
@@ -10,36 +10,43 @@ from sklearn.model_selection import train_test_split
from
sklearn.preprocessing
import
LabelEncoder
,
StandardScaler
from
keras
import
models
from
keras
import
layers
import
matplotlib.pyplot
as
plt
from
sklearn.metrics
import
classification_report
import
constants
import
sys
def
create_csv_header
():
header
=
'filename '
for
i
in
range
(
constants
.
MFCC_FEATURE_START
,
constants
.
MFCC_FEATURE_END
):
header
+=
f
' mfcc
{
i
}
'
header
+=
' label'
header
=
header
.
split
()
file
=
open
(
constants
.
FEATURES_CSV_NAME
,
'w'
,
newline
=
''
)
with
file
:
writer
=
csv
.
writer
(
file
)
writer
.
writerow
(
header
)
if
os
.
path
.
isfile
(
constants
.
TRAINED_MODEL
):
sys
.
exit
(
"Trained model file already exists, "
"remove/move trained_model.h5 to another location and start training again"
)
if
os
.
path
.
isfile
(
constants
.
FEATURES_CSV_NAME
):
sys
.
exit
(
"features.csv already exist, please remove/move the file to another location and run main.py again"
)
else
:
header
=
'filename '
for
i
in
range
(
constants
.
MFCC_RANGE_START
,
constants
.
MFCC_RANGE_END
):
header
+=
f
' mfcc
{
i
}
'
header
+=
' label'
header
=
header
.
split
()
file
=
open
(
constants
.
FEATURES_CSV_NAME
,
'x'
,
newline
=
''
)
with
file
:
writer
=
csv
.
writer
(
file
)
writer
.
writerow
(
header
)
def
extract_features
(
trainingDataDir
,
trainingDataSubDirs
):
create_csv_header
()
# Looping over every file inside the subdirectories for feature extraction
for
trainingDataSubDir
in
trainingDataSubDirs
:
for
fileName
in
os
.
listdir
(
trainingDataDir
/
f
'
{
trainingDataSubDir
}
'
):
if
fileName
.
endswith
(
".wav"
):
audioFile
=
trainingDataDir
/
f
'
{
trainingDataSubDir
}
/
{
fileName
}
'
print
(
"Extracting Features from Directory "
+
trainingDataSubDir
+
" and file "
+
audioFile
.
name
)
y
,
sr
=
librosa
.
load
(
audioFile
,
mono
=
True
)
mfcc
=
librosa
.
feature
.
mfcc
(
y
=
y
,
sr
=
sr
,
n_mfcc
=
(
constants
.
MFCC_FEATURE_END
-
constants
.
MFCC_FEATURE_START
))
to_append
=
f
'
{
audioFile
.
name
}
'
for
g
in
mfcc
:
to_append
+=
f
'
{
np
.
mean
(
g
)
}
'
for
audio_file_name
in
os
.
listdir
(
trainingDataDir
/
f
'
{
trainingDataSubDir
}
'
):
if
audio_file_name
.
endswith
(
".wav"
):
audio_file
=
trainingDataDir
/
f
'
{
trainingDataSubDir
}
/
{
audio_file_name
}
'
print
(
"Extracting Features from Directory "
+
trainingDataSubDir
+
" and file "
+
audio_file
.
name
)
y
,
sr
=
librosa
.
load
(
audio_file
,
mono
=
True
)
mfcc_features
=
librosa
.
feature
.
mfcc
(
y
=
y
,
sr
=
sr
,
n_mfcc
=
(
constants
.
MFCC_RANGE_END
-
constants
.
MFCC_RANGE_START
))
to_append
=
f
'
{
audio_file
.
name
}
'
for
mfcc_segment
in
mfcc_features
:
to_append
+=
f
'
{
np
.
mean
(
mfcc_segment
)
}
'
if
trainingDataSubDir
==
constants
.
CAR
:
to_append
+=
f
'
{
constants
.
LIGHT_WEIGHT
}
'
elif
trainingDataSubDir
==
constants
.
BUS
:
...
...
@@ -58,104 +65,84 @@ def extract_features(trainingDataDir, trainingDataSubDirs):
def
preprocessing_csv_data
():
print
(
"Reading Features... "
)
data
=
pd
.
read_csv
(
constants
.
FEATURES_CSV_NAME
)
data
.
head
()
features_data
=
pd
.
read_csv
(
constants
.
FEATURES_CSV_NAME
)
features_data
.
head
()
# Dropping unnecessary columns (Column Filename is dropped)
data
=
data
.
drop
([
'filename'
],
axis
=
1
)
data
.
head
()
return
data
updated_features_data
=
features_
data
.
drop
([
'filename'
],
axis
=
1
)
updated_features_
data
.
head
()
return
updated_features_
data
def
encode_labels
(
d
ata
):
def
encode_labels
(
processedFeaturesD
ata
):
# Extracting classes/label column as y from csv and converting string labels to numbers using LabelEncoder
audio_l
ist
=
d
ata
.
iloc
[:,
-
1
]
encode
r
=
LabelEncoder
()
target_labels
=
encode
r
.
fit_transform
(
audio_list
)
return
target_labels
,
encode
r
audio_l
abels_list
=
processedFeaturesD
ata
.
iloc
[:,
-
1
]
encode
_object
=
LabelEncoder
()
encoded_
target_
audio_
labels
=
encode
_object
.
fit_transform
(
audio_
labels_
list
)
return
encoded_
target_
audio_
labels
,
encode
_object
def
normalize_data
(
d
ata
):
def
normalize_data
(
processedD
ata
):
# normalizing - Extracting Remaining Columns as X and normalizing them to a common scale
scale
r
=
StandardScaler
()
X
=
scale
r
.
fit_transform
(
np
.
array
(
d
ata
.
iloc
[:,
:
-
1
],
dtype
=
float
))
return
X
scale
_object
=
StandardScaler
()
X
_normalized_features
=
scale
_object
.
fit_transform
(
np
.
array
(
processedD
ata
.
iloc
[:,
:
-
1
],
dtype
=
float
))
return
X
_normalized_features
def
train_test_data_split
(
X
,
y
):
def
train_test_data_split
(
X
Input
,
yLabels
):
# splitting of dataset into train and test dataset
X_train
,
X_test
,
y_train
,
y_test
=
train_test_split
(
X
,
y
,
test_size
=
0.20
)
return
X_train
,
X_test
,
y_train
,
y_test
X_split_train
,
X_split_test
,
y_split_train
,
y_split_test
=
train_test_split
(
XInput
,
yLabels
,
test_size
=
constants
.
TEST_DATA_SPLIT
)
return
X_split_train
,
X_split_test
,
y_split_train
,
y_split_test
def
create_and_compile_model
():
print
(
"Creating a Model"
)
# creating a model
model
=
models
.
Sequential
()
model
.
add
(
layers
.
Dense
(
constants
.
HIDDEN_LAYER_1_DIMENSIONS
,
activation
=
constants
.
ACTIVATION_RELU
,
input_shape
=
(
X
.
shape
[
1
],)))
model
.
add
(
layers
.
Dense
(
constants
.
HIDDEN_LAYER_2_DIMENSIONS
,
activation
=
constants
.
ACTIVATION_RELU
))
model
.
add
(
layers
.
Dense
(
constants
.
HIDDEN_LAYER_3_DIMENSIONS
,
activation
=
constants
.
ACTIVATION_RELU
))
model
.
add
(
layers
.
Dense
(
constants
.
OUTPUT_LAYER_DIMENSIONS
,
activation
=
constants
.
ACTIVATION_SOFTMAX
))
model_instance
=
models
.
Sequential
()
model
_instance
.
add
(
layers
.
Dense
(
constants
.
HIDDEN_LAYER_1_DIMENSIONS
,
activation
=
constants
.
ACTIVATION_RELU
,
input_shape
=
(
X_input_features
.
shape
[
1
],)))
model
_instance
.
add
(
layers
.
Dense
(
constants
.
HIDDEN_LAYER_2_DIMENSIONS
,
activation
=
constants
.
ACTIVATION_RELU
))
model
_instance
.
add
(
layers
.
Dense
(
constants
.
HIDDEN_LAYER_3_DIMENSIONS
,
activation
=
constants
.
ACTIVATION_RELU
))
model
_instance
.
add
(
layers
.
Dense
(
constants
.
OUTPUT_LAYER_DIMENSIONS
,
activation
=
constants
.
ACTIVATION_SOFTMAX
))
print
(
"Compiling a Model"
)
model
.
compile
(
optimizer
=
constants
.
OPTIMIZER_ADAM
,
loss
=
constants
.
LOSS_FUNCTION_SPARSE
,
metrics
=
[
constants
.
ACCURACY_METRICS
])
return
model
model_instance
.
compile
(
optimizer
=
constants
.
OPTIMIZER_ADAM
,
loss
=
constants
.
LOSS_FUNCTION_SPARSE
,
metrics
=
[
constants
.
ACCURACY_METRICS
])
return
model_instance
def
train_and_save_model
(
model
,
X_train
,
y_train
,
X_test
,
y_test
):
logdir
=
constants
.
LOG_DIR_PATH
tensorboard_callback
=
keras
.
callbacks
.
TensorBoard
(
log_dir
=
logdir
)
def
train_and_save_model
(
compiledModel
,
X_train
,
y_train
,
X_test
,
y_test
):
log_directory
=
constants
.
LOG_DIR_PATH
tensorboard_callback
=
keras
.
callbacks
.
TensorBoard
(
log_dir
=
log_directory
)
print
(
"Start Training..."
)
history
=
model
.
fit
(
X_train
,
y_train
,
epochs
=
35
,
validation_data
=
(
X_test
,
y_test
),
callbacks
=
[
tensorboard_callback
])
training_history
=
compiledModel
.
fit
(
X_train
,
y_train
,
epochs
=
35
,
validation_data
=
(
X_test
,
y_test
),
callbacks
=
[
tensorboard_callback
])
# Saving the trained model to avoid re-training
model
.
save
(
constants
.
TRAINED_MODEL
)
return
history
#print(training_history)
compiledModel
.
save
(
constants
.
TRAINED_MODEL
)
return
training_history
def
predict
(
X_test
,
y_test
):
print
(
"Predictions....."
)
predictions
=
np
.
argmax
(
model
.
predict
(
X_test
),
axis
=-
1
)
target_names
=
[
constants
.
LIGHT_WEIGHT
,
constants
.
MEDIUM_WEIGHT
,
constants
.
HEAVY_WEIGHT
,
constants
.
TWO_WHEELED
,
constants
.
RAIL_BOUND
]
print
(
classification_report
(
y_test
,
predictions
,
target_names
=
target_names
))
def
plot_model_accuracy
(
history
):
# Plot graph Model Accuracy
plt
.
plot
(
history
.
history
[
'accuracy'
])
plt
.
plot
(
history
.
history
[
'val_accuracy'
])
plt
.
title
(
'Model Accuracy'
)
plt
.
ylabel
(
'Accuracy'
)
plt
.
xlabel
(
'Epoch'
)
plt
.
legend
([
'Train'
,
'Test'
],
loc
=
'upper left'
)
plt
.
show
()
def
plot_model_loss
(
history
):
# Plot graph Model Loss
plt
.
plot
(
history
.
history
[
'loss'
])
plt
.
plot
(
history
.
history
[
'val_loss'
])
plt
.
title
(
'Model loss'
)
plt
.
ylabel
(
'Loss'
)
plt
.
xlabel
(
'Epoch'
)
plt
.
legend
([
'Train'
,
'Test'
],
loc
=
'upper right'
)
plt
.
show
()
final_predictions
=
np
.
argmax
(
compiled_model
.
predict
(
X_test
),
axis
=-
1
)
target_names
=
[
constants
.
LIGHT_WEIGHT
,
constants
.
MEDIUM_WEIGHT
,
constants
.
HEAVY_WEIGHT
,
constants
.
TWO_WHEELED
,
constants
.
RAIL_BOUND
]
print
(
classification_report
(
y_test
,
final_predictions
,
target_names
=
target_names
))
# Changing Directory to Training Dataset Folder
chdir
(
constants
.
TRAINING_DATA_DIRECTORY_NAME
)
trainingDataDir
=
Path
.
cwd
()
trainingDataSubDirs
=
os
.
listdir
(
trainingDataDir
)
extract_features
(
trainingDataDir
,
trainingDataSubDirs
)
data
=
preprocessing_csv_data
()
target_labels
,
encoder
=
encode_labels
(
data
)
X
=
normalize_data
(
data
)
X_train
,
X_test
,
y_train
,
y_test
=
train_test_data_split
(
X
,
target_labels
)
model
=
create_and_compile_model
()
history
=
train_and_save_model
(
model
,
X_train
,
y_train
,
X_test
,
y_test
)
predict
(
X_test
,
y_test
)
plot_model_accuracy
(
history
)
plot_model_loss
(
history
)
training_data_directory
=
Path
.
cwd
()
training_data_sub_directories
=
os
.
listdir
(
training_data_directory
)
extract_features
(
training_data_directory
,
training_data_sub_directories
)
processed_features_data
=
preprocessing_csv_data
()
target_audio_labels
,
encoder_object
=
encode_labels
(
processed_features_data
)
X_input_features
=
normalize_data
(
processed_features_data
)
X_train_data
,
X_test_data
,
y_train_data
,
y_test_data
=
train_test_data_split
(
X_input_features
,
target_audio_labels
)
compiled_model
=
create_and_compile_model
()
model_training_history
=
train_and_save_model
(
compiled_model
,
X_train_data
,
y_train_data
,
X_test_data
,
y_test_data
)
predict
(
X_test_data
,
y_test_data
)
test.py
View file @
b52cc91f
...
...
@@ -7,31 +7,36 @@ import csv
from
tensorflow
import
keras
from
sklearn.preprocessing
import
LabelEncoder
,
StandardScaler
import
constants
import
sys
def
create_csv_header
():
header
=
''
for
i
in
range
(
constants
.
MFCC_FEATURE_START
,
constants
.
MFCC_FEATURE_END
):
header
+=
f
' mfcc
{
i
}
'
header
=
header
.
split
()
file
=
open
(
constants
.
TEST_CSV_NAME
,
'w'
,
newline
=
''
)
with
file
:
writer
=
csv
.
writer
(
file
)
writer
.
writerow
(
header
)
if
os
.
path
.
isfile
(
constants
.
TEST_CSV_NAME
):
sys
.
exit
(
"test.csv already exist, please remove/move the file to another location and run test.py again"
)
else
:
header
=
''
for
i
in
range
(
constants
.
MFCC_RANGE_START
,
constants
.
MFCC_RANGE_END
):
header
+=
f
' mfcc
{
i
}
'
header
=
header
.
split
()
file
=
open
(
constants
.
TEST_CSV_NAME
,
'x'
,
newline
=
''
)
with
file
:
writer
=
csv
.
writer
(
file
)
writer
.
writerow
(
header
)
def
extract_features
(
workingDir
,
subDirectories
):
create_csv_header
()
for
subDirectory
in
subDirectories
:
if
subDirectory
==
constants
.
TESTING_DATA_DIRECTORY_NAME
:
for
fileName
in
os
.
listdir
(
workingDir
/
f
'
{
subDirectory
}
'
):
if
fileName
.
endswith
(
".wav"
):
audioFile
=
workingDir
/
f
'
{
subDirectory
}
/
{
fileName
}
'
y
,
sr
=
librosa
.
load
(
audioFile
,
mono
=
True
)
mfcc
=
librosa
.
feature
.
mfcc
(
y
=
y
,
sr
=
sr
,
n_mfcc
=
(
constants
.
MFCC_FEATURE_END
-
constants
.
MFCC_FEATURE_START
))
for
test_audio_file_name
in
os
.
listdir
(
workingDir
/
f
'
{
subDirectory
}
'
):
if
test_audio_file_name
.
endswith
(
".wav"
):
test_audio_file
=
workingDir
/
f
'
{
subDirectory
}
/
{
test_audio_file_name
}
'
y
,
sr
=
librosa
.
load
(
test_audio_file
,
mono
=
True
)
mfcc_features
=
librosa
.
feature
.
mfcc
(
y
=
y
,
sr
=
sr
,
n_mfcc
=
(
constants
.
MFCC_RANGE_END
-
constants
.
MFCC_RANGE_START
))
to_append
=
''
for
g
in
mfcc
:
to_append
+=
f
'
{
np
.
mean
(
g
)
}
'
for
mfcc_segment
in
mfcc_features
:
to_append
+=
f
'
{
np
.
mean
(
mfcc_segment
)
}
'
file
=
open
(
constants
.
TEST_CSV_NAME
,
'a'
,
newline
=
''
)
with
file
:
writer
=
csv
.
writer
(
file
)
...
...
@@ -39,30 +44,29 @@ def extract_features(workingDir, subDirectories):
def
preprocessing_csv_data
():
# reading dataset from csv
print
(
"Reading Features... "
)
data
=
pd
.
read_csv
(
constants
.
TEST_CSV_NAME
)
data
.
head
()
return
data
test_features_
data
=
pd
.
read_csv
(
constants
.
TEST_CSV_NAME
)
test_features_
data
.
head
()
return
test_features_
data
def
normalize_data
(
d
ata
):
def
normalize_data
(
processedD
ata
):
# # normalizing - Extracting Remaining Columns as X and normalizing them to a common scale
scaler
=
StandardScaler
()
X
=
scaler
.
fit_transform
(
np
.
array
(
data
.
iloc
[:,
:],
dtype
=
float
))
print
(
X
)
print
(
X
.
shape
)
return
X
scale_object
=
StandardScaler
()
X_test
=
scale_object
.
fit_transform
(
np
.
array
(
processedData
.
iloc
[:,
:],
dtype
=
float
))
return
X_test
WorkingDir
=
Path
.
cwd
()
subDirectories
=
os
.
listdir
(
WorkingDir
)
extract_features
(
WorkingDir
,
subDirectories
)
data
=
preprocessing_csv_data
()
X
=
normalize_data
(
data
)
model
=
keras
.
models
.
load_model
(
'./DemoTrainingDataset/trained_model.h5'
)
model
.
summary
()
predictions
=
np
.
argmax
(
model
.
predict
(
X
),
axis
=-
1
)
working_directory
=
Path
.
cwd
()
sub_directories
=
os
.
listdir
(
working_directory
)
extract_features
(
working_directory
,
sub_directories
)
processed_data
=
preprocessing_csv_data
()
X_test_data
=
normalize_data
(
processed_data
)
if
os
.
path
.
isfile
(
'./DemoTrainingDataset/Trained_Model/trained_model.h5'
):
model
=
keras
.
models
.
load_model
(
'./DemoTrainingDataset/Trained_Model/trained_model.h5'
)
else
:
sys
.
exit
(
"Trained model file does not exists"
)
predictions
=
np
.
argmax
(
model
.
predict
(
X_test_data
),
axis
=-
1
)
encoder
=
LabelEncoder
()
labels
=
[
'Light-Weight'
,
'Medium-Weight'
,
'Heavy-Weight'
,
'Two-Wheeled'
,
'Rail-Bound'
]
encoder
.
fit_transform
(
labels
)
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment