Skip to content

Commit eafce01

Browse files
committed
feat: Using Duration type where applicable
1 parent 7d7aff0 commit eafce01

18 files changed

+370
-89
lines changed

Diff for: core/riot-core/src/main/java/com/redis/riot/core/AbstractJobCommand.java

+49-45
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.redis.riot.core;
22

3-
import java.time.Duration;
43
import java.util.Arrays;
54
import java.util.Collection;
65
import java.util.Iterator;
@@ -67,8 +66,8 @@ public abstract class AbstractJobCommand extends AbstractCallableCommand {
6766
@Option(names = "--job-name", description = "Job name.", paramLabel = "<string>", hidden = true)
6867
private String jobName;
6968

70-
@Option(names = "--repeat-every", description = "After the job completes keep repeating it on a fixed interval (ex 5m, 1h)")
71-
private String repeatEvery;
69+
@Option(names = "--repeat", description = "After the job completes keep repeating it on a fixed interval (ex 5m, 1h)", paramLabel = "<duration>")
70+
private Duration repeatEvery;
7271

7372
@ArgGroup(exclusive = false, heading = "Job options%n")
7473
private StepArgs stepArgs = new StepArgs();
@@ -179,59 +178,64 @@ protected Job job(Collection<Step<?, ?>> steps) {
179178
job.next(step(iterator.next()));
180179
}
181180

182-
if (null != repeatEvery) {
181+
if (repeatEvery != null) {
183182
job.incrementer(new RunIdIncrementer());
184183
job.preventRestart();
185-
String standardDuration = repeatEvery.toLowerCase().replace("m", "M").replace("h", "H");
186-
if (!standardDuration.startsWith("P")) {
187-
standardDuration = "PT" + standardDuration;
188-
}
189-
Duration repeatDuration = Duration.parse(standardDuration);
190-
job.listener(new JobExecutionListener() {
191-
Job lastJob;
192-
193-
@Override
194-
public void afterJob(JobExecution jobExecution) {
195-
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
196-
if (null != onJobSuccessCallback) {
197-
onJobSuccessCallback.run();
198-
}
199-
200-
log.info("Finished job, will run again in {}", repeatEvery);
201-
try {
202-
Thread.sleep(repeatDuration.toMillis());
203-
if (lastJob == null) {
204-
lastJob = job.build();
205-
}
206-
207-
Job nextJob = jobBuilder().start(step(steps.stream().findFirst().get()))
208-
.incrementer(new RunIdIncrementer()).preventRestart().listener(this).build();
209-
210-
JobParametersBuilder paramsBuilder = new JobParametersBuilder(
211-
jobExecution.getJobParameters(), jobExplorer);
212-
213-
jobLauncher.run(nextJob,
214-
paramsBuilder.addString("runTime", String.valueOf(System.currentTimeMillis()))
215-
.getNextJobParameters(lastJob).toJobParameters());
216-
lastJob = nextJob;
217-
} catch (InterruptedException | JobExecutionAlreadyRunningException | JobRestartException
218-
| JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
219-
throw new RuntimeException(e);
220-
}
221-
}
222-
JobExecutionListener.super.afterJob(jobExecution);
223-
}
224-
});
184+
job.listener(new RepeatJobExecutionListener(job, steps));
225185
}
226186

227187
return job.build();
228188
}
229189

190+
private class RepeatJobExecutionListener implements JobExecutionListener {
191+
192+
private final SimpleJobBuilder job;
193+
private final Collection<Step<?, ?>> steps;
194+
private Job lastJob;
195+
196+
public RepeatJobExecutionListener(SimpleJobBuilder job, Collection<Step<?, ?>> steps) {
197+
this.job = job;
198+
this.steps = steps;
199+
}
200+
201+
@Override
202+
public void afterJob(JobExecution jobExecution) {
203+
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
204+
if (null != onJobSuccessCallback) {
205+
onJobSuccessCallback.run();
206+
}
207+
208+
log.info("Finished job, will run again in {}", repeatEvery);
209+
try {
210+
Thread.sleep(repeatEvery.toMillis());
211+
if (lastJob == null) {
212+
lastJob = job.build();
213+
}
214+
215+
Job nextJob = jobBuilder().start(step(steps.stream().findFirst().get()))
216+
.incrementer(new RunIdIncrementer()).preventRestart().listener(this).build();
217+
218+
JobParametersBuilder paramsBuilder = new JobParametersBuilder(jobExecution.getJobParameters(),
219+
jobExplorer);
220+
221+
jobLauncher.run(nextJob,
222+
paramsBuilder.addString("runTime", String.valueOf(System.currentTimeMillis()))
223+
.getNextJobParameters(lastJob).toJobParameters());
224+
lastJob = nextJob;
225+
} catch (InterruptedException | JobExecutionAlreadyRunningException | JobRestartException
226+
| JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
227+
throw new RuntimeException(e);
228+
}
229+
}
230+
JobExecutionListener.super.afterJob(jobExecution);
231+
}
232+
}
233+
230234
protected boolean shouldShowProgress() {
231235
return stepArgs.getProgressArgs().getStyle() != ProgressStyle.NONE;
232236
}
233237

