Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
Expand Down Expand Up @@ -55,7 +53,6 @@ public void abortTransaction() throws ProducerFencedException {
@Override
public Future<RecordMetadata> send(ProducerRecord<byte[], byte[]> producerRecord) {
try {

BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(producerRecord.value(), null);
HBaseKafkaEvent event = dreader.read(null, decoder);
if (!messages.containsKey(producerRecord.topic())) {
Expand All @@ -79,18 +76,16 @@ public boolean isDone() {
}

@Override
public RecordMetadata get() throws InterruptedException, ExecutionException {
public RecordMetadata get() {
return new RecordMetadata(null, 1, 1, 1, (long)1, 1, 1);
}

@Override
public RecordMetadata get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
public RecordMetadata get(long timeout, TimeUnit unit) {
return null;
}
};
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.apache.hadoop.hbase.kafka;

import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.SmallTests;
Expand Down Expand Up @@ -53,15 +54,14 @@ public class TestDropRule {
public void testDropies1() {
TopicRoutingRules rules = new TopicRoutingRules();
try {
rules.parseRules(new ByteArrayInputStream(DROP_RULE1.getBytes("UTF-8")));
rules.parseRules(new ByteArrayInputStream(DROP_RULE1.getBytes(StandardCharsets.UTF_8)));
Assert.assertEquals(1, rules.getDropRules().size());
Assert.assertEquals(TableName.valueOf("default:MyTable"),
rules.getDropRules().get(0).getTableName());
Assert.assertEquals(null, rules.getDropRules().get(0).getColumnFamily());
Assert.assertEquals(null, rules.getDropRules().get(0).getQualifier());
Assert.assertNull(rules.getDropRules().get(0).getColumnFamily());
Assert.assertNull(rules.getDropRules().get(0).getQualifier());
Assert.assertEquals(0, rules.getRouteRules().size());
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
Expand All @@ -70,16 +70,15 @@ public void testDropies1() {
public void testDropies2() {
TopicRoutingRules rules = new TopicRoutingRules();
try {
rules.parseRules(new ByteArrayInputStream(DROP_RULE2.getBytes("UTF-8")));
rules.parseRules(new ByteArrayInputStream(DROP_RULE2.getBytes(StandardCharsets.UTF_8)));
Assert.assertEquals(1, rules.getDropRules().size());
Assert.assertEquals(TableName.valueOf("default:MyTable"),
rules.getDropRules().get(0).getTableName());
Assert.assertTrue(
Bytes.equals("data".getBytes("UTF-8"), rules.getDropRules().get(0).getColumnFamily()));
Assert.assertEquals(null, rules.getDropRules().get(0).getQualifier());
Assert.assertTrue(Bytes.equals("data".getBytes(StandardCharsets.UTF_8),
rules.getDropRules().get(0).getColumnFamily()));
Assert.assertNull(rules.getDropRules().get(0).getQualifier());
Assert.assertEquals(0, rules.getRouteRules().size());
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
Expand All @@ -88,15 +87,14 @@ public void testDropies2() {
public void testDropies3() {
TopicRoutingRules rules = new TopicRoutingRules();
try {
rules.parseRules(new ByteArrayInputStream(DROP_RULE3.getBytes("UTF-8")));
rules.parseRules(new ByteArrayInputStream(DROP_RULE3.getBytes(StandardCharsets.UTF_8)));
Assert.assertEquals(1, rules.getDropRules().size());
Assert.assertEquals(TableName.valueOf("default:MyTable"),
rules.getDropRules().get(0).getTableName());
Assert.assertTrue(
Bytes.equals("data".getBytes("UTF-8"), rules.getDropRules().get(0).getColumnFamily()));
Assert
.assertTrue(Bytes.equals(
"dhold".getBytes("UTF-8"), rules.getDropRules().get(0).getQualifier()));
Assert.assertTrue(Bytes.equals("data".getBytes(StandardCharsets.UTF_8),
rules.getDropRules().get(0).getColumnFamily()));
Assert.assertTrue(Bytes.equals("dhold".getBytes(StandardCharsets.UTF_8),
rules.getDropRules().get(0).getQualifier()));
Assert.assertEquals(0, rules.getRouteRules().size());
} catch (Exception e) {
Assert.fail(e.getMessage());
Expand All @@ -107,30 +105,24 @@ public void testDropies3() {
public void testDropies4() {
TopicRoutingRules rules = new TopicRoutingRules();
try {
rules.parseRules(new ByteArrayInputStream(DROP_RULE4.getBytes("UTF-8")));
rules.parseRules(new ByteArrayInputStream(DROP_RULE4.getBytes(StandardCharsets.UTF_8)));
Assert.assertEquals(1, rules.getDropRules().size());
Assert.assertEquals(TableName.valueOf("default:MyTable"),
rules.getDropRules().get(0).getTableName());
Assert.assertTrue(
Bytes.equals("data".getBytes("UTF-8"), rules.getDropRules().get(0).getColumnFamily()));
Assert.assertTrue(
Bytes.equals("dhold:".getBytes("UTF-8"), rules.getDropRules().get(0).getQualifier()));
Assert.assertTrue(Bytes.equals("data".getBytes(StandardCharsets.UTF_8),
rules.getDropRules().get(0).getColumnFamily()));
Assert.assertTrue(Bytes.equals("dhold:".getBytes(StandardCharsets.UTF_8),
rules.getDropRules().get(0).getQualifier()));
Assert.assertEquals(0, rules.getRouteRules().size());

DropRule drop = rules.getDropRules().get(0);
Assert.assertFalse(
drop.match(TableName.valueOf("default:MyTable"),
"data".getBytes("UTF-8"),
"blah".getBytes("UTF-8")));
Assert.assertFalse(
drop.match(TableName.valueOf("default:MyTable"),
"data".getBytes("UTF-8"),
"dholdme".getBytes("UTF-8")));
Assert.assertTrue(
drop.match(TableName.valueOf("default:MyTable"),
"data".getBytes("UTF-8"),
"dhold:me".getBytes("UTF-8")));

Assert.assertFalse(drop.match(TableName.valueOf("default:MyTable"),
"data".getBytes(StandardCharsets.UTF_8), "blah".getBytes(StandardCharsets.UTF_8)));
Assert.assertFalse(drop.match(TableName.valueOf("default:MyTable"),
"data".getBytes(StandardCharsets.UTF_8), "dholdme".getBytes(StandardCharsets.UTF_8)));
Assert.assertTrue(drop.match(TableName.valueOf("default:MyTable"),
"data".getBytes(StandardCharsets.UTF_8),
"dhold:me".getBytes(StandardCharsets.UTF_8)));
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Expand All @@ -140,28 +132,25 @@ public void testDropies4() {
public void testDropies5() {
TopicRoutingRules rules = new TopicRoutingRules();
try {
rules.parseRules(new ByteArrayInputStream(DROP_RULE5.getBytes("UTF-8")));
rules.parseRules(new ByteArrayInputStream(DROP_RULE5.getBytes(StandardCharsets.UTF_8)));
Assert.assertEquals(1, rules.getDropRules().size());
Assert.assertEquals(TableName.valueOf("default:MyTable"),
rules.getDropRules().get(0).getTableName());
Assert.assertTrue(
Bytes.equals("data".getBytes("UTF-8"), rules.getDropRules().get(0).getColumnFamily()));
Assert.assertTrue(
Bytes.equals("pickme".getBytes("UTF-8"), rules.getDropRules().get(0).getQualifier()));
Assert.assertTrue(Bytes.equals("data".getBytes(StandardCharsets.UTF_8),
rules.getDropRules().get(0).getColumnFamily()));
Assert.assertTrue(Bytes.equals("pickme".getBytes(StandardCharsets.UTF_8),
rules.getDropRules().get(0).getQualifier()));
Assert.assertEquals(0, rules.getRouteRules().size());

DropRule drop = rules.getDropRules().get(0);
Assert.assertFalse(
drop.match(TableName.valueOf("default:MyTable"),
"data".getBytes("UTF-8"),
"blah".getBytes("UTF-8")));
Assert.assertFalse(drop.match(TableName.valueOf("default:MyTable"),
"data".getBytes("UTF-8"),
"blacickme".getBytes("UTF-8")));
"data".getBytes(StandardCharsets.UTF_8), "blah".getBytes(StandardCharsets.UTF_8)));
Assert.assertFalse(drop.match(TableName.valueOf("default:MyTable"),
"data".getBytes(StandardCharsets.UTF_8),
"blacickme".getBytes(StandardCharsets.UTF_8)));
Assert.assertTrue(drop.match(TableName.valueOf("default:MyTable"),
"data".getBytes("UTF-8"),
"hithere.pickme".getBytes("UTF-8")));

"data".getBytes(StandardCharsets.UTF_8),
"hithere.pickme".getBytes(StandardCharsets.UTF_8)));
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Expand All @@ -171,40 +160,36 @@ public void testDropies5() {
public void testDropies6() {
TopicRoutingRules rules = new TopicRoutingRules();
try {
rules.parseRules(new ByteArrayInputStream(DROP_RULE6.getBytes("UTF-8")));
rules.parseRules(new ByteArrayInputStream(DROP_RULE6.getBytes(StandardCharsets.UTF_8)));
Assert.assertEquals(1, rules.getDropRules().size());
Assert.assertEquals(TableName.valueOf("default:MyTable"),
rules.getDropRules().get(0).getTableName());
Assert.assertTrue(
Bytes.equals("data".getBytes("UTF-8"), rules.getDropRules().get(0).getColumnFamily()));
Assert.assertTrue(
Bytes.equals("pickme".getBytes("UTF-8"), rules.getDropRules().get(0).getQualifier()));
rules.getDropRules().get(0).getTableName());
Assert.assertTrue(Bytes.equals("data".getBytes(StandardCharsets.UTF_8),
rules.getDropRules().get(0).getColumnFamily()));
Assert.assertTrue(Bytes.equals("pickme".getBytes(StandardCharsets.UTF_8),
rules.getDropRules().get(0).getQualifier()));
Assert.assertEquals(0, rules.getRouteRules().size());

DropRule drop = rules.getDropRules().get(0);
Assert.assertFalse(
drop.match(TableName.valueOf("default:MyTable"),
"data".getBytes("UTF-8"),
"blah".getBytes("UTF-8")));
Assert.assertFalse(drop.match(TableName.valueOf("default:MyTable"),
"data".getBytes("UTF-8"),
"blacickme".getBytes("UTF-8")));
"data".getBytes(StandardCharsets.UTF_8), "blah".getBytes(StandardCharsets.UTF_8)));
Assert.assertFalse(drop.match(TableName.valueOf("default:MyTable"),
"data".getBytes(StandardCharsets.UTF_8),
"blacickme".getBytes(StandardCharsets.UTF_8)));
Assert.assertTrue(drop.match(TableName.valueOf("default:MyTable"),
"data".getBytes("UTF-8"),
"hithere.pickme".getBytes("UTF-8")));
"data".getBytes(StandardCharsets.UTF_8),
"hithere.pickme".getBytes(StandardCharsets.UTF_8)));
Assert.assertTrue(drop.match(TableName.valueOf("default:MyTable"),
"data".getBytes("UTF-8"),
"pickme.pleaze.do.it".getBytes("UTF-8")));
"data".getBytes(StandardCharsets.UTF_8),
"pickme.pleaze.do.it".getBytes(StandardCharsets.UTF_8)));
Assert.assertFalse(drop.match(TableName.valueOf("default:MyTable"),
"data".getBytes("UTF-8"),
"please.pickme.pleaze".getBytes("UTF-8")));
"data".getBytes(StandardCharsets.UTF_8),
"please.pickme.pleaze".getBytes(StandardCharsets.UTF_8)));
Assert.assertTrue(drop.match(TableName.valueOf("default:MyTable"),
"data".getBytes("UTF-8"),
"pickme.pleaze.pickme".getBytes("UTF-8")));

"data".getBytes(StandardCharsets.UTF_8),
"pickme.pleaze.pickme".getBytes(StandardCharsets.UTF_8)));
} catch (Exception e) {
Assert.fail(e.getMessage());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package org.apache.hadoop.hbase.kafka;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -29,38 +27,17 @@
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;


/**
* Test that mutations are getting published to the topic
*/
@Category(SmallTests.class)
public class TestProcessMutations {
private User user = new User() {
@Override
public String getShortName() {
return "my name";
}

@Override
public <T> T runAs(PrivilegedAction<T> action) {
return null;
}

@Override
public <T> T runAs(PrivilegedExceptionAction<T> action)
throws IOException, InterruptedException {
return null;
}
};

private static final String ROUTE_RULE1 =
"<rules><rule action=\"route\" table=\"MyNamespace:MyTable\" "
+ "topic=\"foo\"/></rules>";
Expand All @@ -72,41 +49,29 @@ public void setup() {
this.myTestingProducer=new ProducerForTesting();
}

@After
public void tearDown() {

}

@Test
public void testSendMessage() {
TopicRoutingRules rules = new TopicRoutingRules();
try {

//Configuration conf, ExecutorService pool, User user,
// TopicRoutingRules routingRules,Producer<byte[],byte[]> producer

rules.parseRules(new ByteArrayInputStream(ROUTE_RULE1.getBytes("UTF-8")));
rules.parseRules(new ByteArrayInputStream(ROUTE_RULE1.getBytes(StandardCharsets.UTF_8)));
Configuration conf = new Configuration();
KafkaBridgeConnection connection =
new KafkaBridgeConnection(conf,rules,myTestingProducer);
KafkaBridgeConnection connection = new KafkaBridgeConnection(conf,rules,myTestingProducer);
long zeTimestamp = System.currentTimeMillis();
Put put = new Put("key1".getBytes("UTF-8"),zeTimestamp);
put.addColumn("FAMILY".getBytes("UTF-8"),
"not foo".getBytes("UTF-8"),
"VALUE should NOT pass".getBytes("UTF-8"));
put.addColumn("FAMILY".getBytes("UTF-8"),
"foo".getBytes("UTF-8"),
"VALUE should pass".getBytes("UTF-8"));
Put put = new Put("key1".getBytes(StandardCharsets.UTF_8),zeTimestamp);
put.addColumn("FAMILY".getBytes(StandardCharsets.UTF_8),
"not foo".getBytes(StandardCharsets.UTF_8),
"VALUE should NOT pass".getBytes(StandardCharsets.UTF_8));
put.addColumn("FAMILY".getBytes(StandardCharsets.UTF_8),
"foo".getBytes(StandardCharsets.UTF_8),
"VALUE should pass".getBytes(StandardCharsets.UTF_8));
Table myTable = connection.getTable(TableName.valueOf("MyNamespace:MyTable"));
List<Row> rows = new ArrayList<>();
rows.add(put);
myTable.batch(rows,new Object[0]);

Assert.assertEquals(false,myTestingProducer.getMessages().isEmpty());

Assert.assertFalse(myTestingProducer.getMessages().isEmpty());
} catch (Exception e){
Assert.fail(e.getMessage());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void testTopic3() {
Assert.assertTrue(Bytes.equals("dhold".getBytes(StandardCharsets.UTF_8),
rules.getRouteRules().get(0).getQualifier()));
Assert.assertTrue(rules.getRouteRules().get(0).getTopics().contains("foo"));
Assert.assertEquals(rules.getRouteRules().get(0).getTopics().size(), 1);
Assert.assertEquals(1, rules.getRouteRules().get(0).getTopics().size());

Assert.assertEquals(0, rules.getDropRules().size());
} catch (Exception e) {
Expand Down