From 8d18f05078daa353f6d04356bd23cc9a32d88d9c Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Tue, 7 Jun 2022 18:29:21 -0700 Subject: [PATCH 1/5] HBASE-27088 IntegrationLoadTestCommonCrawl async load improvements - Use an async client and work stealing executor for parallelism during loads. - Remove the verification read retries, these are not that effective during replication lag anyway. - Increase max task attempts because S3 might throttle. - Implement a side task that exercises Increments by extracting urls from content and updating a cf that tracks referrer counts. These are not validated at this time. It could be possible to log the increments, sum them with a reducer, and then verify the total, but this is left as a future exercise. --- .../test/IntegrationTestLoadCommonCrawl.java | 395 +++++++++++------- 1 file changed, 238 insertions(+), 157 deletions(-) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java index d57cd8198e68..6ab61180558b 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.test; import java.io.BufferedReader; +import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -26,10 +27,21 @@ import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.HashSet; +import java.util.IdentityHashMap; import java.util.List; +import java.util.NoSuchElementException; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.zip.GZIPInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; @@ -45,14 +57,17 @@ import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.AsyncTable; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ScanResultConsumer; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; @@ -141,16 +156,16 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase { protected static byte[] CONTENT_FAMILY_NAME = Bytes.toBytes("c"); protected static byte[] INFO_FAMILY_NAME = Bytes.toBytes("i"); + protected static byte[] URL_FAMILY_NAME = Bytes.toBytes("u"); + protected static byte[] SEP = Bytes.toBytes(":"); protected static byte[] CONTENT_QUALIFIER = HConstants.EMPTY_BYTE_ARRAY; protected static byte[] CONTENT_LENGTH_QUALIFIER = Bytes.toBytes("l"); protected static byte[] CONTENT_TYPE_QUALIFIER = Bytes.toBytes("t"); protected static byte[] CRC_QUALIFIER = Bytes.toBytes("c"); protected static byte[] DATE_QUALIFIER = Bytes.toBytes("d"); protected static byte[] IP_ADDRESS_QUALIFIER = Bytes.toBytes("a"); - protected static byte[] RECORD_ID_QUALIFIER = Bytes.toBytes("r"); protected static byte[] TARGET_URI_QUALIFIER = Bytes.toBytes("u"); - - private static final int VERIFICATION_READ_RETRIES = 10; + protected static byte[] REF_QUALIFIER = Bytes.toBytes("ref"); public static enum Counts { REFERENCED, @@ -241,6 +256,7 @@ protected Set getColumnFamilies() { Set families = new HashSet<>(); families.add(Bytes.toString(CONTENT_FAMILY_NAME)); families.add(Bytes.toString(INFO_FAMILY_NAME)); + families.add(Bytes.toString(URL_FAMILY_NAME)); return families; } @@ -292,10 +308,18 @@ public HBaseKeyWritable(byte[] row, byte[] family, byte[] qualifier, long ts) { qualifier != null ? qualifier.length : 0, ts); } + public HBaseKeyWritable(byte[] row, byte[] family, byte[] qualifier) { + this(row, family, qualifier, Long.MAX_VALUE); + } + public HBaseKeyWritable(byte[] row, byte[] family, long ts) { this(row, family, HConstants.EMPTY_BYTE_ARRAY, ts); } + public HBaseKeyWritable(byte[] row, byte[] family) { + this(row, family, Long.MAX_VALUE); + } + public HBaseKeyWritable(Cell cell) { this(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), @@ -421,17 +445,24 @@ void createSchema(final TableName tableName) throws IOException { Admin admin = conn.getAdmin()) { if (!admin.tableExists(tableName)) { - ColumnFamilyDescriptorBuilder contentFamilyBuilder = ColumnFamilyDescriptorBuilder - .newBuilder(CONTENT_FAMILY_NAME).setDataBlockEncoding(DataBlockEncoding.NONE) - .setBloomFilterType(BloomType.ROW).setMaxVersions(1000).setBlocksize(256 * 1024); + ColumnFamilyDescriptorBuilder contentFamilyBuilder = + ColumnFamilyDescriptorBuilder.newBuilder(CONTENT_FAMILY_NAME).setMaxVersions(1000) + .setDataBlockEncoding(DataBlockEncoding.NONE).setBloomFilterType(BloomType.ROW); + + ColumnFamilyDescriptorBuilder infoFamilyBuilder = + ColumnFamilyDescriptorBuilder.newBuilder(INFO_FAMILY_NAME).setMaxVersions(1000) + .setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1) + .setBloomFilterType(BloomType.ROWCOL).setBlocksize(8 * 1024); - ColumnFamilyDescriptorBuilder infoFamilyBuilder = ColumnFamilyDescriptorBuilder - .newBuilder(INFO_FAMILY_NAME).setDataBlockEncoding(DataBlockEncoding.NONE) - .setBloomFilterType(BloomType.ROWCOL).setMaxVersions(1000).setBlocksize(8 * 1024); + ColumnFamilyDescriptorBuilder urlFamilyBuilder = + ColumnFamilyDescriptorBuilder.newBuilder(URL_FAMILY_NAME).setMaxVersions(1000) + .setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1) + .setBloomFilterType(BloomType.ROWCOL).setBlocksize(8 * 1024); Set families = new HashSet<>(); families.add(contentFamilyBuilder.build()); families.add(infoFamilyBuilder.build()); + families.add(urlFamilyBuilder.build()); TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName).setColumnFamilies(families).build(); @@ -507,6 +538,9 @@ int run(final Path warcFileInput, final Path outputDir) job.setOutputKeyClass(HBaseKeyWritable.class); job.setOutputValueClass(BytesWritable.class); TableMapReduceUtil.addDependencyJars(job); + // Increase max attempts because S3 might throttle aggressively and ultimately fail a task + job.getConfiguration().setInt("mapred.map.max.attempts", 100); + job.getConfiguration().setInt("mapreduce.map.maxattempts", 100); boolean success = job.waitForCompletion(true); if (!success) { @@ -539,24 +573,51 @@ public static void main(String[] args) throws Exception { public static class LoaderMapper extends Mapper { - protected Configuration conf; - protected Connection conn; - protected BufferedMutator mutator; + protected AsyncConnection conn; + protected AsyncTable table; + protected ExecutorService executor; + // Track futures to drain in cleanup() + protected Set> futures = + Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>())); @Override protected void setup(final Context context) throws IOException, InterruptedException { - conf = context.getConfiguration(); - conn = ConnectionFactory.createConnection(conf); - mutator = conn.getBufferedMutator(getTablename(conf)); + Configuration conf = context.getConfiguration(); + executor = Executors.newWorkStealingPool(Runtime.getRuntime().availableProcessors()); + try { + conn = ConnectionFactory.createAsyncConnection(conf).get(); + table = conn.getTable(getTablename(conf), executor); + } catch (ExecutionException e) { + throw new IOException(e); + } } @Override protected void cleanup(final Context context) throws IOException, InterruptedException { - try { - mutator.close(); - } catch (Exception e) { - LOG.warn("Exception closing Table", e); + // Drain futures. Every future has a chained stage that will remove the element. Treat the + // set of futures as a queue and take one element at a time. + while (true) { + CompletableFuture future; + synchronized (futures) { + try { + // This is the "take" + future = futures.iterator().next(); + } catch (NoSuchElementException e) { + break; + } + } + try { + future.get(); + } catch (ExecutionException e) { + LOG.warn(e.getMessage()); + } } + // Shut down the executor + executor.shutdown(); + if (!executor.awaitTermination(1, TimeUnit.MINUTES)) { + LOG.warn("Pool did not shut down cleanly"); + } + // Close the connection try { conn.close(); } catch (Exception e) { @@ -573,10 +634,7 @@ protected void map(final LongWritable key, final WARCWritable value, final Conte if (warcHeader.getRecordType().equals("response") && targetURI != null) { final String contentType = warcHeader.getField("WARC-Identified-Payload-Type"); if (contentType != null) { - LOG.info("Processing uri=\"" + targetURI + "\", id=" + recordID); - // Make row key - byte[] rowKey; try { rowKey = rowKeyFromTargetURI(targetURI); @@ -590,22 +648,21 @@ protected void map(final LongWritable key, final WARCWritable value, final Conte } // Get the content and calculate the CRC64 - final byte[] content = value.getRecord().getContent(); final CRC64 crc = new CRC64(); crc.update(content); final long crc64 = crc.getValue(); + LOG.info("{}: content {} bytes, crc64={}", targetURI, content.length, + Bytes.toHex(Bytes.toBytes(crc64))); // Store to HBase - - final long ts = getCurrentTime(); + final long ts = getSequence(); final Put put = new Put(rowKey); put.addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER, ts, content); put.addColumn(INFO_FAMILY_NAME, CONTENT_LENGTH_QUALIFIER, ts, Bytes.toBytes(content.length)); put.addColumn(INFO_FAMILY_NAME, CONTENT_TYPE_QUALIFIER, ts, Bytes.toBytes(contentType)); put.addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER, ts, Bytes.toBytes(crc64)); - put.addColumn(INFO_FAMILY_NAME, RECORD_ID_QUALIFIER, ts, Bytes.toBytes(recordID)); put.addColumn(INFO_FAMILY_NAME, TARGET_URI_QUALIFIER, ts, Bytes.toBytes(targetURI)); put.addColumn(INFO_FAMILY_NAME, DATE_QUALIFIER, ts, Bytes.toBytes(warcHeader.getDateString())); @@ -613,20 +670,19 @@ protected void map(final LongWritable key, final WARCWritable value, final Conte if (ipAddr != null) { put.addColumn(INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, ts, Bytes.toBytes(ipAddr)); } - mutator.mutate(put); + final CompletableFuture putFuture = table.put(put); + futures.add(putFuture); + putFuture.thenRun(() -> futures.remove(putFuture)); // Write records out for later verification, one per HBase field except for the // content record, which will be verified by CRC64. - - output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CRC_QUALIFIER, ts), - new BytesWritable(Bytes.toBytes(crc64))); output.write( new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CONTENT_LENGTH_QUALIFIER, ts), new BytesWritable(Bytes.toBytes(content.length))); output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CONTENT_TYPE_QUALIFIER, ts), new BytesWritable(Bytes.toBytes(contentType))); - output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, RECORD_ID_QUALIFIER, ts), - new BytesWritable(Bytes.toBytes(recordID))); + output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CRC_QUALIFIER, ts), + new BytesWritable(Bytes.toBytes(crc64))); output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, TARGET_URI_QUALIFIER, ts), new BytesWritable(Bytes.toBytes(targetURI))); output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, DATE_QUALIFIER, ts), @@ -635,54 +691,29 @@ protected void map(final LongWritable key, final WARCWritable value, final Conte output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, ts), new BytesWritable(Bytes.toBytes(ipAddr))); } - } - } - } - private byte[] rowKeyFromTargetURI(final String targetUri) - throws URISyntaxException, IllegalArgumentException { - final URI uri = new URI(targetUri); - // Ignore the scheme - // Reverse the components of the hostname - String reversedHost; - if (uri.getHost() != null) { - final StringBuilder sb = new StringBuilder(); - final String[] hostComponents = uri.getHost().split("\\."); - for (int i = hostComponents.length - 1; i >= 0; i--) { - sb.append(hostComponents[i]); - if (i != 0) { - sb.append('.'); + // The URLs cf is not tracked for correctness. For now it is used only to exercise + // Increments, to drive some read load during ingest. They can be verified with a + // reducer to sum increments per row and then compare the final count to the table + // data. This is left as a future exercise. + final byte[] refQual = Bytes.add(REF_QUALIFIER, SEP, rowKey); + for (String refUri : extractUrls(content)) { + try { + byte[] urlRowKey = rowKeyFromTargetURI(refUri); + LOG.debug(" -> {}", refUri); + final Increment increment = new Increment(urlRowKey); + increment.setTimestamp(ts); + increment.addColumn(URL_FAMILY_NAME, refQual, 1); + final CompletableFuture incrFuture = table.increment(increment); + futures.add(incrFuture); + incrFuture.thenRun(() -> futures.remove(incrFuture)); + } catch (IllegalArgumentException | URISyntaxException e) { + LOG.debug("Could not make a row key for URI " + refUri + ", ignoring", e); + } } } - reversedHost = sb.toString(); - } else { - throw new IllegalArgumentException("URI is missing host component"); - } - final StringBuilder sb = new StringBuilder(); - sb.append(reversedHost); - if (uri.getPort() >= 0) { - sb.append(':'); - sb.append(uri.getPort()); } - if (uri.getPath() != null) { - sb.append('/'); - sb.append(uri.getPath()); - } - if (uri.getQuery() != null) { - sb.append('?'); - sb.append(uri.getQuery()); - } - if (uri.getFragment() != null) { - sb.append('#'); - sb.append(uri.getFragment()); - } - if (sb.length() > HConstants.MAX_ROW_LENGTH) { - throw new IllegalArgumentException("Key would be too large (length=" + sb.length() - + ", limit=" + HConstants.MAX_ROW_LENGTH); - } - return Bytes.toBytes(sb.toString()); } - } } @@ -745,22 +776,25 @@ public static void main(String[] args) throws Exception { public static class VerifyMapper extends Mapper { - private Connection conn; - private Table table; + protected Connection conn; + protected Table table; @Override protected void setup(final Context context) throws IOException, InterruptedException { - conn = ConnectionFactory.createConnection(context.getConfiguration()); - table = conn.getTable(getTablename(conn.getConfiguration())); + Configuration conf = context.getConfiguration(); + conn = ConnectionFactory.createConnection(conf); + table = conn.getTable(getTablename(conf)); } @Override protected void cleanup(final Context context) throws IOException, InterruptedException { + // Close the table try { table.close(); } catch (Exception e) { - LOG.warn("Exception closing Table", e); + LOG.warn("Exception closing table", e); } + // Close the connection try { conn.close(); } catch (Exception e) { @@ -778,95 +812,142 @@ protected void map(final HBaseKeyWritable key, final BytesWritable value, Bytes.copy(key.getQualifierArray(), key.getQualifierOffset(), key.getQualifierLength()); final long ts = key.getTimestamp(); - int retries = VERIFICATION_READ_RETRIES; - while (true) { - - if (Bytes.equals(INFO_FAMILY_NAME, family) && Bytes.equals(CRC_QUALIFIER, qualifier)) { - - final long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength()); - final Result result = - table.get(new Get(row).addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER) - .addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER).setTimestamp(ts)); - final byte[] content = result.getValue(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER); - if (content == null) { - if (retries-- > 0) { - continue; - } - LOG.error("Row " + Bytes.toStringBinary(row) + ": missing content"); - output.getCounter(Counts.UNREFERENCED).increment(1); - return; - } else { - final CRC64 crc = new CRC64(); - crc.update(content); - if (crc.getValue() != expectedCRC64) { - LOG.error("Row " + Bytes.toStringBinary(row) + ": corrupt content"); - output.getCounter(Counts.CORRUPT).increment(1); - return; - } - } - final byte[] crc = result.getValue(INFO_FAMILY_NAME, CRC_QUALIFIER); - if (crc == null) { - if (retries-- > 0) { - continue; - } - LOG.error("Row " + Bytes.toStringBinary(row) + ": missing i:c"); - output.getCounter(Counts.UNREFERENCED).increment(1); - return; - } - if (Bytes.toLong(crc) != expectedCRC64) { - if (retries-- > 0) { - continue; - } - LOG.error("Row " + Bytes.toStringBinary(row) + ": i:c mismatch"); - output.getCounter(Counts.CORRUPT).increment(1); - return; - } - + if (Bytes.equals(INFO_FAMILY_NAME, family) && Bytes.equals(CRC_QUALIFIER, qualifier)) { + final long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength()); + final Get get = new Get(row).setTimestamp(ts).addFamily(CONTENT_FAMILY_NAME) + .addFamily(INFO_FAMILY_NAME); + Result r; + try { + r = table.get(get); + } catch (Exception e) { + LOG.error("Row " + Bytes.toStringBinary(row) + ": exception", e); + output.getCounter(Counts.UNREFERENCED).increment(1); + return; + } + final byte[] crcBytes = r.getValue(INFO_FAMILY_NAME, CRC_QUALIFIER); + if (crcBytes == null) { + LOG.error("Row " + Bytes.toStringBinary(row) + ": missing i:c"); + output.getCounter(Counts.UNREFERENCED).increment(1); + return; + } + if (Bytes.toLong(crcBytes) != expectedCRC64) { + LOG.error("Row " + Bytes.toStringBinary(row) + ": i:c mismatch"); + output.getCounter(Counts.CORRUPT).increment(1); + return; + } + // If we fell through to here all verification checks have succeeded for the info + // record. + output.getCounter(Counts.REFERENCED).increment(1); + final byte[] content = r.getValue(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER); + if (content == null) { + LOG.error("Row " + Bytes.toStringBinary(row) + ": missing content"); + output.getCounter(Counts.UNREFERENCED).increment(1); + return; } else { - - final Result result = - table.get(new Get(row).addColumn(family, qualifier).setTimestamp(ts)); - final byte[] bytes = result.getValue(family, qualifier); - if (bytes == null) { - if (retries-- > 0) { - continue; - } - LOG.error("Row " + Bytes.toStringBinary(row) + ": missing " - + Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier)); - output.getCounter(Counts.UNREFERENCED).increment(1); - return; - } - if (!Bytes.equals(bytes, 0, bytes.length, value.getBytes(), 0, value.getLength())) { - if (retries-- > 0) { - continue; - } - LOG.error("Row " + Bytes.toStringBinary(row) + ": " + Bytes.toStringBinary(family) - + ":" + Bytes.toStringBinary(qualifier) + " mismatch"); + final CRC64 crc = new CRC64(); + crc.update(content); + if (crc.getValue() != expectedCRC64) { + LOG.error("Row " + Bytes.toStringBinary(row) + ": corrupt content"); output.getCounter(Counts.CORRUPT).increment(1); return; } - } - - // If we fell through to here all verification checks have succeeded, potentially after - // retries, and we must exit the while loop. + // If we fell through to here all verification checks have succeeded for the content + // record. + output.getCounter(Counts.REFERENCED).increment(1); + } else { + final Get get = new Get(row).setTimestamp(ts).addColumn(family, qualifier); + Result r; + try { + r = table.get(get); + } catch (Exception e) { + LOG.error("Row " + Bytes.toStringBinary(row) + ": exception", e); + output.getCounter(Counts.UNREFERENCED).increment(1); + return; + } + final byte[] bytes = r.getValue(family, qualifier); + if (bytes == null) { + LOG.error("Row " + Bytes.toStringBinary(row) + ": missing " + + Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier)); + output.getCounter(Counts.UNREFERENCED).increment(1); + return; + } + if (!Bytes.equals(bytes, 0, bytes.length, value.getBytes(), 0, value.getLength())) { + LOG.error("Row " + Bytes.toStringBinary(row) + ": " + Bytes.toStringBinary(family) + ":" + + Bytes.toStringBinary(qualifier) + " mismatch"); + output.getCounter(Counts.CORRUPT).increment(1); + return; + } + // If we fell through to here all verification checks have succeeded for the info + // record. output.getCounter(Counts.REFERENCED).increment(1); - break; - } } } } private static final AtomicLong counter = new AtomicLong(); + private static final int shift = 8; + + private static long getSequence() { + long t = EnvironmentEdgeManager.currentTime(); + t <<= shift; + t |= (counter.getAndIncrement() % (1 << shift)); + return t; + } + + private static byte[] rowKeyFromTargetURI(final String targetUri) + throws IOException, URISyntaxException, IllegalArgumentException { + final URI uri = new URI(targetUri); + // Ignore the scheme + // Reverse the components of the hostname + String reversedHost; + if (uri.getHost() != null) { + final StringBuilder sb = new StringBuilder(); + final String[] hostComponents = uri.getHost().split("\\."); + for (int i = hostComponents.length - 1; i >= 0; i--) { + sb.append(hostComponents[i]); + if (i != 0) { + sb.append('.'); + } + } + reversedHost = sb.toString(); + } else { + throw new IllegalArgumentException("URI is missing host component"); + } + final ByteArrayOutputStream os = new ByteArrayOutputStream(); + os.write(reversedHost.getBytes(StandardCharsets.UTF_8)); + if (uri.getPort() >= 0) { + os.write(String.format(":%d", uri.getPort()).getBytes(StandardCharsets.UTF_8)); + } + os.write((byte) '|'); + if (uri.getPath() != null) { + os.write(uri.getPath().getBytes(StandardCharsets.UTF_8)); + } + if (uri.getQuery() != null) { + os.write(String.format("?%s", uri.getQuery()).getBytes(StandardCharsets.UTF_8)); + } + if (uri.getFragment() != null) { + os.write(String.format("#%s", uri.getFragment()).getBytes(StandardCharsets.UTF_8)); + } + if (os.size() > HConstants.MAX_ROW_LENGTH) { + throw new IllegalArgumentException( + "Key would be too large (length=" + os.size() + ", limit=" + HConstants.MAX_ROW_LENGTH); + } + return os.toByteArray(); + } - private static long getCurrentTime() { - // Typical hybrid logical clock scheme. - // Take the current time, shift by 16 bits and zero those bits, and replace those bits - // with the low 16 bits of the atomic counter. Mask off the high bit too because timestamps - // cannot be negative. - return ((EnvironmentEdgeManager.currentTime() << 16) & 0x7fff_ffff_ffff_0000L) - | (counter.getAndIncrement() & 0xffffL); + static final Pattern URL_PATTERN = Pattern.compile( + "\\b((https?|ftp|file)://|(www|ftp)\\.)" + "[\\-A-Z0-9+&@#/%?=~_|$!:,\\.;]*[A-Z0-9+&@#/%=~_|$]", + Pattern.CASE_INSENSITIVE); + + private static Collection extractUrls(byte[] content) { + final Set list = new HashSet<>(); // uniques + final Matcher m = URL_PATTERN.matcher(new String(content, StandardCharsets.UTF_8)); + while (m.find()) { + list.add(m.group()); + } + return list; } } From e3a5a67f7468a9d71da78b9f2e1a9e1f6a8330e6 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Tue, 7 Jun 2022 18:29:21 -0700 Subject: [PATCH 2/5] Make increments optional --- .../test/IntegrationTestLoadCommonCrawl.java | 47 ++++++++++++------- 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java index 6ab61180558b..33e3b07407eb 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java @@ -129,6 +129,10 @@ * not specify compression (by default) there is roughly a 10x expansion. Loading the full crawl * archive results in a table approximately 640 TB in size. *

+ * The loader can optionally drive read load during ingest by incrementing counters for each URL + * discovered in content. Add -DIntegrationTestLoadCommonCrawl.increments=true to the + * command line to enable. + *

* You can also split the Loader and Verify stages: *

* Load with:

./bin/hbase @@ -154,6 +158,9 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase { protected static String TABLE_NAME_KEY = "IntegrationTestLoadCommonCrawl.table"; protected static String DEFAULT_TABLE_NAME = "IntegrationTestLoadCommonCrawl"; + protected static String INCREMENTS_NAME_KEY = "IntegrationTestLoadCommonCrawl.increments"; + protected static boolean DEFAULT_INCREMENTS = false; + protected static byte[] CONTENT_FAMILY_NAME = Bytes.toBytes("c"); protected static byte[] INFO_FAMILY_NAME = Bytes.toBytes("i"); protected static byte[] URL_FAMILY_NAME = Bytes.toBytes("u"); @@ -579,11 +586,13 @@ public static class LoaderMapper // Track futures to drain in cleanup() protected Set> futures = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>())); + protected boolean doIncrements; @Override protected void setup(final Context context) throws IOException, InterruptedException { - Configuration conf = context.getConfiguration(); executor = Executors.newWorkStealingPool(Runtime.getRuntime().availableProcessors()); + Configuration conf = context.getConfiguration(); + doIncrements = conf.getBoolean(INCREMENTS_NAME_KEY, DEFAULT_INCREMENTS); try { conn = ConnectionFactory.createAsyncConnection(conf).get(); table = conn.getTable(getTablename(conf), executor); @@ -692,23 +701,25 @@ protected void map(final LongWritable key, final WARCWritable value, final Conte new BytesWritable(Bytes.toBytes(ipAddr))); } - // The URLs cf is not tracked for correctness. For now it is used only to exercise - // Increments, to drive some read load during ingest. They can be verified with a - // reducer to sum increments per row and then compare the final count to the table - // data. This is left as a future exercise. - final byte[] refQual = Bytes.add(REF_QUALIFIER, SEP, rowKey); - for (String refUri : extractUrls(content)) { - try { - byte[] urlRowKey = rowKeyFromTargetURI(refUri); - LOG.debug(" -> {}", refUri); - final Increment increment = new Increment(urlRowKey); - increment.setTimestamp(ts); - increment.addColumn(URL_FAMILY_NAME, refQual, 1); - final CompletableFuture incrFuture = table.increment(increment); - futures.add(incrFuture); - incrFuture.thenRun(() -> futures.remove(incrFuture)); - } catch (IllegalArgumentException | URISyntaxException e) { - LOG.debug("Could not make a row key for URI " + refUri + ", ignoring", e); + if (doIncrements) { + // The URLs cf is not tracked for correctness. For now it is used only to exercise + // Increments, to drive some read load during ingest. They can be verified with a + // reducer to sum increments per row and then compare the final count to the table + // data. This is left as a future exercise. + final byte[] refQual = Bytes.add(REF_QUALIFIER, SEP, rowKey); + for (String refUri : extractUrls(content)) { + try { + byte[] urlRowKey = rowKeyFromTargetURI(refUri); + LOG.debug(" -> {}", refUri); + final Increment increment = new Increment(urlRowKey); + increment.setTimestamp(ts); + increment.addColumn(URL_FAMILY_NAME, refQual, 1); + final CompletableFuture incrFuture = table.increment(increment); + futures.add(incrFuture); + incrFuture.thenRun(() -> futures.remove(incrFuture)); + } catch (IllegalArgumentException | URISyntaxException e) { + LOG.debug("Could not make a row key for URI " + refUri + ", ignoring", e); + } } } } From aa2805b58466f1e44e93a0558ad9b9136f455221 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Tue, 7 Jun 2022 18:29:21 -0700 Subject: [PATCH 3/5] Sum RPC time for writes (loader) and reads (verifier) and mutation bytes submitted. Expose as job counters. --- .../test/IntegrationTestLoadCommonCrawl.java | 43 +++++++++++++++++-- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java index 33e3b07407eb..fd67c72bb1c5 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java @@ -177,7 +177,9 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase { public static enum Counts { REFERENCED, UNREFERENCED, - CORRUPT + CORRUPT, + RPC_BYTES_WRITTEN, + RPC_TIME_MS, } protected Path warcFileInputDir = null; @@ -553,6 +555,15 @@ int run(final Path warcFileInput, final Path outputDir) if (!success) { LOG.error("Failure during job " + job.getJobID()); } + + final Counters counters = job.getCounters(); + for (Counts c : Counts.values()) { + long value = counters.findCounter(c).getValue(); + if (value != 0) { + LOG.info(c + ": " + value); + } + } + return success ? 0 : 1; } @@ -679,9 +690,16 @@ protected void map(final LongWritable key, final WARCWritable value, final Conte if (ipAddr != null) { put.addColumn(INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, ts, Bytes.toBytes(ipAddr)); } + final long putStartTime = System.currentTimeMillis(); final CompletableFuture putFuture = table.put(put); futures.add(putFuture); - putFuture.thenRun(() -> futures.remove(putFuture)); + putFuture.thenAccept((v) -> { + output.getCounter(Counts.RPC_TIME_MS) + .increment(System.currentTimeMillis() - putStartTime); + output.getCounter(Counts.RPC_BYTES_WRITTEN).increment(put.heapSize()); + }).thenRun(() -> { + futures.remove(putFuture); + }); // Write records out for later verification, one per HBase field except for the // content record, which will be verified by CRC64. @@ -714,9 +732,16 @@ protected void map(final LongWritable key, final WARCWritable value, final Conte final Increment increment = new Increment(urlRowKey); increment.setTimestamp(ts); increment.addColumn(URL_FAMILY_NAME, refQual, 1); + final long incrStartTime = System.currentTimeMillis(); final CompletableFuture incrFuture = table.increment(increment); futures.add(incrFuture); - incrFuture.thenRun(() -> futures.remove(incrFuture)); + incrFuture.thenAccept((r) -> { + output.getCounter(Counts.RPC_TIME_MS) + .increment(System.currentTimeMillis() - incrStartTime); + output.getCounter(Counts.RPC_BYTES_WRITTEN).increment(increment.heapSize()); + }).thenRun(() -> { + futures.remove(putFuture); + }); } catch (IllegalArgumentException | URISyntaxException e) { LOG.debug("Could not make a row key for URI " + refUri + ", ignoring", e); } @@ -751,13 +776,18 @@ int run(final Path inputDir) throws IOException, ClassNotFoundException, Interru job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(NullWritable.class); TableMapReduceUtil.addDependencyJars(job); + boolean success = job.waitForCompletion(true); if (!success) { LOG.error("Failure during job " + job.getJobID()); } + final Counters counters = job.getCounters(); for (Counts c : Counts.values()) { - LOG.info(c + ": " + counters.findCounter(c).getValue()); + long value = counters.findCounter(c).getValue(); + if (value != 0) { + LOG.info(c + ": " + value); + } } if (counters.findCounter(Counts.UNREFERENCED).getValue() > 0) { LOG.error("Nonzero UNREFERENCED count from job " + job.getJobID()); @@ -767,6 +797,7 @@ int run(final Path inputDir) throws IOException, ClassNotFoundException, Interru LOG.error("Nonzero CORRUPT count from job " + job.getJobID()); success = false; } + return success ? 0 : 1; } @@ -827,9 +858,11 @@ protected void map(final HBaseKeyWritable key, final BytesWritable value, final long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength()); final Get get = new Get(row).setTimestamp(ts).addFamily(CONTENT_FAMILY_NAME) .addFamily(INFO_FAMILY_NAME); + final long startTime = System.currentTimeMillis(); Result r; try { r = table.get(get); + output.getCounter(Counts.RPC_TIME_MS).increment(System.currentTimeMillis() - startTime); } catch (Exception e) { LOG.error("Row " + Bytes.toStringBinary(row) + ": exception", e); output.getCounter(Counts.UNREFERENCED).increment(1); @@ -867,10 +900,12 @@ protected void map(final HBaseKeyWritable key, final BytesWritable value, // record. output.getCounter(Counts.REFERENCED).increment(1); } else { + final long startTime = System.currentTimeMillis(); final Get get = new Get(row).setTimestamp(ts).addColumn(family, qualifier); Result r; try { r = table.get(get); + output.getCounter(Counts.RPC_TIME_MS).increment(System.currentTimeMillis() - startTime); } catch (Exception e) { LOG.error("Row " + Bytes.toStringBinary(row) + ": exception", e); output.getCounter(Counts.UNREFERENCED).increment(1); From 1fafc8872e8fccf447781c20ce56be7fd5b2ed71 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Tue, 7 Jun 2022 18:29:21 -0700 Subject: [PATCH 4/5] Fix an issue with completion chaining --- .../test/IntegrationTestLoadCommonCrawl.java | 59 +++++++------------ 1 file changed, 22 insertions(+), 37 deletions(-) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java index fd67c72bb1c5..ed0a5cf61a49 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java @@ -28,11 +28,8 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashSet; -import java.util.IdentityHashMap; import java.util.List; -import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -594,9 +591,7 @@ public static class LoaderMapper protected AsyncConnection conn; protected AsyncTable table; protected ExecutorService executor; - // Track futures to drain in cleanup() - protected Set> futures = - Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>())); + protected AtomicLong inflight = new AtomicLong(); protected boolean doIncrements; @Override @@ -614,24 +609,12 @@ protected void setup(final Context context) throws IOException, InterruptedExcep @Override protected void cleanup(final Context context) throws IOException, InterruptedException { - // Drain futures. Every future has a chained stage that will remove the element. Treat the - // set of futures as a queue and take one element at a time. - while (true) { - CompletableFuture future; - synchronized (futures) { - try { - // This is the "take" - future = futures.iterator().next(); - } catch (NoSuchElementException e) { - break; - } - } - try { - future.get(); - } catch (ExecutionException e) { - LOG.warn(e.getMessage()); - } + + while (inflight.get() != 0) { + LOG.info("Operations in flight, waiting"); + Thread.sleep(1000); } + // Shut down the executor executor.shutdown(); if (!executor.awaitTermination(1, TimeUnit.MINUTES)) { @@ -690,15 +673,16 @@ protected void map(final LongWritable key, final WARCWritable value, final Conte if (ipAddr != null) { put.addColumn(INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, ts, Bytes.toBytes(ipAddr)); } + inflight.incrementAndGet(); final long putStartTime = System.currentTimeMillis(); final CompletableFuture putFuture = table.put(put); - futures.add(putFuture); - putFuture.thenAccept((v) -> { - output.getCounter(Counts.RPC_TIME_MS) - .increment(System.currentTimeMillis() - putStartTime); - output.getCounter(Counts.RPC_BYTES_WRITTEN).increment(put.heapSize()); - }).thenRun(() -> { - futures.remove(putFuture); + putFuture.thenRun(() -> { + inflight.decrementAndGet(); + if (!putFuture.isCompletedExceptionally()) { + output.getCounter(Counts.RPC_TIME_MS) + .increment(System.currentTimeMillis() - putStartTime); + output.getCounter(Counts.RPC_BYTES_WRITTEN).increment(put.heapSize()); + } }); // Write records out for later verification, one per HBase field except for the @@ -732,15 +716,16 @@ protected void map(final LongWritable key, final WARCWritable value, final Conte final Increment increment = new Increment(urlRowKey); increment.setTimestamp(ts); increment.addColumn(URL_FAMILY_NAME, refQual, 1); + inflight.incrementAndGet(); final long incrStartTime = System.currentTimeMillis(); final CompletableFuture incrFuture = table.increment(increment); - futures.add(incrFuture); - incrFuture.thenAccept((r) -> { - output.getCounter(Counts.RPC_TIME_MS) - .increment(System.currentTimeMillis() - incrStartTime); - output.getCounter(Counts.RPC_BYTES_WRITTEN).increment(increment.heapSize()); - }).thenRun(() -> { - futures.remove(putFuture); + incrFuture.thenRun(() -> { + inflight.decrementAndGet(); + if (!incrFuture.isCompletedExceptionally()) { + output.getCounter(Counts.RPC_TIME_MS) + .increment(System.currentTimeMillis() - incrStartTime); + output.getCounter(Counts.RPC_BYTES_WRITTEN).increment(increment.heapSize()); + } }); } catch (IllegalArgumentException | URISyntaxException e) { LOG.debug("Could not make a row key for URI " + refUri + ", ignoring", e); From f4afd115a0ba04adc9fa707cd55307942bc5656a Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Tue, 7 Jun 2022 18:29:21 -0700 Subject: [PATCH 5/5] Pause loading if too many operations are in flight --- .../test/IntegrationTestLoadCommonCrawl.java | 55 ++++++++++++------- 1 file changed, 34 insertions(+), 21 deletions(-) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java index ed0a5cf61a49..bdb1c719af28 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java @@ -152,24 +152,27 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase { private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestLoadCommonCrawl.class); - protected static String TABLE_NAME_KEY = "IntegrationTestLoadCommonCrawl.table"; - protected static String DEFAULT_TABLE_NAME = "IntegrationTestLoadCommonCrawl"; - - protected static String INCREMENTS_NAME_KEY = "IntegrationTestLoadCommonCrawl.increments"; - protected static boolean DEFAULT_INCREMENTS = false; - - protected static byte[] CONTENT_FAMILY_NAME = Bytes.toBytes("c"); - protected static byte[] INFO_FAMILY_NAME = Bytes.toBytes("i"); - protected static byte[] URL_FAMILY_NAME = Bytes.toBytes("u"); - protected static byte[] SEP = Bytes.toBytes(":"); - protected static byte[] CONTENT_QUALIFIER = HConstants.EMPTY_BYTE_ARRAY; - protected static byte[] CONTENT_LENGTH_QUALIFIER = Bytes.toBytes("l"); - protected static byte[] CONTENT_TYPE_QUALIFIER = Bytes.toBytes("t"); - protected static byte[] CRC_QUALIFIER = Bytes.toBytes("c"); - protected static byte[] DATE_QUALIFIER = Bytes.toBytes("d"); - protected static byte[] IP_ADDRESS_QUALIFIER = Bytes.toBytes("a"); - protected static byte[] TARGET_URI_QUALIFIER = Bytes.toBytes("u"); - protected static byte[] REF_QUALIFIER = Bytes.toBytes("ref"); + static final String TABLE_NAME_KEY = "IntegrationTestLoadCommonCrawl.table"; + static final String DEFAULT_TABLE_NAME = "IntegrationTestLoadCommonCrawl"; + + static final String INCREMENTS_NAME_KEY = "IntegrationTestLoadCommonCrawl.increments"; + static final boolean DEFAULT_INCREMENTS = false; + + static final int MAX_INFLIGHT = 1000; + static final int INFLIGHT_PAUSE_MS = 100; + + static final byte[] CONTENT_FAMILY_NAME = Bytes.toBytes("c"); + static final byte[] INFO_FAMILY_NAME = Bytes.toBytes("i"); + static final byte[] URL_FAMILY_NAME = Bytes.toBytes("u"); + static final byte[] SEP = Bytes.toBytes(":"); + static final byte[] CONTENT_QUALIFIER = HConstants.EMPTY_BYTE_ARRAY; + static final byte[] CONTENT_LENGTH_QUALIFIER = Bytes.toBytes("l"); + static final byte[] CONTENT_TYPE_QUALIFIER = Bytes.toBytes("t"); + static final byte[] CRC_QUALIFIER = Bytes.toBytes("c"); + static final byte[] DATE_QUALIFIER = Bytes.toBytes("d"); + static final byte[] IP_ADDRESS_QUALIFIER = Bytes.toBytes("a"); + static final byte[] TARGET_URI_QUALIFIER = Bytes.toBytes("u"); + static final byte[] REF_QUALIFIER = Bytes.toBytes("ref"); public static enum Counts { REFERENCED, @@ -612,7 +615,7 @@ protected void cleanup(final Context context) throws IOException, InterruptedExc while (inflight.get() != 0) { LOG.info("Operations in flight, waiting"); - Thread.sleep(1000); + Thread.sleep(INFLIGHT_PAUSE_MS); } // Shut down the executor @@ -673,7 +676,12 @@ protected void map(final LongWritable key, final WARCWritable value, final Conte if (ipAddr != null) { put.addColumn(INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, ts, Bytes.toBytes(ipAddr)); } - inflight.incrementAndGet(); + long pending = inflight.incrementAndGet(); + while (pending > MAX_INFLIGHT) { + LOG.info("Too many operations in flight, waiting"); + Thread.sleep(INFLIGHT_PAUSE_MS); + pending = inflight.get(); + } final long putStartTime = System.currentTimeMillis(); final CompletableFuture putFuture = table.put(put); putFuture.thenRun(() -> { @@ -716,7 +724,12 @@ protected void map(final LongWritable key, final WARCWritable value, final Conte final Increment increment = new Increment(urlRowKey); increment.setTimestamp(ts); increment.addColumn(URL_FAMILY_NAME, refQual, 1); - inflight.incrementAndGet(); + pending = inflight.incrementAndGet(); + while (pending > MAX_INFLIGHT) { + LOG.info("Too many operations in flight, waiting"); + Thread.sleep(INFLIGHT_PAUSE_MS); + pending = inflight.get(); + } final long incrStartTime = System.currentTimeMillis(); final CompletableFuture incrFuture = table.increment(increment); incrFuture.thenRun(() -> {