AsyncImportJob.java 9.77 KB
Newer Older
bruse's avatar
bruse committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package eu.simstadt.nf4j.async;

import java.util.LinkedList;
import java.util.Objects;
import java.util.Optional;

import eu.simstadt.nf4j.ImportJob;
import eu.simstadt.nf4j.FailedTransmissionException;
import eu.simstadt.nf4j.InvalidJobDescriptorException;
import eu.simstadt.nf4j.JobStatus;

/**
 * Import jobs are requests to store, change or delete CityGML models. Every valid import job has an id and a status.
 * This implementation offers non-blocking asynchronous send and poll operations, so that your main application has
 * not to wait for the results. You may want to register your main application as a job status listeners at this job 
 * to get status updates from the asynchronous operations.
 *  
 * @author Marcel Bruse
 */
public class AsyncImportJob extends ImportJob<ImportJobDescription> implements AsyncJob {
	
	/**
23
24
	 * While polling for the current job status, the polling thread will sleep for this amount of seconds
	 * before each status update request.
bruse's avatar
bruse committed
25
	 */
26
	private final int DEFAULT_POLLING_INTERVAL = 5; // seconds
bruse's avatar
bruse committed
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
	
	private boolean jobTransmissionTriggered = false;
	
	private JobStatus lastPublishedJobStatus;
	
	private Optional<String> lastEncounteredProblem = Optional.empty();
	
	private Thread sendThread;
	
	private Thread pollThread;
	
	/** As long as this variable is true, the polling thread will be kept alive. */
	private boolean keepPolling = true;
	
	/** 
	 * List of all registered job status listeners. Whenever the state of this job changes, these listeners
	 * will get informed.
	 */
	private LinkedList<JobStatusListener> jobListenerList = new LinkedList<>();
	
	/**
	 * This constructor forces the job to have a description and a connector instance. Every job which
	 * is created by this constructor will have the status "local", because it is assumed that it has an unsent
	 * description and no job id yet.
	 * 
	 * @param connector The job will use this connector to synchronize itself with the nF.
	 * @param descriptor The description of this job.
	 */
	public AsyncImportJob(ImportJobDescription descriptor, HTTPConnection connector) {
		super(descriptor, connector);
		status = JobStatus.LOCAL;
	}
	
	/**
	 * This constructor forces the job to have a id and a connector instance. Every job which is created by this
	 * constructor will have the status "sent", because it is assumed that the job is already enqueued at the 
	 * nF job queue.  
	 * 
	 * @param id The job id. If you call updateStatus() and the nF "knows" the job id, then the job status
	 * will be updated. If you call updateStatus() and the job id is "unkown" on the server side, then     
	 * @param connector The job will use this connector to synchronize itself with the nF.
	 */
	public AsyncImportJob(int id, HTTPConnection connector) {
		super(id, connector);
		status = JobStatus.SENT;
	}
	
	/**
	 * This method zips up an archive which includes the CityGML file to be imported as well as a nF start
	 * file. Both files will be preprocessed according to the set attributes of the job description.
	 */
	@Override
	public synchronized void send() throws InvalidJobDescriptorException, FailedTransmissionException {
		if (jobTransmissionTriggered) {
			throw new FailedTransmissionException("Jobs cannot be sent twice!");
		}
		if (Objects.isNull(descriptor) || !descriptor.isValid()) {
			throw new InvalidJobDescriptorException();
		}
		jobTransmissionTriggered = true;		
		notifyJobStatusListeners(); // Force the job to signal the LOCAL status
		sendThread = new Thread(new SendImportJobTask(this));
		sendThread.start();
	}
	
	/**
	 * Frequently queries the remote status of the nF export job and updates the local status accordingly.
	 * The queries will be performed asynchronously in a separate thread. Job status listener will be notified
	 * upon every new status change.
	 *  
97
98
99
	 * @param interval Amount of seconds to wait before the next status update request will be sent to the
	 * nF server.
	 *  
bruse's avatar
bruse committed
100
101
102
	 * @throws FailedTransmissionException If your job has not been sent yet, then you will get some of this. 
	 */
	@Override
103
	public synchronized void poll(int interval) throws FailedTransmissionException {
bruse's avatar
bruse committed
104
105
106
107
108
109
110
		if (status.compareTo(JobStatus.SENT) < 0) {
			throw new FailedTransmissionException("The job has not been sent to the nF yet!");
		}
		if (Objects.nonNull(pollThread)) {
			pollThread.interrupt();
		}
		keepPolling = true;
111
		pollThread = new Thread(new PollJobStatusTask(this, interval));
bruse's avatar
bruse committed
112
113
114
		pollThread.start();
	}
	
115
116
117
118
119
120
121
122
123
	/**
	 * Convenience method for polling with a predefined default interval.
	 * 
	 * @see poll(int)
	 */
	public synchronized void poll() throws FailedTransmissionException {
		poll(DEFAULT_POLLING_INTERVAL);
	}
	
bruse's avatar
bruse committed
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
	/**
	 * Connects to the nF and refreshes the status of this job. If there is no nF connector set,
	 * this operation will throw a FailedTransmissionException.
	 * 
	 * @throws FailedTransmissionException If the connection to the nF is broken you will get some of this. 
	 */
	@Override
	public synchronized void updateStatus() throws FailedTransmissionException {
		if (Objects.nonNull(connector)) {
			AsyncImportJob job = ((HTTPConnection) connector).requestImportJob(id);
			JobStatus newStatus = job.getStatus();
			if (newStatus.compareTo(JobStatus.SENT) > 0) {
				setStatus(job.getStatus(), Optional.empty());
			}
		} else {
			throw new FailedTransmissionException();
		}
	}

