AsyncExportJob.java 13.2 KB
Newer Older
bruse's avatar
bruse committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package eu.simstadt.nf4j.async;

import java.io.File;
import java.util.LinkedList;
import java.util.Objects;
import java.util.Optional;

import eu.simstadt.nf4j.ExportJob;
import eu.simstadt.nf4j.FailedTransmissionException;
import eu.simstadt.nf4j.InvalidJobDescriptorException;
import eu.simstadt.nf4j.JobStatus;
import eu.simstadt.nf4j.Connector;

/**
 * Export jobs are requests for CityGML models. Every valid export job has an id and a status. This implementation
 * offers non-blocking asynchronous send, poll and download operations, so that your main application has not to
 * wait for the results. You may want to register your main application as a job status listeners at this job to
 * get status updates from the asynchronous operations. 
 * 
 * @author Marcel Bruse
 */
public class AsyncExportJob extends ExportJob<ExportJobDescription> implements AsyncJob {
	
	/**
25
26
	 * While polling for the current job status, the polling thread will sleep for this amount of seconds
	 * before each status update request.
bruse's avatar
bruse committed
27
	 */
28
	private final int DEFAULT_POLLING_INTERVAL = 5; // seconds
bruse's avatar
bruse committed
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
	
	/** There can only be one sending thread for each job at a time. */
	private Thread sendThread;
		
	/** There can only be one polling thread for each job at a time. */
	private Thread pollThread;
	
	/** There can only be one download thread for each job at a time. */
	private Thread downloadThread;
	
	/** 
	 * Once the send() operation has been triggered, this member will be true. No subsequent invocations of
	 * send() will be possible then.
	 */
	private boolean jobTransmissionTriggered = false;
	
	/** As long as this variable is true, the polling thread will be kept alive. */
	private boolean keepPolling = true;
	
	/** 
	 * List of all registered job status listeners. Whenever the state of this job changes, these listeners
	 * will get informed.
	 */
	private LinkedList<JobStatusListener> jobListenerList = new LinkedList<>();
	
	/**
	 * This job will be send and observed asynchronously. It's results will be downloaded asynchronously also.
	 * If an asynchronous operation breaks, then the last encountered problem will be described here. 
	 */
	private Optional<String> lastEncounteredProblem = Optional.empty();
	
	/**
	 * The last job status which has been sent to all registered job status listeners.
	 */
	private JobStatus lastPublishedJobStatus;
	
	/** Once the CityGML file has been download, it should be referenced here. */
	private File result; 
	
	/**
	 * This constructor forces the job to have a description and a connector instance. Every job which
	 * is created by this constructor will have the status "local", because it is assumed that it has an unsent
	 * description and no job id yet.
	 * 
	 * @param connector The job will use this connector to synchronize itself with the nF.
	 * @param descriptor The description of this job.
	 */
	public AsyncExportJob(ExportJobDescription descriptor, Connector<AsyncImportJob, AsyncExportJob> connector) {
		super(descriptor, connector);
		status = JobStatus.LOCAL;
	}

	/**
	 * This constructor forces the job to have a id and a connector instance. Every job which is created by this
	 * constructor will have the status "sent", because it is assumed that the job is already enqueued at the 
	 * nF job queue.  
	 * 
	 * @param id The job id. If you call updateStatus() and the nF "knows" the job id, then the job status
	 * will be updated. If you call updateStatus() and the job id is "unkown" on the server side, then     
	 * @param connector The job will use this connector to synchronize itself with the nF.
	 */
	public AsyncExportJob(int id, Connector<AsyncImportJob, AsyncExportJob> connector) {
		super(id, connector);
		status = JobStatus.SENT;
	}
	
	/**
	 * Builds an XML job file from the job description and sends it to the nF server which has been configured
	 * in your connector instance. This will be done asynchronously within a SendExportJobTask. 
	 * 
	 * @throws FailedTransmissionException If this method has been called before, then you will receive this.
	 * @throws InvalidJobDescriptorException If the job description is invalid or null, then you will receive this.
	 */
	@Override
	public synchronized void send() throws FailedTransmissionException, InvalidJobDescriptorException {
		if (jobTransmissionTriggered) {
			throw new FailedTransmissionException("Jobs cannot be sent twice!");
		}
		if (Objects.isNull(descriptor) || !descriptor.isValid()) {
			throw new InvalidJobDescriptorException();
		}
		jobTransmissionTriggered = true;
		notifyJobStatusListeners(); // Force the job to signal the LOCAL status
		sendThread = new Thread(new SendExportJobTask(this));
		sendThread.start();
	}
	
