@@ -76,20 +76,25 @@ public void testBuildConnectionProfile() {
7676 builder .addConnections (1 , TransportRequestOptions .Type .BULK );
7777 builder .addConnections (2 , TransportRequestOptions .Type .STATE , TransportRequestOptions .Type .RECOVERY );
7878 builder .addConnections (3 , TransportRequestOptions .Type .PING );
79+
7980 IllegalStateException illegalStateException = expectThrows (IllegalStateException .class , builder ::build );
80- assertEquals ("not all types are added for this connection profile - missing types: [REG]" , illegalStateException .getMessage ());
81+ assertEquals (
82+ "not all types are added for this connection profile - missing types: [REG, STREAM]" ,
83+ illegalStateException .getMessage ()
84+ );
8185
8286 IllegalArgumentException illegalArgumentException = expectThrows (
8387 IllegalArgumentException .class ,
8488 () -> builder .addConnections (4 , TransportRequestOptions .Type .REG , TransportRequestOptions .Type .PING )
8589 );
8690 assertEquals ("type [PING] is already registered" , illegalArgumentException .getMessage ());
8791 builder .addConnections (4 , TransportRequestOptions .Type .REG );
92+ builder .addConnections (1 , TransportRequestOptions .Type .STREAM );
8893 ConnectionProfile build = builder .build ();
8994 if (randomBoolean ()) {
9095 build = new ConnectionProfile .Builder (build ).build ();
9196 }
92- assertEquals (10 , build .getNumConnections ());
97+ assertEquals (11 , build .getNumConnections ());
9398 if (setConnectTimeout ) {
9499 assertEquals (connectTimeout , build .getConnectTimeout ());
95100 } else {
@@ -114,12 +119,12 @@ public void testBuildConnectionProfile() {
114119 assertNull (build .getPingInterval ());
115120 }
116121
117- List <Integer > list = new ArrayList <>(10 );
118- for (int i = 0 ; i < 10 ; i ++) {
122+ List <Integer > list = new ArrayList <>(11 );
123+ for (int i = 0 ; i < 11 ; i ++) {
119124 list .add (i );
120125 }
121126 final int numIters = randomIntBetween (5 , 10 );
122- assertEquals (4 , build .getHandles ().size ());
127+ assertEquals (5 , build .getHandles ().size ());
123128 assertEquals (0 , build .getHandles ().get (0 ).offset );
124129 assertEquals (1 , build .getHandles ().get (0 ).length );
125130 assertEquals (EnumSet .of (TransportRequestOptions .Type .BULK ), build .getHandles ().get (0 ).getTypes ());
@@ -155,11 +160,20 @@ public void testBuildConnectionProfile() {
155160 assertThat (channel , Matchers .anyOf (Matchers .is (6 ), Matchers .is (7 ), Matchers .is (8 ), Matchers .is (9 )));
156161 }
157162
163+ assertEquals (10 , build .getHandles ().get (4 ).offset );
164+ assertEquals (1 , build .getHandles ().get (4 ).length );
165+ assertEquals (EnumSet .of (TransportRequestOptions .Type .STREAM ), build .getHandles ().get (4 ).getTypes ());
166+ channel = build .getHandles ().get (4 ).getChannel (list );
167+ for (int i = 0 ; i < numIters ; i ++) {
168+ assertEquals (10 , channel .intValue ());
169+ }
170+
158171 assertEquals (3 , build .getNumConnectionsPerType (TransportRequestOptions .Type .PING ));
159172 assertEquals (4 , build .getNumConnectionsPerType (TransportRequestOptions .Type .REG ));
160173 assertEquals (2 , build .getNumConnectionsPerType (TransportRequestOptions .Type .STATE ));
161174 assertEquals (2 , build .getNumConnectionsPerType (TransportRequestOptions .Type .RECOVERY ));
162175 assertEquals (1 , build .getNumConnectionsPerType (TransportRequestOptions .Type .BULK ));
176+ assertEquals (1 , build .getNumConnectionsPerType (TransportRequestOptions .Type .STREAM ));
163177 }
164178
165179 public void testNoChannels () {
@@ -169,7 +183,8 @@ public void testNoChannels() {
169183 TransportRequestOptions .Type .BULK ,
170184 TransportRequestOptions .Type .STATE ,
171185 TransportRequestOptions .Type .RECOVERY ,
172- TransportRequestOptions .Type .REG
186+ TransportRequestOptions .Type .REG ,
187+ TransportRequestOptions .Type .STREAM
173188 );
174189 builder .addConnections (0 , TransportRequestOptions .Type .PING );
175190 ConnectionProfile build = builder .build ();
@@ -188,6 +203,7 @@ public void testConnectionProfileResolve() {
188203 builder .addConnections (randomIntBetween (0 , 5 ), TransportRequestOptions .Type .REG );
189204 builder .addConnections (randomIntBetween (0 , 5 ), TransportRequestOptions .Type .STATE );
190205 builder .addConnections (randomIntBetween (0 , 5 ), TransportRequestOptions .Type .PING );
206+ builder .addConnections (randomIntBetween (0 , 5 ), TransportRequestOptions .Type .STREAM );
191207
192208 final boolean connectionTimeoutSet = randomBoolean ();
193209 if (connectionTimeoutSet ) {
@@ -235,6 +251,7 @@ public void testDefaultConnectionProfile() {
235251 assertEquals (1 , profile .getNumConnectionsPerType (TransportRequestOptions .Type .STATE ));
236252 assertEquals (2 , profile .getNumConnectionsPerType (TransportRequestOptions .Type .RECOVERY ));
237253 assertEquals (3 , profile .getNumConnectionsPerType (TransportRequestOptions .Type .BULK ));
254+ assertEquals (0 , profile .getNumConnectionsPerType (TransportRequestOptions .Type .STREAM ));
238255 assertEquals (TransportSettings .CONNECT_TIMEOUT .get (Settings .EMPTY ), profile .getConnectTimeout ());
239256 assertEquals (TransportSettings .CONNECT_TIMEOUT .get (Settings .EMPTY ), profile .getHandshakeTimeout ());
240257 assertEquals (TransportSettings .TRANSPORT_COMPRESS .get (Settings .EMPTY ), profile .getCompressionEnabled ());
@@ -247,6 +264,7 @@ public void testDefaultConnectionProfile() {
247264 assertEquals (0 , profile .getNumConnectionsPerType (TransportRequestOptions .Type .STATE ));
248265 assertEquals (2 , profile .getNumConnectionsPerType (TransportRequestOptions .Type .RECOVERY ));
249266 assertEquals (3 , profile .getNumConnectionsPerType (TransportRequestOptions .Type .BULK ));
267+ assertEquals (0 , profile .getNumConnectionsPerType (TransportRequestOptions .Type .STREAM ));
250268
251269 profile = ConnectionProfile .buildDefaultConnectionProfile (nonDataNode ());
252270 assertEquals (11 , profile .getNumConnections ());
@@ -255,6 +273,7 @@ public void testDefaultConnectionProfile() {
255273 assertEquals (1 , profile .getNumConnectionsPerType (TransportRequestOptions .Type .STATE ));
256274 assertEquals (0 , profile .getNumConnectionsPerType (TransportRequestOptions .Type .RECOVERY ));
257275 assertEquals (3 , profile .getNumConnectionsPerType (TransportRequestOptions .Type .BULK ));
276+ assertEquals (0 , profile .getNumConnectionsPerType (TransportRequestOptions .Type .STREAM ));
258277
259278 profile = ConnectionProfile .buildDefaultConnectionProfile (
260279 removeRoles (
@@ -267,5 +286,6 @@ public void testDefaultConnectionProfile() {
267286 assertEquals (0 , profile .getNumConnectionsPerType (TransportRequestOptions .Type .STATE ));
268287 assertEquals (0 , profile .getNumConnectionsPerType (TransportRequestOptions .Type .RECOVERY ));
269288 assertEquals (3 , profile .getNumConnectionsPerType (TransportRequestOptions .Type .BULK ));
289+ assertEquals (0 , profile .getNumConnectionsPerType (TransportRequestOptions .Type .STREAM ));
270290 }
271291}
0 commit comments