run_LR_SBERT.py 7.9 KB
Newer Older
Siddharth Thorat's avatar
Siddharth Thorat committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
import os
import sys
import time
import numpy as np
import pandas as pd

# UP
import pickle
import argparse

from sklearn import metrics
from sentence_transformers import models, SentenceTransformer
from sklearn.linear_model import LogisticRegression, Perceptron
from sklearn.metrics import f1_score
from sklearn.metrics import classification_report
from sklearn.model_selection import cross_validate, cross_val_predict

__author__ = "Yunus Eryilmaz"
__version__ = "1.0"
__date__ = "21.07.2021"
__source__ = "https://pypi.org/project/sentence-transformers/0.3.0/"


def main():
    parser = argparse.ArgumentParser()

    # Where are we?
    location = ".";
    if getattr(sys, 'frozen', False):
        # running in a bundle
        location = sys._MEIPASS

    # Required parameters
    parser.add_argument(
        "--data",
        # default=None,
        default=location + "\\Skript\\outputs\\test.tsv",
        type=str,
        # required=True,
        required=False,
        help="The input data file for the task.",
    )
    parser.add_argument(
        "--output_dir",
        # default=None,
        default=location + "\\Skript\\outputs\\",
        type=str,
        # required=True,
        required=False,
        help="The output directory where predictions will be written.",
    )
    parser.add_argument(
        "--model_dir",
        # default=None,
        default=location + "\\Skript\\german\\models",
        type=str,
        # required=True,
        required=False,
        help="The directory where the ML models are stored.",
    )
    args = parser.parse_args()

    # open a log file next to the executable with line buffering
    # out = open("log.txt", "a",buffering=1);

    # print("Started German processing in",location,file=out);

    # import SentenceTransformer-model
    start_time = time.time()

    # print("Reading from",args.data, file=out);

    with open(os.path.join(location, args.data)) as ft:
        dft = pd.read_csv(ft, delimiter='\t')

    # Sentences we want sentence embeddings for
    sentences1_test = dft['referenceAnswer'].values.tolist()
    sentences2_test = dft['studentAnswer'].values.tolist()
    # print("Input read:",sentences2_test, file=out);

    # print(sentences1_test)

    # Use BERT for mapping tokens to embeddings
    word_embedding_model = models.Transformer('sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2')
    # pooling operation can choose by setting true (Apply mean pooling to get one fixed sized sentence vector)
    pooling_model = models.Pooling(word_embedding_model.get_word_embedding_dimension(),
                                   pooling_mode_mean_tokens=True,
                                   pooling_mode_cls_token=False,
                                   pooling_mode_max_tokens=False)

    # compute the sentence embeddings for both sentences
    model = SentenceTransformer(modules=[word_embedding_model, pooling_model])
    # print("Model loaded", file=out);

    sentence_embeddings1_test = model.encode(sentences1_test, convert_to_tensor=True, show_progress_bar=False)
    # print("Embeddings RefA:",sentence_embeddings1_test,file=out);

    sentence_embeddings2_test = model.encode(sentences2_test, convert_to_tensor=True, show_progress_bar=False)

    # print("Embeddings found", file=out);

    # Possible concatenations from the embedded sentences can be selected
    def similarity(sentence_embeddings1, sentence_embeddings2):
        # I2=(|u − v| + u ∗ v)
        simi = abs(np.subtract(sentence_embeddings1, sentence_embeddings2)) + np.multiply(sentence_embeddings1,
                                                                                          sentence_embeddings2)

        return simi

    # calls the similarity function and get the concatenated values between the sentence embeddings
    computed_simis_test = similarity(sentence_embeddings1_test, sentence_embeddings2_test)

    # get the sentence embeddings and the labels fpr train and test

    X_test = computed_simis_test
    # Y_test = np.array(dft['label'])

    # UP: read pre-trained LR model
    clf_log = pickle.load(open(args.model_dir + "\\clf_BERT.pickle", "rb"))

    # print('--------Evaluate on Testset------- ', file=out)
    predictions = clf_log.predict(X_test)

    # new code inserted here

    count = 0

    # observed grade list created
    observed_grade_column = dft['observed grade']
    obs_grade = observed_grade_column.tolist()

    # suggested grade list created
    temp_sugg_grade = predictions
    sugg_grade = ['correct' if pred == 1 else 'incorrect' for pred in temp_sugg_grade]

    # Check if obs_grade contains "NONE" values or is empty
    if not obs_grade or all(grade == 'NONE' for grade in obs_grade):

        # print("obs_grade is empty or contains 'NONE' values. Skipping classification report.")
        count += 1

    else:
        # classification report
        classification_rep = classification_report(obs_grade, sugg_grade)

        report_string = classification_rep

        report_lines = report_string.split('\n')

        # print(report_lines)

        # accuracy line
        formatted_accuracy_line = "\t".join(report_lines[5].split())
        formatted_acc_line_with_tabs = (formatted_accuracy_line[:formatted_accuracy_line.index('\t',
                                                                                               formatted_accuracy_line.index(
                                                                                                   '\t'))] + '\t\t' +
                                        formatted_accuracy_line[
                                        formatted_accuracy_line.index('\t', formatted_accuracy_line.index('\t')):])

        # #weighted avg printing
        #
        wt_avg_line = "\t".join(report_lines[7].split())
        # print(wt_avg_line)

        new_wt_avg_line = wt_avg_line.replace("\t", " ", 1)
        # print(new_wt_avg_line)

        #
        # wt_avg_line = report_lines[7].split()
        #
        # #wt_avg_line
        #
        # wg_element_1 = wt_avg_line[0]
        # wg_element_2 = wt_avg_line[1]
        #
        # print(wg_element_1)
        # print(wg_element_2)
        #
        # new_wt_line_out_1_2 =

        # formatted_wt_with_tabs = (wt_avg_line[:wt_avg_line.index('\t',
        #     wt_avg_line.index('\t')) +1 ] + '\t' +
        #     wt_avg_line[wt_avg_line.index('\t', wt_avg_line.index('\t')):])

        # Join the entire newly formatted list into a single string
        formatted_output = "\n".join([
            "\t precision \t recall \t f1-score \t support",
            "\t".join(report_lines[2].split()),
            "\t".join(report_lines[3].split()),
            formatted_acc_line_with_tabs,
            new_wt_avg_line
        ])

        # Print the entire formatted output
        # print("\nFormatted Output:")
        # print(formatted_output)

    # UP print results
    with open(args.output_dir + "\\predictions.txt", "w") as writer:
        writer.write("question\treferenceAnswer\tstudentAnswer\tsuggested_grade\tobserved_grade\n")
        for i in range(len(dft)):
            hrpred = "incorrect"
            if predictions[i] == 1:
                hrpred = "correct"
            writer.write(
                str(dft.iloc[i][0])
                + "\t"
                + str(dft.iloc[i][1])
                + "\t"
                + str(dft.iloc[i][2])
                + "\t"
                + str(hrpred)
                + "\t"
                + str(dft.iloc[i][3])
                + "\n"
            )

        if count == 1:
            writer.write("\nClassification Report cannot pe printed as observed grade column is empty or filled "
                         "with 'NONE' or 'none' values\n")
        else:

            # Write the classification report to the file

            writer.write("\nClassification Report:\n")
            writer.write(formatted_output)

    # print('\nExecution time:', time.strftime("%H:%M:%S", time.gmtime(time.time() - start_time)),file=out)


if __name__ == "__main__":
    main()