	/**
	 * Frequently queries the remote status of the nF export job and updates the local status accordingly.
	 * The queries will be performed asynchronously in a separate thread. Job status listener will be notified
	 * upon every new status change.
120
121
122
123
124
125
126
	 * 
	 * Note, there can only be one polling thread at a time. Subsequent calls of poll() will stop the previously
	 * started poll threads.
	 * 
	 * @param interval Amount of seconds to wait before the next status update request will be sent to the
	 * nF server.
	 * 
bruse's avatar
bruse committed
127
128
129
	 * @throws FailedTransmissionException If your job has not been sent yet, then you will get some of this. 
	 */
	@Override
130
	public synchronized void poll(int interval) throws FailedTransmissionException {
bruse's avatar
bruse committed
131
132
133
134
135
136
137
		if (status.compareTo(JobStatus.SENT) < 0) {
			throw new FailedTransmissionException("The job has not been sent to the nF yet!");
		}
		if (Objects.nonNull(pollThread)) {
			pollThread.interrupt();
		}
		keepPolling = true;
138
		pollThread = new Thread(new PollJobStatusTask(this, interval));
bruse's avatar
bruse committed
139
140
141
		pollThread.start();
	}
	
142
143
144
145
146
147
148
149
150
	/**
	 * Convenience method for polling with a predefined default interval.
	 * 
	 * @see poll(int)
	 */
	public synchronized void poll() throws FailedTransmissionException {
		poll(DEFAULT_POLLING_INTERVAL);
	}
	
bruse's avatar
bruse committed
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
	/**
	 * Connects to the nF and refreshes the status of this job. If there is no nF connector set,
	 * this operation will throw a FailedTransmissionException.
	 * 
	 * @throws FailedTransmissionException You will receive this exception if no connector is present, the connection
	 * to the nF is broken, the job has not been sent to the nF yet, or another update request is ongoing. In the two
	 * latter cases, job will either have the status "LOCAL" or "WAITING". 
	 */
	@Override
	public synchronized void updateStatus() throws FailedTransmissionException {
		if (Objects.isNull(connector)) {
			throw new FailedTransmissionException("No connector set for this job!");
		}
		if (status.compareTo(JobStatus.SENT) < 0) {
			throw new FailedTransmissionException("The job has not been sent to the nF yet!");
		}
		AsyncExportJob job = ((HTTPConnection) connector).requestExportJob(id);
		JobStatus newStatus = job.getStatus();
		if (newStatus.compareTo(JobStatus.SENT) > 0) {
			setStatus(job.getStatus(), Optional.empty());
		}
	}
	
	/**
	 * Calls updateStatus() for you, since updateStatus() is a protected method. This method is used by the
	 * asynchronous PollJobStatusTask class.
	 */
	@Override
	public void triggerStatusUpdate() throws FailedTransmissionException {
		updateStatus();
	}

	/**
	 * Sets the status of this job depending on the given nF status code. nF status codes will be sent
	 * to you in http responses.
	 *  
	 * @param statusCode The nF status code for this job.
	 */
	@Override
	public synchronized void setStatusForCode(int statusCode) {
		switch (statusCode) {
		case 0:
			setStatus(JobStatus.PENDING); break;
		case 10:
			setStatus(JobStatus.RUNNING); break;
		case 20:
			setStatus(JobStatus.FAILED); break;
		case 30:
			setStatus(JobStatus.FINISHED); break;
		default:
			setStatus(JobStatus.UNKOWN);
		}
	}
	
	/**
	 * @return Returns true, if the job is definitely done. This is also the case, if the resulting CityGML
	 * file has been download. False, otherwise.
	 */
	@Override
	public boolean hasFinished() {
		return status == JobStatus.FINISHED || status == JobStatus.DOWNLOAD;
	}
	
	/**
	 * @return Returns true, if the job has been failed. You may want to look up the "last encountered problem"
	 * string.
	 */
	@Override
	public boolean hasFailed() {
		return status == JobStatus.FAILED;
	}
	
	/**
	 * Registers a job status listener.
	 * 
	 * @param jobListener The job status listener to be registered. This listener will receive updates about every
	 * progressing change of the job status. Meaning, the change to a particular status will only be signaled once
	 * to the listener.
	 */
	@Override
	public void addJobStatusListener(JobStatusListener jobListener) {
		jobListenerList.add(jobListener);
	}
	
