Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 100 additions & 97 deletions java/vtgate-client/pom.xml
Original file line number Diff line number Diff line change
@@ -1,99 +1,102 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.youtube.vitess</groupId>
<artifactId>vtgate-client</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.youtube.vitess</groupId>
<artifactId>gorpc-client</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.9</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.5.1</version>
<!-- Include the test jar to reuse Hadoop testing utils -->
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.17</version>
<configuration>
<argLine>${surefireArgLine}</argLine>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>2.13</version>
<configuration>
<argLine>${failsafeArgLine}</argLine>
</configuration>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.youtube.vitess</groupId>
<artifactId>vtgate-client</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.youtube.vitess</groupId>
<artifactId>gorpc-client</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.9</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.5.1</version>
<!-- Include the test jar to reuse Hadoop testing utils -->
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.17</version>
<configuration>
<argLine>${surefireArgLine}</argLine>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>2.13</version>
<configuration>
<argLine>${failsafeArgLine}</argLine>
</configuration>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
public class SplitQueryRequest {
private String sql;
private String keyspace;
private int splitsPerShard;
private int splitCount;

public SplitQueryRequest(String sql, String keyspace, int splitsPerShard) {
public SplitQueryRequest(String sql, String keyspace, int splitCount) {
this.sql = sql;
this.keyspace = keyspace;
this.splitsPerShard = splitsPerShard;
this.splitCount = splitCount;
}

public String getSql() {
Expand All @@ -19,7 +19,7 @@ public String getKeyspace() {
return keyspace;
}

public int getSplitsPerShard() {
return splitsPerShard;
public int getSplitCount() {
return splitCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ public List<Cursor> execute(BatchQuery query) throws DatabaseException, Connecti
* instances. Batch jobs or MapReduce jobs that needs to scan all rows can use these queries to
* parallelize full table scans.
*/
public Map<Query, Long> splitQuery(String keyspace, String sql, int splitsPerShard)
public Map<Query, Long> splitQuery(String keyspace, String sql, int splitCount)
throws ConnectionException, DatabaseException {
SplitQueryRequest req = new SplitQueryRequest(sql, keyspace, splitsPerShard);
SplitQueryRequest req = new SplitQueryRequest(sql, keyspace, splitCount);
SplitQueryResponse response = client.splitQuery(req);
if (response.getError() != null) {
throw new DatabaseException(response.getError());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public static BSONObject splitQueryRequestToBson(SplitQueryRequest request) {
BSONObject b = new BasicBSONObject();
b.put("Keyspace", request.getKeyspace());
b.put("Query", query);
b.put("SplitsPerShard", request.getSplitsPerShard());
b.put("SplitCount", request.getSplitCount());
return b;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void close() throws ConnectionException {

@Override
public SplitQueryResponse splitQuery(SplitQueryRequest request) throws ConnectionException {
String callMethod = "VTGate.GetMRSplits";
String callMethod = "VTGate.SplitQuery";
Response response = call(callMethod, Bsonify.splitQueryRequestToBson(request));
return Bsonify.bsonToSplitQueryResponse((BSONObject) response.getReply());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.youtube.vitess.vtgate.integration.util.TestEnv;
import com.youtube.vitess.vtgate.integration.util.Util;

import org.apache.commons.codec.binary.Hex;
import org.joda.time.DateTime;
import org.junit.AfterClass;
import org.junit.Assert;
Expand All @@ -33,10 +34,12 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;

@RunWith(JUnit4.class)
public class VtGateIT {
Expand Down Expand Up @@ -312,6 +315,90 @@ public void testBatchExecuteKeyspaceIds() throws Exception {
}


@Test
public void testSplitQuery() throws Exception {
// Insert 20 rows per shard
for (String shardName : testEnv.shardKidMap.keySet()) {
Util.insertRowsInShard(testEnv, shardName, 20);
}
Util.waitForTablet("rdonly", 40, 3, testEnv);
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0);
Map<Query, Long> queries =
vtgate.splitQuery("test_keyspace", "select id,keyspace_id from vtgate_test", 1);
vtgate.close();

// Verify 2 splits, one per shard
Assert.assertEquals(2, queries.size());
Set<String> shardsInSplits = new HashSet<>();
for (Query q : queries.keySet()) {
Assert.assertEquals("select id,keyspace_id from vtgate_test", q.getSql());
Assert.assertEquals("test_keyspace", q.getKeyspace());
Assert.assertEquals("rdonly", q.getTabletType());
Assert.assertEquals(0, q.getBindVars().size());
Assert.assertEquals(null, q.getKeyspaceIds());
String start = Hex.encodeHexString(q.getKeyRanges().get(0).get("Start"));
String end = Hex.encodeHexString(q.getKeyRanges().get(0).get("End"));
shardsInSplits.add(start + "-" + end);
}

// Verify the keyrange queries in splits cover the entire keyspace
Assert.assertTrue(shardsInSplits.containsAll(testEnv.shardKidMap.keySet()));
}

@Test
public void testSplitQueryMultipleSplitsPerShard() throws Exception {
int rowCount = 30;
Util.insertRows(testEnv, 1, 30);
List<String> expectedSqls =
Lists.newArrayList("select id, keyspace_id from vtgate_test where id < 10",
"select id, keyspace_id from vtgate_test where id < 11",
"select id, keyspace_id from vtgate_test where id >= 10 and id < 19",
"select id, keyspace_id from vtgate_test where id >= 11 and id < 19",
"select id, keyspace_id from vtgate_test where id >= 19",
"select id, keyspace_id from vtgate_test where id >= 19");
Util.waitForTablet("rdonly", rowCount, 3, testEnv);
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0);
int splitCount = 6;
Map<Query, Long> queries =
vtgate.splitQuery("test_keyspace", "select id,keyspace_id from vtgate_test", splitCount);
vtgate.close();

// Verify 6 splits, 3 per shard
Assert.assertEquals(splitCount, queries.size());
Set<String> shardsInSplits = new HashSet<>();
for (Query q : queries.keySet()) {
String sql = q.getSql();
Assert.assertTrue(expectedSqls.contains(sql));
expectedSqls.remove(sql);
Assert.assertEquals("test_keyspace", q.getKeyspace());
Assert.assertEquals("rdonly", q.getTabletType());
Assert.assertEquals(0, q.getBindVars().size());
Assert.assertEquals(null, q.getKeyspaceIds());
String start = Hex.encodeHexString(q.getKeyRanges().get(0).get("Start"));
String end = Hex.encodeHexString(q.getKeyRanges().get(0).get("End"));
shardsInSplits.add(start + "-" + end);
}

// Verify the keyrange queries in splits cover the entire keyspace
Assert.assertTrue(shardsInSplits.containsAll(testEnv.shardKidMap.keySet()));
Assert.assertTrue(expectedSqls.size() == 0);
}

@Test
public void testSplitQueryInvalidTable() throws Exception {
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0);
try {
vtgate.splitQuery("test_keyspace", "select id from invalid_table", 1);
Assert.fail("failed to raise connection exception");
} catch (ConnectionException e) {
Assert.assertTrue(
e.getMessage().contains("query validation error: can't find table in schema"));
} finally {
vtgate.close();
}
}


/**
* Create env with two shards each having a master and replica
*/
Expand All @@ -322,7 +409,7 @@ static TestEnv getTestEnv() {
shardKidMap.put("80-",
Lists.newArrayList("9767889778372766922", "9742070682920810358", "10296850775085416642"));
TestEnv env = new TestEnv(shardKidMap, "test_keyspace");
env.addTablet("replica", 1);
env.addTablet("rdonly", 1);
return env;
}
}
Loading