diff --git a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusServiceBean.java b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusServiceBean.java
index 53de77f2fc8..5f85aa0cd80 100644
--- a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusServiceBean.java
+++ b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusServiceBean.java
@@ -154,7 +154,7 @@ private String getRuleId(GlobusEndpoint endpoint, String principal, String permi
* @param dataset - the dataset associated with the rule
* @param globusLogger - a separate logger instance, may be null
*/
- public void deletePermission(String ruleId, Dataset dataset, Logger globusLogger) {
+ private void deletePermission(String ruleId, Dataset dataset, Logger globusLogger) {
globusLogger.fine("Start deleting rule " + ruleId + " for dataset " + dataset.getId());
if (ruleId.length() > 0) {
if (dataset != null) {
@@ -172,7 +172,7 @@ public void deletePermission(String ruleId, Dataset dataset, Logger globusLogger
globusLogger.info("Access rule " + ruleId + " was deleted successfully");
}
} catch (MalformedURLException ex) {
- logger.log(Level.WARNING,
+ globusLogger.log(Level.WARNING,
"Failed to delete access rule " + ruleId + " on endpoint " + endpoint.getId(), ex);
}
}
@@ -444,7 +444,6 @@ private void monitorTemporaryPermissions(String ruleId, long datasetId) {
* files are created in general, some calls may use the
* class logger)
* @return
- * @throws MalformedURLException
*/
public GlobusTaskState getTask(String accessToken, String taskId, Logger globusLogger) {
@@ -730,8 +729,13 @@ public void globusUpload(JsonObject jsonData, Dataset dataset, String httpReques
String logTimestamp = logFormatter.format(startDate);
Logger globusLogger = Logger.getLogger(
- "edu.harvard.iq.dataverse.upload.client.DatasetServiceBean." + "GlobusUpload" + logTimestamp);
- String logFileName = System.getProperty("com.sun.aas.instanceRoot") + File.separator + "logs" + File.separator + "globusUpload_" + dataset.getId() + "_" + logTimestamp
+ "edu.harvard.iq.dataverse.globus.GlobusServiceBean." + "Globus"
+ + GlobusTaskInProgress.TaskType.UPLOAD + logTimestamp);
+
+ String logFileName = System.getProperty("com.sun.aas.instanceRoot")
+ + File.separator + "logs"
+ + File.separator + "globus" + GlobusTaskInProgress.TaskType.UPLOAD + "_"
+ + logTimestamp + " " + dataset.getId() + "_"
+ ".log";
FileHandler fileHandler;
@@ -1160,12 +1164,18 @@ private void processUploadedFiles(JsonArray filesJsonArray, Dataset dataset, Aut
public void globusDownload(JsonObject jsonObject, Dataset dataset, User authUser) throws MalformedURLException {
Date startDate = new Date();
-
+
+ // @todo the logger initialization method will be moved into the GlobusUtil
+ // eventually, for both this and the monitoring service to use
String logTimestamp = logFormatter.format(startDate);
Logger globusLogger = Logger.getLogger(
- "edu.harvard.iq.dataverse.upload.client.DatasetServiceBean." + "GlobusDownload" + logTimestamp);
+ "edu.harvard.iq.dataverse.globus.GlobusServiceBean." + "Globus"
+ + GlobusTaskInProgress.TaskType.DOWNLOAD + logTimestamp);
- String logFileName = System.getProperty("com.sun.aas.instanceRoot") + File.separator + "logs" + File.separator + "globusDownload_id_" + dataset.getId() + "_" + logTimestamp
+ String logFileName = System.getProperty("com.sun.aas.instanceRoot")
+ + File.separator + "logs"
+ + File.separator + "globus" + GlobusTaskInProgress.TaskType.DOWNLOAD + "_"
+ + dataset.getId() + "_" + logTimestamp
+ ".log";
FileHandler fileHandler;
boolean fileHandlerSuceeded;
@@ -1194,8 +1204,8 @@ public void globusDownload(JsonObject jsonObject, Dataset dataset, User authUser
// If the rules_cache times out, the permission will be deleted. Presumably that
// doesn't affect a
// globus task status check
- GlobusTaskState task = getTask(endpoint.getClientToken(), taskIdentifier, globusLogger);
- String ruleId = getRuleId(endpoint, task.getOwner_id(), "r");
+ GlobusTaskState taskState = getTask(endpoint.getClientToken(), taskIdentifier, globusLogger);
+ String ruleId = getRuleId(endpoint, taskState.getOwner_id(), "r");
if (ruleId != null) {
logger.fine("Found rule: " + ruleId);
Long datasetId = rulesCache.getIfPresent(ruleId);
@@ -1227,51 +1237,25 @@ public void globusDownload(JsonObject jsonObject, Dataset dataset, User authUser
GlobusTaskInProgress.TaskType.DOWNLOAD,
dataset,
endpoint.getClientToken(),
- authUser instanceof AuthenticatedUser ? authUser : null,
+ authUser instanceof AuthenticatedUser ? (AuthenticatedUser)authUser : null,
ruleId,
new Timestamp(startDate.getTime()));
em.persist(taskInProgress);
- if (fileHandler != null) {
- fileHandler.close();
- }
+ fileHandler.close();
// return and forget
return;
}
- task = globusStatusCheck(endpoint, taskIdentifier, globusLogger);
- // @todo null check?
- String taskStatus = GlobusUtil.getTaskStatus(task);
-
- // Transfer is done (success or failure) so delete the rule
- if (ruleId != null) {
- logger.fine("Deleting: rule: " + ruleId);
- deletePermission(ruleId, dataset, globusLogger);
- }
-
- if (taskStatus.startsWith("FAILED") || taskStatus.startsWith("INACTIVE")) {
- String comment = "Reason : " + taskStatus.split("#")[1] + "
Short Description : "
- + taskStatus.split("#")[2];
- if (authUser != null && authUser instanceof AuthenticatedUser) {
- userNotificationService.sendNotification((AuthenticatedUser) authUser, new Timestamp(new Date().getTime()),
- UserNotification.Type.GLOBUSDOWNLOADCOMPLETEDWITHERRORS, dataset.getId(), comment, true);
- }
-
- globusLogger.info("Globus task failed during download process: "+comment);
- } else if (authUser != null && authUser instanceof AuthenticatedUser) {
-
- boolean taskSkippedFiles = (task.getSkip_source_errors() == null) ? false : task.getSkip_source_errors();
- if (!taskSkippedFiles) {
- userNotificationService.sendNotification((AuthenticatedUser) authUser,
- new Timestamp(new Date().getTime()), UserNotification.Type.GLOBUSDOWNLOADCOMPLETED,
- dataset.getId());
- } else {
- userNotificationService.sendNotification((AuthenticatedUser) authUser,
- new Timestamp(new Date().getTime()), UserNotification.Type.GLOBUSDOWNLOADCOMPLETEDWITHERRORS,
- dataset.getId(), "");
- }
- }
+ // Check again:
+ taskState = globusStatusCheck(endpoint, taskIdentifier, globusLogger);
+
+ processCompletedDownloadTask(taskState,
+ authUser instanceof AuthenticatedUser ? (AuthenticatedUser)authUser : null,
+ dataset,
+ ruleId,
+ globusLogger);
}
Executor executor = Executors.newFixedThreadPool(10);
@@ -1540,6 +1524,10 @@ public List findAllOngoingTasks() {
return em.createQuery("select object(o) from GlobusTaskInProgress as o order by o.startTime", GlobusTaskInProgress.class).getResultList();
}
+ public List findAllOngoingTasks(GlobusTaskInProgress.TaskType taskType) {
+ return em.createQuery("select object(o) from GlobusTaskInProgress as o where o.taskType=:taskType order by o.startTime", GlobusTaskInProgress.class).setParameter("taskType", taskType).getResultList();
+ }
+
public void deleteTask(GlobusTaskInProgress task) {
GlobusTaskInProgress mergedTask = em.merge(task);
em.remove(mergedTask);
@@ -1549,14 +1537,10 @@ public List findExternalUploadsByTaskId(String tas
return em.createNamedQuery("ExternalFileUploadInProgress.findByTaskId").setParameter("taskId", taskId).getResultList();
}
- public void processCompletedTask(GlobusTaskInProgress globusTask, boolean taskSuccess, String taskStatus, Logger taskLogger) {
+ public void processCompletedTask(GlobusTaskInProgress globusTask, GlobusTaskState taskState, boolean taskSuccess, String taskStatus, Logger taskLogger) {
String ruleId = globusTask.getRuleId();
Dataset dataset = globusTask.getDataset();
AuthenticatedUser authUser = globusTask.getLocalUser();
- if (authUser == null) {
- // @todo log error message; do nothing
- return;
- }
if (GlobusTaskInProgress.TaskType.UPLOAD.equals(globusTask.getTaskType())) {
List fileUploadsInProgress = findExternalUploadsByTaskId(globusTask.getTaskId());
@@ -1578,10 +1562,67 @@ public void processCompletedTask(GlobusTaskInProgress globusTask, boolean taskSu
JsonArray filesJsonArray = filesJsonArrayBuilder.build();
processCompletedUploadTask(dataset, filesJsonArray, authUser, ruleId, taskLogger, taskSuccess, taskStatus);
+ } else if (GlobusTaskInProgress.TaskType.DOWNLOAD.equals(globusTask.getTaskType())) {
+
+ processCompletedDownloadTask(taskState, authUser, dataset, ruleId, taskLogger);
+
} else {
- // @todo eventually, extend this async. framework to handle Glonus downloads as well
+ logger.warning("Unknown or null TaskType passed to processCompletedTask()");
+ }
+
+ }
+
+ private void processCompletedDownloadTask(GlobusTaskState taskState,
+ AuthenticatedUser authUser,
+ Dataset dataset,
+ String ruleId,
+ Logger taskLogger) {
+ // The only thing to do on completion of a remote download
+ // transfer is to delete the permission ACL that Dataverse
+ // had negotiated for the user before the task was initialized:
+
+ if (ruleId != null) {
+ deletePermission(ruleId, dataset, taskLogger);
}
+ String taskStatus = GlobusUtil.getTaskStatus(taskState);
+
+ // ... plus log the outcome and send any notifications:
+ if (taskStatus.startsWith("FAILED") || taskStatus.startsWith("INACTIVE")) {
+ // Outright, unambiguous failure:
+ String comment = "Reason : " + taskStatus.split("#")[1] + "
Short Description : "
+ + taskStatus.split("#")[2];
+ taskLogger.info("Globus task failed during download process: " + comment);
+
+ sendNotification(authUser, UserNotification.Type.GLOBUSDOWNLOADCOMPLETEDWITHERRORS, dataset.getId(), comment);
+
+ } else {
+ // Success, total or partial
+ boolean taskSkippedFiles = (taskState == null || taskState.getSkip_source_errors() == null) ? false : taskState.getSkip_source_errors();
+
+ if (!taskSkippedFiles) {
+ taskLogger.info("Globus task completed successfully");
+
+ sendNotification(authUser, UserNotification.Type.GLOBUSDOWNLOADCOMPLETED, dataset.getId(), "");
+ } else {
+ taskLogger.info("Globus task completed with partial success (skip source errors)");
+
+ sendNotification(authUser, UserNotification.Type.GLOBUSDOWNLOADCOMPLETEDWITHERRORS, dataset.getId(), "");
+ }
+ }
+ }
+
+ private void sendNotification(AuthenticatedUser authUser,
+ UserNotification.Type type,
+ Long datasetId,
+ String comment) {
+ if (authUser != null) {
+ userNotificationService.sendNotification(authUser,
+ new Timestamp(new Date().getTime()),
+ type,
+ datasetId,
+ comment);
+ }
}
public void deleteExternalUploadRecords(String taskId) {
diff --git a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusTaskState.java b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusTaskState.java
index b5db20d46c1..2a53df74ee4 100644
--- a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusTaskState.java
+++ b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusTaskState.java
@@ -4,6 +4,7 @@
* This class is used to store the state of an ongoing Globus task (transfer)
* as reported by the Globus task API.
*/
+
public class GlobusTaskState {
private String DATA_TYPE;
diff --git a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusUtil.java b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusUtil.java
index 3254114ca49..fd36e2a27bc 100644
--- a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusUtil.java
+++ b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusUtil.java
@@ -39,6 +39,8 @@ public static boolean isTaskCompleted(GlobusTaskState task) {
// TODO: "nice_status": "CONNECTION_FAILED" *may* mean
// that it's a Globus issue on the endnode side, that is
// in fact recoverable; should we add it to the list here?
+ // @todo: I'm tempted to just take "ACTIVE" for face value,
+ // and assume that it's still ongoing.
if (task.getNice_status().equalsIgnoreCase("ok")
|| task.getNice_status().equalsIgnoreCase("queued")) {
return false;
@@ -61,6 +63,9 @@ public static boolean isTaskSucceeded(GlobusTaskState task) {
// has not completed *successfully*.
return false;
}
+ // @todo: should we be more careful here, and actually check for
+ // status.equalsI("SUCCEEDED") etc. before assuming the task
+ // did in fact succeed?
return true;
}
}
@@ -89,6 +94,7 @@ public static String getTaskStatus(GlobusTaskState task) {
status = "FAILED";
}
} else {
+ // @todo are we sure?
status = "FAILED";
}
return status;
diff --git a/src/main/java/edu/harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java b/src/main/java/edu/harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java
index fdb2b222804..60e24d62702 100644
--- a/src/main/java/edu/harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java
+++ b/src/main/java/edu/harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java
@@ -53,21 +53,30 @@ public void init() {
logger.info("Starting Globus task monitoring service");
int pollingInterval = SystemConfig.getIntLimitFromStringOrDefault(
settingsSvc.getValueForKey(SettingsServiceBean.Key.GlobusPollingInterval), 600);
- this.scheduler.scheduleWithFixedDelay(this::checkOngoingTasks,
+ this.scheduler.scheduleWithFixedDelay(this::checkOngoingUploadTasks,
0, pollingInterval,
TimeUnit.SECONDS);
+
+ // A separate monitoring service for ongoing download tasks:
+ this.scheduler.scheduleWithFixedDelay(this::checkOngoingDownloadTasks,
+ 0, 13 /*pollingInterval*/,
+ TimeUnit.SECONDS);
+
} else {
logger.info("Skipping Globus task monitor initialization");
}
+
+
}
/**
* This method will be executed on a timer-like schedule, continuously
- * monitoring all the ongoing external Globus tasks (transfers).
+ * monitoring all the ongoing external Globus tasks (transfers TO remote
+ * Globus endnodes).
*/
- public void checkOngoingTasks() {
- logger.fine("Performing a scheduled external Globus task check");
- List tasks = globusService.findAllOngoingTasks();
+ public void checkOngoingUploadTasks() {
+ logger.fine("Performing a scheduled external Globus UPLOAD task check");
+ List tasks = globusService.findAllOngoingTasks(GlobusTaskInProgress.TaskType.UPLOAD);
tasks.forEach(t -> {
FileHandler taskLogHandler = getTaskLogHandler(t);
@@ -76,7 +85,7 @@ public void checkOngoingTasks() {
GlobusTaskState retrieved = globusService.getTask(t.getGlobusToken(), t.getTaskId(), taskLogger);
if (GlobusUtil.isTaskCompleted(retrieved)) {
// Do our thing, finalize adding the files to the dataset
- globusService.processCompletedTask(t, GlobusUtil.isTaskSucceeded(retrieved), GlobusUtil.getTaskStatus(retrieved), taskLogger);
+ globusService.processCompletedTask(t, retrieved, GlobusUtil.isTaskSucceeded(retrieved), GlobusUtil.getTaskStatus(retrieved), taskLogger);
// Whether it finished successfully, or failed in the process,
// there's no need to keep monitoring this task, so we can
// delete it.
@@ -92,6 +101,36 @@ public void checkOngoingTasks() {
});
}
+ /**
+ * This method will be executed on a timer-like schedule, continuously
+ * monitoring all the ongoing external Globus download tasks (transfers by
+ * Dataverse users FROM remote, Dataverse-managed Globus endnodes).
+ */
+ public void checkOngoingDownloadTasks() {
+ logger.fine("Performing a scheduled external Globus DOWNLOAD task check");
+ List tasks = globusService.findAllOngoingTasks(GlobusTaskInProgress.TaskType.DOWNLOAD);
+
+ tasks.forEach(t -> {
+ FileHandler taskLogHandler = getTaskLogHandler(t);
+ Logger taskLogger = getTaskLogger(t, taskLogHandler);
+
+ GlobusTaskState retrieved = globusService.getTask(t.getGlobusToken(), t.getTaskId(), taskLogger);
+ if (GlobusUtil.isTaskCompleted(retrieved)) {
+ globusService.processCompletedTask(t, retrieved, GlobusUtil.isTaskSucceeded(retrieved), GlobusUtil.getTaskStatus(retrieved), taskLogger);
+ // globusService.processCompletedTask(t, GlobusUtil.isTaskSucceeded(retrieved), GlobusUtil.getTaskStatus(retrieved), taskLogger);
+ // Whether it finished successfully or failed, the task can now
+ // be deleted.
+ globusService.deleteTask(t);
+ }
+
+ if (taskLogHandler != null) {
+ // @todo it should be prudent to cache these loggers and handlers
+ // between monitoring runs (should be fairly easy to do)
+ taskLogHandler.close();
+ }
+ });
+ }
+
private FileHandler getTaskLogHandler(GlobusTaskInProgress task) {
if (task == null) {
return null;
@@ -100,7 +139,10 @@ private FileHandler getTaskLogHandler(GlobusTaskInProgress task) {
Date startDate = new Date(task.getStartTime().getTime());
String logTimeStamp = logFormatter.format(startDate);
- String logFileName = System.getProperty("com.sun.aas.instanceRoot") + File.separator + "logs" + File.separator + "globusUpload_" + task.getDataset().getId() + "_" + logTimeStamp
+ String logFileName = System.getProperty("com.sun.aas.instanceRoot")
+ + File.separator + "logs"
+ + File.separator + "globus" + task.getTaskType() + "_"
+ + logTimeStamp + "_" + task.getDataset().getId()
+ ".log";
FileHandler fileHandler;
try {
@@ -120,7 +162,8 @@ private Logger getTaskLogger(GlobusTaskInProgress task, FileHandler logFileHandl
String logTimeStamp = logFormatter.format(startDate);
Logger taskLogger = Logger.getLogger(
- "edu.harvard.iq.dataverse.upload.client.DatasetServiceBean." + "GlobusUpload" + logTimeStamp);
+ "edu.harvard.iq.dataverse.globus.GlobusServiceBean." + "Globus"
+ + task.getTaskType() + logTimeStamp);
taskLogger.setUseParentHandlers(false);
taskLogger.addHandler(logFileHandler);