diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java index a55d8a0a4217..63509126022b 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java @@ -209,7 +209,7 @@ * *
See {@link Read#withSessionServiceFactory(SessionServiceFactory)} for session authentication. * The connector provides implementation of the {@link SessionServiceFactory} using the Basic - * Authentication: {@link org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionService}. + * Authentication: {@link org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionServiceFactory}. * *
For the authentication to the SEMP API ({@link Read#withSempClientFactory(SempClientFactory)})
* the connector provides {@link org.apache.beam.sdk.io.solace.broker.BasicAuthSempClientFactory} to
@@ -639,9 +639,8 @@ public Read An existing implementation of the SempClientFactory includes {@link
- * org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionService} which implements the Basic
- * Authentication to Solace. *
+ * The {@link BasicAuthJcsmpSessionServiceFactory} is an existing implementation of the
+ * {@link SessionServiceFactory} which implements the Basic Authentication to Solace.
*
* To use it, specify the credentials with the builder methods. *
*
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java
index 199dcccee854..5c3952e6b30f 100644
--- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java
@@ -20,13 +20,14 @@
import static org.apache.beam.sdk.io.solace.broker.SessionService.DEFAULT_VPN_NAME;
import com.google.auto.value.AutoValue;
+import com.solacesystems.jcsmp.JCSMPProperties;
/**
- * A factory for creating {@link BasicAuthJcsmpSessionService} instances. Extends {@link
+ * A factory for creating {@link JcsmpSessionService} instances. Extends {@link
* SessionServiceFactory}.
*
- * This factory provides a way to create {@link BasicAuthJcsmpSessionService} instances with
- * authenticate to Solace with Basic Authentication.
+ * This factory provides a way to create {@link JcsmpSessionService} that use Basic
+ * Authentication.
*/
@AutoValue
public abstract class BasicAuthJcsmpSessionServiceFactory extends SessionServiceFactory {
@@ -69,15 +70,14 @@ public abstract static class Builder {
@Override
public SessionService create() {
- BasicAuthJcsmpSessionService.Builder builder = BasicAuthJcsmpSessionService.builder();
- if (queue != null) {
- builder = builder.queueName(queue.getName());
- }
- return builder
- .host(host())
- .username(username())
- .password(password())
- .vpnName(vpnName())
- .build();
+ JCSMPProperties jcsmpProperties = new JCSMPProperties();
+ jcsmpProperties.setProperty(JCSMPProperties.VPN_NAME, vpnName());
+ jcsmpProperties.setProperty(
+ JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_BASIC);
+ jcsmpProperties.setProperty(JCSMPProperties.USERNAME, username());
+ jcsmpProperties.setProperty(JCSMPProperties.PASSWORD, password());
+ jcsmpProperties.setProperty(JCSMPProperties.HOST, host());
+
+ return JcsmpSessionService.create(jcsmpProperties, getQueue());
}
}
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/JcsmpSessionService.java
similarity index 74%
rename from sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java
rename to sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/JcsmpSessionService.java
index d4c9a3ec6210..818368a92b9f 100644
--- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/JcsmpSessionService.java
@@ -43,45 +43,16 @@
/**
* A class that manages a connection to a Solace broker using basic authentication.
*
- * This class provides a way to connect to a Solace broker and receive messages from a queue. The
- * connection is established using basic authentication.
+ * This class provides a way to connect to a Solace broker and receive messages from a queue.
*/
@AutoValue
-public abstract class BasicAuthJcsmpSessionService extends SessionService {
+public abstract class JcsmpSessionService extends SessionService {
- /** The name of the queue to receive messages from. */
- public abstract @Nullable String queueName();
+ /** JCSMP properties used to establish the connection. */
+ abstract JCSMPProperties jcsmpProperties();
- /** The host name or IP address of the Solace broker. Format: Host[:Port] */
- public abstract String host();
-
- /** The username to use for authentication. */
- public abstract String username();
-
- /** The password to use for authentication. */
- public abstract String password();
-
- /** The name of the VPN to connect to. */
- public abstract String vpnName();
-
- public static Builder builder() {
- return new AutoValue_BasicAuthJcsmpSessionService.Builder().vpnName(DEFAULT_VPN_NAME);
- }
-
- @AutoValue.Builder
- public abstract static class Builder {
- public abstract Builder queueName(@Nullable String queueName);
-
- public abstract Builder host(String host);
-
- public abstract Builder username(String username);
-
- public abstract Builder password(String password);
-
- public abstract Builder vpnName(String vpnName);
-
- public abstract BasicAuthJcsmpSessionService build();
- }
+ /** The Queue to receive messages from. */
+ abstract @Nullable Queue queue();
@Nullable private transient JCSMPSession jcsmpSession;
@Nullable private transient MessageReceiver messageReceiver;
@@ -90,9 +61,19 @@ public abstract static class Builder {
new ConcurrentLinkedQueue<>();
private final RetryCallableManager retryCallableManager = RetryCallableManager.create();
+ public static JcsmpSessionService create(JCSMPProperties jcsmpProperties, @Nullable Queue queue) {
+ return new AutoValue_JcsmpSessionService(jcsmpProperties, queue);
+ }
+
+ @Override
+ public JCSMPProperties getSessionProperties() {
+ return jcsmpProperties();
+ }
+
@Override
public void connect() {
- retryCallableManager.retryCallable(this::connectSession, ImmutableSet.of(JCSMPException.class));
+ retryCallableManager.retryCallable(
+ this::connectReadSession, ImmutableSet.of(JCSMPException.class));
}
@Override
@@ -158,10 +139,7 @@ private MessageProducer createXMLMessageProducer(SubmissionMode submissionMode)
}
private MessageReceiver createFlowReceiver() throws JCSMPException, IOException {
-
- Queue queue =
- JCSMPFactory.onlyInstance()
- .createQueue(checkStateNotNull(queueName(), "SolaceIO.Read: Queue is not set."));
+ Queue queue = checkStateNotNull(queue(), "SolaceIO.Read: Queue is not set.");
ConsumerFlowProperties flowProperties = new ConsumerFlowProperties();
flowProperties.setEndpoint(queue);
@@ -190,9 +168,9 @@ private static FlowReceiver createFlowReceiver(
return jcsmpSession.createFlow(null, flowProperties, endpointProperties);
}
- private int connectSession() throws JCSMPException {
+ private int connectReadSession() throws JCSMPException {
if (jcsmpSession == null) {
- jcsmpSession = createSessionObject();
+ jcsmpSession = createReadSessionObject();
}
jcsmpSession.connect();
return 0;
@@ -206,25 +184,12 @@ private int connectWriteSession(SubmissionMode mode) throws JCSMPException {
return 0;
}
- private JCSMPSession createSessionObject() throws InvalidPropertiesException {
- JCSMPProperties properties = initializeSessionProperties(new JCSMPProperties());
- return JCSMPFactory.onlyInstance().createSession(properties);
+ private JCSMPSession createReadSessionObject() throws InvalidPropertiesException {
+ return JCSMPFactory.onlyInstance().createSession(jcsmpProperties());
}
private JCSMPSession createWriteSessionObject(SubmissionMode mode)
throws InvalidPropertiesException {
return JCSMPFactory.onlyInstance().createSession(initializeWriteSessionProperties(mode));
}
-
- @Override
- public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProps) {
- baseProps.setProperty(JCSMPProperties.VPN_NAME, vpnName());
-
- baseProps.setProperty(
- JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_BASIC);
- baseProps.setProperty(JCSMPProperties.USERNAME, username());
- baseProps.setProperty(JCSMPProperties.PASSWORD, password());
- baseProps.setProperty(JCSMPProperties.HOST, host());
- return baseProps;
- }
}
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java
index 6dcd0b652616..5342f86c13e1 100644
--- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java
@@ -32,8 +32,8 @@
* messaging system. It allows for establishing a connection, creating a message-receiver object,
* checking if the connection is closed or not, and gracefully closing the session.
*
- * Override this class and the method {@link #initializeSessionProperties(JCSMPProperties)} with
- * your specific properties, including all those related to authentication.
+ * Override this class and the method {@link #getSessionProperties()} with your specific
+ * properties, including all those related to authentication.
*
* The connector will call the method only once per session created, so you can perform
* relatively heavy operations in that method (e.g. connect to a store or vault to retrieve
@@ -70,8 +70,7 @@
* The connector ensures that no two threads will be calling that method at the same time, so you
* don't have to take any specific precautions to avoid race conditions.
*
- * For basic authentication, use {@link BasicAuthJcsmpSessionService} and {@link
- * BasicAuthJcsmpSessionServiceFactory}.
+ * For basic authentication, use {@link BasicAuthJcsmpSessionServiceFactory}.
*
* For other situations, you need to extend this class and implement the `equals` method, so two
* instances of your class can be compared by value. We recommend using AutoValue for that. For
@@ -143,11 +142,7 @@ public abstract class SessionService implements Serializable {
/**
* Override this method and provide your specific properties, including all those related to
- * authentication, and possibly others too. The {@code}baseProperties{@code} parameter sets the
- * Solace VPN to "default" if none is specified.
- *
- * You should add your properties to the parameter {@code}baseProperties{@code}, and return the
- * result.
+ * authentication, and possibly others too.
*
* The method will be used whenever the session needs to be created or refreshed. If you are
* setting credentials with expiration, just make sure that the latest available credentials (e.g.
@@ -160,7 +155,7 @@ public abstract class SessionService implements Serializable {
* href="https://docs.solace.com/API-Developer-Online-Ref-Documentation/java/com/solacesystems/jcsmp/JCSMPProperties.html">https://docs.solace.com/API-Developer-Online-Ref-Documentation/java/com/solacesystems/jcsmp/JCSMPProperties.html
*
*/
- public abstract JCSMPProperties initializeSessionProperties(JCSMPProperties baseProperties);
+ public abstract JCSMPProperties getSessionProperties();
/**
* You need to override this method to be able to compare these objects by value. We recommend
@@ -187,22 +182,10 @@ public abstract class SessionService implements Serializable {
* token), this method will be called again.
*/
public final JCSMPProperties initializeWriteSessionProperties(SolaceIO.SubmissionMode mode) {
- JCSMPProperties jcsmpProperties = initializeSessionProperties(getDefaultProperties());
+ JCSMPProperties jcsmpProperties = getSessionProperties();
return overrideConnectorProperties(jcsmpProperties, mode);
}
- private static JCSMPProperties getDefaultProperties() {
- JCSMPProperties props = new JCSMPProperties();
- props.setProperty(JCSMPProperties.VPN_NAME, DEFAULT_VPN_NAME);
- // Outgoing messages will have a sender timestamp field populated
- props.setProperty(JCSMPProperties.GENERATE_SEND_TIMESTAMPS, true);
- // Make XMLProducer safe to access from several threads. This is the default value, setting
- // it just in case.
- props.setProperty(JCSMPProperties.PUB_MULTI_THREAD, true);
-
- return props;
- }
-
/**
* This method overrides some properties for the broker session to prevent misconfiguration,
* taking into account how the write connector works.
@@ -210,6 +193,19 @@ private static JCSMPProperties getDefaultProperties() {
private static JCSMPProperties overrideConnectorProperties(
JCSMPProperties props, SolaceIO.SubmissionMode mode) {
+ if (props.getProperty(JCSMPProperties.VPN_NAME) == null) {
+ props.setProperty(JCSMPProperties.VPN_NAME, DEFAULT_VPN_NAME);
+ }
+ if (props.getProperty(JCSMPProperties.VPN_NAME) == null) {
+ // Outgoing messages will have a sender timestamp field populated
+ props.setProperty(JCSMPProperties.GENERATE_SEND_TIMESTAMPS, true);
+ }
+ if (props.getProperty(JCSMPProperties.VPN_NAME) == null) {
+ // Make XMLProducer safe to access from several threads. This is the default value, setting
+ // it just in case.
+ props.setProperty(JCSMPProperties.PUB_MULTI_THREAD, true);
+ }
+
// PUB_ACK_WINDOW_SIZE heavily affects performance when publishing persistent
// messages. It can be a value between 1 and 255. This is the batch size for the ack
// received from Solace. A value of 1 will have the lowest latency, but a very low
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
index bd1f3c23694d..fe49e3493657 100644
--- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
@@ -20,6 +20,7 @@
import com.solacesystems.jcsmp.Queue;
import java.io.Serializable;
import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode;
+import org.apache.beam.sdk.io.solace.data.Solace;
import org.checkerframework.checker.nullness.qual.Nullable;
/**
@@ -61,7 +62,7 @@ public abstract class SessionServiceFactory implements Serializable {
* This could be used to associate the created SessionService with a specific queue for message
* handling.
*/
- @Nullable Queue queue;
+ private @Nullable Queue queue;
/**
* The write submission mode. This is set when the writers are created. This property is used only
@@ -75,6 +76,33 @@ public abstract class SessionServiceFactory implements Serializable {
*/
public abstract SessionService create();
+ /**
+ * This method is called in the {@link
+ * org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)} method
+ * to set the Queue reference based on {@link
+ * org.apache.beam.sdk.io.solace.SolaceIO.Read#from(Solace.Queue)} or {@link
+ * org.apache.beam.sdk.io.solace.SolaceIO.Read#from(Solace.Topic)}. The queue can be retrieved in
+ * the classes that inherit {@link SessionServiceFactory} with the getter method {@link
+ * SessionServiceFactory#getQueue()}
+ */
+ public final void setQueue(Queue queue) {
+ this.queue = queue;
+ }
+
+ /**
+ * Getter for the queue. This is nullable, because at the construction time this reference is
+ * null. Once the pipeline is compiled and the {@link
+ * org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)} method
+ * is called, this reference is valid.
+ *
+ * @return a reference to the queue which is set with the {@link
+ * org.apache.beam.sdk.io.solace.SolaceIO.Read#from(Solace.Queue)} or {@link
+ * org.apache.beam.sdk.io.solace.SolaceIO.Read#from(Solace.Topic)}
+ */
+ public final @Nullable Queue getQueue() {
+ return queue;
+ }
+
/**
* You need to override this method to be able to compare these objects by value. We recommend
* using AutoValue for that.
@@ -89,15 +117,6 @@ public abstract class SessionServiceFactory implements Serializable {
@Override
public abstract int hashCode();
- /**
- * This method is called in the {@link
- * org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)} method
- * to set the Queue reference.
- */
- public void setQueue(Queue queue) {
- this.queue = queue;
- }
-
/**
* Called by the write connector to set the submission mode used to create the message producers.
*/
diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java
index 7631d32f63cc..f6bb67419541 100644
--- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java
+++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java
@@ -61,7 +61,7 @@ public void connect() {
}
@Override
- public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProperties) {
+ public JCSMPProperties getSessionProperties() {
throw new UnsupportedOperationException(exceptionMessage);
}
}
diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java
index 6d28bcefc84c..e888c62c8522 100644
--- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java
+++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java
@@ -102,11 +102,11 @@ public Queue