-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#25887] fix(JmsIO): issue with multiple connection open #25887 #25945
Conversation
@Abacn I pushed this PR to fix the issue and more integration test. We are working on having a pool connection to determine the connection that we need. Something like this. private static class JmsConnectionPool<T> implements Serializable {
private static final long serialVersionUID = 1L;
private static final int DEFAULT_MAX_POOL_SIZE = 10;
private static final int DEFAULT_INITIAL_POOL_SIZE = 20;
private JmsIO.Write<T> spec;
private final int maxPoolSize;
private final int initialPoolSize;
private List<JmsConnection<T>> jmsConnectionPool;
private List<JmsConnection<T>> usedJmsConnections = new ArrayList<>();
JmsConnectionPool(JmsIO.Write<T> spec, List<JmsConnection<T>> jmsConnectionPool) {
this.spec = spec;
this.jmsConnectionPool = jmsConnectionPool;
this.maxPoolSize = Optional.ofNullable(spec.getMaxPoolSize()).orElse(DEFAULT_MAX_POOL_SIZE);
this.initialPoolSize = Optional.ofNullable(spec.getInitialPoolSize()).orElse(DEFAULT_INITIAL_POOL_SIZE);
}
static <T> JmsConnectionPool<T> create(JmsIO.Write<T> spec) {
int initialPoolSize = Optional.ofNullable(spec.getInitialPoolSize()).orElse(DEFAULT_INITIAL_POOL_SIZE);
List<JmsConnection<T>> jmsConnectionPool = new ArrayList<>(initialPoolSize);
for (int i = 0; i < initialPoolSize; i++) {
jmsConnectionPool.add(new JmsConnection<>(spec));
}
return new JmsConnectionPool<>(spec, jmsConnectionPool);
}
JmsConnection<T> getConnection() throws JmsIOException {
if (jmsConnectionPool.isEmpty()) {
if (usedJmsConnections.size() < maxPoolSize) {
jmsConnectionPool.add(new JmsConnection<>(spec));
} else {
throw new JmsIOException("Maximum pool connection size has been reached");
}
}
JmsConnection<T> jmsConnection = jmsConnectionPool
.remove(jmsConnectionPool.size() - 1);
usedJmsConnections.add(jmsConnection);
return jmsConnection;
}
public boolean releaseConnection(JmsConnection<T> jmsConnection) {
jmsConnectionPool.add(jmsConnection);
return usedJmsConnections.remove(jmsConnection);
}
public boolean closeConnection(JmsConnection<T> jmsConnection) {
jmsConnection.close();
jmsConnectionPool.remove(jmsConnection);
return usedJmsConnections.remove(jmsConnection);
}
public void shutdown() throws JMSException {
usedJmsConnections.forEach(this::releaseConnection);
for (JmsConnection<T> jmsConnection : jmsConnectionPool) {
jmsConnection.close();
}
jmsConnectionPool.clear();
}
} The function |
Codecov Report
@@ Coverage Diff @@
## master #25945 +/- ##
==========================================
- Coverage 71.42% 71.37% -0.06%
==========================================
Files 782 783 +1
Lines 102856 102946 +90
==========================================
+ Hits 73470 73473 +3
- Misses 27910 27997 +87
Partials 1476 1476
Flags with carried forward coverage won't be shown. Click here to find out more. see 12 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
@Abacn I had an issue with source compilation. The client that I used qpid-jms-client is compiled using JDK 11 but the tests are running using JDK 8. I downgraded the version to 0.61.0 compiled using JDK 8 in the commit 73bb7f8. |
The forward fix is heavy. To avoid introducing new risk is it possible to have a lightweight fix at first? For example, Does only close the producer and session at finishBundle (but keep session and connection open) mitigate the pressure to the server? Per documentation "A connection could represent an open TCP/IP socket between a client and a provider service daemon." So the port use should be per connection base. Connection Pool is a feature request and can be considered later. |
Also, integration test goes to JmsIOIT and has its own gradle task. The guideline of Beam website may help with design and implement integration tests: https://beam.apache.org/documentation/io/io-standards/#integration-tests |
And about the creating of connection, should we keep it at setup or startBundle ?
Yes, it would be nice to have. |
Assigning reviewers. If you would like to opt out of this review, comment R: @kennknowles for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
At setup and destroy in tearDown, as the original implementation. Then only one port will be used for one worker thread. |
Reminder, please take a look at this pr: @kennknowles @damccorm @ahmedabu98 |
- Issue related to multiple connection being open for each bundle - Add integration test using jms-qpid for JmsIO Fixes #25887 Co-Authored-By: Amrane Ait Zeouay <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. the changes on main scope looks good to me. For the test left a couple of comments. If we want the integration test running on ci, we'll need to add the task here:
jms: [
':sdks:java:io:jms:integrationTest',
],
def additionalTasks = [ |
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/CommonJms.java
Show resolved
Hide resolved
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
Outdated
Show resolved
Hide resolved
…ng it #25887 Fixes #25887 Co-Authored-By: Amrane Ait Zeouay <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, just have a few comments about clean up. I think we can still get it in by the 2.47.0 cut which is tomorrow. Have you tested in your environement that this PR resolves the port occupying issue?
Verified integration test locally:
Test | Duration | Result
-- | -- | --
testPublishingThenReadingAll[with client class class org.apache.activemq.ActiveMQConnectionFactory] | 33.851s | passed
testPublishingThenReadingAll[with client class class org.apache.qpid.jms.JmsConnectionFactory] | 34.059s | passed
@@ -1026,7 +1027,7 @@ public void start() throws JMSException { | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may replace the direct assignment of producer with startProducer() thus make it clear that producer is opened in single code path.
Also, I see " isProducerNeedsToBeCreated" is removed in several places and connect() is only called in DoFn's setup. Can we get rid of this flag now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We added the flag in case of a failed connection it will create a new connection based on exceptionListener. Do you think it would be better to check with the producer if it's null ?
if (producer == null) {
// open connection
connection.setExceptionListener(exception -> {
connectionCounter.inc();
// if there is an issue with the connection, we will close session, connection & producer so it can be recreated it
close();
});
// create new producer
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both is fine, if the flag is used then it should be consistent. For now if closeProducer() is called, producer is closed and set to null, but isProducerNeedsToBeCreated is still false.
|
||
void close() { | ||
try { | ||
if (producer != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may call closeProducer() here. Making producer is opened / closed by single code path
When the version 2.47.0 will be published? So I can know if I will have time to test in my environment. For the issue with new connection it was fixed. |
Release cut is tomorrow, Apr 5th. Release date targetted to early May. |
I'm not really confident about it. I didn't test the republish functionality which is a major thing for us. The problem that I can see is that in case of a failed connection, the DoFn thread will retry the bundle for X times during a duration Y. But, there is no recreating the connection again. I can suggest two possible ways to do it: 1 - inside of the exception listener connection.setExceptionListener(exception -> {
failedConnectionCounter.inc();
// Make sure to free the failed connection
close();
// Recreate a new connection with a new producer
connect();
}); 2 - inside of the loop connection.setExceptionListener(exception -> {
failedConnectionCounter.inc();
// Make sure to free the failed connection
close();
});
...
void connect () {
if (producer == null) {
....
}
}
...
void publishMessage() {
while(true) {
//Recreate a connection if the producer is null
connect();
...
}
} What do you think about this? |
Generally, the backOff retry intermittent error (that a retry in place can success). If your concern is the persistent error (a dead connection) it should be already handled by runner. That is, when the exception is popping up, the runner will create a new work item, which includes initializing a new DoFn instance, to retry the bundle. Handling all errors within DoFn, theoretically, it works, but not encouraged because it adds complexity to the implementation. If republish need more test, can we aim to fix the original issue (high port occupacy) that was a regression in 2.46.0 at the moment? |
For the comment #25945 (comment)
is there a potential racing condition to operate connection inside connection.exceptionListener?
This looks good at first glance. Though I still suggest to improvement one thing at a time, to get at lease a stable version first. Also, if we call close and connect inside |
…25887 Fixes #25887 Co-Authored-By: Amrane Ait Zeouay <[email protected]>
I will try to see which solution have better performance. Should we add it with PoolConnection ? Just FYI, when there is an exception while publishing, the bundle is not republished because the message is sent to an output. That was the previous implementation and I wanted to keep backward compatibility. So if we remove that, we can just throw an exception and let Dataflow to retry the bundle without the retry policy. |
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOIT.java
Outdated
Show resolved
Hide resolved
Fixes #25887 Co-Authored-By: Amrane Ait Zeouay <[email protected]>
test passed though not reflected on GitHub UI. Merging for now |
Fixes #25887
Related to PR#25886
There are two points handled in this PR:
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.