	/**
	 * Unregisters a job status listener.
	 * 
	 * @param jobListener The job status listener to be unregistered.
	 */
	@Override
	public void removeJobStatusListener(JobStatusListener jobListener) {
		jobListenerList.remove(jobListener);
	}
	
	/**
	 * Once the status of this job changes, all registered job status listeners will be notified.
	 * Listeners will only be notified of the status updates where the status of the job progresses and they will 
	 * only be notified once about every singular status. If this status has been signaled already, then the 
	 * listeners will not be notified again. 
	 */
	@Override
	public synchronized void notifyJobStatusListeners() {
		if (Objects.isNull(lastPublishedJobStatus) || status.compareTo(lastPublishedJobStatus) > 0) {
			JobStatusEvent event = new JobStatusEvent(status, this, lastEncounteredProblem);
			for (JobStatusListener listener : jobListenerList) {
				listener.jobStatusChanged(event);
			}
			lastPublishedJobStatus = status;
		}
	}
	
	/**
	 * Cancels all ongoing send, poll and download operations as soon as possible.
	 */
	@Override
	public void cancel() {
		keepPolling = false;
		if (Objects.nonNull(sendThread)) {
			sendThread.interrupt();
		}
		if (Objects.nonNull(pollThread)) {
			pollThread.interrupt();
		}
		if (Objects.nonNull(downloadThread)) {
			downloadThread.interrupt();
		}
	}
	
	/**
	 * @return Returns true, if the polling thread should go on with its polling job. Otherwise, false.
	 */
	@Override
	public boolean keepPolling() {
		return keepPolling;
	}
	
	/**
	 * Starts downloading the export job result, if there is any. As soon as the download has been finished,
	 * the job status will be set to DOWNLOADED. All registered job status listeners will get notified about
	 * the it. Afterwards, you may want to obtain a handle to the download CityGML file with getResult().
	 * 
	 * @throws FailedTransmissionException If the job has not been finished yet, then you will get some
	 * of this.
	 */
	public void downloadResult() throws FailedTransmissionException {
		if (!hasFinished()) {
			throw new FailedTransmissionException("Job has not been finished!");
		}
		downloadThread = new Thread(new DownloadTask(this));
		downloadThread.start();
	}
	
	/**
	 * This method is used by the download task to set the CityGML file handle as soon as the file has
	 * been download.
	 * 
	 * @param result The file handle of the download CityGML file.
	 */
	protected void setResult(File result) {
		this.result = result;
	}
	
	/**
	 * This method will return the download CityGML file for this export job, but only if the export job has
	 * actually been finished before.
	 * 
	 * @return Returns a file handle to the download CityGML file.
	 * 
	 * @throws FailedTransmissionException If the job result has not been download yet, then you will get some
	 * of this. 
	 */
	@Override
	public File getResult() throws FailedTransmissionException {
		if (!hasFinished()) {
			throw new FailedTransmissionException("Job has not been finished!");			
		}
		if (Objects.isNull(result)) {
			throw new FailedTransmissionException("Job result has not been downloaded!");
		}
		return result;
	}
	
	/**
	 * The asynchronous send, poll and download tasks cannot throw exceptions. If something goes wrong during the save,
	 * poll or download operation, then you may want to submit at lease a textual description of the problem here. This
	 * message will be sent to all registered job status listeners on the next status update. This is why you can use
	 * the convenience method setStatus(jobStatus, message), to do both at the same time.
	 *  
	 * @param errorMessage The description of the encountered problem. This could be the exception message or a more user
	 * friendly message.
	 */
	protected synchronized void setLastEncounteredProblem(Optional<String> errorMessage) {
		this.lastEncounteredProblem = errorMessage;
	}
	
	/**
	 * A convenience method to set a new job status and a status message at the same time. Status messages will be passed
	 * by asynchronous tasks instead of exception, because they cannot throw exceptions.
	 * 
	 * @param jobStatus The new status of this job.
	 * @param message A status message. This message may describe a problem which occurred during an asynchronous task.
	 */
	protected synchronized void setStatus(JobStatus jobStatus, Optional<String> message) {
		super.setStatus(jobStatus);
		lastEncounteredProblem = message;
		notifyJobStatusListeners();
	}
	
}