package eu.simstadt.nf4j.async; import java.io.File; import java.util.LinkedList; import java.util.Objects; import java.util.Optional; import eu.simstadt.nf4j.Connector; import eu.simstadt.nf4j.ExportJob; import eu.simstadt.nf4j.FailedTransmissionException; import eu.simstadt.nf4j.InvalidJobDescriptorException; import eu.simstadt.nf4j.JobStatus; /** * Export jobs are requests for CityGML models. Every valid export job has an id and a status. This implementation * offers non-blocking asynchronous send, poll and download operations, so that your main application has not to wait * for the results. You may want to register your main application as a job status listeners at this job to get status * updates from the asynchronous operations. * * @author Marcel Bruse */ public class AsyncExportJob extends ExportJob implements AsyncJob { /** * While polling for the current job status, the polling thread will sleep for this amount of seconds before each * status update request. */ private final int DEFAULT_POLLING_INTERVAL = 5; // seconds /** There can only be one sending thread for each job at a time. */ private Thread sendThread; /** There can only be one polling thread for each job at a time. */ private Thread pollThread; /** There can only be one download thread for each job at a time. */ private Thread downloadThread; /** * Once the send() operation has been triggered, this member will be true. No subsequent invocations of send() will * be possible then. */ private boolean jobTransmissionTriggered = false; /** As long as this variable is true, the polling thread will be kept alive. */ private boolean keepPolling = true; /** * List of all registered job status listeners. Whenever the state of this job changes, these listeners will get * informed. */ private LinkedList jobListenerList = new LinkedList<>(); /** * This job will be send and observed asynchronously. Its results will be downloaded asynchronously also. If an * asynchronous operation breaks, then the last encountered problem will be described here. */ private Optional lastEncounteredProblem = Optional.empty(); /** * The last job status which has been sent to all registered job status listeners. */ private JobStatus lastPublishedJobStatus; /** Once the CityGML file has been download, it should be referenced here. */ private File result; /** * This constructor forces the job to have a description and a connector instance. Every job which is created by this * constructor will have the status "local", because it is assumed that it has an unsent description and no job id * yet. * * @param connector The job will use this connector to synchronize itself with the nF. * @param descriptor The description of this job. */ public AsyncExportJob(ExportJobDescription descriptor, Connector connector) { super(descriptor, connector); status = JobStatus.LOCAL; } /** * This constructor forces the job to have a id and a connector instance. Every job which is created by this * constructor will have the status "sent", because it is assumed that the job is already enqueued at the nF job * queue. * * @param id The job id. If you call updateStatus() and the nF "knows" the job id, then the job status will be * updated. If you call updateStatus() and the job id is "unkown" on the server side, then * @param connector The job will use this connector to synchronize itself with the nF. */ public AsyncExportJob(int id, Connector connector) { super(id, connector); status = JobStatus.SENT; } /** * Builds an XML job file from the job description and sends it to the nF server which has been configured in your * connector instance. This will be done asynchronously within a SendExportJobTask. * * @throws FailedTransmissionException If this method has been called before, then you will receive this. * @throws InvalidJobDescriptorException If the job description is invalid or null, then you will receive this. */ @Override public synchronized void send() throws FailedTransmissionException, InvalidJobDescriptorException { if (jobTransmissionTriggered) { throw new FailedTransmissionException("Jobs cannot be sent twice!"); } if (Objects.isNull(descriptor) || !descriptor.isValid()) { throw new InvalidJobDescriptorException(); } jobTransmissionTriggered = true; notifyJobStatusListeners(); // Force the job to signal the LOCAL status sendThread = new Thread(new SendExportJobTask(this)); sendThread.start(); } /** * Frequently queries the status of the remote nF export job and updates the local status accordingly. The queries * will be performed asynchronously in a separate thread. Job status listeners will be notified upon every new status * change. * * Note, there can only be one polling thread at a time. Subsequent calls of poll() will stop the previously started * poll threads. * * @param interval Amount of seconds to wait before the next status update request will be sent to the nF server. * * @throws FailedTransmissionException If your job has not been sent yet, then you will get some of this. */ @Override public synchronized void poll(int interval) throws FailedTransmissionException { if (status.compareTo(JobStatus.SENT) < 0) { throw new FailedTransmissionException("The job has not been sent to the nF yet!"); } if (Objects.nonNull(pollThread)) { pollThread.interrupt(); } keepPolling = true; pollThread = new Thread(new PollJobStatusTask(this, interval)); pollThread.start(); } /** * Convenience method for polling with a predefined default interval. * * Frequently queries the status of the remote nF export job and updates the local status accordingly. The queries * will be performed asynchronously in a separate thread. Job status listeners will be notified upon every new status * change. * * Note, there can only be one polling thread at a time. Subsequent calls of poll() will stop the previously started * poll threads. * * @param interval Amount of seconds to wait before the next status update request will be sent to the nF server. * @throws FailedTransmissionException If your job has not been sent yet, then you will get some of this. * @see poll(int) */ public synchronized void poll() throws FailedTransmissionException { poll(DEFAULT_POLLING_INTERVAL); } /** * Connects to the nF and refreshes the status of this job. If there is no nF connector set, this operation will * throw a FailedTransmissionException. * * @throws FailedTransmissionException You will receive this exception if no connector is present, the connection to * the nF is broken, the job has not been sent to the nF yet, or another update request is ongoing. In the * two latter cases, job will either have the status "LOCAL" or "WAITING". */ @Override public synchronized void updateStatus() throws FailedTransmissionException { if (Objects.isNull(connector)) { throw new FailedTransmissionException("No connector set for this job!"); } if (status.compareTo(JobStatus.SENT) < 0) { throw new FailedTransmissionException("The job has not been sent to the nF yet!"); } AsyncExportJob job = ((HTTPConnection) connector).requestExportJob(id); JobStatus newStatus = job.getStatus(); if (newStatus.compareTo(JobStatus.SENT) > 0) { setStatus(job.getStatus(), Optional.empty()); } } /** * Calls updateStatus() for you, since updateStatus() is a protected method. This method is used by the asynchronous * PollJobStatusTask class. */ @Override public void triggerStatusUpdate() throws FailedTransmissionException { updateStatus(); } /** * Sets the status of this job depending on the given nF status code. nF status codes will be sent to you in http * responses. * * @param statusCode The nF status code for this job. */ @Override public synchronized void setStatusForCode(int statusCode) { switch (statusCode) { case 0: setStatus(JobStatus.PENDING); break; case 10: setStatus(JobStatus.RUNNING); break; case 20: setStatus(JobStatus.FAILED); break; case 30: setStatus(JobStatus.FINISHED); break; default: setStatus(JobStatus.UNKNOWN); } } /** * @return Returns true, if the job is definitely done. This is also the case, if the resulting CityGML file has been * download. False, otherwise. */ @Override public boolean hasFinished() { return status == JobStatus.FINISHED || status == JobStatus.DOWNLOAD; } /** * @return Returns true, if the job has been failed. You may want to look up the "last encountered problem" string. */ @Override public boolean hasFailed() { return status == JobStatus.FAILED; } /** * Registers a job status listener. * * @param jobListener The job status listener to be registered. This listener will receive updates about every * progressing change of the job status. Meaning, the change to a particular status will only be signaled * once to the listener. */ @Override public void addJobStatusListener(JobStatusListener jobListener) { jobListenerList.add(jobListener); } /** * Unregisters a job status listener. * * @param jobListener The job status listener to be unregistered. */ @Override public void removeJobStatusListener(JobStatusListener jobListener) { jobListenerList.remove(jobListener); } /** * Once the status of this job changes, all registered job status listeners will be notified. Listeners will only be * notified of the status updates where the status of the job progresses and they will only be notified once about * every singular status. If this status has been signaled already, then the listeners will not be notified again. */ @Override public synchronized void notifyJobStatusListeners() { if (Objects.isNull(lastPublishedJobStatus) || status.compareTo(lastPublishedJobStatus) > 0) { JobStatusEvent event = new JobStatusEvent(status, this, lastEncounteredProblem); for (JobStatusListener listener : jobListenerList) { listener.jobStatusChanged(event); } lastPublishedJobStatus = status; } } /** * Cancels all ongoing send, poll and download operations as soon as possible. */ @Override public void cancel() { keepPolling = false; if (Objects.nonNull(sendThread)) { sendThread.interrupt(); } if (Objects.nonNull(pollThread)) { pollThread.interrupt(); } if (Objects.nonNull(downloadThread)) { downloadThread.interrupt(); } } /** * @return Returns true, if the polling thread should go on with its polling job. Otherwise, false. */ @Override public boolean keepPolling() { return keepPolling; } /** * Starts downloading the export job result, if there is any. As soon as the download has been finished, the job * status will be set to DOWNLOADED. All registered job status listeners will get notified about the it. Afterwards, * you may want to obtain a handle to the download CityGML file with getResult(). * * @throws FailedTransmissionException If the job has not been finished yet, then you will get some of this. */ public void downloadResult() throws FailedTransmissionException { if (!hasFinished()) { throw new FailedTransmissionException("Job has not been finished!"); } downloadThread = new Thread(new DownloadTask(this)); downloadThread.start(); } /** * This method is used by the download task to set the CityGML file handle as soon as the file has been download. * * @param result The file handle of the download CityGML file. */ protected void setResult(File result) { this.result = result; } /** * This method will return the download CityGML file for this export job, but only if the export job has actually * been finished before. * * @return Returns a file handle to the download CityGML file. * * @throws FailedTransmissionException If the job result has not been download yet, then you will get some of this. */ @Override public File getResult() throws FailedTransmissionException { if (!hasFinished()) { throw new FailedTransmissionException("Job has not been finished!"); } if (Objects.isNull(result)) { throw new FailedTransmissionException("Job result has not been downloaded!"); } return result; } /** * The asynchronous send, poll and download tasks cannot throw exceptions. If something goes wrong during the save, * poll or download operation, then you may want to submit at lease a textual description of the problem here. This * message will be sent to all registered job status listeners on the next status update. This is why you can use the * convenience method setStatus(jobStatus, message), to do both at the same time. * * @param errorMessage The description of the encountered problem. This could be the exception message or a more user * friendly message. */ protected synchronized void setLastEncounteredProblem(Optional errorMessage) { this.lastEncounteredProblem = errorMessage; } /** * A convenience method to set a new job status and a status message at the same time. Status messages will be passed * by asynchronous tasks instead of exception, because they cannot throw exceptions. * * @param jobStatus The new status of this job. * @param message A status message. This message may describe a problem which occurred during an asynchronous task. */ protected synchronized void setStatus(JobStatus jobStatus, Optional message) { super.setStatus(jobStatus); lastEncounteredProblem = message; notifyJobStatusListeners(); } }