88import java .util .concurrent .CompletionStage ;
99import java .util .concurrent .CountDownLatch ;
1010import java .util .concurrent .TimeUnit ;
11+ import java .util .function .BiFunction ;
1112
1213import org .hibernate .SessionFactory ;
1314import org .hibernate .boot .registry .StandardServiceRegistry ;
1415import org .hibernate .boot .registry .StandardServiceRegistryBuilder ;
1516import org .hibernate .cfg .Configuration ;
17+ import org .hibernate .reactive .annotations .DisabledFor ;
1618import org .hibernate .reactive .provider .ReactiveServiceRegistryBuilder ;
1719import org .hibernate .reactive .stage .Stage ;
1820import org .hibernate .reactive .util .impl .CompletionStages ;
1921import org .hibernate .reactive .vertx .VertxInstance ;
2022
21- import org .junit .jupiter .api .AfterAll ;
22- import org .junit .jupiter .api .BeforeAll ;
23+ import org .junit .jupiter .api .AfterEach ;
24+ import org .junit .jupiter .api .BeforeEach ;
2325import org .junit .jupiter .api .Test ;
2426import org .junit .jupiter .api .TestInstance ;
2527import org .junit .jupiter .api .extension .ExtendWith ;
4143import static org .assertj .core .api .Assertions .fail ;
4244import static org .hibernate .cfg .AvailableSettings .SHOW_SQL ;
4345import static org .hibernate .reactive .BaseReactiveTest .setDefaultProperties ;
46+ import static org .hibernate .reactive .containers .DatabaseConfiguration .DBType .DB2 ;
4447import static org .hibernate .reactive .provider .Settings .POOL_CONNECT_TIMEOUT ;
4548import static org .hibernate .reactive .util .impl .CompletionStages .failedFuture ;
4649import static org .hibernate .reactive .util .impl .CompletionStages .loop ;
@@ -101,8 +104,8 @@ public class MultithreadedInsertionWithLazyConnectionTest {
101104 private static Vertx vertx ;
102105 private static SessionFactory sessionFactory ;
103106
104- @ BeforeAll
105- public static void setupSessionFactory () {
107+ @ BeforeEach
108+ public void setupSessionFactory () {
106109 vertx = Vertx .vertx ( getVertxOptions () );
107110 Configuration configuration = new Configuration ();
108111 setDefaultProperties ( configuration );
@@ -130,8 +133,8 @@ private static VertxOptions getVertxOptions() {
130133 return vertxOptions ;
131134 }
132135
133- @ AfterAll
134- public static void closeSessionFactory () {
136+ @ AfterEach
137+ public void closeSessionFactory () {
135138 stageSessionFactory .close ();
136139 }
137140
@@ -140,8 +143,33 @@ public void testIdentityGenerator(VertxTestContext context) {
140143 final DeploymentOptions deploymentOptions = new DeploymentOptions ();
141144 deploymentOptions .setInstances ( N_THREADS );
142145
146+ // We are not using transactions on purpose here, because this approach will cause a context switch
147+ // and an assertion error if things aren't handled correctly.
148+ // See Hibernate Reactive issue #2768: https://github.com/hibernate/hibernate-reactive/issues/2768
143149 vertx
144- .deployVerticle ( InsertEntitiesVerticle ::new , deploymentOptions )
150+ .deployVerticle ( () -> new InsertEntitiesVerticle ( (s , entity ) -> s
151+ .persist ( entity )
152+ .thenCompose ( v -> s .flush () )
153+ .thenAccept ( v -> s .clear () ) ), deploymentOptions
154+ )
155+ .onSuccess ( res -> {
156+ endLatch .waitForEveryone ();
157+ context .completeNow ();
158+ } )
159+ .onFailure ( context ::failNow )
160+ .eventually ( () -> vertx .close () );
161+ }
162+
163+ @ Test
164+ @ DisabledFor (value = DB2 , reason = "Exception: IllegalStateException: Needed to have 6 in buffer but only had 0" )
165+ public void testIdentityGeneratorWithTransaction (VertxTestContext context ) {
166+ final DeploymentOptions deploymentOptions = new DeploymentOptions ();
167+ deploymentOptions .setInstances ( N_THREADS );
168+ vertx
169+ .deployVerticle (
170+ () -> new InsertEntitiesVerticle ( (s , entity ) -> s
171+ .withTransaction ( t -> s .persist ( entity ) ) ), deploymentOptions
172+ )
145173 .onSuccess ( res -> {
146174 endLatch .waitForEveryone ();
147175 context .completeNow ();
@@ -152,9 +180,12 @@ public void testIdentityGenerator(VertxTestContext context) {
152180
153181 private static class InsertEntitiesVerticle extends AbstractVerticle {
154182
183+ final BiFunction <Stage .Session , EntityWithGeneratedId , CompletionStage <Void >> insertFun ;
184+
155185 int sequentialOperation = 0 ;
156186
157- public InsertEntitiesVerticle () {
187+ public InsertEntitiesVerticle (BiFunction <Stage .Session , EntityWithGeneratedId , CompletionStage <Void >> insertFun ) {
188+ this .insertFun = insertFun ;
158189 }
159190
160191 @ Override
@@ -196,9 +227,8 @@ private CompletionStage<Void> storeEntity(Stage.Session s) {
196227 final int localVerticleOperationSequence = sequentialOperation ++;
197228 final EntityWithGeneratedId entity = new EntityWithGeneratedId ();
198229 entity .name = beforeOperationThread + "__" + localVerticleOperationSequence ;
199-
200- return s
201- .withTransaction ( t -> s .persist ( entity ) )
230+ return insertFun
231+ .apply ( s , entity )
202232 .thenCompose ( v -> beforeOperationThread != Thread .currentThread ()
203233 ? failedFuture ( new IllegalStateException ( "Detected an unexpected switch of carrier threads!" ) )
204234 : voidFuture () );
0 commit comments