	/**
	 * Sets the status of this job depending on the given nF status code. nF status codes will be sent
	 * to you in HTTP responses. Note, the nF status code differ from the internal job status codes of
	 * this library. Read the nF documentation for more information.
	 *  
	 * @param statusCode The nF status code for this job.
	 */
	@Override
	public synchronized void setStatusForCode(int statusCode) {
		switch (statusCode) {
		case 0:
			setStatus(JobStatus.PENDING); break;
		case 10:
			setStatus(JobStatus.RUNNING); break;
		case 20:
			setStatus(JobStatus.ERROR); break;
		case 25:
			setStatus(JobStatus.WARNING); break;
		case 30:
			setStatus(JobStatus.FINISHED); break;
		case 40:
			setStatus(JobStatus.APPROVE); break;
		case 45:
			setStatus(JobStatus.REJECT); break;
		case 50:
			setStatus(JobStatus.APPROVE_RUNNING); break;
		case 55:
			setStatus(JobStatus.REJECT_RUNNING); break;
		case 60:
			setStatus(JobStatus.APPROVE_REJECT_ERROR); break;
		case 70:
			setStatus(JobStatus.APPROVE_REJECT_OK); break;
		case 80:
			setStatus(JobStatus.IMPORTED_WARNING); break;
		default:
			setStatus(JobStatus.UNKOWN);
		}
	}
	
	/**
	 * Registers a job status listener.
	 * 
	 * @param jobListener The job status listener to be registered. This listener will receive updates about every
	 * progressing change of the job status. Meaning, the change to a particular status will only be signaled once
	 * to the listener.
	 */
	@Override
	public void addJobStatusListener(JobStatusListener jobListener) {
		jobListenerList.add(jobListener);
	}
	
	/**
	 * Unregisters a job status listener.
	 * 
	 * @param jobListener The job status listener to be unregistered.
	 */
	@Override
	public void removeJobStatusListener(JobStatusListener jobListener) {
		jobListenerList.remove(jobListener);
	}
	
	/**
	 * Once the status of this job changes, all registered job status listeners will be notified.
	 * Listeners will only be notified of the status updates where the status of the job progresses and they will 
	 * only be notified once about every singular status. If this status has been signaled already, then the 
	 * listeners will not be notified again. 
	 */
	@Override
	public synchronized void notifyJobStatusListeners() {
		if (Objects.isNull(lastPublishedJobStatus) || status.compareTo(lastPublishedJobStatus) > 0) {
			JobStatusEvent event = new JobStatusEvent(status, this, lastEncounteredProblem);
			for (JobStatusListener listener : jobListenerList) {
				listener.jobStatusChanged(event);
			}
			lastPublishedJobStatus = status;
		}
	}
	
	/**
	 * A convenience method to set a new job status and a status message at the same time. Status messages will be passed
	 * by asynchronous tasks instead of exception, because they cannot throw exceptions.
	 * 
	 * @param jobStatus The new status of this job.
	 * @param message A status message. This message may describe a problem which occurred during an asynchronous task.
	 */
	protected synchronized void setStatus(JobStatus jobStatus, Optional<String> message) {
		super.setStatus(jobStatus);
		lastEncounteredProblem = message;
		notifyJobStatusListeners();
	}
	
	/**
	 * Cancels all ongoing send and poll operations as soon as possible.
	 */
	@Override
	public void cancel() {
		keepPolling = false;
		if (Objects.nonNull(sendThread)) {
			sendThread.interrupt();
		}
		if (Objects.nonNull(pollThread)) {
			pollThread.interrupt();
		}
	}
	
	/**
	 * @return Returns true, if the polling thread should go on with its polling job. Otherwise, false.
	 */
	@Override
	public boolean keepPolling() {
		return keepPolling;
	}
	
	/**
	 * @return Returns true, if the job is definitely done. This is also the case, if the resulting CityGML
	 * file has been download. False, otherwise.
	 */
	@Override
	public boolean hasFinished() {
		return status == JobStatus.FINISHED;
	}
	
	/**
	 * @return Returns true, if the job has been failed. You may want to look up the "last encountered problem"
	 * string.
	 */
	@Override
	public boolean hasFailed() {
		return status == JobStatus.FAILED || status == JobStatus.ERROR;
	}
	
	/**
	 * Calls updateStatus() for you, since updateStatus() is a protected method. This method is used by the
	 * asynchronous PollJobStatusTask class.
	 */
	@Override
	public void triggerStatusUpdate() throws FailedTransmissionException {
		updateStatus();
	}
	
}