Skip to content

Commit

Permalink
Io jms fix ack message checkpoint (#22932)
Browse files Browse the repository at this point in the history
* [BEAM-11828] => Fix read message queue implementation

* [BEAM-11828] => New implementation to fix acknowledgment

* [BEAM-11828] => Some refactoring (remove drainMessage method)

* Update CHANGES.md

* Pull request review:
- Upgrade equals and hashcode method in JmsCheckpointMark
- Add a serciveExecutor.schedule method in order to close the JMS session after a tiemout and discard all the related checkpointd

* Pull request review:
- Get back to the initial implementation of JmsCheckpointMark
- Add the discard attribute and discard() method to JmsCheckpointMark

* Adding some unit tests to new JmsCHeckpointMark discard method

* Code review: discard checkpoint and clear messages at beginning of finalizeCheckpoint method

Co-authored-by: Lukasz Cwik <[email protected]>

* Throw an IllegalStateException when adding message when checkpoint is discarded

* Change closeTimeout from long to Duration

* CHeck that closeTimeout is non negative

* Add private package fields to perform testd

* Code review: update comment

Co-authored-by: Lukasz Cwik <[email protected]>

* Code review: add comment

Co-authored-by: Lukasz Cwik <[email protected]>

* Code review: remove empty space

Co-authored-by: Lukasz Cwik <[email protected]>

* Code review: use ExecutorOptions to get an instance of ShceduleExecutorService

* Code review: avoid Thread.sleep with ExecutorService.awaitTermination (don't know if it is better)

* Apply suggestions from code review

* Apply suggestions from code review

* Apply suggestions from code review

* Update sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java

Co-authored-by: Lukasz Cwik <[email protected]>
  • Loading branch information
rvballada and lukecwik authored Oct 19, 2022
1 parent 171d3e3 commit 10977bf
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
## Bugfixes
* Fixed JmsIO acknowledgment issue (https://github.com/apache/beam/issues/20814)
* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
## Known Issues
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.Message;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import org.slf4j.Logger;
Expand All @@ -41,13 +42,20 @@ class JmsCheckpointMark implements UnboundedSource.CheckpointMark, Serializable
private Instant oldestMessageTimestamp = Instant.now();
private transient List<Message> messages = new ArrayList<>();

private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@VisibleForTesting transient boolean discarded = false;

@VisibleForTesting final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

JmsCheckpointMark() {}

void add(Message message) throws Exception {
lock.writeLock().lock();
try {
if (discarded) {
throw new IllegalStateException(
String.format(
"Attempting to add message %s to checkpoint that is discarded.", message));
}
Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
if (currentMessageTimestamp.isBefore(oldestMessageTimestamp)) {
oldestMessageTimestamp = currentMessageTimestamp;
Expand All @@ -67,6 +75,15 @@ Instant getOldestMessageTimestamp() {
}
}

void discard() {
lock.writeLock().lock();
try {
this.discarded = true;
} finally {
lock.writeLock().unlock();
}
}

/**
* Acknowledge all outstanding message. Since we believe that messages will be delivered in
* timestamp order, and acknowledged messages will not be retried, the newest message in this
Expand All @@ -76,6 +93,10 @@ Instant getOldestMessageTimestamp() {
public void finalizeCheckpoint() {
lock.writeLock().lock();
try {
if (discarded) {
messages.clear();
return;
}
for (Message message : messages) {
try {
message.acknowledge();
Expand All @@ -98,6 +119,7 @@ private void readObject(java.io.ObjectInputStream stream)
throws IOException, ClassNotFoundException {
stream.defaultReadObject();
messages = new ArrayList<>();
discarded = false;
}

@Override
Expand Down
111 changes: 94 additions & 17 deletions sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
Expand All @@ -45,6 +47,7 @@
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.options.ExecutorOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
Expand Down Expand Up @@ -123,11 +126,13 @@
public class JmsIO {

private static final Logger LOG = LoggerFactory.getLogger(JmsIO.class);
private static final Duration DEFAULT_CLOSE_TIMEOUT = Duration.millis(60000L);

public static Read<JmsRecord> read() {
return new AutoValue_JmsIO_Read.Builder<JmsRecord>()
.setMaxNumRecords(Long.MAX_VALUE)
.setCoder(SerializableCoder.of(JmsRecord.class))
.setCloseTimeout(DEFAULT_CLOSE_TIMEOUT)
.setMessageMapper(
(MessageMapper<JmsRecord>)
new MessageMapper<JmsRecord>() {
Expand Down Expand Up @@ -162,7 +167,10 @@ public JmsRecord mapMessage(Message message) throws Exception {
}

public static <T> Read<T> readMessage() {
return new AutoValue_JmsIO_Read.Builder<T>().setMaxNumRecords(Long.MAX_VALUE).build();
return new AutoValue_JmsIO_Read.Builder<T>()
.setMaxNumRecords(Long.MAX_VALUE)
.setCloseTimeout(DEFAULT_CLOSE_TIMEOUT)
.build();
}

public static <EventT> Write<EventT> write() {
Expand Down Expand Up @@ -206,6 +214,8 @@ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>>

abstract @Nullable AutoScaler getAutoScaler();

abstract Duration getCloseTimeout();

abstract Builder<T> builder();

@AutoValue.Builder
Expand All @@ -230,6 +240,8 @@ abstract static class Builder<T> {

abstract Builder<T> setAutoScaler(AutoScaler autoScaler);

abstract Builder<T> setCloseTimeout(Duration closeTimeout);

abstract Read<T> build();
}

Expand Down Expand Up @@ -364,6 +376,18 @@ public Read<T> withAutoScaler(AutoScaler autoScaler) {
return builder().setAutoScaler(autoScaler).build();
}

/**
* Sets the amount of time to wait for callbacks from the runner stating that the output has
* been durably persisted before closing the connection to the JMS broker. Any callbacks that do
* not occur will cause unacknowledged messages to be returned to the JMS broker and redelivered
* to other clients.
*/
public Read<T> withCloseTimeout(Duration closeTimeout) {
checkArgument(closeTimeout != null, "closeTimeout can not be null");
checkArgument(closeTimeout.getMillis() >= 0, "Close timeout must be non-negative.");
return builder().setCloseTimeout(closeTimeout).build();
}

@Override
public PCollection<T> expand(PBegin input) {
checkArgument(getConnectionFactory() != null, "withConnectionFactory() is required");
Expand Down Expand Up @@ -446,7 +470,7 @@ public List<UnboundedJmsSource<T>> split(int desiredNumSplits, PipelineOptions o
@Override
public UnboundedJmsReader<T> createReader(
PipelineOptions options, JmsCheckpointMark checkpointMark) {
return new UnboundedJmsReader<T>(this, checkpointMark);
return new UnboundedJmsReader<T>(this, options);
}

@Override
Expand All @@ -471,15 +495,13 @@ static class UnboundedJmsReader<T> extends UnboundedReader<T> {

private T currentMessage;
private Instant currentTimestamp;
private PipelineOptions options;

public UnboundedJmsReader(UnboundedJmsSource<T> source, JmsCheckpointMark checkpointMark) {
public UnboundedJmsReader(UnboundedJmsSource<T> source, PipelineOptions options) {
this.source = source;
if (checkpointMark != null) {
this.checkpointMark = checkpointMark;
} else {
this.checkpointMark = new JmsCheckpointMark();
}
this.checkpointMark = new JmsCheckpointMark();
this.currentMessage = null;
this.options = options;
}

@Override
Expand Down Expand Up @@ -582,29 +604,84 @@ public long getTotalBacklogBytes() {
}

@Override
public void close() throws IOException {
public void close() {
doClose();
}

@SuppressWarnings("FutureReturnValueIgnored")
private void doClose() {

try {
if (consumer != null) {
consumer.close();
consumer = null;
closeAutoscaler();
closeConsumer();
ScheduledExecutorService executorService =
options.as(ExecutorOptions.class).getScheduledExecutorService();
executorService.schedule(
() -> {
LOG.debug(
"Closing session and connection after delay {}", source.spec.getCloseTimeout());
// Discard the checkpoints and set the reader as inactive
checkpointMark.discard();
closeSession();
closeConnection();
},
source.spec.getCloseTimeout().getMillis(),
TimeUnit.MILLISECONDS);

} catch (Exception e) {
LOG.error("Error closing reader", e);
}
}

private void closeConnection() {
try {
if (connection != null) {
connection.stop();
connection.close();
connection = null;
}
} catch (Exception e) {
LOG.error("Error closing connection", e);
}
}

private void closeSession() {
try {
if (session != null) {
session.close();
session = null;
}
if (connection != null) {
connection.stop();
connection.close();
connection = null;
} catch (Exception e) {
LOG.error("Error closing session" + e.getMessage(), e);
}
}

private void closeConsumer() {
try {
if (consumer != null) {
consumer.close();
consumer = null;
}
} catch (Exception e) {
LOG.error("Error closing consumer", e);
}
}

private void closeAutoscaler() {
try {
if (autoScaler != null) {
autoScaler.stop();
autoScaler = null;
}
} catch (Exception e) {
throw new IOException(e);
LOG.error("Error closing autoscaler", e);
}
}

@Override
protected void finalize() {
doClose();
}
}

/**
Expand Down
Loading

0 comments on commit 10977bf

Please sign in to comment.