diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java index 062bec67b884..ff791a9390a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java @@ -20,15 +20,14 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; -import java.util.Date; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.regionserver.CellSink; import org.apache.hadoop.hbase.regionserver.HMobStore; import org.apache.hadoop.hbase.regionserver.HStore; @@ -84,17 +83,21 @@ public InternalScanner createScanner(ScanInfo scanInfo, List s public StoreFileWriter createWriter(InternalScanner scanner, org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd, boolean shouldDropBehind) throws IOException { - // make this writer with tags always because of possible new cells with tags. + // make this writer with tags always because of possible new cells + // with tags. return store.createWriterInTmp(fd.maxKeyCount, compactionCompression, true, true, true, - shouldDropBehind); + shouldDropBehind); } }; public DefaultMobStoreCompactor(Configuration conf, HStore store) { super(conf, store); - // The mob cells reside in the mob-enabled column family which is held by HMobStore. - // During the compaction, the compactor reads the cells from the mob files and - // probably creates new mob files. All of these operations are included in HMobStore, + // The mob cells reside in the mob-enabled column family which is held by + // HMobStore. + // During the compaction, the compactor reads the cells from the mob + // files and + // probably creates new mob files. All of these operations are included + // in HMobStore, // so we need to cast the Store to HMobStore. if (!(store instanceof HMobStore)) { throw new IllegalArgumentException("The store " + store + " is not a HMobStore"); @@ -104,8 +107,8 @@ public DefaultMobStoreCompactor(Configuration conf, HStore store) { } @Override - public List compact(CompactionRequestImpl request, ThroughputController throughputController, - User user) throws IOException { + public List compact(CompactionRequestImpl request, + ThroughputController throughputController, User user) throws IOException { return compact(request, scannerFactory, writerFactory, throughputController, user); } @@ -114,22 +117,27 @@ public List compact(CompactionRequestImpl request, ThroughputController th * This is for when the mob threshold size has changed or if the mob * column family mode has been toggled via an alter table statement. * Compacts the files by the following rules. - * 1. If the Put cell has a mob reference tag, the cell's value is the path of the mob file. + * 1. If the Put cell has a mob reference tag, the cell's value is the path + * of the mob file. *
    *
  1. - * If the value size of a cell is larger than the threshold, this cell is regarded as a mob, + * If the value size of a cell is larger than the threshold, this cell is + * regarded as a mob, * directly copy the (with mob tag) cell into the new store file. *
  2. *
  3. - * Otherwise, retrieve the mob cell from the mob file, and writes a copy of the cell into + * Otherwise, retrieve the mob cell from the mob file, and writes a copy of + * the cell into * the new store file. *
  4. *
* 2. If the Put cell doesn't have a reference tag. *
    *
  1. - * If the value size of a cell is larger than the threshold, this cell is regarded as a mob, - * write this cell to a mob file, and write the path of this mob file to the store file. + * If the value size of a cell is larger than the threshold, this cell is + * regarded as a mob, + * write this cell to a mob file, and write the path of this mob file to + * the store file. *
  2. *
  3. * Otherwise, directly write this cell into the store file. @@ -138,29 +146,38 @@ public List compact(CompactionRequestImpl request, ThroughputController th * 3. Decide how to write a Delete cell. *
      *
    1. - * If a Delete cell does not have a mob reference tag which means this delete marker have not - * been written to the mob del file, write this cell to the mob del file, and write this cell + * If a Delete cell does not have a mob reference tag which means this + * delete marker have not + * been written to the mob del file, write this cell to the mob del file, + * and write this cell * with a ref tag to a store file. *
    2. *
    3. * Otherwise, directly write it to a store file. *
    4. *
    - * After the major compaction on the normal hfiles, we have a guarantee that we have purged all - * deleted or old version mob refs, and the delete markers are written to a del file with the - * suffix _del. Because of this, it is safe to use the del file in the mob compaction. - * The mob compaction doesn't take place in the normal hfiles, it occurs directly in the - * mob files. When the small mob files are merged into bigger ones, the del file is added into + * After the major compaction on the normal hfiles, we have a guarantee + * that we have purged all + * deleted or old version mob refs, and the delete markers are written to a + * del file with the + * suffix _del. Because of this, it is safe to use the del file in the mob + * compaction. + * The mob compaction doesn't take place in the normal hfiles, it occurs + * directly in the + * mob files. When the small mob files are merged into bigger ones, the del + * file is added into * the scanner to filter the deleted cells. * @param fd File details * @param scanner Where to read from. * @param writer Where to write to. * @param smallestReadPoint Smallest read point. - * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint + * @param cleanSeqId When true, remove seqId(used to be mvcc) value which + * is <= smallestReadPoint * @param throughputController The compaction throughput controller. * @param major Is a major compaction. * @param numofFilesToCompact the number of files to compact - * @return Whether compaction ended; false if it was interrupted for any reason. + * @return Whether compaction ended; false if it was interrupted for any + * reason. */ @Override protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, @@ -191,27 +208,31 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); throughputController.start(compactionName); - KeyValueScanner kvs = (scanner instanceof KeyValueScanner)? (KeyValueScanner)scanner : null; - long shippedCallSizeLimit = (long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize(); + KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null; + long shippedCallSizeLimit = + (long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize(); try { try { - // If the mob file writer could not be created, directly write the cell to the store file. - mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount, - compactionCompression, store.getRegionInfo().getStartKey(), true); + // If the mob file writer could not be created, directly write the + // cell to the store file. + mobFileWriter = mobStore + .createWriterInTmp(fd.latestPutTs, fd.maxKeyCount, compactionCompression, + store.getRegionInfo().getStartKey(), true); fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); } catch (IOException e) { LOG.warn("Failed to create mob writer, " - + "we will continue the compaction by writing MOB cells directly in store files", e); + + "we will continue the compaction by writing MOB cells " + "directly in store " + + "files", e); } if (major) { try { - delFileWriter = mobStore.createDelFileWriterInTmp(new Date(fd.latestPutTs), - fd.maxKeyCount, compactionCompression, store.getRegionInfo().getStartKey()); + delFileWriter = mobStore + .createDelFileWriterInTmp(fd.latestPutTs, fd.maxKeyCount, compactionCompression, + store.getRegionInfo().getStartKey()); } catch (IOException e) { LOG.warn( - "Failed to create del writer, " - + "we will continue the compaction by writing delete markers directly in store files", - e); + "Failed to create del writer, " + "we will continue the compaction by writing delete " + + "markers directly in store files", e); } } do { @@ -232,19 +253,24 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel deleteMarkersCount++; } } else if (mobFileWriter == null || c.getTypeByte() != KeyValue.Type.Put.getCode()) { - // If the mob file writer is null or the kv type is not put, directly write the cell + // If the mob file writer is null or the kv type is not put, + // directly write the cell // to the store file. writer.append(c); } else if (MobUtils.isMobReferenceCell(c)) { if (MobUtils.hasValidMobRefCellValue(c)) { int size = MobUtils.getMobValueLength(c); if (size > mobSizeThreshold) { - // If the value size is larger than the threshold, it's regarded as a mob. Since - // its value is already in the mob file, directly write this cell to the store file + // If the value size is larger than the threshold, it's + // regarded as a mob. Since + // its value is already in the mob file, directly write this + // cell to the store file writer.append(c); } else { - // If the value is not larger than the threshold, it's not regarded a mob. Retrieve - // the mob cell from the mob file, and write it back to the store file. + // If the value is not larger than the threshold, it's not + // regarded a mob. Retrieve + // the mob cell from the mob file, and write it back to the + // store file. Cell mobCell = mobStore.resolve(c, false); if (mobCell.getValueLength() != 0) { // put the mob data back to the store file @@ -253,29 +279,35 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel cellsCountCompactedFromMob++; cellsSizeCompactedFromMob += mobCell.getValueLength(); } else { - // If the value of a file is empty, there might be issues when retrieving, - // directly write the cell to the store file, and leave it to be handled by the + // If the value of a file is empty, there might be issues + // when retrieving, + // directly write the cell to the store file, and leave it + // to be handled by the // next compaction. writer.append(c); } } } else { - LOG.warn("The value format of the KeyValue " + c - + " is wrong, its length is less than " + Bytes.SIZEOF_INT); + LOG.warn( + "The value format of the KeyValue " + c + " is wrong, its length is less than " + + Bytes.SIZEOF_INT); writer.append(c); } } else if (c.getValueLength() <= mobSizeThreshold) { - //If value size of a cell is not larger than the threshold, directly write to store file + //If value size of a cell is not larger than the threshold, + // directly write to store file writer.append(c); } else { - // If the value size of a cell is larger than the threshold, it's regarded as a mob, - // write this cell to a mob file, and write the path to the store file. + // If the value size of a cell is larger than the threshold, it's + // regarded as a mob, + // write this cell to a mob file, and write the path to the store + // file. mobCells++; // append the original keyValue in the mob file. mobFileWriter.append(c); - Cell reference = MobUtils.createMobRefCell(c, fileName, - this.mobStore.getRefCellTags()); - // write the cell whose value is the path of a mob file to the store file. + Cell reference = MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags()); + // write the cell whose value is the path of a mob file to the + // store file. writer.append(reference); cellsCountCompactedToMob++; cellsSizeCompactedToMob += c.getValueLength(); @@ -300,7 +332,7 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel } } if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { - ((ShipperListener)writer).beforeShipped(); + ((ShipperListener) writer).beforeShipped(); kvs.shipped(); bytesWrittenProgressForShippedCall = 0; } @@ -309,13 +341,10 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel // logging at DEBUG level if (LOG.isDebugEnabled()) { if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) { - LOG.debug("Compaction progress: " - + compactionName - + " " - + progress - + String.format(", rate=%.2f kB/sec", (bytesWrittenProgressForLog / 1024.0) - / ((now - lastMillis) / 1000.0)) + ", throughputController is " - + throughputController); + LOG.debug("Compaction progress: " + compactionName + " " + progress + String + .format(", rate=%.2f kB/sec", + (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0)) + + ", throughputController is " + throughputController); lastMillis = now; bytesWrittenProgressForLog = 0; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java index 3de7992cb121..a18fd759feb3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; -import java.util.Date; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -48,15 +47,20 @@ /** * An implementation of the StoreFlusher. It extends the DefaultStoreFlusher. - * If the store is not a mob store, the flusher flushes the MemStore the same with + * If the store is not a mob store, the flusher flushes the MemStore the same + * with * DefaultStoreFlusher, - * If the store is a mob store, the flusher flushes the MemStore into two places. + * If the store is a mob store, the flusher flushes the MemStore into two + * places. * One is the store files of HBase, the other is the mob files. *
      - *
    1. Cells that are not PUT type or have the delete mark will be directly flushed to HBase.
    2. + *
    3. Cells that are not PUT type or have the delete mark will be directly + * flushed to HBase.
    4. *
    5. If the size of a cell value is larger than a threshold, it'll be flushed - * to a mob file, another cell with the path of this file will be flushed to HBase.
    6. - *
    7. If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to + * to a mob file, another cell with the path of this file will be flushed to + * HBase.
    8. + *
    9. If the size of a cell value is smaller than or equal with a threshold, + * it'll be flushed to * HBase directly.
    10. *
    * @@ -76,8 +80,8 @@ public DefaultMobStoreFlusher(Configuration conf, HStore store) throws IOExcepti throw new IllegalArgumentException("The store " + store + " is not a HMobStore"); } mobCellValueSizeThreshold = store.getColumnFamilyDescriptor().getMobThreshold(); - this.targetPath = MobUtils.getMobFamilyPath(conf, store.getTableName(), - store.getColumnFamilyName()); + this.targetPath = + MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName()); if (!this.store.getFileSystem().exists(targetPath)) { this.store.getFileSystem().mkdirs(targetPath); } @@ -86,15 +90,20 @@ public DefaultMobStoreFlusher(Configuration conf, HStore store) throws IOExcepti /** * Flushes the snapshot of the MemStore. - * If this store is not a mob store, flush the cells in the snapshot to store files of HBase. - * If the store is a mob one, the flusher flushes the MemStore into two places. + * If this store is not a mob store, flush the cells in the snapshot to + * store files of HBase. + * If the store is a mob one, the flusher flushes the MemStore into two + * places. * One is the store files of HBase, the other is the mob files. *
      - *
    1. Cells that are not PUT type or have the delete mark will be directly flushed to + *
    2. Cells that are not PUT type or have the delete mark will be directly + * flushed to * HBase.
    3. *
    4. If the size of a cell value is larger than a threshold, it'll be - * flushed to a mob file, another cell with the path of this file will be flushed to HBase.
    5. - *
    6. If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to + * flushed to a mob file, another cell with the path of this file will be + * flushed to HBase.
    7. + *
    8. If the size of a cell value is smaller than or equal with a + * threshold, it'll be flushed to * HBase directly.
    9. *
    */ @@ -111,16 +120,20 @@ public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker); StoreFileWriter writer; try { - // TODO: We can fail in the below block before we complete adding this flush to - // list of store files. Add cleanup of anything put on filesystem if we fail. + // TODO: We can fail in the below block before we complete adding this + // flush to + // list of store files. Add cleanup of anything put on filesystem if we + // fail. synchronized (flushLock) { status.setStatus("Flushing " + store + ": creating writer"); // Write the map out to the disk - writer = store.createWriterInTmp(cellsCount, store.getColumnFamilyDescriptor().getCompressionType(), - false, true, true, false); + writer = store + .createWriterInTmp(cellsCount, store.getColumnFamilyDescriptor().getCompressionType(), + false, true, true, false); IOException e = null; try { - // It's a mob store, flush the cells in a mob way. This is the difference of flushing + // It's a mob store, flush the cells in a mob way. This is the + // difference of flushing // between a normal and a mob store. performMobFlush(snapshot, cacheFlushId, scanner, writer, status, throughputController); } catch (IOException ioe) { @@ -139,28 +152,33 @@ public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, scanner.close(); } LOG.info("Mob store is flushed, sequenceid=" + cacheFlushId + ", memsize=" - + StringUtils.TraditionalBinaryPrefix.long2String(snapshot.getDataSize(), "", 1) + - ", hasBloomFilter=" + writer.hasGeneralBloom() + - ", into tmp file " + writer.getPath()); + + StringUtils.TraditionalBinaryPrefix.long2String(snapshot.getDataSize(), "", 1) + + ", hasBloomFilter=" + writer.hasGeneralBloom() + ", into tmp file " + writer.getPath()); result.add(writer.getPath()); return result; } /** * Flushes the cells in the mob store. - *
      In the mob store, the cells with PUT type might have or have no mob tags. - *
    1. If a cell does not have a mob tag, flushing the cell to different files depends - * on the value length. If the length is larger than a threshold, it's flushed to a - * mob file and the mob file is flushed to a store file in HBase. Otherwise, directly + *
        In the mob store, the cells with PUT type might have or have no mob + * tags. + *
      1. If a cell does not have a mob tag, flushing the cell to different + * files depends + * on the value length. If the length is larger than a threshold, it's + * flushed to a + * mob file and the mob file is flushed to a store file in HBase. + * Otherwise, directly * flush the cell to a store file in HBase.
      2. - *
      3. If a cell have a mob tag, its value is a mob file name, directly flush it + *
      4. If a cell have a mob tag, its value is a mob file name, directly + * flush it * to a store file in HBase.
      5. *
      * @param snapshot Memstore snapshot. * @param cacheFlushId Log cache flush sequence number. * @param scanner The scanner of memstore snapshot. * @param writer The store file writer. - * @param status Task that represents the flush operation and may be updated with status. + * @param status Task that represents the flush operation and may be + * updated with status. * @param throughputController A controller to avoid flush too fast. * @throws IOException */ @@ -168,13 +186,14 @@ protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId, InternalScanner scanner, StoreFileWriter writer, MonitoredTask status, ThroughputController throughputController) throws IOException { StoreFileWriter mobFileWriter = null; - int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, - HConstants.COMPACTION_KV_MAX_DEFAULT); + int compactionKVMax = + conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); long mobCount = 0; long mobSize = 0; long time = snapshot.getTimeRangeTracker().getMax(); - mobFileWriter = mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(), - store.getColumnFamilyDescriptor().getCompressionType(), store.getRegionInfo().getStartKey(), false); + mobFileWriter = mobStore.createWriterInTmp(time, snapshot.getCellsCount(), + store.getColumnFamilyDescriptor().getCompressionType(), store.getRegionInfo().getStartKey(), + false); // the target path is {tableName}/.mob/{cfName}/mobFiles // the relative path is mobFiles byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); @@ -183,7 +202,8 @@ protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId, List cells = new ArrayList<>(); boolean hasMore; String flushName = ThroughputControlUtil.getNameForThrottling(store, "flush"); - boolean control = throughputController != null && !store.getRegionInfo().getTable().isSystemTable(); + boolean control = + throughputController != null && !store.getRegionInfo().getTable().isSystemTable(); if (control) { throughputController.start(flushName); } @@ -193,8 +213,10 @@ protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId, hasMore = scanner.next(cells, scannerContext); if (!cells.isEmpty()) { for (Cell c : cells) { - // If we know that this KV is going to be included always, then let us - // set its memstoreTS to 0. This will help us save space when writing to + // If we know that this KV is going to be included always, then + // let us + // set its memstoreTS to 0. This will help us save space when + // writing to // disk. if (c.getValueLength() <= mobCellValueSizeThreshold || MobUtils.isMobReferenceCell(c) || c.getTypeByte() != KeyValue.Type.Put.getCode()) { @@ -207,8 +229,8 @@ protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId, // append the tags to the KeyValue. // The key is same, the value is the filename of the mob file - Cell reference = MobUtils.createMobRefCell(c, fileName, - this.mobStore.getRefCellTags()); + Cell reference = + MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags()); writer.append(reference); } if (control) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java index 304a62e8901b..424c74a97df0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,11 +20,16 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.text.ParseException; -import java.text.SimpleDateFormat; +import java.time.DayOfWeek; +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; +import java.time.temporal.TemporalAdjusters; import java.util.ArrayList; -import java.util.Calendar; +import java.util.Arrays; import java.util.Collection; -import java.util.Date; import java.util.List; import java.util.Optional; import java.util.UUID; @@ -90,15 +94,11 @@ public final class MobUtils { private final static long WEEKLY_THRESHOLD_MULTIPLIER = 7; private final static long MONTHLY_THRESHOLD_MULTIPLIER = 4 * WEEKLY_THRESHOLD_MULTIPLIER; - private static final ThreadLocal LOCAL_FORMAT = - new ThreadLocal() { - @Override - protected SimpleDateFormat initialValue() { - return new SimpleDateFormat("yyyyMMdd"); - } - }; + private static final DateTimeFormatter DATE_TIME_FORMATTER = + DateTimeFormatter.ofPattern("yyyyMMdd"); private static final byte[] REF_DELETE_MARKER_TAG_BYTES; + static { List tags = new ArrayList<>(); tags.add(MobConstants.MOB_REF_TAG); @@ -113,79 +113,71 @@ private MobUtils() { /** * Formats a date to a string. + * * @param date The date. * @return The string format of the date, it's yyyymmdd. */ - public static String formatDate(Date date) { - return LOCAL_FORMAT.get().format(date); + public static String formatDate(LocalDate date) { + return DATE_TIME_FORMATTER.format(date).replace("+", ""); + } + + public static String formatDate(long millisecond) { + return formatDate(toLocalDate(millisecond)); + } + + private static LocalDate toLocalDate(long millisecond) { + return Instant.ofEpochMilli(millisecond).atZone(ZoneId.systemDefault()).toLocalDate(); + } + + private static long toMillisecond(LocalDate localDate) { + return localDate.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); } /** * Parses the string to a date. + * * @param dateString The string format of a date, it's yyyymmdd. - * @return A date. + * @return A LocalDate. * @throws ParseException */ - public static Date parseDate(String dateString) throws ParseException { - return LOCAL_FORMAT.get().parse(dateString); + public static LocalDate parseDate(String dateString) throws ParseException { + return LocalDate.parse(dateString, DATE_TIME_FORMATTER); } /** * Get the first day of the input date's month - * @param calendar Calendar object + * * @param date The date to find out its first day of that month * @return The first day in the month */ - public static Date getFirstDayOfMonth(final Calendar calendar, final Date date) { - - calendar.setTime(date); - calendar.set(Calendar.HOUR_OF_DAY, 0); - calendar.set(Calendar.MINUTE, 0); - calendar.set(Calendar.SECOND, 0); - calendar.set(Calendar.MILLISECOND, 0); - calendar.set(Calendar.DAY_OF_MONTH, 1); - - Date firstDayInMonth = calendar.getTime(); - return firstDayInMonth; + public static LocalDate getFirstDayOfMonth(LocalDate date) { + return date.with(TemporalAdjusters.firstDayOfMonth()); } /** * Get the first day of the input date's week - * @param calendar Calendar object + * FIXME: Monday may NOT be the first day of the week in some countries of the world. + * * @param date The date to find out its first day of that week * @return The first day in the week */ - public static Date getFirstDayOfWeek(final Calendar calendar, final Date date) { - - calendar.setTime(date); - calendar.set(Calendar.HOUR_OF_DAY, 0); - calendar.set(Calendar.MINUTE, 0); - calendar.set(Calendar.SECOND, 0); - calendar.set(Calendar.MILLISECOND, 0); - calendar.setFirstDayOfWeek(Calendar.MONDAY); - calendar.set(Calendar.DAY_OF_WEEK, Calendar.MONDAY); - - Date firstDayInWeek = calendar.getTime(); - return firstDayInWeek; + public static LocalDate getFirstDayOfWeek(LocalDate date) { + return date.with(DayOfWeek.MONDAY); } /** * Whether the current cell is a mob reference cell. + * * @param cell The current cell. * @return True if the cell has a mob reference tag, false if it doesn't. */ public static boolean isMobReferenceCell(Cell cell) { - if (cell.getTagsLength() > 0) { - Optional tag = PrivateCellUtil.getTag(cell, TagType.MOB_REFERENCE_TAG_TYPE); - if (tag.isPresent()) { - return true; - } - } - return false; + return PrivateCellUtil.getTag(cell, TagType.MOB_REFERENCE_TAG_TYPE).isPresent(); } /** * Gets the table name tag. + * * @param cell The current cell. * @return The table name tag. */ @@ -201,18 +193,12 @@ public static Tag getTableNameTag(Cell cell) { /** * Whether the tag list has a mob reference tag. + * * @param tags The tag list. * @return True if the list has a mob reference tag, false if it doesn't. */ public static boolean hasMobReferenceTag(List tags) { - if (!tags.isEmpty()) { - for (Tag tag : tags) { - if (tag.getType() == TagType.MOB_REFERENCE_TAG_TYPE) { - return true; - } - } - } - return false; + return tags.stream().anyMatch(tag -> TagType.MOB_REFERENCE_TAG_TYPE == tag.getType()); } /** @@ -221,58 +207,52 @@ public static boolean hasMobReferenceTag(List tags) { * For a mob cell, in a normal scan the scanners retrieves the mob cell from the mob file. * In a raw scan, the scanner directly returns cell in HBase without retrieve the one in * the mob file. + * * @param scan The current scan. * @return True if it's a raw scan. */ public static boolean isRawMobScan(Scan scan) { byte[] raw = scan.getAttribute(MobConstants.MOB_SCAN_RAW); - try { - return raw != null && Bytes.toBoolean(raw); - } catch (IllegalArgumentException e) { - return false; - } + return toBoolean(raw); } /** * Indicates whether it's a reference only scan. * The information is set in the attribute "hbase.mob.scan.ref.only" of scan. * If it's a ref only scan, only the cells with ref tag are returned. + * * @param scan The current scan. * @return True if it's a ref only scan. */ public static boolean isRefOnlyScan(Scan scan) { byte[] refOnly = scan.getAttribute(MobConstants.MOB_SCAN_REF_ONLY); - try { - return refOnly != null && Bytes.toBoolean(refOnly); - } catch (IllegalArgumentException e) { - return false; - } + return toBoolean(refOnly); } /** * Indicates whether the scan contains the information of caching blocks. * The information is set in the attribute "hbase.mob.cache.blocks" of scan. + * * @param scan The current scan. * @return True when the Scan attribute specifies to cache the MOB blocks. */ public static boolean isCacheMobBlocks(Scan scan) { byte[] cache = scan.getAttribute(MobConstants.MOB_CACHE_BLOCKS); - try { - return cache != null && Bytes.toBoolean(cache); - } catch (IllegalArgumentException e) { - return false; - } + return toBoolean(cache); + } + + private static boolean toBoolean(byte[] byteArray) { + return byteArray != null && byteArray.length == 1 && byteArray[0] != (byte) 0; } /** * Sets the attribute of caching blocks in the scan. * - * @param scan - * The current scan. - * @param cacheBlocks - * True, set the attribute of caching blocks into the scan, the scanner with this scan - * caches blocks. - * False, the scanner doesn't cache blocks for this scan. + * @param scan The current scan. + * @param cacheBlocks True, set the attribute of caching blocks into the scan, the scanner with + * this scan + * caches blocks. + * False, the scanner doesn't cache blocks for this scan. */ public static void setCacheMobBlocks(Scan scan, boolean cacheBlocks) { scan.setAttribute(MobConstants.MOB_CACHE_BLOCKS, Bytes.toBytes(cacheBlocks)); @@ -282,12 +262,13 @@ public static void setCacheMobBlocks(Scan scan, boolean cacheBlocks) { * Cleans the expired mob files. * Cleans the files whose creation date is older than (current - columnFamily.ttl), and * the minVersions of that column family is 0. - * @param fs The current file system. - * @param conf The current configuration. - * @param tableName The current table name. + * + * @param fs The current file system. + * @param conf The current configuration. + * @param tableName The current table name. * @param columnDescriptor The descriptor of the current column family. - * @param cacheConfig The cacheConfig that disables the block cache. - * @param current The current time. + * @param cacheConfig The cacheConfig that disables the block cache. + * @param current The current time. * @throws IOException */ public static void cleanExpiredMobFiles(FileSystem fs, Configuration conf, TableName tableName, @@ -299,16 +280,7 @@ public static void cleanExpiredMobFiles(FileSystem fs, Configuration conf, Table return; } - Calendar calendar = Calendar.getInstance(); - calendar.setTimeInMillis(current - timeToLive * 1000); - calendar.set(Calendar.HOUR_OF_DAY, 0); - calendar.set(Calendar.MINUTE, 0); - calendar.set(Calendar.SECOND, 0); - - Date expireDate = calendar.getTime(); - - LOG.info("MOB HFiles older than " + expireDate.toGMTString() + " will be deleted!"); - + LOG.info("MOB HFiles before than {} will be deleted!", formatDate(current - timeToLive * 1000)); FileStatus[] stats = null; Path mobTableDir = FSUtils.getTableDir(getMobHome(conf), tableName); Path path = getMobFamilyPath(conf, tableName, columnDescriptor.getNameAsString()); @@ -331,12 +303,12 @@ public static void cleanExpiredMobFiles(FileSystem fs, Configuration conf, Table fileName = hfileLink.getOriginPath().getName(); } - Date fileDate = parseDate(MobFileName.getDateFromName(fileName)); - + LocalDate fileDate = parseDate(MobFileName.getDateFromName(fileName)); + long fileTimestamp = toMillisecond(fileDate); if (LOG.isDebugEnabled()) { LOG.debug("Checking file " + fileName); } - if (fileDate.getTime() < expireDate.getTime()) { + if (fileTimestamp + timeToLive * 1000 < current) { if (LOG.isDebugEnabled()) { LOG.debug(fileName + " is an expired file"); } @@ -349,8 +321,7 @@ public static void cleanExpiredMobFiles(FileSystem fs, Configuration conf, Table } if (!filesToClean.isEmpty()) { try { - removeMobFiles(conf, fs, tableName, mobTableDir, columnDescriptor.getName(), - filesToClean); + removeMobFiles(conf, fs, tableName, mobTableDir, columnDescriptor.getName(), filesToClean); deletedFileCount = filesToClean.size(); } catch (IOException e) { LOG.error("Failed to delete the mob files " + filesToClean, e); @@ -362,6 +333,7 @@ public static void cleanExpiredMobFiles(FileSystem fs, Configuration conf, Table /** * Gets the root dir of the mob files. * It's {HBASE_DIR}/mobdir. + * * @param conf The current configuration. * @return the root dir of the mob file. */ @@ -373,6 +345,7 @@ public static Path getMobHome(Configuration conf) { /** * Gets the root dir of the mob files under the qualified HBase root dir. * It's {rootDir}/mobdir. + * * @param rootDir The qualified path of HBase root directory. * @return The root dir of the mob file. */ @@ -382,6 +355,7 @@ public static Path getMobHome(Path rootDir) { /** * Gets the qualified root dir of the mob files. + * * @param conf The current configuration. * @return The qualified root dir. * @throws IOException @@ -396,7 +370,8 @@ public static Path getQualifiedMobRootDir(Configuration conf) throws IOException /** * Gets the table dir of the mob files under the qualified HBase root dir. * It's {rootDir}/mobdir/data/${namespace}/${tableName} - * @param rootDir The qualified path of HBase root directory. + * + * @param rootDir The qualified path of HBase root directory. * @param tableName The name of table. * @return The table dir of the mob file. */ @@ -407,7 +382,8 @@ public static Path getMobTableDir(Path rootDir, TableName tableName) { /** * Gets the region dir of the mob files. * It's {HBASE_DIR}/mobdir/data/{namespace}/{tableName}/{regionEncodedName}. - * @param conf The current configuration. + * + * @param conf The current configuration. * @param tableName The current table name. * @return The region dir of the mob files. */ @@ -418,7 +394,8 @@ public static Path getMobRegionPath(Configuration conf, TableName tableName) { /** * Gets the region dir of the mob files under the specified root dir. * It's {rootDir}/mobdir/data/{namespace}/{tableName}/{regionEncodedName}. - * @param rootDir The qualified path of HBase root directory. + * + * @param rootDir The qualified path of HBase root directory. * @param tableName The current table name. * @return The region dir of the mob files. */ @@ -431,8 +408,9 @@ public static Path getMobRegionPath(Path rootDir, TableName tableName) { /** * Gets the family dir of the mob files. * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}. - * @param conf The current configuration. - * @param tableName The current table name. + * + * @param conf The current configuration. + * @param tableName The current table name. * @param familyName The current family name. * @return The family dir of the mob files. */ @@ -443,6 +421,7 @@ public static Path getMobFamilyPath(Configuration conf, TableName tableName, Str /** * Gets the family dir of the mob files. * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}. + * * @param regionPath The path of mob region which is a dummy one. * @param familyName The current family name. * @return The family dir of the mob files. @@ -455,31 +434,30 @@ public static Path getMobFamilyPath(Path regionPath, String familyName) { * Gets the RegionInfo of the mob files. * This is a dummy region. The mob files are not saved in a region in HBase. * This is only used in mob snapshot. It's internally used only. + * * @param tableName * @return A dummy mob region info. */ public static RegionInfo getMobRegionInfo(TableName tableName) { - return RegionInfoBuilder.newBuilder(tableName) - .setStartKey(MobConstants.MOB_REGION_NAME_BYTES) - .setEndKey(HConstants.EMPTY_END_ROW) - .setSplit(false) - .setRegionId(0) - .build(); + return RegionInfoBuilder.newBuilder(tableName).setStartKey(MobConstants.MOB_REGION_NAME_BYTES) + .setEndKey(HConstants.EMPTY_END_ROW).setSplit(false).setRegionId(0).build(); } /** * Gets whether the current RegionInfo is a mob one. + * * @param regionInfo The current RegionInfo. * @return If true, the current RegionInfo is a mob one. */ public static boolean isMobRegionInfo(RegionInfo regionInfo) { - return regionInfo == null ? false : getMobRegionInfo(regionInfo.getTable()).getEncodedName() + return regionInfo != null && getMobRegionInfo(regionInfo.getTable()).getEncodedName() .equals(regionInfo.getEncodedName()); } /** * Gets whether the current region name follows the pattern of a mob region name. - * @param tableName The current table name. + * + * @param tableName The current table name. * @param regionName The current region name. * @return True if the current region name follows the pattern of a mob region name. */ @@ -489,7 +467,8 @@ public static boolean isMobRegionName(TableName tableName, byte[] regionName) { /** * Gets the working directory of the mob compaction. - * @param root The root directory of the mob compaction. + * + * @param root The root directory of the mob compaction. * @param jobName The current job name. * @return The directory of the mob compaction for the current job. */ @@ -499,27 +478,29 @@ public static Path getCompactionWorkingPath(Path root, String jobName) { /** * Archives the mob files. - * @param conf The current configuration. - * @param fs The current file system. - * @param tableName The table name. - * @param tableDir The table directory. - * @param family The name of the column family. + * + * @param conf The current configuration. + * @param fs The current file system. + * @param tableName The table name. + * @param tableDir The table directory. + * @param family The name of the column family. * @param storeFiles The files to be deleted. * @throws IOException */ public static void removeMobFiles(Configuration conf, FileSystem fs, TableName tableName, Path tableDir, byte[] family, Collection storeFiles) throws IOException { - HFileArchiver.archiveStoreFiles(conf, fs, getMobRegionInfo(tableName), tableDir, family, - storeFiles); + HFileArchiver + .archiveStoreFiles(conf, fs, getMobRegionInfo(tableName), tableDir, family, storeFiles); } /** * Creates a mob reference KeyValue. * The value of the mob reference KeyValue is mobCellValueSize + mobFileName. - * @param cell The original Cell. - * @param fileName The mob file name where the mob reference KeyValue is written. + * + * @param cell The original Cell. + * @param fileName The mob file name where the mob reference KeyValue is written. * @param tableNameTag The tag of the current table name. It's very important in - * cloning the snapshot. + * cloning the snapshot. * @return The mob reference KeyValue. */ public static Cell createMobRefCell(Cell cell, byte[] fileName, Tag tableNameTag) { @@ -544,93 +525,93 @@ public static Cell createMobRefCell(Cell cell, byte[] fileName, byte[] refCellTa /** * Creates a writer for the mob file in temp directory. - * @param conf The current configuration. - * @param fs The current file system. - * @param family The descriptor of the current column family. - * @param date The date string, its format is yyyymmmdd. - * @param basePath The basic path for a temp directory. - * @param maxKeyCount The key count. - * @param compression The compression algorithm. - * @param startKey The hex string of the start key. - * @param cacheConfig The current cache config. + * + * @param conf The current configuration. + * @param fs The current file system. + * @param family The descriptor of the current column family. + * @param date The date string, its format is yyyymmmdd. + * @param basePath The basic path for a temp directory. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param startKey The hex string of the start key. + * @param cacheConfig The current cache config. * @param cryptoContext The encryption context. - * @param isCompaction If the writer is used in compaction. + * @param isCompaction If the writer is used in compaction. * @return The writer for the mob file. * @throws IOException */ public static StoreFileWriter createWriter(Configuration conf, FileSystem fs, ColumnFamilyDescriptor family, String date, Path basePath, long maxKeyCount, Compression.Algorithm compression, String startKey, CacheConfig cacheConfig, - Encryption.Context cryptoContext, boolean isCompaction) - throws IOException { - MobFileName mobFileName = MobFileName.create(startKey, date, - UUID.randomUUID().toString().replaceAll("-", "")); + Encryption.Context cryptoContext, boolean isCompaction) throws IOException { + MobFileName mobFileName = + MobFileName.create(startKey, date, UUID.randomUUID().toString().replaceAll("-", "")); return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression, cacheConfig, cryptoContext, isCompaction); } /** * Creates a writer for the ref file in temp directory. - * @param conf The current configuration. - * @param fs The current file system. - * @param family The descriptor of the current column family. - * @param basePath The basic path for a temp directory. - * @param maxKeyCount The key count. - * @param cacheConfig The current cache config. + * + * @param conf The current configuration. + * @param fs The current file system. + * @param family The descriptor of the current column family. + * @param basePath The basic path for a temp directory. + * @param maxKeyCount The key count. + * @param cacheConfig The current cache config. * @param cryptoContext The encryption context. - * @param isCompaction If the writer is used in compaction. + * @param isCompaction If the writer is used in compaction. * @return The writer for the mob file. * @throws IOException */ public static StoreFileWriter createRefFileWriter(Configuration conf, FileSystem fs, - ColumnFamilyDescriptor family, Path basePath, long maxKeyCount, CacheConfig cacheConfig, - Encryption.Context cryptoContext, boolean isCompaction) - throws IOException { + ColumnFamilyDescriptor family, Path basePath, long maxKeyCount, CacheConfig cacheConfig, + Encryption.Context cryptoContext, boolean isCompaction) throws IOException { return createWriter(conf, fs, family, - new Path(basePath, UUID.randomUUID().toString().replaceAll("-", "")), maxKeyCount, - family.getCompactionCompressionType(), cacheConfig, cryptoContext, - HStore.getChecksumType(conf), HStore.getBytesPerChecksum(conf), family.getBlocksize(), - family.getBloomFilterType(), isCompaction); + new Path(basePath, UUID.randomUUID().toString().replaceAll("-", "")), maxKeyCount, + family.getCompactionCompressionType(), cacheConfig, cryptoContext, + HStore.getChecksumType(conf), HStore.getBytesPerChecksum(conf), family.getBlocksize(), + family.getBloomFilterType(), isCompaction); } /** * Creates a writer for the mob file in temp directory. - * @param conf The current configuration. - * @param fs The current file system. - * @param family The descriptor of the current column family. - * @param date The date string, its format is yyyymmmdd. - * @param basePath The basic path for a temp directory. - * @param maxKeyCount The key count. - * @param compression The compression algorithm. - * @param startKey The start key. - * @param cacheConfig The current cache config. + * + * @param conf The current configuration. + * @param fs The current file system. + * @param family The descriptor of the current column family. + * @param date The date string, its format is yyyyMMdd. + * @param basePath The basic path for a temp directory. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param startKey The start key. + * @param cacheConfig The current cache config. * @param cryptoContext The encryption context. - * @param isCompaction If the writer is used in compaction. + * @param isCompaction If the writer is used in compaction. * @return The writer for the mob file. - * @throws IOException */ public static StoreFileWriter createWriter(Configuration conf, FileSystem fs, ColumnFamilyDescriptor family, String date, Path basePath, long maxKeyCount, Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig, - Encryption.Context cryptoContext, boolean isCompaction) - throws IOException { - MobFileName mobFileName = MobFileName.create(startKey, date, - UUID.randomUUID().toString().replaceAll("-", "")); + Encryption.Context cryptoContext, boolean isCompaction) throws IOException { + MobFileName mobFileName = + MobFileName.create(startKey, date, UUID.randomUUID().toString().replaceAll("-", "")); return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression, - cacheConfig, cryptoContext, isCompaction); + cacheConfig, cryptoContext, isCompaction); } /** * Creates a writer for the del file in temp directory. - * @param conf The current configuration. - * @param fs The current file system. - * @param family The descriptor of the current column family. - * @param date The date string, its format is yyyymmmdd. - * @param basePath The basic path for a temp directory. - * @param maxKeyCount The key count. - * @param compression The compression algorithm. - * @param startKey The start key. - * @param cacheConfig The current cache config. + * + * @param conf The current configuration. + * @param fs The current file system. + * @param family The descriptor of the current column family. + * @param date The date string, its format is yyyymmmdd. + * @param basePath The basic path for a temp directory. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param startKey The start key. + * @param cacheConfig The current cache config. * @param cryptoContext The encryption context. * @return The writer for the del file. * @throws IOException @@ -638,64 +619,61 @@ public static StoreFileWriter createWriter(Configuration conf, FileSystem fs, public static StoreFileWriter createDelFileWriter(Configuration conf, FileSystem fs, ColumnFamilyDescriptor family, String date, Path basePath, long maxKeyCount, Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig, - Encryption.Context cryptoContext) - throws IOException { - String suffix = UUID - .randomUUID().toString().replaceAll("-", "") + "_del"; + Encryption.Context cryptoContext) throws IOException { + String suffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del"; MobFileName mobFileName = MobFileName.create(startKey, date, suffix); return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression, - cacheConfig, cryptoContext, true); + cacheConfig, cryptoContext, true); } /** * Creates a writer for the mob file in temp directory. - * @param conf The current configuration. - * @param fs The current file system. - * @param family The descriptor of the current column family. - * @param mobFileName The mob file name. - * @param basePath The basic path for a temp directory. - * @param maxKeyCount The key count. - * @param compression The compression algorithm. - * @param cacheConfig The current cache config. + * + * @param conf The current configuration. + * @param fs The current file system. + * @param family The descriptor of the current column family. + * @param mobFileName The mob file name. + * @param basePath The basic path for a temp directory. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param cacheConfig The current cache config. * @param cryptoContext The encryption context. - * @param isCompaction If the writer is used in compaction. + * @param isCompaction If the writer is used in compaction. * @return The writer for the mob file. * @throws IOException */ public static StoreFileWriter createWriter(Configuration conf, FileSystem fs, - ColumnFamilyDescriptor family, MobFileName mobFileName, Path basePath, long maxKeyCount, + ColumnFamilyDescriptor family, MobFileName mobFileName, Path basePath, long maxKeyCount, Compression.Algorithm compression, CacheConfig cacheConfig, Encryption.Context cryptoContext, - boolean isCompaction) - throws IOException { - return createWriter(conf, fs, family, - new Path(basePath, mobFileName.getFileName()), maxKeyCount, compression, cacheConfig, - cryptoContext, HStore.getChecksumType(conf), HStore.getBytesPerChecksum(conf), - family.getBlocksize(), BloomType.NONE, isCompaction); + boolean isCompaction) throws IOException { + return createWriter(conf, fs, family, new Path(basePath, mobFileName.getFileName()), + maxKeyCount, compression, cacheConfig, cryptoContext, HStore.getChecksumType(conf), + HStore.getBytesPerChecksum(conf), family.getBlocksize(), BloomType.NONE, isCompaction); } /** * Creates a writer for the mob file in temp directory. - * @param conf The current configuration. - * @param fs The current file system. - * @param family The descriptor of the current column family. - * @param path The path for a temp directory. - * @param maxKeyCount The key count. - * @param compression The compression algorithm. - * @param cacheConfig The current cache config. - * @param cryptoContext The encryption context. - * @param checksumType The checksum type. + * + * @param conf The current configuration. + * @param fs The current file system. + * @param family The descriptor of the current column family. + * @param path The path for a temp directory. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param cacheConfig The current cache config. + * @param cryptoContext The encryption context. + * @param checksumType The checksum type. * @param bytesPerChecksum The bytes per checksum. - * @param blocksize The HFile block size. - * @param bloomType The bloom filter type. - * @param isCompaction If the writer is used in compaction. + * @param blocksize The HFile block size. + * @param bloomType The bloom filter type. + * @param isCompaction If the writer is used in compaction. * @return The writer for the mob file. * @throws IOException */ public static StoreFileWriter createWriter(Configuration conf, FileSystem fs, - ColumnFamilyDescriptor family, Path path, long maxKeyCount, - Compression.Algorithm compression, CacheConfig cacheConfig, Encryption.Context cryptoContext, - ChecksumType checksumType, int bytesPerChecksum, int blocksize, BloomType bloomType, - boolean isCompaction) + ColumnFamilyDescriptor family, Path path, long maxKeyCount, Compression.Algorithm compression, + CacheConfig cacheConfig, Encryption.Context cryptoContext, ChecksumType checksumType, + int bytesPerChecksum, int blocksize, BloomType bloomType, boolean isCompaction) throws IOException { if (compression == null) { compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; @@ -707,18 +685,16 @@ public static StoreFileWriter createWriter(Configuration conf, FileSystem fs, } else { writerCacheConf = cacheConfig; } - HFileContext hFileContext = new HFileContextBuilder().withCompression(compression) - .withIncludesMvcc(true).withIncludesTags(true) - .withCompressTags(family.isCompressTags()) - .withChecksumType(checksumType) - .withBytesPerCheckSum(bytesPerChecksum) - .withBlockSize(blocksize) - .withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding()) - .withEncryptionContext(cryptoContext) - .withCreateTime(EnvironmentEdgeManager.currentTime()).build(); - - StoreFileWriter w = new StoreFileWriter.Builder(conf, writerCacheConf, fs) - .withFilePath(path) + HFileContext hFileContext = + new HFileContextBuilder().withCompression(compression).withIncludesMvcc(true) + .withIncludesTags(true).withCompressTags(family.isCompressTags()) + .withChecksumType(checksumType).withBytesPerCheckSum(bytesPerChecksum) + .withBlockSize(blocksize).withHBaseCheckSum(true) + .withDataBlockEncoding(family.getDataBlockEncoding()) + .withEncryptionContext(cryptoContext) + .withCreateTime(EnvironmentEdgeManager.currentTime()).build(); + + StoreFileWriter w = new StoreFileWriter.Builder(conf, writerCacheConf, fs).withFilePath(path) .withComparator(CellComparator.getInstance()).withBloomType(bloomType) .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build(); return w; @@ -726,10 +702,11 @@ public static StoreFileWriter createWriter(Configuration conf, FileSystem fs, /** * Commits the mob file. - * @param conf The current configuration. - * @param fs The current file system. - * @param sourceFile The path where the mob file is saved. - * @param targetPath The directory path where the source file is renamed to. + * + * @param conf The current configuration. + * @param fs The current file system. + * @param sourceFile The path where the mob file is saved. + * @param targetPath The directory path where the source file is renamed to. * @param cacheConfig The current cache config. * @return The target file path the source file is renamed to. * @throws IOException @@ -755,9 +732,10 @@ public static Path commitFile(Configuration conf, FileSystem fs, final Path sour /** * Validates a mob file by opening and closing it. - * @param conf The current configuration. - * @param fs The current file system. - * @param path The path where the mob file is saved. + * + * @param conf The current configuration. + * @param fs The current file system. + * @param path The path where the mob file is saved. * @param cacheConfig The current cache config. */ private static void validateMobFile(Configuration conf, FileSystem fs, Path path, @@ -782,6 +760,7 @@ private static void validateMobFile(Configuration conf, FileSystem fs, Path path * The value of a mob ref cell consists of two parts, real mob value length and mob file name. * The real mob value length takes 4 bytes. * The remaining part is the mob file name. + * * @param cell The mob ref cell. * @return True if the cell has a valid value. */ @@ -795,6 +774,7 @@ public static boolean hasValidMobRefCellValue(Cell cell) { * The value of a mob ref cell consists of two parts, real mob value length and mob file name. * The real mob value length takes 4 bytes. * The remaining part is the mob file name. + * * @param cell The mob ref cell. * @return The real mob value length. */ @@ -808,6 +788,7 @@ public static int getMobValueLength(Cell cell) { * The value of a mob ref cell consists of two parts, real mob value length and mob file name. * The real mob value length takes 4 bytes. * The remaining part is the mob file name. + * * @param cell The mob ref cell. * @return The mob file name. */ @@ -819,6 +800,7 @@ public static String getMobFileName(Cell cell) { /** * Gets the table name used in the table lock. * The table lock name is a dummy one, it's not a table name. It's tableName + ".mobLock". + * * @param tn The table name. * @return The table name used in table lock. */ @@ -829,24 +811,26 @@ public static TableName getTableLockName(TableName tn) { /** * Performs the mob compaction. - * @param conf the Configuration - * @param fs the file system + * + * @param conf the Configuration + * @param fs the file system * @param tableName the table the compact - * @param hcd the column descriptor - * @param pool the thread pool - * @param allFiles Whether add all mob files into the compaction. + * @param hcd the column descriptor + * @param pool the thread pool + * @param allFiles Whether add all mob files into the compaction. */ public static void doMobCompaction(Configuration conf, FileSystem fs, TableName tableName, - ColumnFamilyDescriptor hcd, ExecutorService pool, boolean allFiles, LockManager.MasterLock lock) - throws IOException { - String className = conf.get(MobConstants.MOB_COMPACTOR_CLASS_KEY, - PartitionedMobCompactor.class.getName()); + ColumnFamilyDescriptor hcd, ExecutorService pool, boolean allFiles, + LockManager.MasterLock lock) throws IOException { + String className = + conf.get(MobConstants.MOB_COMPACTOR_CLASS_KEY, PartitionedMobCompactor.class.getName()); // instantiate the mob compactor. MobCompactor compactor = null; try { - compactor = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] { - Configuration.class, FileSystem.class, TableName.class, ColumnFamilyDescriptor.class, - ExecutorService.class }, new Object[] { conf, fs, tableName, hcd, pool }); + compactor = ReflectionUtils.instantiateWithCustomCtor(className, + new Class[] { Configuration.class, FileSystem.class, TableName.class, + ColumnFamilyDescriptor.class, ExecutorService.class }, + new Object[] { conf, fs, tableName, hcd, pool }); } catch (Exception e) { throw new IOException("Unable to load configured mob file compactor '" + className + "'", e); } @@ -866,6 +850,7 @@ public static void doMobCompaction(Configuration conf, FileSystem fs, TableName /** * Creates a thread pool. + * * @param conf the Configuration * @return A thread pool. */ @@ -877,54 +862,47 @@ public static ExecutorService createMobCompactorThreadPool(Configuration conf) { } final SynchronousQueue queue = new SynchronousQueue<>(); ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS, queue, - Threads.newDaemonThreadFactory("MobCompactor"), new RejectedExecutionHandler() { - @Override - public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { - try { - // waiting for a thread to pick up instead of throwing exceptions. - queue.put(r); - } catch (InterruptedException e) { - throw new RejectedExecutionException(e); - } + Threads.newDaemonThreadFactory("MobCompactor"), new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + try { + // waiting for a thread to pick up instead of throwing exceptions. + queue.put(r); + } catch (InterruptedException e) { + throw new RejectedExecutionException(e); } - }); + } + }); ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true); return pool; } /** * Checks whether this table has mob-enabled columns. + * * @param htd The current table descriptor. * @return Whether this table has mob-enabled columns. */ public static boolean hasMobColumns(TableDescriptor htd) { - ColumnFamilyDescriptor[] hcds = htd.getColumnFamilies(); - for (ColumnFamilyDescriptor hcd : hcds) { - if (hcd.isMobEnabled()) { - return true; - } - } - return false; + return Arrays.stream(htd.getColumnFamilies()).anyMatch(ColumnFamilyDescriptor::isMobEnabled); } /** * Indicates whether return null value when the mob file is missing or corrupt. * The information is set in the attribute "empty.value.on.mobcell.miss" of scan. + * * @param scan The current scan. * @return True if the readEmptyValueOnMobCellMiss is enabled. */ public static boolean isReadEmptyValueOnMobCellMiss(Scan scan) { byte[] readEmptyValueOnMobCellMiss = - scan.getAttribute(MobConstants.EMPTY_VALUE_ON_MOBCELL_MISS); - try { - return readEmptyValueOnMobCellMiss != null && Bytes.toBoolean(readEmptyValueOnMobCellMiss); - } catch (IllegalArgumentException e) { - return false; - } + scan.getAttribute(MobConstants.EMPTY_VALUE_ON_MOBCELL_MISS); + return toBoolean(readEmptyValueOnMobCellMiss); } /** * Creates a mob ref delete marker. + * * @param cell The current delete marker. * @return A delete marker with the ref tag. */ @@ -934,12 +912,14 @@ public static Cell createMobRefDeleteMarker(Cell cell) { /** * Checks if the mob file is expired. - * @param column The descriptor of the current column family. - * @param current The current time. + * + * @param column The descriptor of the current column family. + * @param current The current time in millisecond. * @param fileDate The date string parsed from the mob file name. * @return True if the mob file is expired. */ - public static boolean isMobFileExpired(ColumnFamilyDescriptor column, long current, String fileDate) { + public static boolean isMobFileExpired(ColumnFamilyDescriptor column, long current, + String fileDate) { if (column.getMinVersions() > 0) { return false; } @@ -947,35 +927,29 @@ public static boolean isMobFileExpired(ColumnFamilyDescriptor column, long curre if (Integer.MAX_VALUE == timeToLive) { return false; } - - Date expireDate = new Date(current - timeToLive * 1000); - expireDate = new Date(expireDate.getYear(), expireDate.getMonth(), expireDate.getDate()); + LocalDate currentDate = toLocalDate(current); try { - Date date = parseDate(fileDate); - if (date.getTime() < expireDate.getTime()) { - return true; - } - } catch (ParseException e) { + return parseDate(fileDate).plus(timeToLive, ChronoUnit.SECONDS).isBefore(currentDate); + } catch (Exception e) { LOG.warn("Failed to parse the date " + fileDate, e); return false; } - return false; } /** * fill out partition id based on compaction policy and date, threshold... - * @param id Partition id to be filled out + * + * @param id Partition id to be filled out * @param firstDayOfCurrentMonth The first day in the current month - * @param firstDayOfCurrentWeek The first day in the current week - * @param dateStr Date string from the mob file - * @param policy Mob compaction policy - * @param calendar Calendar object - * @param threshold Mob compaciton threshold configured + * @param firstDayOfCurrentWeek The first day in the current week + * @param dateStr Date string from the mob file + * @param policy Mob compaction policy + * @param threshold Mob compaciton threshold configured * @return true if the file needs to be excluded from compaction */ public static boolean fillPartitionId(final CompactionPartitionId id, - final Date firstDayOfCurrentMonth, final Date firstDayOfCurrentWeek, final String dateStr, - final MobCompactPartitionPolicy policy, final Calendar calendar, final long threshold) { + LocalDate firstDayOfCurrentMonth, LocalDate firstDayOfCurrentWeek, final String dateStr, + final MobCompactPartitionPolicy policy, final long threshold) { boolean skipCompcation = false; id.setThreshold(threshold); @@ -985,10 +959,10 @@ public static boolean fillPartitionId(final CompactionPartitionId id, } long finalThreshold; - Date date; + LocalDate date; try { - date = MobUtils.parseDate(dateStr); - } catch (ParseException e) { + date = parseDate(dateStr); + } catch (Exception e) { LOG.warn("Failed to parse date " + dateStr, e); id.setDate(dateStr); return true; @@ -1006,7 +980,7 @@ public static boolean fillPartitionId(final CompactionPartitionId id, * 1). apply threshold */ if (policy == MobCompactPartitionPolicy.MONTHLY) { - if (date.before(firstDayOfCurrentMonth)) { + if (date.isBefore(firstDayOfCurrentMonth)) { // Check overflow if (threshold < (Long.MAX_VALUE / MONTHLY_THRESHOLD_MULTIPLIER)) { finalThreshold = MONTHLY_THRESHOLD_MULTIPLIER * threshold; @@ -1016,15 +990,15 @@ public static boolean fillPartitionId(final CompactionPartitionId id, id.setThreshold(finalThreshold); // set to the date for the first day of that month - id.setDate(MobUtils.formatDate(MobUtils.getFirstDayOfMonth(calendar, date))); + id.setDate(MobUtils.formatDate(MobUtils.getFirstDayOfMonth(date))); return skipCompcation; } } - if ((policy == MobCompactPartitionPolicy.MONTHLY) || - (policy == MobCompactPartitionPolicy.WEEKLY)) { + if ((policy == MobCompactPartitionPolicy.MONTHLY) || (policy + == MobCompactPartitionPolicy.WEEKLY)) { // Check if it needs to apply weekly multiplier - if (date.before(firstDayOfCurrentWeek)) { + if (date.isBefore(firstDayOfCurrentWeek)) { // Check overflow if (threshold < (Long.MAX_VALUE / WEEKLY_THRESHOLD_MULTIPLIER)) { finalThreshold = WEEKLY_THRESHOLD_MULTIPLIER * threshold; @@ -1033,7 +1007,7 @@ public static boolean fillPartitionId(final CompactionPartitionId id, } id.setThreshold(finalThreshold); - id.setDate(MobUtils.formatDate(MobUtils.getFirstDayOfWeek(calendar, date))); + id.setDate(MobUtils.formatDate(MobUtils.getFirstDayOfWeek(date))); return skipCompcation; } else if (policy == MobCompactPartitionPolicy.MONTHLY) { skipCompcation = true; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java index 9f1ab965a18b..4cc9c4c0c6ac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java @@ -24,12 +24,11 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.time.LocalDate; import java.util.ArrayList; -import java.util.Calendar; import java.util.Collection; import java.util.Collections; import java.util.Comparator; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -82,11 +81,11 @@ import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.security.EncryptionUtil; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,7 +99,9 @@ public class PartitionedMobCompactor extends MobCompactor { private static final Logger LOG = LoggerFactory.getLogger(PartitionedMobCompactor.class); protected long mergeableSize; protected int delFileMaxCount; - /** The number of files compacted in a batch */ + /** + * The number of files compacted in a batch + */ protected int compactionBatchSize; protected int compactionKVMax; @@ -111,20 +112,20 @@ public class PartitionedMobCompactor extends MobCompactor { private Encryption.Context cryptoContext = Encryption.Context.NONE; public PartitionedMobCompactor(Configuration conf, FileSystem fs, TableName tableName, - ColumnFamilyDescriptor column, ExecutorService pool) throws IOException { + ColumnFamilyDescriptor column, ExecutorService pool) throws IOException { super(conf, fs, tableName, column, pool); mergeableSize = conf.getLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, - MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD); - delFileMaxCount = conf.getInt(MobConstants.MOB_DELFILE_MAX_COUNT, - MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT); + MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD); + delFileMaxCount = + conf.getInt(MobConstants.MOB_DELFILE_MAX_COUNT, MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT); // default is 100 compactionBatchSize = conf.getInt(MobConstants.MOB_COMPACTION_BATCH_SIZE, - MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE); + MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE); tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME); - bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, new Path( - tableName.getNamespaceAsString(), tableName.getQualifierAsString()))); - compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX, - HConstants.COMPACTION_KV_MAX_DEFAULT); + bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, + new Path(tableName.getNamespaceAsString(), tableName.getQualifierAsString()))); + compactionKVMax = + this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); Configuration copyOfConf = new Configuration(conf); copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f); compactionCacheConfig = new CacheConfig(copyOfConf); @@ -153,16 +154,18 @@ public List compact(List files, boolean allFiles) throws IOExc /** * Selects the compacted mob/del files. * Iterates the candidates to find out all the del files and small mob files. + * * @param candidates All the candidates. - * @param allFiles Whether add all mob files into the compaction. + * @param allFiles Whether add all mob files into the compaction. * @return A compaction request. * @throws IOException if IO failure is encountered */ - protected PartitionedMobCompactionRequest select(List candidates, - boolean allFiles) throws IOException { + protected PartitionedMobCompactionRequest select(List candidates, boolean allFiles) + throws IOException { final Map filesToCompact = new HashMap<>(); final CompactionPartitionId id = new CompactionPartitionId(); - final NavigableMap delFilesToCompact = new TreeMap<>(); + final NavigableMap delFilesToCompact = + new TreeMap<>(); final CompactionDelPartitionId delId = new CompactionDelPartitionId(); final ArrayList allDelPartitions = new ArrayList<>(); int selectedFileCount = 0; @@ -170,16 +173,15 @@ protected PartitionedMobCompactionRequest select(List candidates, int totalDelFiles = 0; MobCompactPartitionPolicy policy = column.getMobCompactPartitionPolicy(); - Calendar calendar = Calendar.getInstance(); - Date currentDate = new Date(); - Date firstDayOfCurrentMonth = null; - Date firstDayOfCurrentWeek = null; + LocalDate today = LocalDate.now(); + LocalDate firstDayOfCurrentMonth = null; + LocalDate firstDayOfCurrentWeek = null; if (policy == MobCompactPartitionPolicy.MONTHLY) { - firstDayOfCurrentMonth = MobUtils.getFirstDayOfMonth(calendar, currentDate); - firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(calendar, currentDate); + firstDayOfCurrentMonth = MobUtils.getFirstDayOfMonth(today); + firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(today); } else if (policy == MobCompactPartitionPolicy.WEEKLY) { - firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(calendar, currentDate); + firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(today); } // We check if there is any del files so the logic can be optimized for the following processing @@ -239,13 +241,13 @@ protected PartitionedMobCompactionRequest select(List candidates, delFilesToCompact.put(newDelId, delPartition); } delPartition.addDelFile(file); - totalDelFiles ++; + totalDelFiles++; } else { String fileName = linkedFile.getPath().getName(); String date = MobFileName.getDateFromName(fileName); boolean skipCompaction = MobUtils .fillPartitionId(id, firstDayOfCurrentMonth, firstDayOfCurrentWeek, date, policy, - calendar, mergeableSize); + mergeableSize); if (allFiles || (!skipCompaction && (linkedFile.getLen() < id.getThreshold()))) { // add all files if allFiles is true, // otherwise add the small files to the merge pool @@ -282,7 +284,8 @@ protected PartitionedMobCompactionRequest select(List candidates, /* * Merge del files so there are only non-overlapped del file lists */ - for(Map.Entry entry : delFilesToCompact.entrySet()) { + for (Map.Entry entry : delFilesToCompact + .entrySet()) { if (allDelPartitions.size() > 0) { // check if the current key range overlaps the previous one CompactionDelPartition prev = allDelPartitions.get(allDelPartitions.size() - 1); @@ -299,15 +302,15 @@ protected PartitionedMobCompactionRequest select(List candidates, } } - PartitionedMobCompactionRequest request = new PartitionedMobCompactionRequest( - filesToCompact.values(), allDelPartitions); + PartitionedMobCompactionRequest request = + new PartitionedMobCompactionRequest(filesToCompact.values(), allDelPartitions); if (candidates.size() == (totalDelFiles + selectedFileCount + irrelevantFileCount)) { // all the files are selected request.setCompactionType(CompactionType.ALL_FILES); } LOG.info("The compaction type is " + request.getCompactionType() + ", the request has " - + totalDelFiles + " del files, " + selectedFileCount + " selected files, and " - + irrelevantFileCount + " irrelevant files"); + + totalDelFiles + " del files, " + selectedFileCount + " selected files, and " + + irrelevantFileCount + " irrelevant files"); return request; } @@ -318,12 +321,13 @@ protected PartitionedMobCompactionRequest select(List candidates, *
    2. Compacts the selected small mob files and all the del files.
    3. *
    4. If all the candidates are selected, delete the del files.
    5. *
    + * * @param request The compaction request. * @return The paths of new mob files generated in the compaction. * @throws IOException if IO failure is encountered */ protected List performCompaction(PartitionedMobCompactionRequest request) - throws IOException { + throws IOException { // merge the del files, it is per del partition for (CompactionDelPartition delPartition : request.getDelPartitions()) { @@ -359,13 +363,12 @@ protected List performCompaction(PartitionedMobCompactionRequest request) // archive the del files if all the mob files are selected. if (request.type == CompactionType.ALL_FILES && !request.getDelPartitions().isEmpty()) { - LOG.info( - "After a mob compaction with all files selected, archiving the del files "); + LOG.info("After a mob compaction with all files selected, archiving the del files "); for (CompactionDelPartition delPartition : request.getDelPartitions()) { LOG.info(Objects.toString(delPartition.listDelFiles())); try { MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), - delPartition.getStoreFiles()); + delPartition.getStoreFiles()); } catch (IOException e) { LOG.error("Failed to archive the del files " + delPartition.getStoreFiles(), e); } @@ -421,7 +424,8 @@ List getListOfDelFilesForPartition(final CompactionPartition partiti return result; } else { // Check another case which has no overlap - if (Bytes.compareTo(partition.getEndKey(), delPartitions.get(start).getId().getStartKey()) < 0) { + if (Bytes.compareTo(partition.getEndKey(), delPartitions.get(start).getId().getStartKey()) + < 0) { return result; } } @@ -438,14 +442,15 @@ List getListOfDelFilesForPartition(final CompactionPartition partiti return result; } else { --end; - if (Bytes.compareTo(partition.getStartKey(), delPartitions.get(end).getId().getEndKey()) > 0) { + if (Bytes.compareTo(partition.getStartKey(), delPartitions.get(end).getId().getEndKey()) + > 0) { return result; } } } for (int i = start; i <= end; ++i) { - result.addAll(delPartitions.get(i).getStoreFiles()); + result.addAll(delPartitions.get(i).getStoreFiles()); } return result; @@ -453,6 +458,7 @@ List getListOfDelFilesForPartition(final CompactionPartition partiti /** * Compacts the selected small mob files and all the del files. + * * @param request The compaction request. * @return The paths of new mob files after compactions. * @throws IOException if IO failure is encountered @@ -477,8 +483,8 @@ protected List compactMobFiles(final PartitionedMobCompactionRequest reque // Search the delPartitions and collect all the delFiles for the partition // One optimization can do is that if there is no del file, we do not need to // come up with startKey/endKey. - List delFiles = getListOfDelFilesForPartition(partition, - request.getDelPartitions()); + List delFiles = + getListOfDelFilesForPartition(partition, request.getDelPartitions()); results.put(partition.getPartitionId(), pool.submit(new Callable>() { @Override @@ -515,21 +521,20 @@ public List call() throws Exception { /** * Compacts a partition of selected small mob files and all the del files. - * @param request The compaction request. - * @param partition A compaction partition. - * @param delFiles The del files. + * + * @param request The compaction request. + * @param partition A compaction partition. + * @param delFiles The del files. * @param connection The connection to use. - * @param table The current table. + * @param table The current table. * @return The paths of new mob files after compactions. * @throws IOException if IO failure is encountered */ private List compactMobFilePartition(PartitionedMobCompactionRequest request, - CompactionPartition partition, - List delFiles, - Connection connection, - Table table) throws IOException { + CompactionPartition partition, List delFiles, Connection connection, Table table) + throws IOException { if (MobUtils.isMobFileExpired(column, EnvironmentEdgeManager.currentTime(), - partition.getPartitionId().getDate())) { + partition.getPartitionId().getDate())) { // If the files in the partition are expired, do not compact them and directly // return an empty list. return Collections.emptyList(); @@ -556,24 +561,27 @@ private List compactMobFilePartition(PartitionedMobCompactionRequest reque // add the selected mob files and del files into filesToCompact List filesToCompact = new ArrayList<>(); for (int i = offset; i < batch + offset; i++) { - HStoreFile sf = new HStoreFile(fs, files.get(i).getPath(), conf, compactionCacheConfig, - BloomType.NONE, true); + HStoreFile sf = + new HStoreFile(fs, files.get(i).getPath(), conf, compactionCacheConfig, BloomType.NONE, + true); filesToCompact.add(sf); } filesToCompact.addAll(delFiles); // compact the mob files in a batch. compactMobFilesInBatch(request, partition, connection, table, filesToCompact, batch, - bulkloadPathOfPartition, bulkloadColumnPath, newFiles); + bulkloadPathOfPartition, bulkloadColumnPath, newFiles); // move to the next batch. offset += batch; } - LOG.info("Compaction is finished. The number of mob files is changed from " + files.size() - + " to " + newFiles.size()); + LOG.info( + "Compaction is finished. The number of mob files is changed from " + files.size() + " to " + + newFiles.size()); return newFiles; } /** * Closes the readers of store files. + * * @param storeFiles The store files to be closed. */ private void closeStoreFileReaders(List storeFiles) { @@ -588,26 +596,24 @@ private void closeStoreFileReaders(List storeFiles) { /** * Compacts a partition of selected small mob files and all the del files in a batch. - * @param request The compaction request. - * @param partition A compaction partition. - * @param connection To use for transport - * @param table The current table. - * @param filesToCompact The files to be compacted. - * @param batch The number of mob files to be compacted in a batch. + * + * @param request The compaction request. + * @param partition A compaction partition. + * @param connection To use for transport + * @param table The current table. + * @param filesToCompact The files to be compacted. + * @param batch The number of mob files to be compacted in a batch. * @param bulkloadPathOfPartition The directory where the bulkload column of the current - * partition is saved. - * @param bulkloadColumnPath The directory where the bulkload files of current partition - * are saved. - * @param newFiles The paths of new mob files after compactions. + * partition is saved. + * @param bulkloadColumnPath The directory where the bulkload files of current partition + * are saved. + * @param newFiles The paths of new mob files after compactions. * @throws IOException if IO failure is encountered */ private void compactMobFilesInBatch(PartitionedMobCompactionRequest request, - CompactionPartition partition, - Connection connection, Table table, - List filesToCompact, int batch, - Path bulkloadPathOfPartition, Path bulkloadColumnPath, - List newFiles) - throws IOException { + CompactionPartition partition, Connection connection, Table table, + List filesToCompact, int batch, Path bulkloadPathOfPartition, + Path bulkloadColumnPath, List newFiles) throws IOException { // open scanner to the selected mob files and del files. StoreScanner scanner = createScanner(filesToCompact, ScanType.COMPACT_DROP_DELETES); // the mob files to be compacted, not include the del files. @@ -622,7 +628,7 @@ private void compactMobFilesInBatch(PartitionedMobCompactionRequest request, boolean cleanupTmpMobFile = false; boolean cleanupBulkloadDirOfPartition = false; boolean cleanupCommittedMobFile = false; - boolean closeReaders= true; + boolean closeReaders = true; try { try { @@ -684,7 +690,8 @@ private void compactMobFilesInBatch(PartitionedMobCompactionRequest request, try { closeStoreFileReaders(mobFilesToCompact); closeReaders = false; - MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact); + MobUtils + .removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact); } catch (IOException e) { LOG.error("Failed to archive the files " + mobFilesToCompact, e); } @@ -710,14 +717,15 @@ private void compactMobFilesInBatch(PartitionedMobCompactionRequest request, /** * Compacts the del files in batches which avoids opening too many files. - * @param request The compaction request. + * + * @param request The compaction request. * @param delFilePaths Del file paths to compact * @return The paths of new del files after merging or the original files if no merging - * is necessary. + * is necessary. * @throws IOException if IO failure is encountered */ protected List compactDelFiles(PartitionedMobCompactionRequest request, - List delFilePaths) throws IOException { + List delFilePaths) throws IOException { if (delFilePaths.size() <= delFileMaxCount) { return delFilePaths; } @@ -738,8 +746,9 @@ protected List compactDelFiles(PartitionedMobCompactionRequest request, continue; } for (int i = offset; i < batch + offset; i++) { - batchedDelFiles.add(new HStoreFile(fs, delFilePaths.get(i), conf, compactionCacheConfig, - BloomType.NONE, true)); + batchedDelFiles.add( + new HStoreFile(fs, delFilePaths.get(i), conf, compactionCacheConfig, BloomType.NONE, + true)); } // compact the del files in a batch. paths.add(compactDelFilesInBatch(request, batchedDelFiles)); @@ -751,27 +760,28 @@ protected List compactDelFiles(PartitionedMobCompactionRequest request, /** * Compacts the del file in a batch. - * @param request The compaction request. + * + * @param request The compaction request. * @param delFiles The del files. * @return The path of new del file after merging. * @throws IOException if IO failure is encountered */ private Path compactDelFilesInBatch(PartitionedMobCompactionRequest request, - List delFiles) throws IOException { + List delFiles) throws IOException { // create a scanner for the del files. StoreScanner scanner = createScanner(delFiles, ScanType.COMPACT_RETAIN_DELETES); StoreFileWriter writer = null; Path filePath = null; try { - writer = MobUtils.createDelFileWriter(conf, fs, column, - MobUtils.formatDate(new Date(request.selectionTime)), tempPath, Long.MAX_VALUE, - column.getCompactionCompressionType(), HConstants.EMPTY_START_ROW, compactionCacheConfig, - cryptoContext); + writer = MobUtils + .createDelFileWriter(conf, fs, column, MobUtils.formatDate(request.selectionTime), + tempPath, Long.MAX_VALUE, column.getCompactionCompressionType(), + HConstants.EMPTY_START_ROW, compactionCacheConfig, cryptoContext); filePath = writer.getPath(); List cells = new ArrayList<>(); boolean hasMore; ScannerContext scannerContext = - ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); + ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); do { hasMore = scanner.next(cells, scannerContext); for (Cell cell : cells) { @@ -802,15 +812,17 @@ private Path compactDelFilesInBatch(PartitionedMobCompactionRequest request, /** * Creates a store scanner. + * * @param filesToCompact The files to be compacted. - * @param scanType The scan type. + * @param scanType The scan type. * @return The store scanner. * @throws IOException if IO failure is encountered */ private StoreScanner createScanner(List filesToCompact, ScanType scanType) throws IOException { - List scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact, - false, true, false, false, HConstants.LATEST_TIMESTAMP); + List scanners = StoreFileScanner + .getScannersForStoreFiles(filesToCompact, false, true, false, false, + HConstants.LATEST_TIMESTAMP); long ttl = HStore.determineTTLFromFamily(column); ScanInfo scanInfo = new ScanInfo(conf, column, ttl, 0, CellComparator.getInstance()); return new StoreScanner(scanInfo, scanType, scanners); @@ -819,15 +831,14 @@ private StoreScanner createScanner(List filesToCompact, ScanType sca /** * Bulkloads the current file. * - * @param connection to use to get admin/RegionLocator - * @param table The current table. + * @param connection to use to get admin/RegionLocator + * @param table The current table. * @param bulkloadDirectory The path of bulkload directory. - * @param fileName The current file name. + * @param fileName The current file name. * @throws IOException if IO failure is encountered */ private void bulkloadRefFile(Connection connection, Table table, Path bulkloadDirectory, - String fileName) - throws IOException { + String fileName) throws IOException { // bulkload the ref file try { LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); @@ -840,13 +851,14 @@ private void bulkloadRefFile(Connection connection, Table table, Path bulkloadDi /** * Closes the mob file writer. - * @param writer The mob file writer. - * @param maxSeqId Maximum sequence id. + * + * @param writer The mob file writer. + * @param maxSeqId Maximum sequence id. * @param mobCellsCount The number of mob cells. * @throws IOException if IO failure is encountered */ private void closeMobFileWriter(StoreFileWriter writer, long maxSeqId, long mobCellsCount) - throws IOException { + throws IOException { if (writer != null) { writer.appendMetadata(maxSeqId, false, mobCellsCount); try { @@ -859,13 +871,14 @@ private void closeMobFileWriter(StoreFileWriter writer, long maxSeqId, long mobC /** * Closes the ref file writer. - * @param writer The ref file writer. - * @param maxSeqId Maximum sequence id. + * + * @param writer The ref file writer. + * @param maxSeqId Maximum sequence id. * @param bulkloadTime The timestamp at which the bulk load file is created. * @throws IOException if IO failure is encountered */ private void closeRefFileWriter(StoreFileWriter writer, long maxSeqId, long bulkloadTime) - throws IOException { + throws IOException { if (writer != null) { writer.appendMetadata(maxSeqId, false); writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(bulkloadTime)); @@ -880,6 +893,7 @@ private void closeRefFileWriter(StoreFileWriter writer, long maxSeqId, long bulk /** * Gets the max seqId and number of cells of the store files. + * * @param storeFiles The store files. * @return The pair of the max seqId and number of cells of the store files. * @throws IOException if IO failure is encountered @@ -901,6 +915,7 @@ private Pair getFileInfo(List storeFiles) throws IOExcep /** * Deletes a file. + * * @param path The path of the file to be deleted. */ private void deletePath(Path path) { @@ -924,7 +939,7 @@ private FileStatus getLinkedFileStatus(HFileLink link) throws IOException { if (file != null) { return file; } - } catch (FileNotFoundException e) { + } catch (FileNotFoundException e) { } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java index 632642ffb3d3..f44427e2477e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java @@ -20,7 +20,6 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; -import java.util.Date; import java.util.List; import java.util.Map; import java.util.NavigableSet; @@ -109,18 +108,19 @@ public HMobStore(final HRegion region, final ColumnFamilyDescriptor family, this.family = family; this.mobFileCache = region.getMobFileCache(); this.homePath = MobUtils.getMobHome(conf); - this.mobFamilyPath = MobUtils.getMobFamilyPath(conf, this.getTableName(), - family.getNameAsString()); + this.mobFamilyPath = + MobUtils.getMobFamilyPath(conf, this.getTableName(), family.getNameAsString()); List locations = new ArrayList<>(2); locations.add(mobFamilyPath); TableName tn = region.getTableDescriptor().getTableName(); - locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils.getMobRegionInfo(tn) - .getEncodedName(), family.getNameAsString())); + locations.add(HFileArchiveUtil + .getStoreArchivePath(conf, tn, MobUtils.getMobRegionInfo(tn).getEncodedName(), + family.getNameAsString())); map.put(Bytes.toString(tn.getName()), locations); List tags = new ArrayList<>(2); tags.add(MobConstants.MOB_REF_TAG); - Tag tableNameTag = new ArrayBackedTag(TagType.MOB_TABLE_NAME_TAG_TYPE, - getTableName().getName()); + Tag tableNameTag = + new ArrayBackedTag(TagType.MOB_TABLE_NAME_TAG_TYPE, getTableName().getName()); tags.add(tableNameTag); this.refCellTags = TagUtil.fromList(tags); } @@ -148,8 +148,9 @@ protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo, scan.setFilter(refOnlyFilter); } } - return scan.isReversed() ? new ReversedMobStoreScanner(this, scanInfo, scan, targetCols, readPt) - : new MobStoreScanner(this, scanInfo, scan, targetCols, readPt); + return scan.isReversed() ? + new ReversedMobStoreScanner(this, scanInfo, scan, targetCols, readPt) : + new MobStoreScanner(this, scanInfo, scan, targetCols, readPt); } /** @@ -165,6 +166,7 @@ protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo, /** * Gets the temp directory. + * * @return The temp directory. */ private Path getTempDir() { @@ -173,87 +175,89 @@ private Path getTempDir() { /** * Creates the writer for the mob file in temp directory. - * @param date The latest date of written cells. - * @param maxKeyCount The key count. - * @param compression The compression algorithm. - * @param startKey The start key. - * @param isCompaction If the writer is used in compaction. + * + * @param latestWriteTimestamp The latest unix timestamp of written cells by millisecond. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param startKey The start key. + * @param isCompaction If the writer is used in compaction. * @return The writer for the mob file. * @throws IOException */ - public StoreFileWriter createWriterInTmp(Date date, long maxKeyCount, - Compression.Algorithm compression, byte[] startKey, - boolean isCompaction) throws IOException { + public StoreFileWriter createWriterInTmp(long latestWriteTimestamp, long maxKeyCount, + Compression.Algorithm compression, byte[] startKey, boolean isCompaction) throws IOException { if (startKey == null) { startKey = HConstants.EMPTY_START_ROW; } Path path = getTempDir(); - return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey, - isCompaction); + return createWriterInTmp(MobUtils.formatDate(latestWriteTimestamp), path, maxKeyCount, + compression, startKey, isCompaction); } /** * Creates the writer for the del file in temp directory. * The del file keeps tracking the delete markers. Its name has a suffix _del, * the format is [0-9a-f]+(_del)?. - * @param date The latest date of written cells. - * @param maxKeyCount The key count. - * @param compression The compression algorithm. - * @param startKey The start key. + * + * @param latestWriteTimestamp The latest timestamp of written cells by millisecond. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param startKey The start key. * @return The writer for the del file. * @throws IOException */ - public StoreFileWriter createDelFileWriterInTmp(Date date, long maxKeyCount, + public StoreFileWriter createDelFileWriterInTmp(long latestWriteTimestamp, long maxKeyCount, Compression.Algorithm compression, byte[] startKey) throws IOException { if (startKey == null) { startKey = HConstants.EMPTY_START_ROW; } Path path = getTempDir(); - String suffix = UUID - .randomUUID().toString().replaceAll("-", "") + "_del"; - MobFileName mobFileName = MobFileName.create(startKey, MobUtils.formatDate(date), suffix); + String suffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del"; + MobFileName mobFileName = + MobFileName.create(startKey, MobUtils.formatDate(latestWriteTimestamp), suffix); return createWriterInTmp(mobFileName, path, maxKeyCount, compression, true); } /** * Creates the writer for the mob file in temp directory. - * @param date The date string, its format is yyyymmmdd. - * @param basePath The basic path for a temp directory. - * @param maxKeyCount The key count. - * @param compression The compression algorithm. - * @param startKey The start key. + * + * @param date The date string, its format is yyyymmmdd. + * @param basePath The basic path for a temp directory. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param startKey The start key. * @param isCompaction If the writer is used in compaction. * @return The writer for the mob file. * @throws IOException */ public StoreFileWriter createWriterInTmp(String date, Path basePath, long maxKeyCount, - Compression.Algorithm compression, byte[] startKey, - boolean isCompaction) throws IOException { - MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID() - .toString().replaceAll("-", "")); + Compression.Algorithm compression, byte[] startKey, boolean isCompaction) throws IOException { + MobFileName mobFileName = + MobFileName.create(startKey, date, UUID.randomUUID().toString().replaceAll("-", "")); return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression, isCompaction); } /** * Creates the writer for the mob file in temp directory. - * @param mobFileName The mob file name. - * @param basePath The basic path for a temp directory. - * @param maxKeyCount The key count. - * @param compression The compression algorithm. + * + * @param mobFileName The mob file name. + * @param basePath The basic path for a temp directory. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. * @param isCompaction If the writer is used in compaction. * @return The writer for the mob file. * @throws IOException */ - public StoreFileWriter createWriterInTmp(MobFileName mobFileName, Path basePath, - long maxKeyCount, Compression.Algorithm compression, - boolean isCompaction) throws IOException { + public StoreFileWriter createWriterInTmp(MobFileName mobFileName, Path basePath, long maxKeyCount, + Compression.Algorithm compression, boolean isCompaction) throws IOException { return MobUtils.createWriter(conf, region.getFilesystem(), family, - new Path(basePath, mobFileName.getFileName()), maxKeyCount, compression, cacheConf, - cryptoContext, checksumType, bytesPerChecksum, blocksize, BloomType.NONE, isCompaction); + new Path(basePath, mobFileName.getFileName()), maxKeyCount, compression, cacheConf, + cryptoContext, checksumType, bytesPerChecksum, blocksize, BloomType.NONE, isCompaction); } /** * Commits the mob file. + * * @param sourceFile The source file. * @param targetPath The directory path where the source file is renamed to. * @throws IOException @@ -283,8 +287,8 @@ public void commitFile(final Path sourceFile, Path targetPath) throws IOExceptio private void validateMobFile(Path path) throws IOException { HStoreFile storeFile = null; try { - storeFile = new HStoreFile(region.getFilesystem(), path, conf, this.cacheConf, - BloomType.NONE, isPrimaryReplicaStore()); + storeFile = new HStoreFile(region.getFilesystem(), path, conf, this.cacheConf, BloomType.NONE, + isPrimaryReplicaStore()); storeFile.initReader(); } catch (IOException e) { LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e); @@ -299,7 +303,8 @@ private void validateMobFile(Path path) throws IOException { /** * Reads the cell from the mob file, and the read point does not count. * This is used for DefaultMobStoreCompactor where we can read empty value for the missing cell. - * @param reference The cell found in the HBase, its value is a path to a mob file. + * + * @param reference The cell found in the HBase, its value is a path to a mob file. * @param cacheBlocks Whether the scanner should cache blocks. * @return The cell found in the mob file. * @throws IOException @@ -310,16 +315,18 @@ public Cell resolve(Cell reference, boolean cacheBlocks) throws IOException { /** * Reads the cell from the mob file. - * @param reference The cell found in the HBase, its value is a path to a mob file. - * @param cacheBlocks Whether the scanner should cache blocks. - * @param readPt the read point. + * + * @param reference The cell found in the HBase, its value is a path to a mob + * file. + * @param cacheBlocks Whether the scanner should cache blocks. + * @param readPt the read point. * @param readEmptyValueOnMobCellMiss Whether return null value when the mob file is - * missing or corrupt. + * missing or corrupt. * @return The cell found in the mob file. * @throws IOException */ public Cell resolve(Cell reference, boolean cacheBlocks, long readPt, - boolean readEmptyValueOnMobCellMiss) throws IOException { + boolean readEmptyValueOnMobCellMiss) throws IOException { Cell result = null; if (MobUtils.hasValidMobRefCellValue(reference)) { String fileName = MobUtils.getMobFileName(reference); @@ -335,8 +342,9 @@ public Cell resolve(Cell reference, boolean cacheBlocks, long readPt, locations = new ArrayList<>(2); TableName tn = TableName.valueOf(tableNameString); locations.add(MobUtils.getMobFamilyPath(conf, tn, family.getNameAsString())); - locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils - .getMobRegionInfo(tn).getEncodedName(), family.getNameAsString())); + locations.add(HFileArchiveUtil + .getStoreArchivePath(conf, tn, MobUtils.getMobRegionInfo(tn).getEncodedName(), + family.getNameAsString())); map.put(tableNameString, locations); } } finally { @@ -344,24 +352,21 @@ public Cell resolve(Cell reference, boolean cacheBlocks, long readPt, } } result = readCell(locations, fileName, reference, cacheBlocks, readPt, - readEmptyValueOnMobCellMiss); + readEmptyValueOnMobCellMiss); } } if (result == null) { LOG.warn("The Cell result is null, assemble a new Cell with the same row,family," + "qualifier,timestamp,type and tags but with an empty value to return."); result = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY) - .setRow(reference.getRowArray(), reference.getRowOffset(), reference.getRowLength()) - .setFamily(reference.getFamilyArray(), reference.getFamilyOffset(), - reference.getFamilyLength()) - .setQualifier(reference.getQualifierArray(), - reference.getQualifierOffset(), reference.getQualifierLength()) - .setTimestamp(reference.getTimestamp()) - .setType(reference.getTypeByte()) - .setValue(HConstants.EMPTY_BYTE_ARRAY) - .setTags(reference.getTagsArray(), reference.getTagsOffset(), - reference.getTagsLength()) - .build(); + .setRow(reference.getRowArray(), reference.getRowOffset(), reference.getRowLength()) + .setFamily(reference.getFamilyArray(), reference.getFamilyOffset(), + reference.getFamilyLength()) + .setQualifier(reference.getQualifierArray(), reference.getQualifierOffset(), + reference.getQualifierLength()).setTimestamp(reference.getTimestamp()) + .setType(reference.getTypeByte()).setValue(HConstants.EMPTY_BYTE_ARRAY) + .setTags(reference.getTagsArray(), reference.getTagsOffset(), reference.getTagsLength()) + .build(); } return result; } @@ -372,18 +377,19 @@ public Cell resolve(Cell reference, boolean cacheBlocks, long readPt, * 1. The working directory. * 2. The archive directory. * Reads the cell from the files located in both of the above directories. - * @param locations The possible locations where the mob files are saved. - * @param fileName The file to be read. - * @param search The cell to be searched. - * @param cacheMobBlocks Whether the scanner should cache blocks. - * @param readPt the read point. + * + * @param locations The possible locations where the mob files are saved. + * @param fileName The file to be read. + * @param search The cell to be searched. + * @param cacheMobBlocks Whether the scanner should cache blocks. + * @param readPt the read point. * @param readEmptyValueOnMobCellMiss Whether return null value when the mob file is - * missing or corrupt. + * missing or corrupt. * @return The found cell. Null if there's no such a cell. * @throws IOException */ private Cell readCell(List locations, String fileName, Cell search, boolean cacheMobBlocks, - long readPt, boolean readEmptyValueOnMobCellMiss) throws IOException { + long readPt, boolean readEmptyValueOnMobCellMiss) throws IOException { FileSystem fs = getFileSystem(); Throwable throwable = null; for (Path location : locations) { @@ -391,13 +397,14 @@ private Cell readCell(List locations, String fileName, Cell search, boolea Path path = new Path(location, fileName); try { file = mobFileCache.openFile(fs, path, cacheConf); - return readPt != -1 ? file.readCell(search, cacheMobBlocks, readPt) : file.readCell(search, - cacheMobBlocks); + return readPt != -1 ? + file.readCell(search, cacheMobBlocks, readPt) : + file.readCell(search, cacheMobBlocks); } catch (IOException e) { mobFileCache.evictFile(fileName); throwable = e; - if ((e instanceof FileNotFoundException) || - (e.getCause() instanceof FileNotFoundException)) { + if ((e instanceof FileNotFoundException) || (e + .getCause() instanceof FileNotFoundException)) { LOG.debug("Fail to read the cell, the mob file " + path + " doesn't exist", e); } else if (e instanceof CorruptHFileException) { LOG.error("The mob file " + path + " is corrupt", e); @@ -420,11 +427,11 @@ private Cell readCell(List locations, String fileName, Cell search, boolea } } LOG.error("The mob file " + fileName + " could not be found in the locations " + locations - + " or it is corrupt"); + + " or it is corrupt"); if (readEmptyValueOnMobCellMiss) { return null; - } else if ((throwable instanceof FileNotFoundException) - || (throwable.getCause() instanceof FileNotFoundException)) { + } else if ((throwable instanceof FileNotFoundException) || (throwable + .getCause() instanceof FileNotFoundException)) { // The region is re-opened when FileNotFoundException is thrown. // This is not necessary when MOB files cannot be found, because the store files // in a region only contain the references to MOB files and a re-open on a region @@ -439,6 +446,7 @@ private Cell readCell(List locations, String fileName, Cell search, boolea /** * Gets the mob file path. + * * @return The mob file path. */ public Path getPath() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCache.java index b5bbd539e4b2..66134bc7b3f0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCache.java @@ -21,7 +21,8 @@ import static org.junit.Assert.assertNotNull; import java.io.IOException; -import java.util.Date; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -61,7 +62,7 @@ public class TestMobFileCache { private HRegion region; private Configuration conf; private MobFileCache mobFileCache; - private Date currentDate = new Date(); + private long currentTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); private static final String TEST_CACHE_SIZE = "2"; private static final int EXPECTED_CACHE_SIZE_ZERO = 0; private static final int EXPECTED_CACHE_SIZE_ONE = 1; @@ -133,8 +134,7 @@ private Path createMobStoreFile(Configuration conf, String family) throws IOExce /** * Create the mob store file */ - private Path createMobStoreFile(HColumnDescriptor hcd) - throws IOException { + private Path createMobStoreFile(HColumnDescriptor hcd) throws IOException { // Setting up a Store TableName tn = TableName.valueOf(TABLE); HTableDescriptor htd = new HTableDescriptor(tn); @@ -146,15 +146,16 @@ private Path createMobStoreFile(HColumnDescriptor hcd) KeyValue[] keys = new KeyValue[] { key1, key2, key3 }; int maxKeyCount = keys.length; HRegionInfo regionInfo = new HRegionInfo(tn); - StoreFileWriter mobWriter = mobStore.createWriterInTmp(currentDate, - maxKeyCount, hcd.getCompactionCompression(), regionInfo.getStartKey(), false); + StoreFileWriter mobWriter = mobStore + .createWriterInTmp(currentTimeMillis, maxKeyCount, hcd.getCompactionCompression(), + regionInfo.getStartKey(), false); Path mobFilePath = mobWriter.getPath(); String fileName = mobFilePath.getName(); mobWriter.append(key1); mobWriter.append(key2); mobWriter.append(key3); mobWriter.close(); - String targetPathName = MobUtils.formatDate(currentDate); + String targetPathName = MobUtils.formatDate(currentTimeMillis); Path targetPath = new Path(mobStore.getPath(), targetPathName); mobStore.commitFile(mobFilePath, targetPath); return new Path(targetPath, fileName); @@ -171,8 +172,7 @@ public void testMobFileCache() throws Exception { // Before open one file by the MobFileCache assertEquals(EXPECTED_CACHE_SIZE_ZERO, mobFileCache.getCacheSize()); // Open one file by the MobFileCache - CachedMobFile cachedMobFile1 = (CachedMobFile) mobFileCache.openFile( - fs, file1Path, cacheConf); + CachedMobFile cachedMobFile1 = (CachedMobFile) mobFileCache.openFile(fs, file1Path, cacheConf); assertEquals(EXPECTED_CACHE_SIZE_ONE, mobFileCache.getCacheSize()); assertNotNull(cachedMobFile1); assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile1.getReferenceCount()); @@ -191,14 +191,11 @@ public void testMobFileCache() throws Exception { cachedMobFile1.close(); // Close the cached mob file // Reopen three cached file - cachedMobFile1 = (CachedMobFile) mobFileCache.openFile( - fs, file1Path, cacheConf); + cachedMobFile1 = (CachedMobFile) mobFileCache.openFile(fs, file1Path, cacheConf); assertEquals(EXPECTED_CACHE_SIZE_ONE, mobFileCache.getCacheSize()); - CachedMobFile cachedMobFile2 = (CachedMobFile) mobFileCache.openFile( - fs, file2Path, cacheConf); + CachedMobFile cachedMobFile2 = (CachedMobFile) mobFileCache.openFile(fs, file2Path, cacheConf); assertEquals(EXPECTED_CACHE_SIZE_TWO, mobFileCache.getCacheSize()); - CachedMobFile cachedMobFile3 = (CachedMobFile) mobFileCache.openFile( - fs, file3Path, cacheConf); + CachedMobFile cachedMobFile3 = (CachedMobFile) mobFileCache.openFile(fs, file3Path, cacheConf); // Before the evict // Evict the cache, should close the first file 1 assertEquals(EXPECTED_CACHE_SIZE_THREE, mobFileCache.getCacheSize()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileName.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileName.java index ae53ff21feba..7714391ba3ed 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileName.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileName.java @@ -22,8 +22,9 @@ import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertTrue; -import java.util.Date; +import java.time.LocalDate; import java.util.Random; + import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -44,7 +45,7 @@ public class TestMobFileName { private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private String uuid; - private Date date; + private LocalDate date; private String dateStr; private byte[] startKey; @@ -52,7 +53,7 @@ public class TestMobFileName { public void setUp() { Random random = new Random(); uuid = TEST_UTIL.getRandomUUID().toString().replaceAll("-", ""); - date = new Date(); + date = LocalDate.now(); dateStr = MobUtils.formatDate(date); startKey = Bytes.toBytes(random.nextInt()); } @@ -76,8 +77,8 @@ public void testGet() { MobFileName mobFileName = MobFileName.create(startKey, dateStr, uuid); assertEquals(MD5Hash.getMD5AsHex(startKey, 0, startKey.length), mobFileName.getStartKey()); assertEquals(dateStr, mobFileName.getDate()); - assertEquals(mobFileName.getFileName(), MD5Hash.getMD5AsHex(startKey, 0, startKey.length) - + dateStr + uuid); + assertEquals(mobFileName.getFileName(), + MD5Hash.getMD5AsHex(startKey, 0, startKey.length) + dateStr + uuid); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java index 2cf741ed9bd6..4feaf063264c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java @@ -22,11 +22,11 @@ import java.io.IOException; import java.text.ParseException; +import java.time.LocalDate; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; -import java.util.Calendar; import java.util.Collection; import java.util.Collections; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -37,6 +37,7 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -118,8 +119,8 @@ public class TestPartitionedMobCompactor { public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); // Inject our customized DistributedFileSystem - TEST_UTIL.getConfiguration().setClass("fs.hdfs.impl", FaultyDistributedFileSystem.class, - DistributedFileSystem.class); + TEST_UTIL.getConfiguration() + .setClass("fs.hdfs.impl", FaultyDistributedFileSystem.class, DistributedFileSystem.class); TEST_UTIL.startMiniCluster(1); pool = createThreadPool(); } @@ -146,20 +147,21 @@ private void init(String tableName) throws Exception { public void testCompactionSelectAllFilesWeeklyPolicy() throws Exception { String tableName = "testCompactionSelectAllFilesWeeklyPolicy"; testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD, - CompactionType.ALL_FILES, false, false, new Date(), MobCompactPartitionPolicy.WEEKLY, 1); + CompactionType.ALL_FILES, false, false, LocalDate.now(), MobCompactPartitionPolicy.WEEKLY, + 1); } @Test public void testCompactionSelectPartFilesWeeklyPolicy() throws Exception { String tableName = "testCompactionSelectPartFilesWeeklyPolicy"; testCompactionAtMergeSize(tableName, 4000, CompactionType.PART_FILES, false, false, - new Date(), MobCompactPartitionPolicy.WEEKLY, 1); + LocalDate.now(), MobCompactPartitionPolicy.WEEKLY, 1); } @Test public void testCompactionSelectPartFilesWeeklyPolicyWithPastWeek() throws Exception { String tableName = "testCompactionSelectPartFilesWeeklyPolicyWithPastWeek"; - Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS)); + LocalDate dateLastWeek = LocalDate.now().minus(1, ChronoUnit.WEEKS); testCompactionAtMergeSize(tableName, 700, CompactionType.PART_FILES, false, false, dateLastWeek, MobCompactPartitionPolicy.WEEKLY, 7); } @@ -167,47 +169,45 @@ public void testCompactionSelectPartFilesWeeklyPolicyWithPastWeek() throws Excep @Test public void testCompactionSelectAllFilesWeeklyPolicyWithPastWeek() throws Exception { String tableName = "testCompactionSelectAllFilesWeeklyPolicyWithPastWeek"; - Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS)); - testCompactionAtMergeSize(tableName, 3000, CompactionType.ALL_FILES, - false, false, dateLastWeek, MobCompactPartitionPolicy.WEEKLY, 7); + LocalDate dateLastWeek = LocalDate.now().minus(1, ChronoUnit.WEEKS); + testCompactionAtMergeSize(tableName, 3000, CompactionType.ALL_FILES, false, false, dateLastWeek, + MobCompactPartitionPolicy.WEEKLY, 7); } @Test public void testCompactionSelectAllFilesMonthlyPolicy() throws Exception { String tableName = "testCompactionSelectAllFilesMonthlyPolicy"; - Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS)); + LocalDate dateLastWeek = LocalDate.now().minus(1, ChronoUnit.WEEKS); testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD, - CompactionType.ALL_FILES, false, false, dateLastWeek, - MobCompactPartitionPolicy.MONTHLY, 7); + CompactionType.ALL_FILES, false, false, dateLastWeek, MobCompactPartitionPolicy.MONTHLY, 7); } @Test public void testCompactionSelectNoFilesWithinCurrentWeekMonthlyPolicy() throws Exception { String tableName = "testCompactionSelectNoFilesWithinCurrentWeekMonthlyPolicy"; testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD, - CompactionType.PART_FILES, false, false, new Date(), MobCompactPartitionPolicy.MONTHLY, 1); + CompactionType.PART_FILES, false, false, LocalDate.now(), MobCompactPartitionPolicy.MONTHLY, + 1); } @Test public void testCompactionSelectPartFilesMonthlyPolicy() throws Exception { String tableName = "testCompactionSelectPartFilesMonthlyPolicy"; testCompactionAtMergeSize(tableName, 4000, CompactionType.PART_FILES, false, false, - new Date(), MobCompactPartitionPolicy.MONTHLY, 1); + LocalDate.now(), MobCompactPartitionPolicy.MONTHLY, 1); } @Test public void testCompactionSelectPartFilesMonthlyPolicyWithPastWeek() throws Exception { String tableName = "testCompactionSelectPartFilesMonthlyPolicyWithPastWeek"; - Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS)); - Calendar calendar = Calendar.getInstance(); - Date firstDayOfCurrentMonth = MobUtils.getFirstDayOfMonth(calendar, new Date()); + LocalDate dateLastWeek = LocalDate.now().minus(7, ChronoUnit.DAYS); + LocalDate firstDayOfCurrentMonth = MobUtils.getFirstDayOfMonth(LocalDate.now()); CompactionType type = CompactionType.PART_FILES; long mergeSizeMultiFactor = 7; - // The dateLastWeek may not really be last week, suppose that it runs at 2/1/2017, it is going // to be last month and the monthly policy is going to be applied here. - if (dateLastWeek.before(firstDayOfCurrentMonth)) { + if (dateLastWeek.isBefore(firstDayOfCurrentMonth)) { type = CompactionType.ALL_FILES; mergeSizeMultiFactor *= 4; } @@ -219,10 +219,9 @@ public void testCompactionSelectPartFilesMonthlyPolicyWithPastWeek() throws Exce @Test public void testCompactionSelectAllFilesMonthlyPolicyWithPastWeek() throws Exception { String tableName = "testCompactionSelectAllFilesMonthlyPolicyWithPastWeek"; - Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS)); - - testCompactionAtMergeSize(tableName, 3000, CompactionType.ALL_FILES, - false, false, dateLastWeek, MobCompactPartitionPolicy.MONTHLY, 7); + LocalDate dateLastWeek = LocalDate.now().minus(1, ChronoUnit.WEEKS); + testCompactionAtMergeSize(tableName, 3000, CompactionType.ALL_FILES, false, false, dateLastWeek, + MobCompactPartitionPolicy.MONTHLY, 7); } @Test @@ -230,9 +229,9 @@ public void testCompactionSelectPartFilesMonthlyPolicyWithPastMonth() throws Exc String tableName = "testCompactionSelectPartFilesMonthlyPolicyWithPastMonth"; // back 5 weeks, it is going to be a past month - Date dateLastMonth = new Date(System.currentTimeMillis() - (7 * 5 * DAY_IN_MS)); - testCompactionAtMergeSize(tableName, 200, CompactionType.PART_FILES, false, false, dateLastMonth, - MobCompactPartitionPolicy.MONTHLY, 28); + LocalDate dateLastMonth = LocalDate.now().minus(5, ChronoUnit.WEEKS); + testCompactionAtMergeSize(tableName, 200, CompactionType.PART_FILES, false, false, + dateLastMonth, MobCompactPartitionPolicy.MONTHLY, 28); } @Test @@ -240,9 +239,9 @@ public void testCompactionSelectAllFilesMonthlyPolicyWithPastMonth() throws Exce String tableName = "testCompactionSelectAllFilesMonthlyPolicyWithPastMonth"; // back 5 weeks, it is going to be a past month - Date dateLastMonth = new Date(System.currentTimeMillis() - (7 * 5 * DAY_IN_MS)); - testCompactionAtMergeSize(tableName, 750, CompactionType.ALL_FILES, - false, false, dateLastMonth, MobCompactPartitionPolicy.MONTHLY, 28); + LocalDate dateLastMonth = LocalDate.now().minus(5, ChronoUnit.WEEKS); + testCompactionAtMergeSize(tableName, 750, CompactionType.ALL_FILES, false, false, dateLastMonth, + MobCompactPartitionPolicy.MONTHLY, 28); } @Test @@ -266,32 +265,28 @@ public void testCompactionSelectWithForceAllFiles() throws Exception { testCompactionAtMergeSize(tableName, Long.MAX_VALUE, CompactionType.ALL_FILES, true); } - private void testCompactionAtMergeSize(final String tableName, - final long mergeSize, final CompactionType type, final boolean isForceAllFiles) - throws Exception { + private void testCompactionAtMergeSize(final String tableName, final long mergeSize, + final CompactionType type, final boolean isForceAllFiles) throws Exception { testCompactionAtMergeSize(tableName, mergeSize, type, isForceAllFiles, true); } - private void testCompactionAtMergeSize(final String tableName, - final long mergeSize, final CompactionType type, final boolean isForceAllFiles, - final boolean createDelFiles) + private void testCompactionAtMergeSize(final String tableName, final long mergeSize, + final CompactionType type, final boolean isForceAllFiles, final boolean createDelFiles) throws Exception { - Date date = new Date(); + LocalDate date = LocalDate.now(); testCompactionAtMergeSize(tableName, mergeSize, type, isForceAllFiles, createDelFiles, date); } - private void testCompactionAtMergeSize(final String tableName, - final long mergeSize, final CompactionType type, final boolean isForceAllFiles, - final boolean createDelFiles, final Date date) - throws Exception { + private void testCompactionAtMergeSize(final String tableName, final long mergeSize, + final CompactionType type, final boolean isForceAllFiles, final boolean createDelFiles, + LocalDate date) throws Exception { testCompactionAtMergeSize(tableName, mergeSize, type, isForceAllFiles, createDelFiles, date, MobCompactPartitionPolicy.DAILY, 1); } - private void testCompactionAtMergeSize(final String tableName, - final long mergeSize, final CompactionType type, final boolean isForceAllFiles, - final boolean createDelFiles, final Date date, final MobCompactPartitionPolicy policy, - final long mergeSizeMultiFactor) + private void testCompactionAtMergeSize(final String tableName, final long mergeSize, + final CompactionType type, final boolean isForceAllFiles, final boolean createDelFiles, + LocalDate date, final MobCompactPartitionPolicy policy, final long mergeSizeMultiFactor) throws Exception { resetConf(); init(tableName); @@ -304,13 +299,12 @@ private void testCompactionAtMergeSize(final String tableName, createStoreFiles(basePath, family, qf, count, Type.Delete, date); } - Calendar calendar = Calendar.getInstance(); - Date firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(calendar, new Date()); + LocalDate firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(LocalDate.now()); listFiles(); List expectedStartKeys = new ArrayList<>(); - for(FileStatus file : mobFiles) { - if(file.getLen() < mergeSize * mergeSizeMultiFactor) { + for (FileStatus file : mobFiles) { + if (file.getLen() < mergeSize * mergeSizeMultiFactor) { String fileName = file.getPath().getName(); String startKey = fileName.substring(0, 32); @@ -319,14 +313,14 @@ private void testCompactionAtMergeSize(final String tableName, boolean skipCompaction = false; if (policy == MobCompactPartitionPolicy.MONTHLY) { String fileDateStr = MobFileName.getDateFromName(fileName); - Date fileDate; + LocalDate fileDate; try { fileDate = MobUtils.parseDate(fileDateStr); - } catch (ParseException e) { + } catch (ParseException e) { LOG.warn("Failed to parse date " + fileDateStr, e); - fileDate = new Date(); + fileDate = LocalDate.now(); } - if (!fileDate.before(firstDayOfCurrentWeek)) { + if (!fileDate.isBefore(firstDayOfCurrentWeek)) { skipCompaction = true; } } @@ -350,13 +344,14 @@ private void testCompactionAtMergeSize(final String tableName, @Test public void testCompactDelFilesWithDefaultBatchSize() throws Exception { - testCompactDelFilesAtBatchSize(name.getMethodName(), MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE, - MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT); + testCompactDelFilesAtBatchSize(name.getMethodName(), + MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE, MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT); } @Test public void testCompactDelFilesWithSmallBatchSize() throws Exception { - testCompactDelFilesAtBatchSize(name.getMethodName(), 4, MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT); + testCompactDelFilesAtBatchSize(name.getMethodName(), 4, + MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT); } @Test @@ -368,7 +363,7 @@ public void testCompactDelFilesChangeMaxDelFileCount() throws Exception { public void testCompactFilesWithDstDirFull() throws Exception { String tableName = name.getMethodName(); fs = FileSystem.get(conf); - FaultyDistributedFileSystem faultyFs = (FaultyDistributedFileSystem)fs; + FaultyDistributedFileSystem faultyFs = (FaultyDistributedFileSystem) fs; Path testDir = FSUtils.getRootDir(conf); Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME); basePath = new Path(new Path(mobTestDir, tableName), family); @@ -376,7 +371,7 @@ public void testCompactFilesWithDstDirFull() throws Exception { try { int count = 2; // create 2 mob files. - createStoreFiles(basePath, family, qf, count, Type.Put, true, new Date()); + createStoreFiles(basePath, family, qf, count, Type.Put, true, LocalDate.now()); listFiles(); TableName tName = TableName.valueOf(tableName); @@ -396,8 +391,8 @@ public void testCompactFilesWithDstDirFull() throws Exception { assertTrue(ls.length == 1); assertTrue(MobConstants.BULKLOAD_DIR_NAME.equalsIgnoreCase(ls[0].getPath().getName())); - Path bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, new Path( - tName.getNamespaceAsString(), tName.getQualifierAsString()))); + Path bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, + new Path(tName.getNamespaceAsString(), tName.getQualifierAsString()))); // Nothing in bulkLoad directory FileStatus[] lsBulkload = faultyFs.listStatus(bulkloadPath); @@ -415,11 +410,10 @@ private void createMobFile(Path basePath) throws IOException { HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build(); MobFileName mobFileName = null; int ii = 0; - Date today = new Date(); for (byte k0 : KEYS) { byte[] startRow = Bytes.toBytes(ii++); - mobFileName = MobFileName.create(startRow, MobUtils.formatDate(today), mobSuffix); + mobFileName = MobFileName.create(startRow, MobUtils.formatDate(LocalDate.now()), mobSuffix); StoreFileWriter mobFileWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFileContext(meta) @@ -432,7 +426,8 @@ private void createMobFile(Path basePath) throws IOException { byte[] dummyData = new byte[5000]; new Random().nextBytes(dummyData); mobFileWriter.append( - new KeyValue(key, Bytes.toBytes(family), Bytes.toBytes(qf), now, Type.Put, dummyData)); + new KeyValue(key, Bytes.toBytes(family), Bytes.toBytes(qf), now, Type.Put, + dummyData)); } } finally { mobFileWriter.close(); @@ -446,11 +441,10 @@ private void createMobFile(Path basePath) throws IOException { private void createMobDelFile(Path basePath, int startKey) throws IOException { HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build(); MobFileName mobFileName = null; - Date today = new Date(); byte[] startRow = Bytes.toBytes(startKey); - mobFileName = MobFileName.create(startRow, MobUtils.formatDate(today), delSuffix); + mobFileName = MobFileName.create(startRow, MobUtils.formatDate(LocalDate.now()), delSuffix); StoreFileWriter mobFileWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFileContext(meta) @@ -485,28 +479,28 @@ public void testCompactFilesWithoutDelFile() throws Exception { listFiles(); - PartitionedMobCompactor compactor = new PartitionedMobCompactor(conf, fs, - TableName.valueOf(tableName), hcd, pool) { - @Override - public List compact(List files, boolean isForceAllFiles) - throws IOException { - if (files == null || files.isEmpty()) { - return null; - } + PartitionedMobCompactor compactor = + new PartitionedMobCompactor(conf, fs, TableName.valueOf(tableName), hcd, pool) { + @Override + public List compact(List files, boolean isForceAllFiles) + throws IOException { + if (files == null || files.isEmpty()) { + return null; + } - PartitionedMobCompactionRequest request = select(files, isForceAllFiles); + PartitionedMobCompactionRequest request = select(files, isForceAllFiles); - // Make sure that there is no del Partitions - assertTrue(request.getDelPartitions().size() == 0); + // Make sure that there is no del Partitions + assertTrue(request.getDelPartitions().size() == 0); - // Make sure that when there is no startKey/endKey for partition. - for (CompactionPartition p : request.getCompactionPartitions()) { - assertTrue(p.getStartKey() == null); - assertTrue(p.getEndKey() == null); - } - return null; - } - }; + // Make sure that when there is no startKey/endKey for partition. + for (CompactionPartition p : request.getCompactionPartitions()) { + assertTrue(p.getStartKey() == null); + assertTrue(p.getEndKey() == null); + } + return null; + } + }; compactor.compact(allFiles, true); } @@ -518,16 +512,15 @@ static class MyPartitionedMobCompactor extends PartitionedMobCompactor { MyPartitionedMobCompactor(Configuration conf, FileSystem fs, TableName tableName, ColumnFamilyDescriptor column, ExecutorService pool, final int delPartitionSize, - final CacheConfig cacheConf, final int PartitionsIncludeDelFiles) - throws IOException { + final CacheConfig cacheConf, final int PartitionsIncludeDelFiles) throws IOException { super(conf, fs, tableName, column, pool); this.delPartitionSize = delPartitionSize; this.cacheConfig = cacheConf; this.PartitionsIncludeDelFiles = PartitionsIncludeDelFiles; } - @Override public List compact(List files, boolean isForceAllFiles) - throws IOException { + @Override + public List compact(List files, boolean isForceAllFiles) throws IOException { if (files == null || files.isEmpty()) { return null; } @@ -556,8 +549,7 @@ static class MyPartitionedMobCompactor extends PartitionedMobCompactor { // Make sure that CompactionDelPartitions does not overlap CompactionDelPartition prevDelP = null; for (CompactionDelPartition delP : request.getDelPartitions()) { - assertTrue( - Bytes.compareTo(delP.getId().getStartKey(), delP.getId().getEndKey()) <= 0); + assertTrue(Bytes.compareTo(delP.getId().getStartKey(), delP.getId().getEndKey()) <= 0); if (prevDelP != null) { assertTrue( @@ -570,7 +562,8 @@ static class MyPartitionedMobCompactor extends PartitionedMobCompactor { // Make sure that only del files within key range for a partition is included in compaction. // compact the mob files by partitions in parallel. for (CompactionPartition partition : request.getCompactionPartitions()) { - List delFiles = getListOfDelFilesForPartition(partition, request.getDelPartitions()); + List delFiles = + getListOfDelFilesForPartition(partition, request.getDelPartitions()); if (!request.getDelPartitions().isEmpty()) { if (!((Bytes.compareTo(request.getDelPartitions().get(0).getId().getStartKey(), partition.getEndKey()) > 0) || (Bytes.compareTo( @@ -581,9 +574,9 @@ static class MyPartitionedMobCompactor extends PartitionedMobCompactor { assertTrue(delFiles.size() == 1); affectedPartitions += delFiles.size(); assertTrue(Bytes.compareTo(partition.getStartKey(), - CellUtil.cloneRow(delFiles.get(0).getLastKey().get())) <= 0); + CellUtil.cloneRow(delFiles.get(0).getLastKey().get())) <= 0); assertTrue(Bytes.compareTo(partition.getEndKey(), - CellUtil.cloneRow(delFiles.get(delFiles.size() - 1).getFirstKey().get())) >= 0); + CellUtil.cloneRow(delFiles.get(delFiles.size() - 1).getFirstKey().get())) >= 0); } } } @@ -618,8 +611,9 @@ public void testCompactFilesWithOneDelFile() throws Exception { listFiles(); - MyPartitionedMobCompactor compactor = new MyPartitionedMobCompactor(conf, fs, - TableName.valueOf(tableName), hcd, pool, 1, cacheConf, 1); + MyPartitionedMobCompactor compactor = + new MyPartitionedMobCompactor(conf, fs, TableName.valueOf(tableName), hcd, pool, 1, + cacheConf, 1); compactor.compact(allFiles, true); } @@ -638,19 +632,20 @@ public void testCompactFilesWithMultiDelFiles() throws Exception { listFiles(); - MyPartitionedMobCompactor compactor = new MyPartitionedMobCompactor(conf, fs, - TableName.valueOf(tableName), hcd, pool, 3, cacheConf, 3); + MyPartitionedMobCompactor compactor = + new MyPartitionedMobCompactor(conf, fs, TableName.valueOf(tableName), hcd, pool, 3, + cacheConf, 3); compactor.compact(allFiles, true); } - private void testCompactDelFilesAtBatchSize(String tableName, int batchSize, - int delfileMaxCount) throws Exception { + private void testCompactDelFilesAtBatchSize(String tableName, int batchSize, int delfileMaxCount) + throws Exception { resetConf(); init(tableName); // create 20 mob files. - createStoreFiles(basePath, family, qf, 20, Type.Put, new Date()); + createStoreFiles(basePath, family, qf, 20, Type.Put, LocalDate.now()); // create 13 del files - createStoreFiles(basePath, family, qf, 13, Type.Delete, new Date()); + createStoreFiles(basePath, family, qf, 13, Type.Delete, LocalDate.now()); listFiles(); // set the max del file count @@ -662,101 +657,106 @@ private void testCompactDelFilesAtBatchSize(String tableName, int batchSize, /** * Tests the selectFiles - * @param tableName the table name - * @param type the expected compaction type + * + * @param tableName the table name + * @param type the expected compaction type * @param isForceAllFiles whether all the mob files are selected - * @param expected the expected start keys + * @param expected the expected start keys */ private void testSelectFiles(String tableName, final CompactionType type, - final boolean isForceAllFiles, final List expected) throws IOException { - PartitionedMobCompactor compactor = new PartitionedMobCompactor(conf, fs, - TableName.valueOf(tableName), hcd, pool) { - @Override - public List compact(List files, boolean isForceAllFiles) - throws IOException { - if (files == null || files.isEmpty()) { - return null; - } - PartitionedMobCompactionRequest request = select(files, isForceAllFiles); - - // Make sure that when there is no del files, there will be no startKey/endKey for partition. - if (request.getDelPartitions().size() == 0) { - for (CompactionPartition p : request.getCompactionPartitions()) { - assertTrue(p.getStartKey() == null); - assertTrue(p.getEndKey() == null); - } - } + final boolean isForceAllFiles, final List expected) throws IOException { + PartitionedMobCompactor compactor = + new PartitionedMobCompactor(conf, fs, TableName.valueOf(tableName), hcd, pool) { + @Override + public List compact(List files, boolean isForceAllFiles) + throws IOException { + if (files == null || files.isEmpty()) { + return null; + } + PartitionedMobCompactionRequest request = select(files, isForceAllFiles); + + // Make sure that when there is no del files, there will be no startKey/endKey for + // partition. + if (request.getDelPartitions().size() == 0) { + for (CompactionPartition p : request.getCompactionPartitions()) { + assertTrue(p.getStartKey() == null); + assertTrue(p.getEndKey() == null); + } + } - // Make sure that CompactionDelPartitions does not overlap - CompactionDelPartition prevDelP = null; - for (CompactionDelPartition delP : request.getDelPartitions()) { - assertTrue(Bytes.compareTo(delP.getId().getStartKey(), - delP.getId().getEndKey()) <= 0); + // Make sure that CompactionDelPartitions does not overlap + CompactionDelPartition prevDelP = null; + for (CompactionDelPartition delP : request.getDelPartitions()) { + assertTrue( + Bytes.compareTo(delP.getId().getStartKey(), delP.getId().getEndKey()) <= 0); - if (prevDelP != null) { - assertTrue(Bytes.compareTo(prevDelP.getId().getEndKey(), - delP.getId().getStartKey()) < 0); - } - } + if (prevDelP != null) { + assertTrue( + Bytes.compareTo(prevDelP.getId().getEndKey(), delP.getId().getStartKey()) < 0); + } + } - // Make sure that only del files within key range for a partition is included in compaction. - // compact the mob files by partitions in parallel. - for (CompactionPartition partition : request.getCompactionPartitions()) { - List delFiles = getListOfDelFilesForPartition(partition, request.getDelPartitions()); - if (!request.getDelPartitions().isEmpty()) { - if (!((Bytes.compareTo(request.getDelPartitions().get(0).getId().getStartKey(), - partition.getEndKey()) > 0) || (Bytes.compareTo( - request.getDelPartitions().get(request.getDelPartitions().size() - 1).getId() - .getEndKey(), partition.getStartKey()) < 0))) { - if (delFiles.size() > 0) { - assertTrue(Bytes.compareTo(partition.getStartKey(), - delFiles.get(0).getFirstKey().get().getRowArray()) >= 0); - assertTrue(Bytes.compareTo(partition.getEndKey(), - delFiles.get(delFiles.size() - 1).getLastKey().get().getRowArray()) <= 0); + // Make sure that only del files within key range for a partition is included in + // compaction. + // compact the mob files by partitions in parallel. + for (CompactionPartition partition : request.getCompactionPartitions()) { + List delFiles = + getListOfDelFilesForPartition(partition, request.getDelPartitions()); + if (!request.getDelPartitions().isEmpty()) { + if (!((Bytes.compareTo(request.getDelPartitions().get(0).getId().getStartKey(), + partition.getEndKey()) > 0) || (Bytes.compareTo( + request.getDelPartitions().get(request.getDelPartitions().size() - 1).getId() + .getEndKey(), partition.getStartKey()) < 0))) { + if (delFiles.size() > 0) { + assertTrue(Bytes.compareTo(partition.getStartKey(), + delFiles.get(0).getFirstKey().get().getRowArray()) >= 0); + assertTrue(Bytes.compareTo(partition.getEndKey(), + delFiles.get(delFiles.size() - 1).getLastKey().get().getRowArray()) <= 0); + } + } } } - } - } - // assert the compaction type - assertEquals(type, request.type); - // assert get the right partitions - compareCompactedPartitions(expected, request.compactionPartitions); - // assert get the right del files - compareDelFiles(request.getDelPartitions()); - return null; - } - }; + // assert the compaction type + assertEquals(type, request.type); + // assert get the right partitions + compareCompactedPartitions(expected, request.compactionPartitions); + // assert get the right del files + compareDelFiles(request.getDelPartitions()); + return null; + } + }; compactor.compact(allFiles, isForceAllFiles); } /** * Tests the compacteDelFile - * @param tableName the table name + * + * @param tableName the table name * @param expectedFileCount the expected file count * @param expectedCellCount the expected cell count - * @param isForceAllFiles whether all the mob files are selected + * @param isForceAllFiles whether all the mob files are selected */ private void testCompactDelFiles(String tableName, final int expectedFileCount, final int expectedCellCount, boolean isForceAllFiles) throws IOException { - PartitionedMobCompactor compactor = new PartitionedMobCompactor(conf, fs, - TableName.valueOf(tableName), hcd, pool) { - @Override - protected List performCompaction(PartitionedMobCompactionRequest request) - throws IOException { - List delFilePaths = new ArrayList<>(); - for (CompactionDelPartition delPartition: request.getDelPartitions()) { - for (Path p : delPartition.listDelFiles()) { - delFilePaths.add(p); + PartitionedMobCompactor compactor = + new PartitionedMobCompactor(conf, fs, TableName.valueOf(tableName), hcd, pool) { + @Override + protected List performCompaction(PartitionedMobCompactionRequest request) + throws IOException { + List delFilePaths = new ArrayList<>(); + for (CompactionDelPartition delPartition : request.getDelPartitions()) { + for (Path p : delPartition.listDelFiles()) { + delFilePaths.add(p); + } + } + List newDelPaths = compactDelFiles(request, delFilePaths); + // assert the del files are merged. + assertEquals(expectedFileCount, newDelPaths.size()); + assertEquals(expectedCellCount, countDelCellsInDelFiles(newDelPaths)); + return null; } - } - List newDelPaths = compactDelFiles(request, delFilePaths); - // assert the del files are merged. - assertEquals(expectedFileCount, newDelPaths.size()); - assertEquals(expectedCellCount, countDelCellsInDelFiles(newDelPaths)); - return null; - } - }; + }; compactor.compact(allFiles, isForceAllFiles); } @@ -776,6 +776,7 @@ private void listFiles() throws IOException { /** * Compares the compacted partitions. + * * @param partitions the collection of CompactedPartitions */ private void compareCompactedPartitions(List expected, @@ -794,6 +795,7 @@ private void compareCompactedPartitions(List expected, /** * Compares the del files. + * * @param delPartitions all del partitions */ private void compareDelFiles(List delPartitions) { @@ -810,6 +812,7 @@ private void compareDelFiles(List delPartitions) { /** * Creates store files. + * * @param basePath the path to create file * @family the family name * @qualifier the column qualifier @@ -817,12 +820,12 @@ private void compareDelFiles(List delPartitions) { * @type the key type */ private void createStoreFiles(Path basePath, String family, String qualifier, int count, - Type type, final Date date) throws IOException { + Type type, LocalDate date) throws IOException { createStoreFiles(basePath, family, qualifier, count, type, false, date); } private void createStoreFiles(Path basePath, String family, String qualifier, int count, - Type type, boolean sameStartKey, final Date date) throws IOException { + Type type, boolean sameStartKey, LocalDate date) throws IOException { HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build(); String startKey = "row_"; MobFileName mobFileName = null; @@ -836,29 +839,31 @@ private void createStoreFiles(Path basePath, String family, String qualifier, in } else { startRow = Bytes.toBytes(startKey + i); } - if(type.equals(Type.Delete)) { + if (type.equals(Type.Delete)) { mobFileName = MobFileName.create(startRow, MobUtils.formatDate(date), delSuffix); } - if(type.equals(Type.Put)){ + if (type.equals(Type.Put)) { mobFileName = MobFileName.create(startRow, MobUtils.formatDate(date), mobSuffix); } - StoreFileWriter mobFileWriter = new StoreFileWriter.Builder(conf, cacheConf, fs) - .withFileContext(meta).withFilePath(new Path(basePath, mobFileName.getFileName())).build(); - writeStoreFile(mobFileWriter, startRow, Bytes.toBytes(family), Bytes.toBytes(qualifier), - type, (i+1)*1000); + StoreFileWriter mobFileWriter = + new StoreFileWriter.Builder(conf, cacheConf, fs).withFileContext(meta) + .withFilePath(new Path(basePath, mobFileName.getFileName())).build(); + writeStoreFile(mobFileWriter, startRow, Bytes.toBytes(family), Bytes.toBytes(qualifier), type, + (i + 1) * 1000); } } /** * Writes data to store file. - * @param writer the store file writer - * @param row the row key - * @param family the family name + * + * @param writer the store file writer + * @param row the row key + * @param family the family name * @param qualifier the column qualifier - * @param type the key type - * @param size the size of value + * @param type the key type + * @param size the size of value */ - private static void writeStoreFile(final StoreFileWriter writer, byte[]row, byte[] family, + private static void writeStoreFile(final StoreFileWriter writer, byte[] row, byte[] family, byte[] qualifier, Type type, int size) throws IOException { long now = System.currentTimeMillis(); try { @@ -872,6 +877,7 @@ private static void writeStoreFile(final StoreFileWriter writer, byte[]row, byte /** * Gets the number of del cell in the del files + * * @param paths the del file paths * @return the cell size */ @@ -882,11 +888,12 @@ private int countDelCellsInDelFiles(List paths) throws IOException { HStoreFile sf = new HStoreFile(fs, path, conf, cacheConf, BloomType.NONE, true); sfs.add(sf); } - List scanners = new ArrayList<>(StoreFileScanner.getScannersForStoreFiles(sfs, - false, true, false, false, HConstants.LATEST_TIMESTAMP)); + List scanners = new ArrayList<>(StoreFileScanner + .getScannersForStoreFiles(sfs, false, true, false, false, HConstants.LATEST_TIMESTAMP)); long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); long ttl = HStore.determineTTLFromFamily(hcd); - ScanInfo scanInfo = new ScanInfo(conf, hcd, ttl, timeToPurgeDeletes, CellComparatorImpl.COMPARATOR); + ScanInfo scanInfo = + new ScanInfo(conf, hcd, ttl, timeToPurgeDeletes, CellComparatorImpl.COMPARATOR); StoreScanner scanner = new StoreScanner(scanInfo, ScanType.COMPACT_RETAIN_DELETES, scanners); List results = new ArrayList<>(); boolean hasMore = true; @@ -904,19 +911,20 @@ private static ExecutorService createThreadPool() { int maxThreads = 10; long keepAliveTime = 60; final SynchronousQueue queue = new SynchronousQueue<>(); - ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, - TimeUnit.SECONDS, queue, Threads.newDaemonThreadFactory("MobFileCompactionChore"), - new RejectedExecutionHandler() { - @Override - public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { - try { - // waiting for a thread to pick up instead of throwing exceptions. - queue.put(r); - } catch (InterruptedException e) { - throw new RejectedExecutionException(e); - } - } - }); + ThreadPoolExecutor pool = + new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS, queue, + Threads.newDaemonThreadFactory("MobFileCompactionChore"), + new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + try { + // waiting for a thread to pick up instead of throwing exceptions. + queue.put(r); + } catch (InterruptedException e) { + throw new RejectedExecutionException(e); + } + } + }); ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true); return pool; } @@ -926,10 +934,10 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { */ private void resetConf() { conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, - MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD); + MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD); conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT); conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE, - MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE); + MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java index bf1f18e81230..36883a37bd6e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java @@ -22,15 +22,17 @@ import java.io.IOException; import java.security.Key; import java.security.SecureRandom; +import java.time.LocalDate; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.NavigableSet; import java.util.Optional; import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.TimeUnit; + import javax.crypto.spec.SecretKeySpec; import org.apache.hadoop.conf.Configuration; @@ -91,25 +93,26 @@ public class TestHMobStore { HBaseClassTestRule.forClass(TestHMobStore.class); public static final Logger LOG = LoggerFactory.getLogger(TestHMobStore.class); - @Rule public TestName name = new TestName(); + @Rule + public TestName name = new TestName(); private HMobStore store; private HRegion region; private FileSystem fs; - private byte [] table = Bytes.toBytes("table"); - private byte [] family = Bytes.toBytes("family"); - private byte [] row = Bytes.toBytes("row"); - private byte [] row2 = Bytes.toBytes("row2"); - private byte [] qf1 = Bytes.toBytes("qf1"); - private byte [] qf2 = Bytes.toBytes("qf2"); - private byte [] qf3 = Bytes.toBytes("qf3"); - private byte [] qf4 = Bytes.toBytes("qf4"); - private byte [] qf5 = Bytes.toBytes("qf5"); - private byte [] qf6 = Bytes.toBytes("qf6"); + private byte[] table = Bytes.toBytes("table"); + private byte[] family = Bytes.toBytes("family"); + private byte[] row = Bytes.toBytes("row"); + private byte[] row2 = Bytes.toBytes("row2"); + private byte[] qf1 = Bytes.toBytes("qf1"); + private byte[] qf2 = Bytes.toBytes("qf2"); + private byte[] qf3 = Bytes.toBytes("qf3"); + private byte[] qf4 = Bytes.toBytes("qf4"); + private byte[] qf5 = Bytes.toBytes("qf5"); + private byte[] qf6 = Bytes.toBytes("qf6"); private byte[] value = Bytes.toBytes("value"); private byte[] value2 = Bytes.toBytes("value2"); private Path mobFilePath; - private Date currentDate = new Date(); + private long currentTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); private Cell seekKey1; private Cell seekKey2; private Cell seekKey3; @@ -122,6 +125,7 @@ public class TestHMobStore { /** * Setup + * * @throws Exception */ @Before @@ -131,8 +135,8 @@ public void setUp() throws Exception { qualifiers.add(qf5); Iterator iter = qualifiers.iterator(); - while(iter.hasNext()){ - byte [] next = iter.next(); + while (iter.hasNext()) { + byte[] next = iter.next(); expected.add(new KeyValue(row, family, next, 1, value)); get.addColumn(family, next); get.readAllVersions(); @@ -172,12 +176,11 @@ private void init(String methodName, Configuration conf, ColumnFamilyDescriptor } } - private void init(Configuration conf, ColumnFamilyDescriptor cfd) - throws IOException { + private void init(Configuration conf, ColumnFamilyDescriptor cfd) throws IOException { Path basedir = FSUtils.getRootDir(conf); fs = FileSystem.get(conf); - Path homePath = new Path(basedir, Bytes.toString(family) + Path.SEPARATOR - + Bytes.toString(family)); + Path homePath = + new Path(basedir, Bytes.toString(family) + Path.SEPARATOR + Bytes.toString(family)); fs.mkdirs(homePath); KeyValue key1 = new KeyValue(row, family, qf1, 1, value); @@ -185,8 +188,9 @@ private void init(Configuration conf, ColumnFamilyDescriptor cfd) KeyValue key3 = new KeyValue(row2, family, qf3, 1, value2); KeyValue[] keys = new KeyValue[] { key1, key2, key3 }; int maxKeyCount = keys.length; - StoreFileWriter mobWriter = store.createWriterInTmp(currentDate, maxKeyCount, - cfd.getCompactionCompressionType(), region.getRegionInfo().getStartKey(), false); + StoreFileWriter mobWriter = store + .createWriterInTmp(currentTimeMillis, maxKeyCount, cfd.getCompactionCompressionType(), + region.getRegionInfo().getStartKey(), false); mobFilePath = mobWriter.getPath(); mobWriter.append(key1); @@ -194,10 +198,10 @@ private void init(Configuration conf, ColumnFamilyDescriptor cfd) mobWriter.append(key3); mobWriter.close(); - String targetPathName = MobUtils.formatDate(currentDate); + String targetPathName = MobUtils.formatDate(currentTimeMillis); byte[] referenceValue = Bytes.toBytes(targetPathName + Path.SEPARATOR + mobFilePath.getName()); - Tag tableNameTag = new ArrayBackedTag(TagType.MOB_TABLE_NAME_TAG_TYPE, - store.getTableName().getName()); + Tag tableNameTag = + new ArrayBackedTag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName().getName()); KeyValue kv1 = new KeyValue(row, family, qf1, Long.MAX_VALUE, referenceValue); KeyValue kv2 = new KeyValue(row, family, qf2, Long.MAX_VALUE, referenceValue); KeyValue kv3 = new KeyValue(row2, family, qf3, Long.MAX_VALUE, referenceValue); @@ -208,6 +212,7 @@ private void init(Configuration conf, ColumnFamilyDescriptor cfd) /** * Getting data from memstore + * * @throws IOException */ @Test @@ -224,9 +229,8 @@ public void testGetFromMemStore() throws IOException { this.store.add(new KeyValue(row, family, qf6, 1, value), null); Scan scan = new Scan(get); - InternalScanner scanner = (InternalScanner) store.getScanner(scan, - scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), - 0); + InternalScanner scanner = (InternalScanner) store + .getScanner(scan, scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 0); List results = new ArrayList<>(); scanner.next(results); @@ -235,7 +239,7 @@ public void testGetFromMemStore() throws IOException { //Compare Assert.assertEquals(expected.size(), results.size()); - for(int i=0; i results = new ArrayList<>(); scanner.next(results); @@ -280,13 +284,14 @@ public void testGetFromFiles() throws IOException { //Compare Assert.assertEquals(expected.size(), results.size()); - for(int i=0; i results = new ArrayList<>(); scanner.next(results); @@ -325,7 +329,7 @@ public void testGetReferencesFromFiles() throws IOException { //Compare Assert.assertEquals(expected.size(), results.size()); - for(int i=0; i results = new ArrayList<>(); scanner.next(results); @@ -370,13 +374,14 @@ public void testGetFromMemStoreAndFiles() throws IOException { //Compare Assert.assertEquals(expected.size(), results.size()); - for(int i=0; i results = new ArrayList<>(); scanner.next(results); @@ -418,7 +422,7 @@ public void testMobCellSizeThreshold() throws IOException { //Compare Assert.assertEquals(expected.size(), results.size()); - for(int i=0; i results = new ArrayList<>(); scanner.next(results); Collections.sort(results, CellComparatorImpl.COMPARATOR); scanner.close(); Assert.assertEquals(expected.size(), results.size()); - for(int i=0; i