Skip to content

Commit

Permalink
Merge pull request #166 from redis/riot-callbacks-and-schedule
Browse files Browse the repository at this point in the history
WIP: Initial changes to support on success callbacks and repeat on intervals
  • Loading branch information
jruaux authored Jan 16, 2025
2 parents 085cc0c + 4107d4f commit 70cddc4
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 11 deletions.
8 changes: 2 additions & 6 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,14 @@ permissions:
jobs:
build:
name: Build
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
runs-on: ${{ matrix.os }}
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Setup Java
uses: actions/setup-java@v4
with:
java-version: 17
java-version: 21
distribution: zulu
cache: gradle

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
4.1.11-SNAPSHOT
4.2.0-SNAPSHOT
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ allprojects {
// maven {
// url 'https://s01.oss.sonatype.org/content/repositories/snapshots/'
//}
// gradlePluginPortal()
}

tasks.withType(GenerateModuleMetadata) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,33 @@
package com.redis.riot.core;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.job.builder.SimpleJobBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.TaskExecutorJobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.core.step.builder.StepBuilder;
Expand Down Expand Up @@ -57,13 +67,19 @@ public abstract class AbstractJobCommand extends AbstractCallableCommand {
@Option(names = "--job-name", description = "Job name.", paramLabel = "<string>", hidden = true)
private String jobName;

@Option(names = "--repeat-every", description = "After the job completes keep repeating it on a fixed interval (ex 5m, 1h)")
private String repeatEvery;

@ArgGroup(exclusive = false, heading = "Job options%n")
private StepArgs stepArgs = new StepArgs();

private String jobRepositoryName = DEFAULT_JOB_REPOSITORY_NAME;
private JobRepository jobRepository;
private PlatformTransactionManager transactionManager;
private JobLauncher jobLauncher;
private JobExplorer jobExplorer;

protected Runnable onJobSuccessCallback;

@Override
protected void initialize() throws RiotInitializationException {
Expand All @@ -88,6 +104,14 @@ protected void initialize() throws RiotInitializationException {
throw new RiotInitializationException("Could not create job launcher", e);
}
}
if (jobExplorer == null) {
try {
jobExplorer = JobUtils.jobExplorerFactoryBean(jobRepositoryName).getObject();
} catch (Exception e) {
log.warn("Error getting jobExplorer", e);
throw new RiotInitializationException("Could not create job explorer", e);
}
}
}

private JobLauncher jobLauncher() throws Exception {
Expand Down Expand Up @@ -154,6 +178,52 @@ protected Job job(Collection<Step<?, ?>> steps) {
while (iterator.hasNext()) {
job.next(step(iterator.next()));
}

if (null != repeatEvery) {
job.incrementer(new RunIdIncrementer());
job.preventRestart();
String standardDuration = repeatEvery.toLowerCase().replace("m", "M").replace("h", "H");
if (!standardDuration.startsWith("P")) {
standardDuration = "PT" + standardDuration;
}
Duration repeatDuration = Duration.parse(standardDuration);
job.listener(new JobExecutionListener() {
Job lastJob;

@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
if (null != onJobSuccessCallback) {
onJobSuccessCallback.run();
}

log.info("Finished job, will run again in {}", repeatEvery);
try {
Thread.sleep(repeatDuration.toMillis());
if (lastJob == null) {
lastJob = job.build();
}

Job nextJob = jobBuilder().start(step(steps.stream().findFirst().get()))
.incrementer(new RunIdIncrementer()).preventRestart().listener(this).build();

JobParametersBuilder paramsBuilder = new JobParametersBuilder(
jobExecution.getJobParameters(), jobExplorer);

jobLauncher.run(nextJob,
paramsBuilder.addString("runTime", String.valueOf(System.currentTimeMillis()))
.getNextJobParameters(lastJob).toJobParameters());
lastJob = nextJob;
} catch (InterruptedException | JobExecutionAlreadyRunningException | JobRestartException
| JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
throw new RuntimeException(e);
}
}
JobExecutionListener.super.afterJob(jobExecution);
}
});
}

return job.build();
}

Expand Down Expand Up @@ -326,4 +396,11 @@ public void setJobLauncher(JobLauncher jobLauncher) {
this.jobLauncher = jobLauncher;
}

public JobExplorer getJobExplorer() {
return jobExplorer;
}

public void setJobExplorer(JobExplorer jobExplorer) {
this.jobExplorer = jobExplorer;
}
}
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ latencyUtilsVersion = 2.0.3
picocliVersion = 4.7.6
progressbarVersion = 0.10.1
protobufVersion = 4.29.1
springBatchRedisVersion = 4.5.4
springBatchRedisVersion = 4.5.5
testcontainersRedisVersion = 2.2.2

org.gradle.daemon = true
Expand Down
4 changes: 2 additions & 2 deletions plugins/riot/src/main/java/com/redis/riot/DatabaseImport.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class DatabaseImport extends AbstractRedisImportCommand {
private DataSourceArgs dataSourceArgs = new DataSourceArgs();

@Parameters(arity = "1", description = "SQL SELECT statement", paramLabel = "SQL")
private String sql;
protected String sql;

@ArgGroup(exclusive = false)
private DatabaseReaderArgs readerArgs = new DatabaseReaderArgs();
Expand All @@ -31,7 +31,7 @@ protected Job job() {
return job(step(reader()));
}

private JdbcCursorItemReader<Map<String, Object>> reader() {
protected JdbcCursorItemReader<Map<String, Object>> reader() {
Assert.hasLength(sql, "No SQL statement specified");
log.info("Creating data source with {}", dataSourceArgs);
DataSource dataSource = dataSourceArgs.dataSource();
Expand Down

0 comments on commit 70cddc4

Please sign in to comment.