Skip to content

Commit 00c5ef6

Browse files
committed
[issue-60] Pravega txn API changes
1 parent a32b23e commit 00c5ef6

File tree

4 files changed

+6
-12
lines changed

4 files changed

+6
-12
lines changed

.gitattributes

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
*.java text
88
*.xml text
99
*.sh text
10+
*.properties text
1011

1112
# Files that should keep CRLF line endings
1213
*.bat text eol=crlf

gradle.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ mockitoVersion=1.10.19
1414

1515
# Version and base tags can be overridden at build time.
1616
connectorVersion=0.2.0-SNAPSHOT
17-
pravegaVersion=0.1.0
17+
pravegaVersion=0.2.0-SNAPSHOT
1818

1919
# These properties are only needed for publishing to maven central
2020
# Pravega Signing Key

src/main/java/io/pravega/connectors/flink/FlinkExactlyOncePravegaWriter.java

+4-10
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ public class FlinkExactlyOncePravegaWriter<T>
7070
private final String streamName;
7171

7272
private final long txnTimeoutMillis;
73-
private final long txnMaxTimeMillis;
7473
private final long txnGracePeriodMillis;
7574

7675
// ----------- runtime fields -----------
@@ -102,7 +101,7 @@ public FlinkExactlyOncePravegaWriter(
102101
final PravegaEventRouter<T> router) {
103102

104103
this(controllerURI, scope, streamName, serializationSchema, router,
105-
DEFAULT_TXN_TIMEOUT_MILLIS, DEFAULT_TXN_TIMEOUT_MILLIS, DEFAULT_TX_SCALE_GRACE_MILLIS);
104+
DEFAULT_TXN_TIMEOUT_MILLIS, DEFAULT_TX_SCALE_GRACE_MILLIS);
106105
}
107106

108107

@@ -115,8 +114,6 @@ public FlinkExactlyOncePravegaWriter(
115114
* @param serializationSchema The implementation for serializing every event into pravega's storage format.
116115
* @param router The implementation to extract the partition key from the event.
117116
* @param txnTimeoutMillis The number of milliseconds after which the transaction will be aborted.
118-
* @param txnMaxTimeMillis The maximum time (in milliseconds) to which transaction timeout may be
119-
* increased via the pingTransaction API.
120117
* @param txnGracePeriodMillis The maximum amount of time, in milliseconds, until which transaction may
121118
* remain active, after a scale operation has been initiated on the underlying stream.
122119
*/
@@ -127,7 +124,6 @@ public FlinkExactlyOncePravegaWriter(
127124
final SerializationSchema<T> serializationSchema,
128125
final PravegaEventRouter<T> router,
129126
final long txnTimeoutMillis,
130-
final long txnMaxTimeMillis,
131127
final long txnGracePeriodMillis) {
132128

133129
Preconditions.checkNotNull(controllerURI, "controllerURI");
@@ -136,7 +132,6 @@ public FlinkExactlyOncePravegaWriter(
136132
Preconditions.checkNotNull(serializationSchema, "serializationSchema");
137133
Preconditions.checkNotNull(router, "router");
138134
Preconditions.checkArgument(txnTimeoutMillis > 0, "txnTimeoutMillis must be > 0");
139-
Preconditions.checkArgument(txnMaxTimeMillis > 0, "txnMaxTimeMillis must be > 0");
140135
Preconditions.checkArgument(txnGracePeriodMillis > 0, "txnGracePeriodMillis must be > 0");
141136

142137
this.controllerURI = controllerURI;
@@ -146,7 +141,6 @@ public FlinkExactlyOncePravegaWriter(
146141
this.eventRouter = router;
147142

148143
this.txnTimeoutMillis = txnTimeoutMillis;
149-
this.txnMaxTimeMillis = txnMaxTimeMillis;
150144
this.txnGracePeriodMillis = txnGracePeriodMillis;
151145
}
152146

@@ -160,13 +154,13 @@ public void open(Configuration parameters) throws Exception {
160154
this.pravegaWriter = clientFactory.createEventWriter(
161155
this.streamName,
162156
serializer,
163-
EventWriterConfig.builder().build());
157+
EventWriterConfig.builder().transactionTimeoutTime(txnTimeoutMillis).transactionTimeoutScaleGracePeriod(txnGracePeriodMillis).build());
164158

165159
log.info("Initialized pravega writer for stream: {}/{} with controller URI: {}", this.scopeName,
166160
this.streamName, this.controllerURI);
167161

168162
// start the transaction that will hold the elements till the first checkpoint
169-
this.currentTxn = this.pravegaWriter.beginTxn(txnTimeoutMillis, txnMaxTimeMillis, txnGracePeriodMillis);
163+
this.currentTxn = this.pravegaWriter.beginTxn();
170164

171165
log.debug("{} - started first transaction '{}'", name(), this.currentTxn.getTxnId());
172166

@@ -227,7 +221,7 @@ public List<UUID> snapshotState(long checkpointId, long checkpointTime) throws E
227221
this.txnsPendingCommit.addLast(new TransactionAndCheckpoint<>(txn, checkpointId));
228222

229223
// start the next transaction for what comes after this checkpoint
230-
this.currentTxn = this.pravegaWriter.beginTxn(txnTimeoutMillis, txnMaxTimeMillis, txnGracePeriodMillis);
224+
this.currentTxn = this.pravegaWriter.beginTxn();
231225

232226
log.debug("{} - started new transaction '{}'", name(), this.currentTxn.getTxnId());
233227
log.debug("{} - storing pending transactions {}", name(), txnsPendingCommit);

src/test/java/io/pravega/connectors/flink/FlinkExactlyOncePravegaWriterTest.java

-1
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,6 @@ private static void runTest(
120120
new IntSerializer(),
121121
new IdentityRouter<>(),
122122
30 * 1000, // 30 secs timeout
123-
30 * 1000,
124123
30 * 1000)
125124
);
126125

0 commit comments

Comments
 (0)