10
10
import io .vertx .core .Vertx ;
11
11
import io .vertx .core .buffer .Buffer ;
12
12
import io .vertx .core .eventbus .Message ;
13
- import io .vertx .core .http .*;
13
+ import io .vertx .core .http .HttpClient ;
14
+ import io .vertx .core .http .HttpClientRequest ;
15
+ import io .vertx .core .http .HttpClientResponse ;
16
+ import io .vertx .core .http .HttpMethod ;
17
+ import io .vertx .core .http .HttpServerRequest ;
18
+ import io .vertx .core .http .HttpServerResponse ;
14
19
import io .vertx .core .http .impl .headers .HeadersMultiMap ;
15
20
import io .vertx .core .json .DecodeException ;
16
21
import io .vertx .core .json .JsonArray ;
26
31
import org .swisspush .gateleen .core .logging .LoggableResource ;
27
32
import org .swisspush .gateleen .core .logging .RequestLogger ;
28
33
import org .swisspush .gateleen .core .storage .ResourceStorage ;
29
- import org .swisspush .gateleen .core .util .*;
30
- import org .swisspush .gateleen .hook .queueingstrategy .*;
34
+ import org .swisspush .gateleen .core .util .CollectionContentComparator ;
35
+ import org .swisspush .gateleen .core .util .HttpHeaderUtil ;
36
+ import org .swisspush .gateleen .core .util .HttpRequestHeader ;
37
+ import org .swisspush .gateleen .core .util .HttpServerRequestUtil ;
38
+ import org .swisspush .gateleen .core .util .ResourcesUtils ;
39
+ import org .swisspush .gateleen .core .util .StatusCode ;
40
+ import org .swisspush .gateleen .hook .queueingstrategy .DefaultQueueingStrategy ;
41
+ import org .swisspush .gateleen .hook .queueingstrategy .DiscardPayloadQueueingStrategy ;
42
+ import org .swisspush .gateleen .hook .queueingstrategy .QueueingStrategy ;
43
+ import org .swisspush .gateleen .hook .queueingstrategy .QueueingStrategyFactory ;
44
+ import org .swisspush .gateleen .hook .queueingstrategy .ReducedPropagationQueueingStrategy ;
31
45
import org .swisspush .gateleen .hook .reducedpropagation .ReducedPropagationManager ;
32
46
import org .swisspush .gateleen .logging .LogAppenderRepository ;
33
47
import org .swisspush .gateleen .logging .LoggingResourceManager ;
36
50
import org .swisspush .gateleen .queue .queuing .QueueClient ;
37
51
import org .swisspush .gateleen .queue .queuing .QueueProcessor ;
38
52
import org .swisspush .gateleen .queue .queuing .RequestQueue ;
53
+ import org .swisspush .gateleen .queue .queuing .splitter .NoOpQueueSplitter ;
54
+ import org .swisspush .gateleen .queue .queuing .splitter .QueueSplitter ;
39
55
import org .swisspush .gateleen .routing .Router ;
40
56
import org .swisspush .gateleen .routing .Rule ;
41
57
import org .swisspush .gateleen .routing .RuleFactory ;
42
58
import org .swisspush .gateleen .validation .RegexpValidator ;
43
59
import org .swisspush .gateleen .validation .ValidationException ;
44
60
45
- import java .util .*;
61
+ import javax .annotation .Nonnull ;
62
+ import javax .annotation .Nullable ;
63
+ import java .util .ArrayList ;
64
+ import java .util .Comparator ;
65
+ import java .util .Iterator ;
66
+ import java .util .List ;
67
+ import java .util .Map ;
68
+ import java .util .Objects ;
69
+ import java .util .Optional ;
70
+ import java .util .Set ;
46
71
import java .util .concurrent .atomic .AtomicInteger ;
47
72
import java .util .function .Consumer ;
48
73
import java .util .regex .Pattern ;
@@ -126,6 +151,8 @@ public class HookHandler implements LoggableResource {
126
151
private final JsonSchema jsonSchemaHook ;
127
152
private int routeMultiplier ;
128
153
154
+ private final QueueSplitter queueSplitter ;
155
+
129
156
130
157
/**
131
158
* Creates a new HookHandler.
@@ -189,19 +216,28 @@ public HookHandler(Vertx vertx, HttpClient selfClient, final ResourceStorage sto
189
216
public HookHandler (Vertx vertx , HttpClient selfClient , final ResourceStorage storage ,
190
217
LoggingResourceManager loggingResourceManager , LogAppenderRepository logAppenderRepository , MonitoringHandler monitoringHandler ,
191
218
String userProfilePath , String hookRootUri , RequestQueue requestQueue , boolean listableRoutes ,
192
- ReducedPropagationManager reducedPropagationManager ) {
219
+ @ Nullable ReducedPropagationManager reducedPropagationManager ) {
193
220
this (vertx , selfClient , storage , loggingResourceManager , logAppenderRepository , monitoringHandler , userProfilePath , hookRootUri ,
194
221
requestQueue , listableRoutes , reducedPropagationManager , null , storage );
195
222
}
196
223
197
- public HookHandler (Vertx vertx , HttpClient selfClient , final ResourceStorage storage ,
224
+ public HookHandler (Vertx vertx , HttpClient selfClient , final ResourceStorage userProfileStorage ,
198
225
LoggingResourceManager loggingResourceManager , LogAppenderRepository logAppenderRepository , MonitoringHandler monitoringHandler ,
199
226
String userProfilePath , String hookRootUri , RequestQueue requestQueue , boolean listableRoutes ,
200
- ReducedPropagationManager reducedPropagationManager , Handler doneHandler , ResourceStorage hookStorage ) {
201
- this (vertx , selfClient , storage , loggingResourceManager , logAppenderRepository , monitoringHandler , userProfilePath , hookRootUri ,
227
+ ReducedPropagationManager reducedPropagationManager , @ Nullable Handler doneHandler , ResourceStorage hookStorage ) {
228
+ this (vertx , selfClient , userProfileStorage , loggingResourceManager , logAppenderRepository , monitoringHandler , userProfilePath , hookRootUri ,
202
229
requestQueue , listableRoutes , reducedPropagationManager , doneHandler , hookStorage , Router .DEFAULT_ROUTER_MULTIPLIER );
203
230
}
204
231
232
+ public HookHandler (Vertx vertx , HttpClient selfClient , final ResourceStorage userProfileStorage ,
233
+ LoggingResourceManager loggingResourceManager , LogAppenderRepository logAppenderRepository , MonitoringHandler monitoringHandler ,
234
+ String userProfilePath , String hookRootUri , RequestQueue requestQueue , boolean listableRoutes ,
235
+ ReducedPropagationManager reducedPropagationManager , @ Nullable Handler doneHandler , ResourceStorage hookStorage ,
236
+ int routeMultiplier ) {
237
+ this (vertx , selfClient , userProfileStorage , loggingResourceManager , logAppenderRepository , monitoringHandler , userProfilePath , hookRootUri ,
238
+ requestQueue , listableRoutes , reducedPropagationManager , doneHandler , hookStorage , routeMultiplier , new NoOpQueueSplitter ());
239
+ }
240
+
205
241
/**
206
242
* Creates a new HookHandler.
207
243
*
@@ -220,12 +256,14 @@ public HookHandler(Vertx vertx, HttpClient selfClient, final ResourceStorage sto
220
256
* @param routeMultiplier the multiplier that is applied to routes, this is typically the number of nodes in
221
257
* a cluster multiplied by the number of router instances within a node. Or in other words
222
258
* the number of {@link Router} instances within a cluster
259
+ * @param queueSplitter Configured QueueSplitter or NoOpQueueSplitter which dispatches to subqueues for
260
+ * parallel operation.
223
261
*/
224
262
public HookHandler (Vertx vertx , HttpClient selfClient , final ResourceStorage userProfileStorage ,
225
263
LoggingResourceManager loggingResourceManager , LogAppenderRepository logAppenderRepository , MonitoringHandler monitoringHandler ,
226
264
String userProfilePath , String hookRootUri , RequestQueue requestQueue , boolean listableRoutes ,
227
- ReducedPropagationManager reducedPropagationManager , Handler doneHandler , ResourceStorage hookStorage ,
228
- int routeMultiplier ) {
265
+ ReducedPropagationManager reducedPropagationManager , @ Nullable Handler doneHandler , ResourceStorage hookStorage ,
266
+ int routeMultiplier , @ Nonnull QueueSplitter queueSplitter ) {
229
267
log .debug ("Creating HookHandler ..." );
230
268
this .vertx = vertx ;
231
269
this .selfClient = selfClient ;
@@ -244,7 +282,7 @@ public HookHandler(Vertx vertx, HttpClient selfClient, final ResourceStorage use
244
282
this .doneHandler = doneHandler ;
245
283
this .hookStorage = hookStorage ;
246
284
this .routeMultiplier = routeMultiplier ;
247
-
285
+ this . queueSplitter = queueSplitter ;
248
286
String hookSchema = ResourcesUtils .loadResource ("gateleen_hooking_schema_hook" , true );
249
287
jsonSchemaHook = JsonSchemaFactory .getInstance ().getSchema (hookSchema );
250
288
}
@@ -605,11 +643,11 @@ private boolean createListingIfRequested(final HttpServerRequest request) {
605
643
606
644
request .response ().headers ().remove (HOOK_ROUTES_LISTED );
607
645
608
- // if everything is fine, we add the listed collections to the given array
609
- if (response .statusCode () == StatusCode .OK .getStatusCode ()) {
610
- if (log .isTraceEnabled ()) {
611
- log .trace ("createListingIfRequested > use existing array" );
612
- }
646
+ // if everything is fine, we add the listed collections to the given array
647
+ if (response .statusCode () == StatusCode .OK .getStatusCode ()) {
648
+ if (log .isTraceEnabled ()) {
649
+ log .trace ("createListingIfRequested > use existing array" );
650
+ }
613
651
614
652
response .handler (data -> {
615
653
JsonObject responseObject = new JsonObject (data .toString ());
@@ -731,7 +769,7 @@ private void installBodyHandler(final RoutingContext ctx, final List<Listener> l
731
769
/**
732
770
* Calls the passed listeners and passes the given handler to the enqueued listener requests.
733
771
*
734
- * @param ctx original request context
772
+ * @param ctx original request context
735
773
* @param buffer buffer
736
774
* @param filteredListeners all listeners which should be called
737
775
* @param handler the handler, which should handle the requests
@@ -783,11 +821,13 @@ private void callListener(RoutingContext ctx, final Buffer buffer, final List<Li
783
821
// if there is an x-queue header (after applying the header manipulator chain!),
784
822
// then directly enqueue to this queue - else enqueue to a queue named alike this listener hook
785
823
String queue = queueHeaders .get (X_QUEUE );
824
+
786
825
if (queue == null ) {
787
826
queue = LISTENER_QUEUE_PREFIX + "-" + listener .getListenerId (); // default queue name for this listener hook
788
827
} else {
789
828
queueHeaders .remove (X_QUEUE ); // remove the "x-queue" header - otherwise we take a second turn through the queue
790
829
}
830
+ queue = queueSplitter .convertToSubQueue (queue , request );
791
831
792
832
QueueingStrategy queueingStrategy = listener .getHook ().getQueueingStrategy ();
793
833
@@ -827,7 +867,7 @@ private void callListener(RoutingContext ctx, final Buffer buffer, final List<Li
827
867
* The handler calls all listener (after), so this requests happen AFTER the original
828
868
* request is performed.
829
869
*
830
- * @param ctx original request context
870
+ * @param ctx original request context
831
871
* @param buffer buffer
832
872
* @param afterListener list of listeners which should be called after the original request
833
873
* @return the after handler
@@ -842,7 +882,7 @@ private Handler<Void> installAfterHandler(final RoutingContext ctx, final Buffer
842
882
* The request happens BEFORE the original request is
843
883
* performed.
844
884
*
845
- * @param ctx original request context
885
+ * @param ctx original request context
846
886
* @param buffer buffer
847
887
* @param beforeListener list of listeners which should be called before the original request
848
888
* @param afterHandler the handler for listeners which have to be called after the original request
@@ -1586,17 +1626,17 @@ private void registerRoute(Buffer buffer) {
1586
1626
private boolean mustCreateNewRouteForHook (Route existingRoute , HttpHook newHook ) {
1587
1627
HttpHook oldHook = existingRoute .getHook ();
1588
1628
boolean same ;
1589
- same = Objects .equals (oldHook .getDestination () , newHook .getDestination ());
1590
- same &= Objects .equals (oldHook .getMethods (), newHook .getMethods ());
1591
- same &= Objects .equals (oldHook .getTranslateStatus (), newHook .getTranslateStatus ());
1592
- same &= oldHook .isCollection () == newHook .isCollection () ;
1593
- same &= oldHook .isFullUrl () == newHook .isFullUrl () ;
1594
- same &= oldHook .isListable () == newHook .isListable () ;
1595
- same &= oldHook .isCollection () == newHook .isCollection () ;
1596
- same &= oldHook .isCollection () == newHook .isCollection () ;
1597
- same &= Objects .equals (oldHook .getConnectionPoolSize () , newHook .getConnectionPoolSize ());
1598
- same &= Objects .equals (oldHook .getMaxWaitQueueSize () , newHook .getMaxWaitQueueSize ());
1599
- same &= Objects .equals (oldHook .getTimeout (), newHook .getTimeout ());
1629
+ same = Objects .equals (oldHook .getDestination (), newHook .getDestination ());
1630
+ same &= Objects .equals (oldHook .getMethods (), newHook .getMethods ());
1631
+ same &= Objects .equals (oldHook .getTranslateStatus (), newHook .getTranslateStatus ());
1632
+ same &= oldHook .isCollection () == newHook .isCollection () ;
1633
+ same &= oldHook .isFullUrl () == newHook .isFullUrl () ;
1634
+ same &= oldHook .isListable () == newHook .isListable () ;
1635
+ same &= oldHook .isCollection () == newHook .isCollection () ;
1636
+ same &= oldHook .isCollection () == newHook .isCollection () ;
1637
+ same &= Objects .equals (oldHook .getConnectionPoolSize (), newHook .getConnectionPoolSize ());
1638
+ same &= Objects .equals (oldHook .getMaxWaitQueueSize (), newHook .getMaxWaitQueueSize ());
1639
+ same &= Objects .equals (oldHook .getTimeout (), newHook .getTimeout ());
1600
1640
same &= headersFilterPatternEquals (oldHook .getHeadersFilterPattern (), newHook .getHeadersFilterPattern ());
1601
1641
1602
1642
// queueingStrategy, filter, queueExpireAfter and hookTriggerType are not relevant for Route-Hooks
@@ -1606,7 +1646,7 @@ private boolean mustCreateNewRouteForHook(Route existingRoute, HttpHook newHook)
1606
1646
}
1607
1647
1608
1648
private boolean headersFilterPatternEquals (Pattern headersFilterPatternLeft , Pattern headersFilterPatternRight ) {
1609
- if (headersFilterPatternLeft != null && headersFilterPatternRight != null ){
1649
+ if (headersFilterPatternLeft != null && headersFilterPatternRight != null ) {
1610
1650
return Objects .equals (headersFilterPatternLeft .pattern (), headersFilterPatternRight .pattern ());
1611
1651
}
1612
1652
0 commit comments