package eu.simstadt.nf4j.async; import java.util.LinkedList; import java.util.Objects; import java.util.Optional; import eu.simstadt.nf4j.ImportJob; import eu.simstadt.nf4j.FailedTransmissionException; import eu.simstadt.nf4j.InvalidJobDescriptorException; import eu.simstadt.nf4j.JobStatus; /** * Import jobs are requests to store, change or delete CityGML models. Every valid import job has an id and a status. * This implementation offers non-blocking asynchronous send and poll operations, so that your main application has * not to wait for the results. You may want to register your main application as a job status listeners at this job * to get status updates from the asynchronous operations. * * @author Marcel Bruse */ public class AsyncImportJob extends ImportJob implements AsyncJob { /** * While polling for the current job status, the polling thread will sleep for this amount of seconds * before each status update request. */ private final int DEFAULT_POLLING_INTERVAL = 5; // seconds private boolean jobTransmissionTriggered = false; private JobStatus lastPublishedJobStatus; private Optional lastEncounteredProblem = Optional.empty(); private Thread sendThread; private Thread pollThread; /** As long as this variable is true, the polling thread will be kept alive. */ private boolean keepPolling = true; /** * List of all registered job status listeners. Whenever the state of this job changes, these listeners * will get informed. */ private LinkedList jobListenerList = new LinkedList<>(); /** * This constructor forces the job to have a description and a connector instance. Every job which * is created by this constructor will have the status "local", because it is assumed that it has an unsent * description and no job id yet. * * @param connector The job will use this connector to synchronize itself with the nF. * @param descriptor The description of this job. */ public AsyncImportJob(ImportJobDescription descriptor, HTTPConnection connector) { super(descriptor, connector); status = JobStatus.LOCAL; } /** * This constructor forces the job to have a id and a connector instance. Every job which is created by this * constructor will have the status "sent", because it is assumed that the job is already enqueued at the * nF job queue. * * @param id The job id. If you call updateStatus() and the nF "knows" the job id, then the job status * will be updated. If you call updateStatus() and the job id is "unkown" on the server side, then * @param connector The job will use this connector to synchronize itself with the nF. */ public AsyncImportJob(int id, HTTPConnection connector) { super(id, connector); status = JobStatus.SENT; } /** * This method zips up an archive which includes the CityGML file to be imported as well as a nF start * file. Both files will be preprocessed according to the set attributes of the job description. */ @Override public synchronized void send() throws InvalidJobDescriptorException, FailedTransmissionException { if (jobTransmissionTriggered) { throw new FailedTransmissionException("Jobs cannot be sent twice!"); } if (Objects.isNull(descriptor) || !descriptor.isValid()) { throw new InvalidJobDescriptorException(); } jobTransmissionTriggered = true; notifyJobStatusListeners(); // Force the job to signal the LOCAL status sendThread = new Thread(new SendImportJobTask(this)); sendThread.start(); } /** * Frequently queries the remote status of the nF export job and updates the local status accordingly. * The queries will be performed asynchronously in a separate thread. Job status listener will be notified * upon every new status change. * * @param interval Amount of seconds to wait before the next status update request will be sent to the * nF server. * * @throws FailedTransmissionException If your job has not been sent yet, then you will get some of this. */ @Override public synchronized void poll(int interval) throws FailedTransmissionException { if (status.compareTo(JobStatus.SENT) < 0) { throw new FailedTransmissionException("The job has not been sent to the nF yet!"); } if (Objects.nonNull(pollThread)) { pollThread.interrupt(); } keepPolling = true; pollThread = new Thread(new PollJobStatusTask(this, interval)); pollThread.start(); } /** * Convenience method for polling with a predefined default interval. * * @see poll(int) */ public synchronized void poll() throws FailedTransmissionException { poll(DEFAULT_POLLING_INTERVAL); } /** * Connects to the nF and refreshes the status of this job. If there is no nF connector set, * this operation will throw a FailedTransmissionException. * * @throws FailedTransmissionException If the connection to the nF is broken you will get some of this. */ @Override public synchronized void updateStatus() throws FailedTransmissionException { if (Objects.nonNull(connector)) { AsyncImportJob job = ((HTTPConnection) connector).requestImportJob(id); JobStatus newStatus = job.getStatus(); if (newStatus.compareTo(JobStatus.SENT) > 0) { setStatus(job.getStatus(), Optional.empty()); } } else { throw new FailedTransmissionException(); } } /** * Sets the status of this job depending on the given nF status code. nF status codes will be sent * to you in HTTP responses. Note, the nF status code differ from the internal job status codes of * this library. Read the nF documentation for more information. * * @param statusCode The nF status code for this job. */ @Override public synchronized void setStatusForCode(int statusCode) { switch (statusCode) { case 0: setStatus(JobStatus.PENDING); break; case 10: setStatus(JobStatus.RUNNING); break; case 20: setStatus(JobStatus.ERROR); break; case 25: setStatus(JobStatus.WARNING); break; case 30: setStatus(JobStatus.FINISHED); break; case 40: setStatus(JobStatus.APPROVE); break; case 45: setStatus(JobStatus.REJECT); break; case 50: setStatus(JobStatus.APPROVE_RUNNING); break; case 55: setStatus(JobStatus.REJECT_RUNNING); break; case 60: setStatus(JobStatus.APPROVE_REJECT_ERROR); break; case 70: setStatus(JobStatus.APPROVE_REJECT_OK); break; case 80: setStatus(JobStatus.IMPORTED_WARNING); break; default: setStatus(JobStatus.UNKOWN); } } /** * Registers a job status listener. * * @param jobListener The job status listener to be registered. This listener will receive updates about every * progressing change of the job status. Meaning, the change to a particular status will only be signaled once * to the listener. */ @Override public void addJobStatusListener(JobStatusListener jobListener) { jobListenerList.add(jobListener); } /** * Unregisters a job status listener. * * @param jobListener The job status listener to be unregistered. */ @Override public void removeJobStatusListener(JobStatusListener jobListener) { jobListenerList.remove(jobListener); } /** * Once the status of this job changes, all registered job status listeners will be notified. * Listeners will only be notified of the status updates where the status of the job progresses and they will * only be notified once about every singular status. If this status has been signaled already, then the * listeners will not be notified again. */ @Override public synchronized void notifyJobStatusListeners() { if (Objects.isNull(lastPublishedJobStatus) || status.compareTo(lastPublishedJobStatus) > 0) { JobStatusEvent event = new JobStatusEvent(status, this, lastEncounteredProblem); for (JobStatusListener listener : jobListenerList) { listener.jobStatusChanged(event); } lastPublishedJobStatus = status; } } /** * A convenience method to set a new job status and a status message at the same time. Status messages will be passed * by asynchronous tasks instead of exception, because they cannot throw exceptions. * * @param jobStatus The new status of this job. * @param message A status message. This message may describe a problem which occurred during an asynchronous task. */ protected synchronized void setStatus(JobStatus jobStatus, Optional message) { super.setStatus(jobStatus); lastEncounteredProblem = message; notifyJobStatusListeners(); } /** * Cancels all ongoing send and poll operations as soon as possible. */ @Override public void cancel() { keepPolling = false; if (Objects.nonNull(sendThread)) { sendThread.interrupt(); } if (Objects.nonNull(pollThread)) { pollThread.interrupt(); } } /** * @return Returns true, if the polling thread should go on with its polling job. Otherwise, false. */ @Override public boolean keepPolling() { return keepPolling; } /** * @return Returns true, if the job is definitely done. This is also the case, if the resulting CityGML * file has been download. False, otherwise. */ @Override public boolean hasFinished() { return status == JobStatus.FINISHED; } /** * @return Returns true, if the job has been failed. You may want to look up the "last encountered problem" * string. */ @Override public boolean hasFailed() { return status == JobStatus.FAILED || status == JobStatus.ERROR; } /** * Calls updateStatus() for you, since updateStatus() is a protected method. This method is used by the * asynchronous PollJobStatusTask class. */ @Override public void triggerStatusUpdate() throws FailedTransmissionException { updateStatus(); } }