Skip to content

Commit 73570c4

Browse files
authored
Merge branch 'master' into 1571-bring-dt-closer
2 parents ecade94 + 6e9ee5a commit 73570c4

File tree

12 files changed

+455
-7
lines changed

12 files changed

+455
-7
lines changed
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Copyright 2025 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.it.testcontainers.pubsub.outbox;
15+
16+
import io.dapr.client.DaprClient;
17+
import io.dapr.client.domain.ExecuteStateTransactionRequest;
18+
import io.dapr.client.domain.State;
19+
import io.dapr.client.domain.TransactionalStateOperation;
20+
import io.dapr.it.testcontainers.DaprClientFactory;
21+
import io.dapr.testcontainers.Component;
22+
import io.dapr.testcontainers.DaprContainer;
23+
import io.dapr.testcontainers.DaprLogLevel;
24+
import org.assertj.core.api.Assertions;
25+
import org.awaitility.Awaitility;
26+
import org.junit.jupiter.api.BeforeEach;
27+
import org.junit.jupiter.api.Tag;
28+
import org.junit.jupiter.api.Test;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
import org.springframework.boot.test.context.SpringBootTest;
32+
import org.springframework.test.context.DynamicPropertyRegistry;
33+
import org.springframework.test.context.DynamicPropertySource;
34+
import org.testcontainers.containers.Network;
35+
import org.testcontainers.containers.wait.strategy.Wait;
36+
import org.testcontainers.junit.jupiter.Container;
37+
import org.testcontainers.junit.jupiter.Testcontainers;
38+
39+
import java.time.Duration;
40+
import java.util.Collections;
41+
import java.util.List;
42+
import java.util.Map;
43+
import java.util.Random;
44+
45+
import static io.dapr.it.testcontainers.ContainerConstants.DAPR_RUNTIME_IMAGE_TAG;
46+
47+
@SpringBootTest(
48+
webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT,
49+
classes = {
50+
TestPubsubOutboxApplication.class
51+
}
52+
)
53+
@Testcontainers
54+
@Tag("testcontainers")
55+
public class DaprPubSubOutboxIT {
56+
57+
private static final Logger LOG = LoggerFactory.getLogger(DaprPubSubOutboxIT.class);
58+
private static final Network DAPR_NETWORK = Network.newNetwork();
59+
private static final Random RANDOM = new Random();
60+
private static final int PORT = RANDOM.nextInt(1000) + 8000;
61+
private static final String APP_FOUND_MESSAGE_PATTERN = ".*application discovered on port.*";
62+
63+
private static final String PUBSUB_APP_ID = "pubsub-dapr-app";
64+
private static final String PUBSUB_NAME = "pubsub";
65+
66+
// topics
67+
private static final String TOPIC_PRODUCT_CREATED = "product.created";
68+
private static final String STATE_STORE_NAME = "kvstore";
69+
70+
@Container
71+
private static final DaprContainer DAPR_CONTAINER = new DaprContainer(DAPR_RUNTIME_IMAGE_TAG)
72+
.withAppName(PUBSUB_APP_ID)
73+
.withNetwork(DAPR_NETWORK)
74+
.withComponent(new Component(STATE_STORE_NAME, "state.in-memory", "v1", Map.of(
75+
"outboxPublishPubsub", PUBSUB_NAME,
76+
"outboxPublishTopic", TOPIC_PRODUCT_CREATED
77+
)))
78+
.withComponent(new Component(PUBSUB_NAME, "pubsub.in-memory", "v1", Collections.emptyMap()))
79+
.withDaprLogLevel(DaprLogLevel.DEBUG)
80+
.withLogConsumer(outputFrame -> LOG.info(outputFrame.getUtf8String()))
81+
.withAppChannelAddress("host.testcontainers.internal")
82+
.withAppPort(PORT);
83+
84+
/**
85+
* Expose the Dapr ports to the host.
86+
*
87+
* @param registry the dynamic property registry
88+
*/
89+
@DynamicPropertySource
90+
static void daprProperties(DynamicPropertyRegistry registry) {
91+
registry.add("dapr.http.endpoint", DAPR_CONTAINER::getHttpEndpoint);
92+
registry.add("dapr.grpc.endpoint", DAPR_CONTAINER::getGrpcEndpoint);
93+
registry.add("server.port", () -> PORT);
94+
}
95+
96+
97+
@BeforeEach
98+
public void setUp() {
99+
org.testcontainers.Testcontainers.exposeHostPorts(PORT);
100+
}
101+
102+
103+
@Test
104+
public void shouldPublishUsingOutbox() throws Exception {
105+
Wait.forLogMessage(APP_FOUND_MESSAGE_PATTERN, 1).waitUntilReady(DAPR_CONTAINER);
106+
107+
try (DaprClient client = DaprClientFactory.createDaprClientBuilder(DAPR_CONTAINER).build()) {
108+
109+
ExecuteStateTransactionRequest transactionRequest = new ExecuteStateTransactionRequest(STATE_STORE_NAME);
110+
111+
Product pencil = new Product("Pencil", 1.50);
112+
State<Product> state = new State<>(
113+
pencil.getId(), pencil, null
114+
);
115+
116+
TransactionalStateOperation<Product> operation = new TransactionalStateOperation<>(
117+
TransactionalStateOperation.OperationType.UPSERT, state
118+
);
119+
120+
transactionRequest.setOperations(List.of(operation));
121+
122+
client.executeStateTransaction(transactionRequest).block();
123+
124+
Awaitility.await().atMost(Duration.ofSeconds(10))
125+
.ignoreExceptions()
126+
.untilAsserted(() -> Assertions.assertThat(ProductWebhookController.EVENT_LIST).isNotEmpty());
127+
}
128+
}
129+
130+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2025 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
package io.dapr.it.testcontainers.pubsub.outbox;
14+
15+
import java.util.UUID;
16+
17+
public class Product {
18+
private String id;
19+
private String name;
20+
private double price;
21+
22+
public Product() {
23+
}
24+
25+
public Product(String name, double price) {
26+
this.id = UUID.randomUUID().toString();
27+
this.name = name;
28+
this.price = price;
29+
}
30+
31+
public String getId() {
32+
return id;
33+
}
34+
35+
public void setId(String id) {
36+
this.id = id;
37+
}
38+
39+
public String getName() {
40+
return name;
41+
}
42+
43+
public void setName(String name) {
44+
this.name = name;
45+
}
46+
47+
public double getPrice() {
48+
return price;
49+
}
50+
51+
public void setPrice(double price) {
52+
this.price = price;
53+
}
54+
55+
@Override
56+
public String toString() {
57+
return "Product{" +
58+
"id='" + id + '\'' +
59+
", name='" + name + '\'' +
60+
", price=" + price +
61+
'}';
62+
}
63+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2025 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
package io.dapr.it.testcontainers.pubsub.outbox;
14+
15+
import io.dapr.Topic;
16+
import io.dapr.client.domain.CloudEvent;
17+
import org.springframework.web.bind.annotation.PostMapping;
18+
import org.springframework.web.bind.annotation.RequestBody;
19+
import org.springframework.web.bind.annotation.RequestMapping;
20+
import org.springframework.web.bind.annotation.RestController;
21+
22+
import java.util.List;
23+
import java.util.concurrent.CopyOnWriteArrayList;
24+
25+
@RestController
26+
@RequestMapping("/webhooks/products")
27+
public class ProductWebhookController {
28+
29+
public static final List<CloudEvent<Product>> EVENT_LIST = new CopyOnWriteArrayList<>();
30+
31+
@PostMapping("/created")
32+
@Topic(name = "product.created", pubsubName = "pubsub")
33+
public void handleEvent(@RequestBody CloudEvent cloudEvent) {
34+
System.out.println("Received product.created event: " + cloudEvent.getData());
35+
EVENT_LIST.add(cloudEvent);
36+
}
37+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2025 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
package io.dapr.it.testcontainers.pubsub.outbox;
14+
15+
import org.springframework.boot.SpringApplication;
16+
import org.springframework.boot.autoconfigure.SpringBootApplication;
17+
18+
@SpringBootApplication
19+
public class TestPubsubOutboxApplication {
20+
public static void main(String[] args) {
21+
SpringApplication.run(TestPubsubOutboxApplication.class, args);
22+
}
23+
}

sdk/src/main/java/io/dapr/client/AbstractDaprClient.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import java.util.ArrayList;
5252
import java.util.Arrays;
5353
import java.util.Collections;
54+
import java.util.HashMap;
5455
import java.util.List;
5556
import java.util.Map;
5657
import java.util.stream.Collectors;
@@ -509,6 +510,27 @@ public Mono<Void> saveState(String storeName, String key, String etag, Object va
509510
return this.saveBulkState(storeName, Collections.singletonList(state));
510511
}
511512

513+
/**
514+
* {@inheritDoc}
515+
*/
516+
@Override
517+
public Mono<Void> saveState(String storeName, String key, String etag, Object value, Map<String, String> meta,
518+
StateOptions options) {
519+
Map<String, String> metaCopy = null;
520+
if (meta == null) {
521+
metaCopy = new HashMap<>();
522+
} else {
523+
metaCopy = new HashMap<>(meta);
524+
}
525+
526+
if (value != null) {
527+
metaCopy.putIfAbsent("contentType", stateSerializer.getContentType());
528+
}
529+
530+
State<?> state = new State<>(key, value, etag, metaCopy, options);
531+
return this.saveBulkState(storeName, Collections.singletonList(state));
532+
}
533+
512534
/**
513535
* {@inheritDoc}
514536
*/

sdk/src/main/java/io/dapr/client/DaprClient.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,21 @@ Mono<Void> executeStateTransaction(String storeName,
498498
*/
499499
Mono<Void> saveState(String storeName, String key, String etag, Object value, StateOptions options);
500500

501+
502+
/**
503+
* Save/Update a state.
504+
*
505+
* @param storeName The name of the state store.
506+
* @param key The key of the state.
507+
* @param etag The etag to be used.
508+
* @param value The value of the state.
509+
* @param meta The metadata to be set to the state.
510+
* @param options The Options to use for each state.
511+
* @return a Mono plan of type Void.
512+
*/
513+
Mono<Void> saveState(String storeName, String key, String etag, Object value, Map<String, String> meta,
514+
StateOptions options);
515+
501516
/**
502517
* Delete a state.
503518
*

sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1241,6 +1241,64 @@ public void saveStateNoOptionsTest() {
12411241
result.block();
12421242
}
12431243

1244+
@Test
1245+
public void saveStateWithMetaTest() {
1246+
String key = "key1";
1247+
String etag = "ETag1";
1248+
String value = "State value";
1249+
Map<String, String> metadata = new HashMap<>();
1250+
metadata.put("custom", "customValue");
1251+
ArgumentCaptor<DaprProtos.SaveStateRequest> argument = ArgumentCaptor.forClass(DaprProtos.SaveStateRequest.class);
1252+
doAnswer((Answer<Void>) invocation -> {
1253+
StreamObserver<Empty> observer = (StreamObserver<Empty>) invocation.getArguments()[1];
1254+
observer.onNext(Empty.getDefaultInstance());
1255+
observer.onCompleted();
1256+
return null;
1257+
}).when(daprStub).saveState(argument.capture(), any());
1258+
1259+
1260+
Mono<Void> result = client.saveState(STATE_STORE_NAME, key, etag, value, metadata,null);
1261+
result.block();
1262+
assertEquals("customValue", argument.getValue().getStates(0).getMetadata().get("custom"));
1263+
}
1264+
1265+
@Test
1266+
public void saveStateWithMetaContentTypeTest() {
1267+
String key = "key1";
1268+
String etag = "ETag1";
1269+
String value = "State value";
1270+
Map<String, String> metadata = new HashMap<>();
1271+
ArgumentCaptor<DaprProtos.SaveStateRequest> argument = ArgumentCaptor.forClass(DaprProtos.SaveStateRequest.class);
1272+
doAnswer((Answer<Void>) invocation -> {
1273+
StreamObserver<Empty> observer = (StreamObserver<Empty>) invocation.getArguments()[1];
1274+
observer.onNext(Empty.getDefaultInstance());
1275+
observer.onCompleted();
1276+
return null;
1277+
}).when(daprStub).saveState(argument.capture(), any());
1278+
1279+
1280+
Mono<Void> result = client.saveState(STATE_STORE_NAME, key, etag, value, metadata,null);
1281+
result.block();
1282+
assertEquals("application/json", argument.getValue().getStates(0).getMetadata().get("contentType"));
1283+
}
1284+
1285+
@Test
1286+
public void saveStateWithMetaEmptyTest() {
1287+
String key = "key1";
1288+
String etag = "ETag1";
1289+
ArgumentCaptor<DaprProtos.SaveStateRequest> argument = ArgumentCaptor.forClass(DaprProtos.SaveStateRequest.class);
1290+
doAnswer((Answer<Void>) invocation -> {
1291+
StreamObserver<Empty> observer = (StreamObserver<Empty>) invocation.getArguments()[1];
1292+
observer.onNext(Empty.getDefaultInstance());
1293+
observer.onCompleted();
1294+
return null;
1295+
}).when(daprStub).saveState(argument.capture(), any());
1296+
1297+
Mono<Void> result = client.saveState(STATE_STORE_NAME, key, etag, null, null,null);
1298+
result.block();
1299+
assertTrue(argument.getValue().getStates(0).getMetadata().keySet().isEmpty());
1300+
}
1301+
12441302
@Test
12451303
public void saveStateTest() {
12461304
String key = "key1";

0 commit comments

Comments
 (0)