Skip to content

Commit fa543df

Browse files
committed
Add SynchronousExecutorService and SynchronousScheduledExecutorService, re #58
1 parent dca19a8 commit fa543df

File tree

2 files changed

+312
-0
lines changed

2 files changed

+312
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
package org.libj.util.concurrent;
2+
3+
import java.util.ArrayList;
4+
import java.util.Collection;
5+
import java.util.Collections;
6+
import java.util.List;
7+
import java.util.concurrent.Callable;
8+
import java.util.concurrent.ExecutionException;
9+
import java.util.concurrent.ExecutorService;
10+
import java.util.concurrent.Future;
11+
import java.util.concurrent.TimeUnit;
12+
import java.util.concurrent.TimeoutException;
13+
14+
import org.libj.util.IdentityHashSet;
15+
16+
public class SynchronousExecutorService implements ExecutorService {
17+
static class SynchronousFuture<T> implements Future<T> {
18+
private final T result;
19+
20+
SynchronousFuture(final T result) {
21+
super();
22+
this.result = result;
23+
}
24+
25+
@Override
26+
public boolean cancel(final boolean mayInterruptIfRunning) {
27+
return false;
28+
}
29+
30+
@Override
31+
public boolean isCancelled() {
32+
return false;
33+
}
34+
35+
@Override
36+
public boolean isDone() {
37+
return true;
38+
}
39+
40+
@Override
41+
public T get() throws InterruptedException, ExecutionException {
42+
return result;
43+
}
44+
45+
@Override
46+
public T get(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
47+
return result;
48+
}
49+
}
50+
51+
private final IdentityHashSet<Object> tasks = new IdentityHashSet<>();
52+
private boolean closed = false;
53+
54+
@Override
55+
public void execute(final Runnable command) {
56+
if (closed)
57+
return;
58+
59+
tasks.add(command);
60+
try {
61+
command.run();
62+
}
63+
finally {
64+
tasks.remove(command);
65+
}
66+
}
67+
68+
@Override
69+
public void shutdown() {
70+
closed = true;
71+
}
72+
73+
@Override
74+
public List<Runnable> shutdownNow() {
75+
closed = true;
76+
return Collections.EMPTY_LIST;
77+
}
78+
79+
@Override
80+
public boolean isShutdown() {
81+
return closed;
82+
}
83+
84+
@Override
85+
public boolean isTerminated() {
86+
return tasks.size() == 0;
87+
}
88+
89+
@Override
90+
public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException {
91+
return false;
92+
}
93+
94+
@Override
95+
public <T> Future<T> submit(final Callable<T> task) {
96+
if (closed)
97+
return null;
98+
99+
tasks.add(task);
100+
try {
101+
return new SynchronousFuture<>(task.call());
102+
}
103+
catch (final Exception e) {
104+
if (e instanceof RuntimeException)
105+
throw (RuntimeException)e;
106+
107+
throw new RuntimeException(e);
108+
}
109+
finally {
110+
tasks.remove(task);
111+
}
112+
}
113+
114+
@Override
115+
public <T> Future<T> submit(final Runnable task, final T result) {
116+
if (closed)
117+
return null;
118+
119+
tasks.add(task);
120+
try {
121+
task.run();
122+
return new SynchronousFuture<>(result);
123+
}
124+
finally {
125+
tasks.remove(task);
126+
}
127+
}
128+
129+
@Override
130+
public Future<?> submit(final Runnable task) {
131+
if (closed)
132+
return null;
133+
134+
tasks.add(task);
135+
try {
136+
task.run();
137+
return new SynchronousFuture<>(null);
138+
}
139+
finally {
140+
tasks.remove(task);
141+
}
142+
}
143+
144+
@Override
145+
public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks) throws InterruptedException {
146+
if (closed)
147+
return null;
148+
149+
final ArrayList<Future<T>> futures = new ArrayList<>();
150+
for (final Callable<T> task : tasks)
151+
futures.add(submit(task));
152+
153+
return futures;
154+
}
155+
156+
@Override
157+
public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit) throws InterruptedException {
158+
throw new UnsupportedOperationException();
159+
}
160+
161+
@Override
162+
public <T> T invokeAny(final Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
163+
if (closed)
164+
return null;
165+
166+
if (tasks.size() == 0)
167+
return null;
168+
169+
final Callable<T> task = tasks.iterator().next();
170+
this.tasks.add(task);
171+
try {
172+
return task.call();
173+
}
174+
catch (final Exception e) {
175+
if (e instanceof RuntimeException)
176+
throw (RuntimeException)e;
177+
178+
throw new RuntimeException(e);
179+
}
180+
finally {
181+
this.tasks.remove(task);
182+
}
183+
}
184+
185+
@Override
186+
public <T> T invokeAny(final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
187+
throw new UnsupportedOperationException();
188+
}
189+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/* Copyright (c) 2024 LibJ
2+
*
3+
* Permission is hereby granted, free of charge, to any person obtaining a copy
4+
* of this software and associated documentation files (the "Software"), to deal
5+
* in the Software without restriction, including without limitation the rights
6+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
* copies of the Software, and to permit persons to whom the Software is
8+
* furnished to do so, subject to the following conditions:
9+
*
10+
* The above copyright notice and this permission notice shall be included in
11+
* all copies or substantial portions of the Software.
12+
*
13+
* You should have received a copy of The MIT License (MIT) along with this
14+
* program. If not, see <http://opensource.org/licenses/MIT/>.
15+
*/
16+
17+
package org.libj.util.concurrent;
18+
19+
import java.util.concurrent.Callable;
20+
import java.util.concurrent.Delayed;
21+
import java.util.concurrent.ScheduledExecutorService;
22+
import java.util.concurrent.ScheduledFuture;
23+
import java.util.concurrent.TimeUnit;
24+
25+
/**
26+
* A {@link ScheduledExecutorService} that executes its submitted tasks synchronously in the current thread.
27+
*/
28+
public class SynchronousScheduledExecutorService extends SynchronousExecutorService implements ScheduledExecutorService {
29+
static class SynchronousScheduledFuture<T> extends SynchronousFuture<T> implements ScheduledFuture<T> {
30+
private final long delay;
31+
private final TimeUnit timeUnit;
32+
33+
SynchronousScheduledFuture(final T result, final long delay, final TimeUnit timeUnit) {
34+
super(result);
35+
this.delay = delay;
36+
this.timeUnit = timeUnit;
37+
}
38+
39+
@Override
40+
public int compareTo(final Delayed o) {
41+
int c = TimeUnits.compare(delay, timeUnit, o.getDelay(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
42+
if (c != 0)
43+
return c;
44+
45+
c = TimeUnits.compare(delay, timeUnit, o.getDelay(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
46+
if (c != 0)
47+
return c;
48+
49+
return TimeUnits.compare(delay, timeUnit, o.getDelay(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
50+
}
51+
52+
@Override
53+
public long getDelay(final TimeUnit unit) {
54+
return unit.convert(delay, timeUnit);
55+
}
56+
}
57+
58+
@Override
59+
public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) {
60+
try {
61+
Thread.sleep(TimeUnit.MILLISECONDS.convert(delay, unit));
62+
}
63+
catch (final InterruptedException e) {
64+
throw new RuntimeException(e);
65+
}
66+
67+
command.run();
68+
return new SynchronousScheduledFuture<>(null, delay, unit);
69+
}
70+
71+
@Override
72+
public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay, final TimeUnit unit) {
73+
try {
74+
Thread.sleep(TimeUnit.MILLISECONDS.convert(delay, unit));
75+
return new SynchronousScheduledFuture<>(callable.call(), delay, unit);
76+
}
77+
catch (final Exception e) {
78+
if (e instanceof RuntimeException)
79+
throw (RuntimeException)e;
80+
81+
throw new RuntimeException(e);
82+
}
83+
}
84+
85+
@Override
86+
public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period, final TimeUnit unit) {
87+
try {
88+
Thread.sleep(TimeUnit.MILLISECONDS.convert(initialDelay, unit));
89+
final long rateMillis = TimeUnit.MILLISECONDS.convert(period, unit);
90+
for (long ts = System.currentTimeMillis(), sleep;;) {
91+
command.run();
92+
sleep = rateMillis - (System.currentTimeMillis() - ts);
93+
ts = System.currentTimeMillis();
94+
if (sleep > 0)
95+
Thread.sleep(sleep);
96+
}
97+
}
98+
catch (final Exception e) {
99+
if (e instanceof RuntimeException)
100+
throw (RuntimeException)e;
101+
102+
throw new RuntimeException(e);
103+
}
104+
}
105+
106+
@Override
107+
public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay, final TimeUnit unit) {
108+
try {
109+
Thread.sleep(TimeUnit.MILLISECONDS.convert(initialDelay, unit));
110+
final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, unit);
111+
while (true) {
112+
Thread.sleep(delayMillis);
113+
command.run();
114+
}
115+
}
116+
catch (final Exception e) {
117+
if (e instanceof RuntimeException)
118+
throw (RuntimeException)e;
119+
120+
throw new RuntimeException(e);
121+
}
122+
}
123+
}

0 commit comments

Comments
 (0)