234-
protected abstract Job job();
238+
protected abstract Job job() throws RiotExecutionException;
235239

236240
private <I, O> TaskletStep step(Step<I, O> step) {
237241
log.info("Creating {}", step);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
package com.redis.riot.core;
2+
3+
import java.util.Locale;
4+
import java.util.concurrent.TimeUnit;
5+
import java.util.regex.Matcher;
6+
import java.util.regex.Pattern;
7+
8+
import org.springframework.util.Assert;
9+
10+
import static java.lang.Math.floor;
11+
import static java.lang.String.format;
12+
import static java.util.Objects.requireNonNull;
13+
import static java.util.concurrent.TimeUnit.DAYS;
14+
import static java.util.concurrent.TimeUnit.HOURS;
15+
import static java.util.concurrent.TimeUnit.MICROSECONDS;
16+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
17+
import static java.util.concurrent.TimeUnit.MINUTES;
18+
import static java.util.concurrent.TimeUnit.NANOSECONDS;
19+
import static java.util.concurrent.TimeUnit.SECONDS;
20+
21+
public final class Duration implements Comparable<Duration> {
22+
23+
private static final Pattern PATTERN = Pattern.compile("^\\s*(\\d+(?:\\.\\d+)?)\\s*([a-zA-Z]+)\\s*$");
24+
25+
// We iterate over the TIME_UNITS constant in convertToMostSuccinctTimeUnit()
26+
// instead of TimeUnit.values() as the latter results in non-trivial amount of
27+
// memory
28+
// allocation when that method is called in a tight loop. The reason is that the
29+
// values()
30+
// call allocates a new array at each call.
31+
private static final TimeUnit[] TIME_UNITS = TimeUnit.values();
32+
33+
public static final Duration ZERO = new Duration(0, SECONDS);
34+
35+
public static Duration nanosSince(long start) {
36+
return succinctNanos(System.nanoTime() - start);
37+
}
38+
39+
public static Duration succinctNanos(long nanos) {
40+
return succinctDuration(nanos, NANOSECONDS);
41+
}
42+
43+
public static Duration succinctDuration(double value, TimeUnit unit) {
44+
if (value == 0) {
45+
return ZERO;
46+
}
47+
return new Duration(value, unit).convertToMostSuccinctTimeUnit();
48+
}
49+
50+
private final double value;
51+
private final TimeUnit unit;
52+
53+
public Duration(double value, TimeUnit unit) {
54+
if (Double.isInfinite(value)) {
55+
throw new IllegalArgumentException("value is infinite: " + value);
56+
}
57+
Assert.isTrue(!Double.isNaN(value), "value is not a number");
58+
if (value < 0) {
59+
throw new IllegalArgumentException("value is negative: " + value);
60+
}
61+
requireNonNull(unit, "unit is null");
62+
63+
this.value = value;
64+
this.unit = unit;
65+
}
66+
67+
public long toMillis() {
68+
return roundTo(MILLISECONDS);
69+
}
70+
71+
public double getValue() {
72+
return value;
73+
}
74+
75+
public TimeUnit getUnit() {
76+
return unit;
77+
}
78+
79+
public double getValue(TimeUnit timeUnit) {
80+
requireNonNull(timeUnit, "timeUnit is null");
81+
return value * (millisPerTimeUnit(this.unit) * 1.0 / millisPerTimeUnit(timeUnit));
82+
}
83+
84+
public long roundTo(TimeUnit timeUnit) {
85+
requireNonNull(timeUnit, "timeUnit is null");
86+
double rounded = Math.floor(getValue(timeUnit) + 0.5d);
87+
if (rounded > Long.MAX_VALUE) {
88+
throw new IllegalArgumentException(
89+
format("value %s %s is too large to be represented in requested unit %s as a long", value, unit,
90+
timeUnit));
91+
}
92+
return (long) rounded;
93+
}
94+
95+
public Duration convertTo(TimeUnit timeUnit) {
96+
requireNonNull(timeUnit, "timeUnit is null");
97+
return new Duration(getValue(timeUnit), timeUnit);
98+
}
99+
100+
public Duration convertToMostSuccinctTimeUnit() {
101+
TimeUnit unitToUse = NANOSECONDS;
102+
for (TimeUnit unitToTest : TIME_UNITS) {
103+
// since time units are powers of ten, we can get rounding errors here, so fuzzy
104+
// match
105+
if (getValue(unitToTest) > 0.9999) {
106+
unitToUse = unitToTest;
107+
} else {
108+
break;
109+
}
110+
}
111+
return convertTo(unitToUse);
112+
}
113+
114+
public java.time.Duration toJavaTime() {
115+
long seconds;
116+
long nanoAdjustment;
117+
long secondsPerUnit = SECONDS.convert(1, unit);
118+
long nanosPerUnit = NANOSECONDS.convert(1, unit);
119+
if (secondsPerUnit > 1) {
120+
seconds = (long) floor(value * secondsPerUnit);
121+
nanoAdjustment = (long) floor((value - (double) seconds / secondsPerUnit) * nanosPerUnit);
122+
} else {
123+
long unitsPerSecond = unit.convert(1, SECONDS);
124+
seconds = (long) floor(value / unitsPerSecond);
125+
nanoAdjustment = (long) floor((value - (double) seconds * unitsPerSecond) * nanosPerUnit);
126+
}
127+
128+
if (seconds == Long.MAX_VALUE) {
129+
nanoAdjustment = 0;
130+
}
131+
return java.time.Duration.ofSeconds(seconds, nanoAdjustment);
132+
}
133+
134+
@Override
135+
public String toString() {
136+
return toString(unit);
137+
}
138+
139+
public String toString(TimeUnit timeUnit) {
140+
requireNonNull(timeUnit, "timeUnit is null");
141+
double magnitude = getValue(timeUnit);
142+
String timeUnitAbbreviation = timeUnitToString(timeUnit);
143+
return format(Locale.ENGLISH, "%.2f%s", magnitude, timeUnitAbbreviation);
144+
}
145+
146+
public static Duration parse(String string) throws IllegalArgumentException {
147+
requireNonNull(string, "duration is null");
148+
Assert.isTrue(!string.isEmpty(), "duration is empty");
149+
150+
Matcher matcher = PATTERN.matcher(string);
151+
if (!matcher.matches()) {
152+
throw new IllegalArgumentException("duration is not a valid data duration string: " + string);
153+
}
154+
155+
double value = Double.parseDouble(matcher.group(1));
156+
String unitString = matcher.group(2);
157+
158+
TimeUnit timeUnit = valueOfTimeUnit(unitString);
159+
return new Duration(value, timeUnit);
160+
}
161+
162+
@Override
163+
public int compareTo(Duration o) {
164+
return Double.compare(getValue(MILLISECONDS), o.getValue(MILLISECONDS));
165+
}
166+
167+
public boolean isZero() {
168+
return equals(ZERO);
169+
}
170+
171+
@Override
172+
public boolean equals(Object o) {
173+
if (this == o) {
174+
return true;
175+
}
176+
if (o == null || getClass() != o.getClass()) {
177+
return false;
178+
}
179+
180+
Duration duration = (Duration) o;
181+
182+
return compareTo(duration) == 0;
183+
}
184+
185+
@Override
186+
public int hashCode() {
187+
double value = getValue(MILLISECONDS);
188+
return Double.hashCode(value);
189+
}
190+
191+
public static TimeUnit valueOfTimeUnit(String timeUnitString) {
192+
requireNonNull(timeUnitString, "timeUnitString is null");
193+
switch (timeUnitString) {
194+
case "ns":
195+
return NANOSECONDS;
196+
case "us":
197+
return MICROSECONDS;
198+
case "ms":
199+
return MILLISECONDS;
200+
case "s":
201+
return SECONDS;
202+
case "m":
203+
return MINUTES;
204+
case "h":
205+
return HOURS;
206+
case "d":
207+
return DAYS;
208+
default:
209+
throw new IllegalArgumentException("Unknown time unit: " + timeUnitString);
210+
}
211+
}
212+
213+
public static String timeUnitToString(TimeUnit timeUnit) {
214+
requireNonNull(timeUnit, "timeUnit is null");
215+
switch (timeUnit) {
216+
case NANOSECONDS:
217+
return "ns";
218+
case MICROSECONDS:
219+
return "us";
220+
case MILLISECONDS:
221+
return "ms";
222+
case SECONDS:
223+
return "s";
224+
case MINUTES:
225+
return "m";
226+
case HOURS:
227+
return "h";
228+
case DAYS:
229+
return "d";
230+
default:
231+
throw new IllegalArgumentException("Unsupported time unit " + timeUnit);
232+
}
233+
}
234+
235+
private static double millisPerTimeUnit(TimeUnit timeUnit) {
236+
switch (timeUnit) {
237+
case NANOSECONDS:
238+
return 1.0 / 1000000.0;
239+
case MICROSECONDS:
240+
return 1.0 / 1000.0;
241+
case MILLISECONDS:
242+
return 1;
243+
case SECONDS:
244+
return 1000;
245+
case MINUTES:
246+
return 1000 * 60;
247+
case HOURS:
248+
return 1000 * 60 * 60;
249+
case DAYS:
250+
return 1000 * 60 * 60 * 24;
251+
default:
252+
throw new IllegalArgumentException("Unsupported time unit " + timeUnit);
253+
}
254+
}
255+
}

Diff for: core/riot-core/src/main/java/com/redis/riot/core/MainCommand.java

+1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public int run(String... args) {
3131
commandLine.setExecutionExceptionHandler(new PrintExceptionMessageHandler());
3232
commandLine.registerConverter(DataSize.class, DataSize::parse);
3333
commandLine.registerConverter(Expression.class, Expression::parse);
34+
commandLine.registerConverter(Duration.class, Duration::parse);
3435
commandLine.registerConverter(TemplateExpression.class, Expression::parseTemplate);
3536
commandLine.setExecutionStrategy(LoggingMixin.executionStrategy(commandLine.getExecutionStrategy()));
3637
return commandLine.execute(args);

0 commit comments

Comments
 (0)