diff --git a/.travis.yml b/.travis.yml index 16a4f68db4a6a..c6205dc4e0365 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,2 +1,4 @@ language: java +jdk: + - oraclejdk8 sudo: required diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java index 13f2ebc2ecc53..1ccf4eccb2247 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java @@ -45,7 +45,7 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { // Run a compaction every N delta commits public static final String INLINE_COMPACT_NUM_DELTA_COMMITS_PROP = "hoodie.compact.inline.max.delta.commits"; - private static final String DEFAULT_INLINE_COMPACT_NUM_DELTA_COMMITS = "4"; + private static final String DEFAULT_INLINE_COMPACT_NUM_DELTA_COMMITS = "10"; public static final String CLEANER_FILE_VERSIONS_RETAINED_PROP = "hoodie.cleaner.fileversions.retained"; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java index 0c97831f1bde3..e1c86f67ccf6c 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java @@ -22,15 +22,17 @@ import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordPayload; -import com.uber.hoodie.common.table.log.HoodieLogAppendConfig; import com.uber.hoodie.common.table.log.HoodieLogFile; -import com.uber.hoodie.common.table.log.avro.RollingAvroLogAppender; +import com.uber.hoodie.common.table.log.HoodieLogFormat; +import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer; +import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieAppendException; import com.uber.hoodie.exception.HoodieUpsertException; import com.uber.hoodie.table.HoodieTable; +import java.util.stream.Collectors; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; @@ -51,11 +53,11 @@ public class HoodieAppendHandle extends HoodieIOH private final WriteStatus writeStatus; private final String fileId; private String partitionPath; - private RollingAvroLogAppender logAppender; private List> records; private long recordsWritten = 0; private long recordsDeleted = 0; private HoodieLogFile currentLogFile; + private Writer writer; public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable, String fileId, Iterator> recordItr) { @@ -84,17 +86,15 @@ private void init(Iterator> recordItr) { writeStatus.getStat().setFileId(fileId); try { - HoodieLogAppendConfig logConfig = HoodieLogAppendConfig.newBuilder() - .onPartitionPath( - new Path(hoodieTable.getMetaClient().getBasePath(), partitionPath)) - .withFileId(fileId).withBaseCommitTime(baseCommitTime).withSchema(schema) - .withFs(fs).withLogFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); - this.logAppender = new RollingAvroLogAppender(logConfig); - this.currentLogFile = logAppender.getConfig().getLogFile(); + this.writer = HoodieLogFormat.newWriterBuilder() + .onParentPath(new Path(hoodieTable.getMetaClient().getBasePath(), partitionPath)) + .withFileId(fileId).overBaseCommit(baseCommitTime) + .withFs(fs).withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); + this.currentLogFile = writer.getLogFile(); ((HoodieDeltaWriteStat) writeStatus.getStat()) .setLogVersion(currentLogFile.getLogVersion()); ((HoodieDeltaWriteStat) writeStatus.getStat()) - .setLogOffset(logAppender.getCurrentSize()); + .setLogOffset(writer.getCurrentSize()); } catch (Exception e) { logger.error("Error in update task at commit " + commitTime, e); writeStatus.setGlobalError(e); @@ -139,11 +139,11 @@ private Optional getIndexedRecord(HoodieRecord hoodieRecord) { } public void doAppend() { - Iterator recordItr = + List recordItr = records.stream().map(this::getIndexedRecord).filter(Optional::isPresent) - .map(Optional::get).iterator(); + .map(Optional::get).collect(Collectors.toList()); try { - logAppender.append(recordItr); + writer = writer.appendBlock(new HoodieAvroDataBlock(recordItr, schema)); } catch (Exception e) { throw new HoodieAppendException( "Failed while appeding records to " + currentLogFile.getPath(), e); @@ -152,8 +152,8 @@ public void doAppend() { public void close() { try { - if (logAppender != null) { - logAppender.close(); + if (writer != null) { + writer.close(); } writeStatus.getStat().setNumWrites(recordsWritten); writeStatus.getStat().setNumDeletes(recordsDeleted); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java index e7efcfa4589be..fa478c4d79886 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java @@ -25,7 +25,7 @@ import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; -import com.uber.hoodie.common.table.log.avro.HoodieAvroReader; +import com.uber.hoodie.common.table.log.HoodieCompactedLogRecordScanner; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.util.FSUtils; @@ -145,9 +145,8 @@ private List executeCompaction(HoodieTableMetaClient metaCl // Load all the delta commits since the last compaction commit and get all the blocks to be loaded and load it using CompositeAvroLogReader // Since a DeltaCommit is not defined yet, reading all the records. revisit this soon. - HoodieAvroReader avroReader = new HoodieAvroReader(fs, operation.getDeltaFilePaths(), - readerSchema); - if (!avroReader.iterator().hasNext()) { + HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, operation.getDeltaFilePaths(), readerSchema); + if (!scanner.iterator().hasNext()) { return Lists.newArrayList(); } @@ -155,15 +154,15 @@ private List executeCompaction(HoodieTableMetaClient metaCl HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable<>(config, metaClient); Iterator> result = table - .handleUpdate(commitTime, operation.getFileId(), avroReader.iterator()); + .handleUpdate(commitTime, operation.getFileId(), scanner.iterator()); Iterable> resultIterable = () -> result; return StreamSupport.stream(resultIterable.spliterator(), false) .flatMap(Collection::stream) .map(WriteStatus::getStat) .map(s -> CompactionWriteStat.newBuilder().withHoodieWriteStat(s) - .setTotalRecordsToUpdate(avroReader.getTotalRecordsToUpdate()) - .setTotalLogFiles(avroReader.getTotalLogFiles()) - .setTotalLogRecords(avroReader.getTotalLogRecords()) + .setTotalRecordsToUpdate(scanner.getTotalRecordsToUpdate()) + .setTotalLogFiles(scanner.getTotalLogFiles()) + .setTotalLogRecords(scanner.getTotalLogRecords()) .onPartition(operation.getPartitionPath()).build()) .collect(toList()); } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieFileFormat.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieFileFormat.java index 3b6c0de9fb324..8ef06ba539264 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieFileFormat.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieFileFormat.java @@ -17,7 +17,7 @@ package com.uber.hoodie.common.model; public enum HoodieFileFormat { - PARQUET(".parquet"), AVRO(".avro"); + PARQUET(".parquet"), HOODIE_LOG(".log"); private final String extension; diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableConfig.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableConfig.java index dc1a864726dcb..7d64bf093d1cf 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableConfig.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableConfig.java @@ -52,7 +52,7 @@ public class HoodieTableConfig implements Serializable { public static final HoodieTableType DEFAULT_TABLE_TYPE = HoodieTableType.COPY_ON_WRITE; public static final HoodieFileFormat DEFAULT_RO_FILE_FORMAT = HoodieFileFormat.PARQUET; - public static final HoodieFileFormat DEFAULT_RT_FILE_FORMAT = HoodieFileFormat.AVRO; + public static final HoodieFileFormat DEFAULT_RT_FILE_FORMAT = HoodieFileFormat.HOODIE_LOG; private Properties props; public HoodieTableConfig(FileSystem fs, String metaPath) { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java new file mode 100644 index 0000000000000..02c14dcf773e5 --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java @@ -0,0 +1,207 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.table.log; + +import com.google.common.collect.Maps; +import com.uber.hoodie.common.model.HoodieAvroPayload; +import com.uber.hoodie.common.model.HoodieKey; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; +import com.uber.hoodie.common.table.log.block.HoodieCommandBlock; +import com.uber.hoodie.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum; +import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock; +import com.uber.hoodie.exception.HoodieIOException; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +/** + * Scans through all the blocks in a list of HoodieLogFile and builds up a compacted/merged + * list of records which will be used as a lookup table when merging the base columnar file + * with the redo log file. + * + * TODO(FIX) - Does not apply application specific merge logic - defaults to HoodieAvroPayload + */ +public class HoodieCompactedLogRecordScanner implements Iterable> { + private final static Logger log = LogManager.getLogger(HoodieCompactedLogRecordScanner.class); + + // Final list of compacted/merged records to iterate + private final Collection> logRecords; + // Reader schema for the records + private final Schema readerSchema; + // Total log files read - for metrics + private AtomicLong totalLogFiles = new AtomicLong(0); + // Total log records read - for metrics + private AtomicLong totalLogRecords = new AtomicLong(0); + // Total final list of compacted/merged records + private long totalRecordsToUpdate; + + public HoodieCompactedLogRecordScanner(FileSystem fs, List logFilePaths, + Schema readerSchema) { + this.readerSchema = readerSchema; + + Map> records = Maps.newHashMap(); + // iterate over the paths + logFilePaths.stream().map(s -> new HoodieLogFile(new Path(s))).forEach(s -> { + log.info("Scanning log file " + s.getPath()); + totalLogFiles.incrementAndGet(); + try { + // Use the HoodieLogFormatReader to iterate through the blocks in the log file + HoodieLogFormatReader reader = new HoodieLogFormatReader(fs, s, readerSchema); + // Store the records loaded from the last data block (needed to implement rollback) + Map> recordsFromLastBlock = Maps.newHashMap(); + reader.forEachRemaining(r -> { + switch (r.getBlockType()) { + case AVRO_DATA_BLOCK: + log.info("Reading a data block from file " + s.getPath()); + // If this is a avro data block, then merge the last block records into the main result + merge(records, recordsFromLastBlock); + // Load the merged records into recordsFromLastBlock + HoodieAvroDataBlock dataBlock = (HoodieAvroDataBlock) r; + loadRecordsFromBlock(dataBlock, recordsFromLastBlock); + break; + case DELETE_BLOCK: + log.info("Reading a delete block from file " + s.getPath()); + // This is a delete block, so lets merge any records from previous data block + merge(records, recordsFromLastBlock); + // Delete the keys listed as to be deleted + HoodieDeleteBlock deleteBlock = (HoodieDeleteBlock) r; + Arrays.stream(deleteBlock.getKeysToDelete()).forEach(records::remove); + break; + case COMMAND_BLOCK: + log.info("Reading a command block from file " + s.getPath()); + // This is a command block - take appropriate action based on the command + HoodieCommandBlock commandBlock = (HoodieCommandBlock) r; + if (commandBlock.getType() == HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK) { + log.info("Rolling back the last data block read in " + s.getPath()); + // rollback the last read data block + recordsFromLastBlock.clear(); + } + break; + case CORRUPT_BLOCK: + log.info("Found a corrupt block in " + s.getPath()); + // If there is a corrupt block - we will assume that this was the next data block + // so merge the last block records (TODO - handle when the corrupted block was a tombstone written partially?) + merge(records, recordsFromLastBlock); + recordsFromLastBlock.clear(); + break; + } + }); + + // merge the last read block when all the blocks are done reading + if (!recordsFromLastBlock.isEmpty()) { + log.info("Merging the final data block in " + s.getPath()); + merge(records, recordsFromLastBlock); + } + + } catch (IOException e) { + throw new HoodieIOException("IOException when reading log file " + s); + } + }); + this.logRecords = Collections.unmodifiableCollection(records.values()); + this.totalRecordsToUpdate = records.size(); + } + + /** + * Iterate over the GenericRecord in the block, read the hoodie key and partition path + * and merge with the HoodieAvroPayload if the same key was found before + * + * @param dataBlock + * @param recordsFromLastBlock + */ + private void loadRecordsFromBlock( + HoodieAvroDataBlock dataBlock, + Map> recordsFromLastBlock) { + recordsFromLastBlock.clear(); + List recs = dataBlock.getRecords(); + totalLogRecords.addAndGet(recs.size()); + recs.forEach(rec -> { + String key = ((GenericRecord) rec).get(HoodieRecord.RECORD_KEY_METADATA_FIELD) + .toString(); + String partitionPath = + ((GenericRecord) rec).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD) + .toString(); + HoodieRecord hoodieRecord = new HoodieRecord<>( + new HoodieKey(key, partitionPath), + new HoodieAvroPayload(Optional.of(((GenericRecord) rec)))); + if (recordsFromLastBlock.containsKey(key)) { + // Merge and store the merged record + HoodieAvroPayload combinedValue = recordsFromLastBlock.get(key).getData() + .preCombine(hoodieRecord.getData()); + recordsFromLastBlock + .put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), + combinedValue)); + } else { + // Put the record as is + recordsFromLastBlock.put(key, hoodieRecord); + } + }); + } + + /** + * Merge the records read from a single data block with the accumulated records + * + * @param records + * @param recordsFromLastBlock + */ + private void merge(Map> records, + Map> recordsFromLastBlock) { + recordsFromLastBlock.forEach((key, hoodieRecord) -> { + if (records.containsKey(key)) { + // Merge and store the merged record + HoodieAvroPayload combinedValue = records.get(key).getData() + .preCombine(hoodieRecord.getData()); + records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), + combinedValue)); + } else { + // Put the record as is + records.put(key, hoodieRecord); + } + }); + } + + @Override + public Iterator> iterator() { + return logRecords.iterator(); + } + + public long getTotalLogFiles() { + return totalLogFiles.get(); + } + + public long getTotalLogRecords() { + return totalLogRecords.get(); + } + + public long getTotalRecordsToUpdate() { + return totalRecordsToUpdate; + } +} + diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogAppendConfig.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogAppendConfig.java deleted file mode 100644 index cc6642a915bec..0000000000000 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogAppendConfig.java +++ /dev/null @@ -1,238 +0,0 @@ -/* - * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.uber.hoodie.common.table.log; - -import com.uber.hoodie.common.util.FSUtils; -import org.apache.avro.Schema; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; - -import java.io.IOException; - -/** - * Configuration for a HoodieLog - */ -public class HoodieLogAppendConfig { - private final static Logger log = LogManager.getLogger(HoodieLogAppendConfig.class); - private static final long DEFAULT_SIZE_THRESHOLD = 32 * 1024 * 1024L; - - private final int bufferSize; - private final short replication; - private final long blockSize; - private final HoodieLogFile logFile; - private boolean isAutoFlush; - private final Schema schema; - private final FileSystem fs; - private final long sizeThreshold; - - private HoodieLogAppendConfig(FileSystem fs, HoodieLogFile logFile, Schema schema, Integer bufferSize, - Short replication, Long blockSize, boolean isAutoFlush, Long sizeThreshold) { - this.fs = fs; - this.logFile = logFile; - this.schema = schema; - this.bufferSize = bufferSize; - this.replication = replication; - this.blockSize = blockSize; - this.isAutoFlush = isAutoFlush; - this.sizeThreshold = sizeThreshold; - } - - public int getBufferSize() { - return bufferSize; - } - - public short getReplication() { - return replication; - } - - public long getBlockSize() { - return blockSize; - } - - public Schema getSchema() { - return schema; - } - - public FileSystem getFs() { - return fs; - } - - public HoodieLogFile getLogFile() { - return logFile; - } - - public long getSizeThreshold() { - return sizeThreshold; - } - - public boolean isAutoFlush() { - return isAutoFlush; - } - - public static Builder newBuilder() { - return new Builder(); - } - - public HoodieLogAppendConfig withLogFile(HoodieLogFile newFile) { - return new HoodieLogAppendConfig(fs, newFile, schema, bufferSize, replication, blockSize, - isAutoFlush, sizeThreshold); - } - - public static class Builder { - // Auto-flush. if set to true - then after every append, the avro block will be flushed - private boolean isAutoFlush = true; - // Buffer size in the Avro writer - private Integer bufferSize; - // Replication for the log file - private Short replication; - // Blocksize for the avro log file (useful if auto-flush is set to false) - private Long blockSize; - // Schema for the log file - private Schema schema; - // FileSystem - private FileSystem fs; - // Size threshold for the log file. Useful when used with a rolling log appender - private Long sizeThreshold; - // Log File extension. Could be .avro.delta or .avro.commits etc - private String logFileExtension; - // File ID - private String fileId; - // version number for this log file. If not specified, then the current version will be computed - private Integer fileVersion; - // Partition path for the log file - private Path partitionPath; - // The base commit time for which the log files are accumulated - private String baseCommitTime; - - public Builder withBufferSize(int bufferSize) { - this.bufferSize = bufferSize; - return this; - } - - public Builder withReplication(short replication) { - this.replication = replication; - return this; - } - - public Builder withBlockSize(long blockSize) { - this.blockSize = blockSize; - return this; - } - - public Builder withSchema(Schema schema) { - this.schema = schema; - return this; - } - - public Builder withFs(FileSystem fs) { - this.fs = fs; - return this; - } - - public Builder withAutoFlush(boolean autoFlush) { - this.isAutoFlush = autoFlush; - return this; - } - - public Builder withSizeThreshold(long sizeThreshold) { - this.sizeThreshold = sizeThreshold; - return this; - } - - public Builder withLogFileExtension(String logFileExtension) { - this.logFileExtension = logFileExtension; - return this; - } - - public Builder withFileId(String fileId) { - this.fileId = fileId; - return this; - } - - public Builder withFileVersion(int version) { - this.fileVersion = version; - return this; - } - - public Builder onPartitionPath(Path path) { - this.partitionPath = path; - return this; - } - - public Builder withBaseCommitTime(String commitTime) { - this.baseCommitTime = commitTime; - return this; - } - - public HoodieLogAppendConfig build() throws IOException { - log.info("Building HoodieLogAppendConfig"); - if (schema == null) { - throw new IllegalArgumentException("Schema for log is not specified"); - } - if (fs == null) { - fs = FSUtils.getFs(); - } - - if (fileId == null) { - throw new IllegalArgumentException("FileID is not specified"); - } - if (baseCommitTime == null) { - throw new IllegalArgumentException("BaseCommitTime is not specified"); - } - if (logFileExtension == null) { - throw new IllegalArgumentException("File extension is not specified"); - } - if (partitionPath == null) { - throw new IllegalArgumentException("Partition path is not specified"); - } - if (fileVersion == null) { - log.info("Computing the next log version for " + fileId + " in " + partitionPath); - fileVersion = - FSUtils.getCurrentLogVersion(fs, partitionPath, fileId, logFileExtension, baseCommitTime); - log.info( - "Computed the next log version for " + fileId + " in " + partitionPath + " as " - + fileVersion); - } - - Path logPath = new Path(partitionPath, - FSUtils.makeLogFileName(fileId, logFileExtension, baseCommitTime, fileVersion)); - log.info("LogConfig created on path " + logPath); - HoodieLogFile logFile = new HoodieLogFile(logPath); - - if (bufferSize == null) { - bufferSize = FSUtils.getDefaultBufferSize(fs); - } - if (replication == null) { - replication = FSUtils.getDefaultReplication(fs, partitionPath); - } - if (blockSize == null) { - blockSize = FSUtils.getDefaultBlockSize(fs, partitionPath); - } - if (sizeThreshold == null) { - sizeThreshold = DEFAULT_SIZE_THRESHOLD; - } - - return new HoodieLogAppendConfig(fs, logFile, schema, bufferSize, replication, blockSize, - isAutoFlush, sizeThreshold); - - } - - - } -} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogAppender.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogAppender.java deleted file mode 100644 index 6ad55b74f4c22..0000000000000 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogAppender.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.uber.hoodie.common.table.log; - -import com.uber.hoodie.common.table.log.avro.AvroLogAppender; -import com.uber.hoodie.common.table.log.avro.RollingAvroLogAppender; - -import java.io.IOException; -import java.util.Iterator; -import java.util.List; - -/** - * Interface for implementations supporting appending data to a log file - * - * @param - * @see AvroLogAppender - * @see RollingAvroLogAppender - */ -public interface HoodieLogAppender { - /** - * Append a stream of records in a batch (this will be written as a block/unit to the underlying log) - * - * @param records - * @throws IOException - */ - void append(Iterator records) throws IOException, InterruptedException; - - /** - * Syncs the log manually if auto-flush is not set in HoodieLogAppendConfig. If auto-flush is set - * Then the LogAppender will automatically flush after the append call. - * - * @throws IOException - */ - void sync() throws IOException; - - /** - * Close the appended and release any resources holding on to - * - * @throws IOException - */ - void close() throws IOException; - - /** - * Gets the current offset in the log. This is usually used to mark the start of the block in - * meta-data and passed to the HoodieLogReader - * - * @return - * @throws IOException - */ - long getCurrentSize() throws IOException; -} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFile.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFile.java index eb38679b601c7..69bb60dc30c05 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFile.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFile.java @@ -32,7 +32,7 @@ * Also contains logic to roll-over the log file */ public class HoodieLogFile { - public static final String DELTA_EXTENSION = ".avro.delta"; + public static final String DELTA_EXTENSION = ".log"; private final Path path; private Optional fileStatus; @@ -89,11 +89,6 @@ public HoodieLogFile rollOver(FileSystem fs) throws IOException { FSUtils.makeLogFileName(fileId, DELTA_EXTENSION, baseCommitTime, newVersion))); } - public boolean shouldRollOver(HoodieLogAppender currentWriter, HoodieLogAppendConfig config) - throws IOException { - return currentWriter.getCurrentSize() > config.getSizeThreshold(); - } - public static Comparator getLogVersionComparator() { return (o1, o2) -> { // reverse the order diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java new file mode 100644 index 0000000000000..4c7644be2b97e --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java @@ -0,0 +1,193 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.table.log; + +import com.uber.hoodie.common.table.log.block.HoodieLogBlock; +import com.uber.hoodie.common.util.FSUtils; +import java.io.Closeable; +import java.io.IOException; +import java.util.Iterator; +import org.apache.avro.Schema; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +/** + * File Format for Hoodie Log Files. + * The File Format consists of blocks each seperated with a MAGIC sync marker. + * A Block can either be a Data block, Command block or Delete Block. + * Data Block - Contains log records serialized as Avro Binary Format + * Command Block - Specific commands like RoLLBACK_PREVIOUS-BLOCK - Tombstone for the previously written block + * Delete Block - List of keys to delete - tombstone for keys + */ +public interface HoodieLogFormat { + /** + * Magic 4 bytes we put at the start of every block in the log file. Sync marker. + * We could make this file specific (generate a random 4 byte magic and stick it in the file header), but this I think is suffice for now - PR + */ + byte [] MAGIC = new byte [] {'H', 'U', 'D', 'I'}; + + /** + * Writer interface to allow appending block to this file format + */ + interface Writer extends Closeable { + /** @return the path to this {@link HoodieLogFormat} */ + HoodieLogFile getLogFile(); + + /** + * Append Block returns a new Writer if the log is rolled + */ + Writer appendBlock(HoodieLogBlock block) throws IOException, InterruptedException; + long getCurrentSize() throws IOException; + } + + /** + * Reader interface which is an Iterator of HoodieLogBlock + */ + interface Reader extends Closeable, Iterator { + /** @return the path to this {@link HoodieLogFormat} */ + HoodieLogFile getLogFile(); + } + + + /** + * Builder class to construct the default log format writer + */ + class WriterBuilder { + private final static Logger log = LogManager.getLogger(WriterBuilder.class); + // Default max log file size 512 MB + public static final long DEFAULT_SIZE_THRESHOLD = 512 * 1024 * 1024L; + + // Buffer size + private Integer bufferSize; + // Replication for the log file + private Short replication; + // FileSystem + private FileSystem fs; + // Size threshold for the log file. Useful when used with a rolling log appender + private Long sizeThreshold; + // Log File extension. Could be .avro.delta or .avro.commits etc + private String fileExtension; + // File Id + private String logFileId; + // File Commit Time stamp + private String commitTime; + // version number for this log file. If not specified, then the current version will be computed by inspecting the file system + private Integer logVersion; + // Location of the directory containing the log + private Path parentPath; + + public WriterBuilder withBufferSize(int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + public WriterBuilder withReplication(short replication) { + this.replication = replication; + return this; + } + + public WriterBuilder withFs(FileSystem fs) { + this.fs = fs; + return this; + } + + public WriterBuilder withSizeThreshold(long sizeThreshold) { + this.sizeThreshold = sizeThreshold; + return this; + } + + public WriterBuilder withFileExtension(String logFileExtension) { + this.fileExtension = logFileExtension; + return this; + } + + public WriterBuilder withFileId(String fileId) { + this.logFileId = fileId; + return this; + } + + public WriterBuilder overBaseCommit(String baseCommit) { + this.commitTime = baseCommit; + return this; + } + + public WriterBuilder withLogVersion(int version) { + this.logVersion = version; + return this; + } + + public WriterBuilder onParentPath(Path parentPath) { + this.parentPath = parentPath; + return this; + } + + public Writer build() throws IOException, InterruptedException { + log.info("Building HoodieLogFormat Writer"); + if (fs == null) { + fs = FSUtils.getFs(); + } + if (logFileId == null) { + throw new IllegalArgumentException("FileID is not specified"); + } + if (commitTime == null) { + throw new IllegalArgumentException("BaseCommitTime is not specified"); + } + if (fileExtension == null) { + throw new IllegalArgumentException("File extension is not specified"); + } + if (parentPath == null) { + throw new IllegalArgumentException("Log file parent location is not specified"); + } + if (logVersion == null) { + log.info("Computing the next log version for " + logFileId + " in " + parentPath); + logVersion = + FSUtils.getCurrentLogVersion(fs, parentPath, logFileId, fileExtension, commitTime); + log.info( + "Computed the next log version for " + logFileId + " in " + parentPath + " as " + + logVersion); + } + + Path logPath = new Path(parentPath, + FSUtils.makeLogFileName(logFileId, fileExtension, commitTime, logVersion)); + log.info("HoodieLogFile on path " + logPath); + HoodieLogFile logFile = new HoodieLogFile(logPath); + + if (bufferSize == null) { + bufferSize = FSUtils.getDefaultBufferSize(fs); + } + if (replication == null) { + replication = FSUtils.getDefaultReplication(fs, parentPath); + } + if (sizeThreshold == null) { + sizeThreshold = DEFAULT_SIZE_THRESHOLD; + } + return new HoodieLogFormatWriter(fs, logFile, bufferSize, replication, sizeThreshold); + } + + } + + static WriterBuilder newWriterBuilder() { + return new WriterBuilder(); + } + + static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema) + throws IOException { + return new HoodieLogFormatReader(fs, logFile, readerSchema); + } +} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java new file mode 100644 index 0000000000000..2a341321c2980 --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java @@ -0,0 +1,201 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.table.log; + +import com.google.common.base.Preconditions; +import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; +import com.uber.hoodie.common.table.log.block.HoodieCommandBlock; +import com.uber.hoodie.common.table.log.block.HoodieCorruptBlock; +import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock; +import com.uber.hoodie.common.table.log.block.HoodieLogBlock; +import com.uber.hoodie.common.table.log.block.HoodieLogBlock.HoodieLogBlockType; +import com.uber.hoodie.exception.CorruptedLogFileException; +import com.uber.hoodie.exception.HoodieIOException; +import com.uber.hoodie.exception.HoodieNotSupportedException; +import java.io.EOFException; +import java.io.IOException; +import java.util.Arrays; +import org.apache.avro.Schema; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +/** + * Scans a log file and provides block level iterator on the log file + * Loads the entire block contents in memory + * Can emit either a DataBlock, CommandBlock, DeleteBlock or CorruptBlock (if one is found) + */ +public class HoodieLogFormatReader implements HoodieLogFormat.Reader { + private static final int DEFAULT_BUFFER_SIZE = 4096; + private final static Logger log = LogManager.getLogger(HoodieLogFormatReader.class); + + private final FSDataInputStream inputStream; + private final HoodieLogFile logFile; + private static final byte[] magicBuffer = new byte[4]; + private final Schema readerSchema; + private HoodieLogBlock nextBlock = null; + + HoodieLogFormatReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize) throws IOException { + this.inputStream = fs.open(logFile.getPath(), bufferSize); + this.logFile = logFile; + this.readerSchema = readerSchema; + } + + HoodieLogFormatReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema) throws IOException { + this(fs, logFile, readerSchema, DEFAULT_BUFFER_SIZE); + } + + @Override + public HoodieLogFile getLogFile() { + return logFile; + } + + private HoodieLogBlock readBlock() throws IOException { + // 2. Read the block type + int ordinal = inputStream.readInt(); + Preconditions.checkArgument(ordinal < HoodieLogBlockType.values().length, + "Invalid block byte ordinal found " + ordinal); + HoodieLogBlockType blockType = HoodieLogBlockType.values()[ordinal]; + + // 3. Read the size of the block + int blocksize = inputStream.readInt(); + + // We may have had a crash which could have written this block partially + // Skip blocksize in the stream and we should either find a sync marker (start of the next block) or EOF + // If we did not find either of it, then this block is a corrupted block. + boolean isCorrupted = isBlockCorrupt(blocksize); + if(isCorrupted) { + return createCorruptBlock(); + } + + // 4. Read the content + // TODO - have a max block size and reuse this buffer in the ByteBuffer (hard to guess max block size for now) + byte[] content = new byte[blocksize]; + inputStream.readFully(content, 0, blocksize); + + switch (blockType) { + // based on type read the block + case AVRO_DATA_BLOCK: + return HoodieAvroDataBlock.fromBytes(content, readerSchema); + case DELETE_BLOCK: + return HoodieDeleteBlock.fromBytes(content); + case COMMAND_BLOCK: + return HoodieCommandBlock.fromBytes(content); + default: + throw new HoodieNotSupportedException("Unsupported Block " + blockType); + } + } + + private HoodieLogBlock createCorruptBlock() throws IOException { + log.info("Log " + logFile + " has a corrupted block at " + inputStream.getPos()); + long currentPos = inputStream.getPos(); + long nextBlockOffset = scanForNextAvailableBlockOffset(); + // Rewind to the initial start and read corrupted bytes till the nextBlockOffset + inputStream.seek(currentPos); + log.info("Next available block in " + logFile + " starts at " + nextBlockOffset); + int corruptedBlockSize = (int) (nextBlockOffset - currentPos); + byte[] content = new byte[corruptedBlockSize]; + inputStream.readFully(content, 0, corruptedBlockSize); + return HoodieCorruptBlock.fromBytes(content); + } + + private boolean isBlockCorrupt(int blocksize) throws IOException { + long currentPos = inputStream.getPos(); + try { + inputStream.seek(currentPos + blocksize); + } catch (EOFException e) { + // this is corrupt + return true; + } + + try { + readMagic(); + // all good - either we found the sync marker or EOF. Reset position and continue + return false; + } catch (CorruptedLogFileException e) { + // This is a corrupted block + return true; + } finally { + inputStream.seek(currentPos); + } + } + + private long scanForNextAvailableBlockOffset() throws IOException { + while(true) { + long currentPos = inputStream.getPos(); + try { + boolean isEOF = readMagic(); + return isEOF ? inputStream.getPos() : currentPos; + } catch (CorruptedLogFileException e) { + // No luck - advance and try again + inputStream.seek(currentPos + 1); + } + } + } + + @Override + public void close() throws IOException { + this.inputStream.close(); + } + + @Override + /** + * hasNext is not idempotent. TODO - Fix this. It is okay for now - PR + */ + public boolean hasNext() { + try { + boolean isEOF = readMagic(); + if (isEOF) { + return false; + } + this.nextBlock = readBlock(); + return nextBlock != null; + } catch (IOException e) { + throw new HoodieIOException("IOException when reading logfile " + logFile, e); + } + } + + private boolean readMagic() throws IOException { + try { + // 1. Read magic header from the start of the block + inputStream.readFully(magicBuffer, 0, 4); + if (!Arrays.equals(magicBuffer, HoodieLogFormat.MAGIC)) { + throw new CorruptedLogFileException( + logFile + "could not be read. Did not find the magic bytes at the start of the block"); + } + return false; + } catch (EOFException e) { + // We have reached the EOF + return true; + } + } + + @Override + public HoodieLogBlock next() { + if(nextBlock == null) { + // may be hasNext is not called + hasNext(); + } + return nextBlock; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Remove not supported for HoodieLogFormatReader"); + } +} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatWriter.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatWriter.java new file mode 100644 index 0000000000000..5010a522a5200 --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatWriter.java @@ -0,0 +1,164 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.table.log; + +import com.google.common.base.Preconditions; +import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer; +import com.uber.hoodie.common.table.log.HoodieLogFormat.WriterBuilder; +import com.uber.hoodie.common.table.log.block.HoodieLogBlock; +import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.exception.HoodieException; +import java.io.IOException; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +/** + * HoodieLogFormatWriter can be used to append blocks to a log file + * Use HoodieLogFormat.WriterBuilder to construct + */ +public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { + + private final static Logger log = LogManager.getLogger(HoodieLogFormatWriter.class); + + private final HoodieLogFile logFile; + private final FileSystem fs; + private final long sizeThreshold; + private final Integer bufferSize; + private final Short replication; + private FSDataOutputStream output; + + /** + * + * @param fs + * @param logFile + * @param bufferSize + * @param replication + * @param sizeThreshold + */ + HoodieLogFormatWriter(FileSystem fs, HoodieLogFile logFile, Integer bufferSize, + Short replication, Long sizeThreshold) + throws IOException, InterruptedException { + this.fs = fs; + this.logFile = logFile; + this.sizeThreshold = sizeThreshold; + this.bufferSize = bufferSize; + this.replication = replication; + + Path path = logFile.getPath(); + if (fs.exists(path)) { + log.info(logFile + " exists. Appending to existing file"); + try { + this.output = fs.append(path, bufferSize); + } catch (RemoteException e) { + // this happens when either another task executor writing to this file died or data node is going down + if (e.getClassName().equals(AlreadyBeingCreatedException.class.getName()) + && fs instanceof DistributedFileSystem) { + log.warn("Trying to recover log on path " + path); + if (FSUtils.recoverDFSFileLease((DistributedFileSystem) fs, path)) { + log.warn("Recovered lease on path " + path); + // try again + this.output = fs.append(path, bufferSize); + } else { + log.warn("Failed to recover lease on path " + path); + throw new HoodieException(e); + } + } + } + } else { + log.info(logFile + " does not exist. Create a new file"); + // Block size does not matter as we will always manually autoflush + this.output = fs.create(path, false, bufferSize, replication, + WriterBuilder.DEFAULT_SIZE_THRESHOLD, null); + // TODO - append a file level meta block + } + } + + public FileSystem getFs() { + return fs; + } + + public HoodieLogFile getLogFile() { + return logFile; + } + + public long getSizeThreshold() { + return sizeThreshold; + } + + @Override + public Writer appendBlock(HoodieLogBlock block) + throws IOException, InterruptedException { + byte[] content = block.getBytes(); + // 1. write the magic header for the start of the block + this.output.write(HoodieLogFormat.MAGIC); + // 2. Write the block type + this.output.writeInt(block.getBlockType().ordinal()); + // 3. Write the size of the block + this.output.writeInt(content.length); + // 4. Write the contents of the block + this.output.write(content); + + // Flush every block to disk + flush(); + + // roll over if size is past the threshold + return rolloverIfNeeded(); + } + + private Writer rolloverIfNeeded() throws IOException, InterruptedException { + // Roll over if the size is past the threshold + if (getCurrentSize() > sizeThreshold) { + //TODO - make an end marker which seals the old log file (no more appends possible to that file). + log.info("CurrentSize " + getCurrentSize() + " has reached threshold " + sizeThreshold + + ". Rolling over to the next version"); + HoodieLogFile newLogFile = logFile.rollOver(fs); + // close this writer and return the new writer + close(); + return new HoodieLogFormatWriter(fs, newLogFile, bufferSize, replication, sizeThreshold); + } + return this; + } + + @Override + public void close() throws IOException { + flush(); + output.close(); + output = null; + } + + private void flush() throws IOException { + if (output == null) { + return; // Presume closed + } + output.flush(); + output.hflush(); + } + + public long getCurrentSize() throws IOException { + if(output == null) { + throw new IllegalStateException("Cannot get current size as the underlying stream has been closed already"); + } + return output.getPos(); + } + +} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/avro/AvroLogAppender.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/avro/AvroLogAppender.java deleted file mode 100644 index e0cdd915348df..0000000000000 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/avro/AvroLogAppender.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.uber.hoodie.common.table.log.avro; - -import com.uber.hoodie.common.table.log.HoodieLogAppendConfig; -import com.uber.hoodie.common.table.log.HoodieLogAppender; -import com.uber.hoodie.common.util.FSUtils; -import com.uber.hoodie.exception.HoodieException; -import com.uber.hoodie.exception.HoodieIOException; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.avro.mapred.FsInput; -import org.apache.hadoop.fs.AvroFSInput; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; - -import java.io.IOException; -import java.util.Iterator; -import java.util.List; - -/** - * AvroLogAppender appends a bunch of IndexedRecord to a Avro data file. - * If auto-flush is set, every call to append writes out a block. - * A avro block corresponds to records appended in a single commit. - * - * @see org.apache.avro.file.DataFileReader - */ -public class AvroLogAppender implements HoodieLogAppender { - private final static Logger log = LogManager.getLogger(AvroLogAppender.class); - private final HoodieLogAppendConfig config; - private FSDataOutputStream output; - private DataFileWriter writer; - private boolean autoFlush; - - public AvroLogAppender(HoodieLogAppendConfig config) throws IOException, InterruptedException { - FileSystem fs = config.getFs(); - this.config = config; - this.autoFlush = config.isAutoFlush(); - GenericDatumWriter datumWriter = - new GenericDatumWriter<>(config.getSchema()); - this.writer = new DataFileWriter<>(datumWriter); - Path path = config.getLogFile().getPath(); - - if (fs.exists(path)) { - //TODO - check for log corruption and roll over if needed - log.info(config.getLogFile() + " exists. Appending to existing file"); - // this log path exists, we will append to it - // fs = FileSystem.get(fs.getConf()); - try { - this.output = fs.append(path, config.getBufferSize()); - } catch (RemoteException e) { - // this happens when either another task executor writing to this file died or data node is going down - if (e.getClassName().equals(AlreadyBeingCreatedException.class.getName()) - && fs instanceof DistributedFileSystem) { - log.warn("Trying to recover log on path " + path); - if (FSUtils.recoverDFSFileLease((DistributedFileSystem) fs, path)) { - log.warn("Recovered lease on path " + path); - // try again - this.output = fs.append(path, config.getBufferSize()); - } else { - log.warn("Failed to recover lease on path " + path); - throw new HoodieException(e); - } - } - } - - this.writer - .appendTo(new FsInput(path, fs.getConf()), output); - // we always want to flush to disk everytime a avro block is written - this.writer.setFlushOnEveryBlock(true); - } else { - log.info(config.getLogFile() + " does not exist. Create a new file"); - this.output = fs.create(path, false, config.getBufferSize(), config.getReplication(), - config.getBlockSize(), null); - this.writer.create(config.getSchema(), output); - this.writer.setFlushOnEveryBlock(true); - // We need to close the writer to be able to tell the name node that we created this file - // this.writer.close(); - } - } - - public void append(Iterator records) throws IOException { - records.forEachRemaining(r -> { - try { - writer.append(r); - } catch (IOException e) { - throw new HoodieIOException( - "Could not append record " + r + " to " + config.getLogFile()); - } - }); - if (autoFlush) { - sync(); - } - } - - public void sync() throws IOException { - if (output == null || writer == null) - return; // Presume closed - writer.flush(); - output.flush(); - output.hflush(); - } - - public void close() throws IOException { - sync(); - writer.close(); - writer = null; - output.close(); - output = null; - } - - public long getCurrentSize() throws IOException { - if (writer == null) { - throw new IllegalStateException( - "LogWriter " + config.getLogFile() + " has been closed. Cannot getCurrentSize"); - } - // writer.sync() returns only the offset for this block and not the global offset - return output.getPos(); - } -} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/avro/AvroLogReader.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/avro/AvroLogReader.java deleted file mode 100644 index 4f9a1a8d5e000..0000000000000 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/avro/AvroLogReader.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.uber.hoodie.common.table.log.avro; - -import com.google.common.collect.Lists; -import com.uber.hoodie.common.table.log.HoodieLogAppender; -import com.uber.hoodie.common.table.log.HoodieLogFile; -import com.uber.hoodie.exception.HoodieIOException; -import org.apache.avro.Schema; -import org.apache.avro.file.DataFileReader; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.fs.AvroFSInput; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.FileSystem; - -import java.io.IOException; -import java.util.Iterator; -import java.util.List; - -/** - * AvroLogReader allows reading blocks of records given a offset as written by AvroLogAppender - * Avro Log files are never streamed entirely - because of fault tolerance. - * If a block is corrupted, then random access with offset bypasses any corrupt blocks. - * Metadata about offset should be saved when writing blocks and passed in readBlock() - * - * @see AvroLogAppender - */ -public class AvroLogReader { - private final DataFileReader reader; - private final HoodieLogFile file; - - public AvroLogReader(HoodieLogFile file, FileSystem fs, Schema readerSchema) - throws IOException { - GenericDatumReader datumReader = new GenericDatumReader<>(); - datumReader.setExpected(readerSchema); - final AvroFSInput input = new AvroFSInput(FileContext.getFileContext(fs.getConf()), file.getPath()); - this.reader = (DataFileReader) DataFileReader.openReader(input, datumReader); - this.file = file; - } - - public Iterator readBlock(long startOffset) throws IOException { - // We keep track of exact offset for blocks, just seek to it directly - reader.seek(startOffset); - - List records = Lists.newArrayList(); - try { - // First check if we are past the sync market and then check reader.hasNext, - // hasNext will load a block in memory and this will fail if a block is corrupted. - while (!reader.pastSync(startOffset) && reader.hasNext()) { - records.add(reader.next()); - } - } catch (IOException e) { - throw new HoodieIOException("Failed to read avro records from " + file); - } - return records.iterator(); - } - - public HoodieLogFile getFile() { - return file; - } - - public void close() throws IOException { - reader.close(); - } - - -} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/avro/CompositeAvroLogReader.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/avro/CompositeAvroLogReader.java deleted file mode 100644 index 881fe30cfec55..0000000000000 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/avro/CompositeAvroLogReader.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.uber.hoodie.common.table.log.avro; - -import com.uber.hoodie.common.table.log.HoodieLogFile; -import com.uber.hoodie.common.util.FSUtils; -import com.uber.hoodie.exception.HoodieIOException; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import java.io.IOException; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.SortedMap; -import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -/** - * CompositeAvroLogReader reads all versions of the logs for a given fileId. - * It gives a iterator which iterates through all the versions and the list of blocks for that specific version - * Useful for merging records in RecordReader and compacting all the delta versions - * - * @see AvroLogReader - */ -public class CompositeAvroLogReader { - private final Map readers; - - public CompositeAvroLogReader(Path partitionPath, String fileId, String baseCommitTime, FileSystem fs, - Schema readerSchema, String logFileExtension) throws IOException { - Stream allLogFiles = - FSUtils.getAllLogFiles(fs, partitionPath, fileId, logFileExtension, baseCommitTime); - this.readers = allLogFiles.map(hoodieLogFile -> { - try { - return new AvroLogReader(hoodieLogFile, fs, readerSchema); - } catch (IOException e) { - throw new HoodieIOException( - "Could not read avro records from path " + hoodieLogFile); - } - }).collect(Collectors.toMap(new Function() { - @Override - public Integer apply(AvroLogReader avroLogReader) { - return avroLogReader.getFile().getLogVersion(); - } - }, Function.identity())); - } - - /** - * Reads all the versions (in the order specified) and all the blocks starting with the offset specified - * - * @param filesToOffsetMap - * @return - * @throws IOException - */ - public Iterator readBlocks(SortedMap> filesToOffsetMap) - throws IOException { - return new Iterators(filesToOffsetMap, readers); - } - - public void close() throws IOException { - readers.values().forEach(s -> { - try { - s.close(); - } catch (IOException e) { - throw new HoodieIOException("Unable to close " + s.getFile(), e); - } - }); - } - - public class Iterators implements Iterator { - - private final Map readers; - private final Map> versionsToOffsetMap; - private Integer currentVersion; - private Iterator currentVersionIterator; - private Iterator currentOffsetIterator; - private Iterator currentRecordIterator; - - public Iterators(Map> versionToOffsetMap, - Map readers) { - this.currentVersionIterator = versionToOffsetMap.keySet().iterator(); - this.readers = readers; - this.versionsToOffsetMap = versionToOffsetMap; - } - - private Iterator findNextBlock() throws IOException { - if (currentOffsetIterator != null) { - while (currentOffsetIterator.hasNext()) { - // we have more offsets to process for this file - long currentOffset = currentOffsetIterator.next(); - Iterator currentBlock = - readers.get(currentVersion).readBlock(currentOffset); - if (currentBlock.hasNext()) { - return currentBlock; - } - } - } - return null; - } - - private Iterator findNext() { - try { - Iterator nextBlock = findNextBlock(); - if (nextBlock != null) { - // we have more offsets to process for this version - return nextBlock; - } - - // We have no more offsets to process for the version, lets move on to the next version - while (currentVersionIterator.hasNext()) { - currentVersion = currentVersionIterator.next(); - currentOffsetIterator = versionsToOffsetMap.get(currentVersion).iterator(); - nextBlock = findNextBlock(); - if (nextBlock != null) { - return nextBlock; - } - } - } catch (IOException e) { - throw new HoodieIOException( - "Could not read avro records from " + readers.get(currentVersion).getFile()); - } - return null; - } - - @Override - public boolean hasNext() { - if (currentRecordIterator == null || !currentRecordIterator.hasNext()) { - currentRecordIterator = findNext(); - } - return (currentRecordIterator != null && currentRecordIterator.hasNext()); - } - - @Override - public GenericRecord next() { - return currentRecordIterator.next(); - } - } -} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/avro/HoodieAvroReader.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/avro/HoodieAvroReader.java deleted file mode 100644 index ab8907090aa4f..0000000000000 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/avro/HoodieAvroReader.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.uber.hoodie.common.table.log.avro; - -import com.google.common.collect.Maps; -import com.uber.hoodie.common.model.HoodieAvroPayload; -import com.uber.hoodie.common.model.HoodieKey; -import com.uber.hoodie.common.model.HoodieRecord; -import com.uber.hoodie.common.util.AvroUtils; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Spliterator; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; -import org.apache.avro.Schema; -import org.apache.hadoop.fs.FileSystem; - -/** - * This reads a bunch of HoodieRecords from avro log files and deduplicates and mantains the merged - * state in memory. This is useful for compaction and record reader - */ -public class HoodieAvroReader implements Iterable> { - - private final Collection> records; - private AtomicLong totalLogFiles = new AtomicLong(0); - private AtomicLong totalLogRecords = new AtomicLong(0); - private long totalRecordsToUpdate; - - - public HoodieAvroReader(FileSystem fs, List logFilePaths, Schema readerSchema) { - Map> records = Maps.newHashMap(); - for (String path : logFilePaths) { - totalLogFiles.incrementAndGet(); - List> recordsFromFile = AvroUtils - .loadFromFile(fs, path, readerSchema); - totalLogRecords.addAndGet(recordsFromFile.size()); - for (HoodieRecord recordFromFile : recordsFromFile) { - String key = recordFromFile.getRecordKey(); - if (records.containsKey(key)) { - // Merge and store the merged record - HoodieAvroPayload combinedValue = records.get(key).getData() - .preCombine(recordFromFile.getData()); - records.put(key, new HoodieRecord<>(new HoodieKey(key, recordFromFile.getPartitionPath()), - combinedValue)); - } else { - // Put the record as is - records.put(key, recordFromFile); - } - } - } - this.records = records.values(); - this.totalRecordsToUpdate = records.size(); - } - - @Override - public Iterator> iterator() { - return records.iterator(); - } - - @Override - public void forEach(Consumer> consumer) { - records.forEach(consumer); - } - - @Override - public Spliterator> spliterator() { - return records.spliterator(); - } - - public long getTotalLogFiles() { - return totalLogFiles.get(); - } - - public long getTotalLogRecords() { - return totalLogRecords.get(); - } - - public long getTotalRecordsToUpdate() { - return totalRecordsToUpdate; - } -} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/avro/RollingAvroLogAppender.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/avro/RollingAvroLogAppender.java deleted file mode 100644 index 07587939d84ba..0000000000000 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/avro/RollingAvroLogAppender.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.uber.hoodie.common.table.log.avro; - -import com.google.common.base.Preconditions; -import com.uber.hoodie.common.table.log.HoodieLogAppendConfig; -import com.uber.hoodie.common.table.log.HoodieLogAppender; -import com.uber.hoodie.common.table.log.HoodieLogFile; -import org.apache.avro.generic.IndexedRecord; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import java.io.IOException; -import java.util.Iterator; -import java.util.List; - -/** - * Implementation of {@link HoodieLogAppender} to roll over the log file when the sizeThreshold is reached. - */ -public class RollingAvroLogAppender implements HoodieLogAppender { - private static final Log LOG = LogFactory.getLog(RollingAvroLogAppender.class); - private AvroLogAppender logWriter; - private HoodieLogAppendConfig config; - - public RollingAvroLogAppender(HoodieLogAppendConfig config) - throws IOException, InterruptedException { - // initialize - this.logWriter = new AvroLogAppender(config); - this.config = config; - rollOverIfNeeded(); - } - - private void rollOverIfNeeded() throws IOException, InterruptedException { - HoodieLogFile logFile = config.getLogFile(); - boolean shouldRollOver = logFile.shouldRollOver(this, config); - if (shouldRollOver) { - if (logWriter != null) { - // Close the old writer and open a new one - logWriter.close(); - } - // Current logWriter is not initialized, set the current file name - HoodieLogFile nextRollLogPath = logFile.rollOver(config.getFs()); - LOG.info("Rolling over log from " + logFile + " to " + nextRollLogPath); - this.config = config.withLogFile(nextRollLogPath); - this.logWriter = new AvroLogAppender(this.config); - } - } - - public long getCurrentSize() throws IOException { - Preconditions.checkArgument(logWriter != null); - return logWriter.getCurrentSize(); - } - - public void append(Iterator records) throws IOException, InterruptedException { - LOG.info("Appending records to " + config.getLogFile()); - rollOverIfNeeded(); - Preconditions.checkArgument(logWriter != null); - logWriter.append(records); - } - - public void sync() throws IOException { - Preconditions.checkArgument(logWriter != null); - logWriter.sync(); - } - - public void close() throws IOException { - Preconditions.checkArgument(logWriter != null); - logWriter.close(); - } - - public HoodieLogAppendConfig getConfig() { - return config; - } -} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieAvroDataBlock.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieAvroDataBlock.java new file mode 100644 index 0000000000000..5129b9dc11804 --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieAvroDataBlock.java @@ -0,0 +1,134 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.table.log.block; + +import com.uber.hoodie.common.util.HoodieAvroUtils; +import com.uber.hoodie.exception.HoodieIOException; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; + +/** + * DataBlock contains a list of records serialized using Avro. + * The Datablock contains + * 1. Compressed Writer Schema length + * 2. Compressed Writer Schema content + * 3. Total number of records in the block + * 4. Size of a record + * 5. Actual avro serialized content of the record + */ +public class HoodieAvroDataBlock implements HoodieLogBlock { + + private List records; + private Schema schema; + + public HoodieAvroDataBlock(List records, Schema schema) { + this.records = records; + this.schema = schema; + } + + public List getRecords() { + return records; + } + + public Schema getSchema() { + return schema; + } + + @Override + public byte[] getBytes() throws IOException { + GenericDatumWriter writer = new GenericDatumWriter<>(schema); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream output = new DataOutputStream(baos); + + // 1. Compress and Write schema out + byte[] schemaContent = HoodieAvroUtils.compress(schema.toString()); + output.writeInt(schemaContent.length); + output.write(schemaContent); + + // 2. Write total number of records + output.writeInt(records.size()); + + // 3. Write the records + records.forEach(s -> { + ByteArrayOutputStream temp = new ByteArrayOutputStream(); + Encoder encoder = EncoderFactory.get().binaryEncoder(temp, null); + try { + // Encode the record into bytes + writer.write(s, encoder); + encoder.flush(); + + // Get the size of the bytes + int size = temp.toByteArray().length; + // Write the record size + output.writeInt(size); + // Write the content + output.write(temp.toByteArray()); + } catch (IOException e) { + throw new HoodieIOException("IOException converting HoodieAvroDataBlock to bytes", e); + } + }); + + output.close(); + return baos.toByteArray(); + } + + @Override + public HoodieLogBlockType getBlockType() { + return HoodieLogBlockType.AVRO_DATA_BLOCK; + } + + public static HoodieLogBlock fromBytes(byte[] content, Schema readerSchema) throws IOException { + // 1. Read the schema written out + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content)); + int schemaLength = dis.readInt(); + byte[] compressedSchema = new byte[schemaLength]; + dis.readFully(compressedSchema, 0, schemaLength); + Schema writerSchema = new Schema.Parser().parse(HoodieAvroUtils.decompress(compressedSchema)); + + GenericDatumReader reader = new GenericDatumReader<>(writerSchema, readerSchema); + // 2. Get the total records + int totalRecords = dis.readInt(); + List records = new ArrayList<>(totalRecords); + + // 3. Read the content + for(int i=0;i getLatestLogFile(Stream log public static Stream getAllLogFiles(FileSystem fs, Path partitionPath, final String fileId, final String logFileExtension, final String baseCommitTime) throws IOException { return Arrays.stream(fs.listStatus(partitionPath, - path -> path.getName().startsWith(fileId) && path.getName().contains(logFileExtension))) - .map(HoodieLogFile::new).filter(s -> s.getBaseCommitTime().equals(baseCommitTime)); + path -> path.getName().startsWith("." + fileId) && path.getName().contains(logFileExtension))) + .map(HoodieLogFile::new).filter(s -> s.getBaseCommitTime().equals(baseCommitTime)); } /** diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java index c3f0d0358d1fd..8323bc5ca8919 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java @@ -18,7 +18,13 @@ import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.SchemaCompatabilityException; +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.zip.DeflaterOutputStream; +import java.util.zip.InflaterInputStream; import org.apache.avro.Schema; import org.apache.avro.generic.*; import org.apache.avro.io.BinaryEncoder; @@ -137,4 +143,30 @@ public static GenericRecord rewriteRecord(GenericRecord record, Schema newSchema } return newRecord; } + + public static byte[] compress(String text) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + OutputStream out = new DeflaterOutputStream(baos); + out.write(text.getBytes("UTF-8")); + out.close(); + } catch (IOException e) { + throw new HoodieIOException("IOException while compressing text " + text, e); + } + return baos.toByteArray(); + } + + public static String decompress(byte[] bytes) { + InputStream in = new InflaterInputStream(new ByteArrayInputStream(bytes)); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + byte[] buffer = new byte[8192]; + int len; + while((len = in.read(buffer))>0) + baos.write(buffer, 0, len); + return new String(baos.toByteArray(), "UTF-8"); + } catch (IOException e) { + throw new HoodieIOException("IOException while decompressing text", e); + } + } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/exception/CorruptedLogFileException.java b/hoodie-common/src/main/java/com/uber/hoodie/exception/CorruptedLogFileException.java new file mode 100644 index 0000000000000..43dd5d40cddff --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/exception/CorruptedLogFileException.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.exception; + +public class CorruptedLogFileException extends HoodieException { + + public CorruptedLogFileException(String msg) { + super(msg); + } +} diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java index b882dd29bebf7..194b1b4073031 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java @@ -21,37 +21,30 @@ import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.JavaSerializer; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.uber.hoodie.common.table.HoodieTableConfig; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; -import com.uber.hoodie.common.table.log.HoodieLogAppendConfig; import com.uber.hoodie.common.table.log.HoodieLogFile; -import com.uber.hoodie.common.table.log.avro.AvroLogAppender; -import com.uber.hoodie.common.table.log.avro.RollingAvroLogAppender; +import com.uber.hoodie.common.table.log.HoodieLogFormat; +import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer; +import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; -import com.uber.hoodie.common.util.SchemaTestUtil; -import com.uber.hoodie.exception.HoodieException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; -import org.apache.jute.Index; import org.junit.rules.TemporaryFolder; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.io.Serializable; import java.text.SimpleDateFormat; import java.util.Date; @@ -60,10 +53,8 @@ import java.util.Map; import java.util.Properties; import java.util.UUID; -import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; -import java.util.stream.StreamSupport; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -181,17 +172,16 @@ public static void writeRecordsToLogFiles(String basePath, Schema schema, List { + logWriter.appendBlock(new HoodieAvroDataBlock(s.getValue().stream().map(r -> { try { GenericRecord val = (GenericRecord) r.getData().getInsertValue(schema).get(); HoodieAvroUtils.addHoodieKeyToRecord(val, @@ -202,8 +192,8 @@ public static void writeRecordsToLogFiles(String basePath, Schema schema, List records = SchemaTestUtil.generateTestRecords(0, 100); + HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, + getSimpleSchema()); + writer = writer.appendBlock(dataBlock); + long size = writer.getCurrentSize(); + assertTrue("We just wrote a block - size should be > 0", size > 0); + assertEquals( + "Write should be auto-flushed. The size reported by FileStatus and the writer should match", + size, fs.getFileStatus(writer.getLogFile().getPath()).getLen()); + writer.close(); + } + + @Test + public void testRollover() throws IOException, InterruptedException, URISyntaxException { + Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .overBaseCommit("100").withFs(fs).build(); + List records = SchemaTestUtil.generateTestRecords(0, 100); + HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, + getSimpleSchema()); + // Write out a block + writer = writer.appendBlock(dataBlock); + // Get the size of the block + long size = writer.getCurrentSize(); + writer.close(); + + // Create a writer with the size threshold as the size we just wrote - so this has to roll + writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .overBaseCommit("100").withFs(fs).withSizeThreshold(size - 1).build(); + records = SchemaTestUtil.generateTestRecords(0, 100); + dataBlock = new HoodieAvroDataBlock(records, + getSimpleSchema()); + writer = writer.appendBlock(dataBlock); + assertEquals("This should be a new log file and hence size should be 0", 0, + writer.getCurrentSize()); + assertEquals("Version should be rolled to 2", 2, writer.getLogFile().getLogVersion()); + writer.close(); + } + + @Test + public void testMultipleAppend() throws IOException, URISyntaxException, InterruptedException { + Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .overBaseCommit("100").withFs(fs).build(); + List records = SchemaTestUtil.generateTestRecords(0, 100); + HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, + getSimpleSchema()); + writer = writer.appendBlock(dataBlock); + long size1 = writer.getCurrentSize(); + writer.close(); + + writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .overBaseCommit("100").withFs(fs).build(); + records = SchemaTestUtil.generateTestRecords(0, 100); + dataBlock = new HoodieAvroDataBlock(records, + getSimpleSchema()); + writer = writer.appendBlock(dataBlock); + long size2 = writer.getCurrentSize(); + assertTrue("We just wrote a new block - size2 should be > size1", size2 > size1); + assertEquals( + "Write should be auto-flushed. The size reported by FileStatus and the writer should match", + size2, fs.getFileStatus(writer.getLogFile().getPath()).getLen()); + writer.close(); + + // Close and Open again and append 100 more records + writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .overBaseCommit("100").withFs(fs).build(); + records = SchemaTestUtil.generateTestRecords(0, 100); + dataBlock = new HoodieAvroDataBlock(records, + getSimpleSchema()); + writer = writer.appendBlock(dataBlock); + long size3 = writer.getCurrentSize(); + assertTrue("We just wrote a new block - size3 should be > size2", size3 > size2); + assertEquals( + "Write should be auto-flushed. The size reported by FileStatus and the writer should match", + size3, fs.getFileStatus(writer.getLogFile().getPath()).getLen()); + writer.close(); + + // Cannot get the current size after closing the log + try { + writer.getCurrentSize(); + fail("getCurrentSize should fail after the logAppender is closed"); + } catch (IllegalStateException e) { + // pass + } + } + + @Test + public void testLeaseRecovery() throws IOException, URISyntaxException, InterruptedException { + Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .overBaseCommit("100").withFs(fs).build(); + List records = SchemaTestUtil.generateTestRecords(0, 100); + HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, + getSimpleSchema()); + writer = writer.appendBlock(dataBlock); + long size1 = writer.getCurrentSize(); + // do not close this writer - this simulates a data note appending to a log dying without closing the file + // writer.close(); + + writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .overBaseCommit("100").withFs(fs).build(); + records = SchemaTestUtil.generateTestRecords(0, 100); + dataBlock = new HoodieAvroDataBlock(records, + getSimpleSchema()); + writer = writer.appendBlock(dataBlock); + long size2 = writer.getCurrentSize(); + assertTrue("We just wrote a new block - size2 should be > size1", size2 > size1); + assertEquals( + "Write should be auto-flushed. The size reported by FileStatus and the writer should match", + size2, fs.getFileStatus(writer.getLogFile().getPath()).getLen()); + writer.close(); + } + + @SuppressWarnings("unchecked") + @Test + public void testBasicWriteAndScan() + throws IOException, URISyntaxException, InterruptedException { + Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .overBaseCommit("100").withFs(fs).build(); + List records = SchemaTestUtil.generateTestRecords(0, 100); + HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, + getSimpleSchema()); + writer = writer.appendBlock(dataBlock); + writer.close(); + + Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema()); + assertTrue("We wrote a block, we should be able to read it", reader.hasNext()); + HoodieLogBlock nextBlock = reader.next(); + assertEquals("The next block should be a data block", HoodieLogBlockType.AVRO_DATA_BLOCK, + nextBlock.getBlockType()); + HoodieAvroDataBlock dataBlockRead = (HoodieAvroDataBlock) nextBlock; + assertEquals("Read records size should be equal to the written records size", + records.size(), dataBlockRead.getRecords().size()); + assertEquals("Both records lists should be the same. (ordering guaranteed)", records, + dataBlockRead.getRecords()); + } + + @SuppressWarnings("unchecked") + @Test + public void testBasicAppendAndRead() + throws IOException, URISyntaxException, InterruptedException { + Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .overBaseCommit("100").withFs(fs).build(); + List records1 = SchemaTestUtil.generateTestRecords(0, 100); + HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, + getSimpleSchema()); + writer = writer.appendBlock(dataBlock); + writer.close(); + + writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .overBaseCommit("100").withFs(fs).build(); + List records2 = SchemaTestUtil.generateTestRecords(0, 100); + dataBlock = new HoodieAvroDataBlock(records2, + getSimpleSchema()); + writer = writer.appendBlock(dataBlock); + writer.close(); + + // Close and Open again and append 100 more records + writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .overBaseCommit("100").withFs(fs).build(); + List records3 = SchemaTestUtil.generateTestRecords(0, 100); + dataBlock = new HoodieAvroDataBlock(records3, + getSimpleSchema()); + writer = writer.appendBlock(dataBlock); + writer.close(); + + Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema()); + assertTrue("First block should be available", reader.hasNext()); + HoodieLogBlock nextBlock = reader.next(); + HoodieAvroDataBlock dataBlockRead = (HoodieAvroDataBlock) nextBlock; + assertEquals("Read records size should be equal to the written records size", + records1.size(), dataBlockRead.getRecords().size()); + assertEquals("Both records lists should be the same. (ordering guaranteed)", records1, + dataBlockRead.getRecords()); + + nextBlock = reader.next(); + dataBlockRead = (HoodieAvroDataBlock) nextBlock; + assertEquals("Read records size should be equal to the written records size", + records2.size(), dataBlockRead.getRecords().size()); + assertEquals("Both records lists should be the same. (ordering guaranteed)", records2, + dataBlockRead.getRecords()); + + nextBlock = reader.next(); + dataBlockRead = (HoodieAvroDataBlock) nextBlock; + assertEquals("Read records size should be equal to the written records size", + records3.size(), dataBlockRead.getRecords().size()); + assertEquals("Both records lists should be the same. (ordering guaranteed)", records3, + dataBlockRead.getRecords()); + } + + @Test + public void testAppendAndReadOnCorruptedLog() + throws IOException, URISyntaxException, InterruptedException { + Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .overBaseCommit("100").withFs(fs).build(); + List records = SchemaTestUtil.generateTestRecords(0, 100); + HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, + getSimpleSchema()); + writer = writer.appendBlock(dataBlock); + writer.close(); + + // Append some arbit byte[] to thee end of the log (mimics a partially written commit) + fs = FileSystem.get(fs.getConf()); + FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath()); + // create a block with + outputStream.write(HoodieLogFormat.MAGIC); + outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal()); + // Write out a length that does not confirm with the content + outputStream.writeInt(100); + outputStream.write("something-random".getBytes()); + outputStream.flush(); + outputStream.close(); + + // First round of reads - we should be able to read the first block and then EOF + Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema()); + assertTrue("First block should be available", reader.hasNext()); + reader.next(); + assertTrue("We should have corrupted block next", reader.hasNext()); + HoodieLogBlock block = reader.next(); + assertEquals("The read block should be a corrupt block", HoodieLogBlockType.CORRUPT_BLOCK, + block.getBlockType()); + assertEquals("", "something-random", new String(block.getBytes())); + assertFalse("There should be no more block left", reader.hasNext()); + + // Simulate another failure back to back + outputStream = fs.append(writer.getLogFile().getPath()); + // create a block with + outputStream.write(HoodieLogFormat.MAGIC); + outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal()); + // Write out a length that does not confirm with the content + outputStream.writeInt(100); + outputStream.write("something-else-random".getBytes()); + outputStream.flush(); + outputStream.close(); + + // Should be able to append a new block + writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .overBaseCommit("100").withFs(fs).build(); + records = SchemaTestUtil.generateTestRecords(0, 100); + dataBlock = new HoodieAvroDataBlock(records, getSimpleSchema()); + writer = writer.appendBlock(dataBlock); + writer.close(); + + // Second round of reads - we should be able to read the first and last block + reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema()); + assertTrue("First block should be available", reader.hasNext()); + reader.next(); + assertTrue("We should get the 1st corrupted block next", reader.hasNext()); + reader.next(); + assertTrue("We should get the 2nd corrupted block next", reader.hasNext()); + block = reader.next(); + assertEquals("The read block should be a corrupt block", HoodieLogBlockType.CORRUPT_BLOCK, + block.getBlockType()); + assertEquals("", "something-else-random", new String(block.getBytes())); + assertTrue("We should get the last block next", reader.hasNext()); + reader.next(); + assertFalse("We should have no more blocks left", reader.hasNext()); + } + + + @Test + public void testAvroLogRecordReaderBasic() + throws IOException, URISyntaxException, InterruptedException { + Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); + // Set a small threshold so that every block is a new version + Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .overBaseCommit("100").withFs(fs).withSizeThreshold(500).build(); + // Write 1 + List records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100); + + HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, schema); + writer = writer.appendBlock(dataBlock); + + // Write 2 + List records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100); + dataBlock = new HoodieAvroDataBlock(records2, schema); + writer = writer.appendBlock(dataBlock); + writer.close(); + + List allLogFiles = FSUtils + .getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100") + .map(s -> s.getPath().toString()) + .collect(Collectors.toList()); + + HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, + schema); + assertEquals("", 200, scanner.getTotalLogRecords()); + Set readKeys = new HashSet<>(200); + scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); + assertEquals("Stream collect should return all 200 records", 200, readKeys.size()); + records1.addAll(records2); + Set originalKeys = records1.stream() + .map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()) + .collect( + Collectors.toSet()); + assertEquals("CompositeAvroLogReader should return 200 records from 2 versions", originalKeys, + readKeys); + } + + @Test + public void testAvroLogRecordReaderWithRollbackTombstone() + throws IOException, URISyntaxException, InterruptedException { + Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); + // Set a small threshold so that every block is a new version + Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .overBaseCommit("100").withFs(fs).build(); + + // Write 1 + List records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100); + HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, schema); + writer = writer.appendBlock(dataBlock); + + // Write 2 + List records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100); + dataBlock = new HoodieAvroDataBlock(records2, schema); + writer = writer.appendBlock(dataBlock); + + // Rollback the last write + HoodieCommandBlock commandBlock = new HoodieCommandBlock( + HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK); + writer = writer.appendBlock(commandBlock); + + // Write 3 + List records3 = SchemaTestUtil.generateHoodieTestRecords(0, 100); + dataBlock = new HoodieAvroDataBlock(records3, schema); + writer = writer.appendBlock(dataBlock); + writer.close(); + + List allLogFiles = FSUtils + .getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100") + .map(s -> s.getPath().toString()) + .collect(Collectors.toList()); + + HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, + schema); + assertEquals("We still would read 300 records, but only 200 of them are valid", 300, + scanner.getTotalLogRecords()); + Set readKeys = new HashSet<>(200); + scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); + assertEquals("Stream collect should return all 200 records", 200, readKeys.size()); + records1.addAll(records3); + Set originalKeys = records1.stream() + .map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()) + .collect( + Collectors.toSet()); + assertEquals("CompositeAvroLogReader should return 200 records from 2 versions", originalKeys, + readKeys); + } + + @Test + public void testAvroLogRecordReaderWithRollbackPartialBlock() + throws IOException, URISyntaxException, InterruptedException { + Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); + // Set a small threshold so that every block is a new version + Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .overBaseCommit("100").withFs(fs).build(); + + // Write 1 + List records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100); + HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, schema); + writer = writer.appendBlock(dataBlock); + writer.close(); + + // Append some arbit byte[] to thee end of the log (mimics a partially written commit) + fs = FileSystem.get(fs.getConf()); + FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath()); + // create a block with + outputStream.write(HoodieLogFormat.MAGIC); + outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal()); + // Write out a length that does not confirm with the content + outputStream.writeInt(100); + outputStream.write("something-random".getBytes()); + outputStream.flush(); + outputStream.close(); + + // Rollback the last write + HoodieCommandBlock commandBlock = new HoodieCommandBlock( + HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK); + writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .overBaseCommit("100").withFs(fs).build(); + writer = writer.appendBlock(commandBlock); + + // Write 3 + List records3 = SchemaTestUtil.generateHoodieTestRecords(0, 100); + dataBlock = new HoodieAvroDataBlock(records3, schema); + writer = writer.appendBlock(dataBlock); + writer.close(); + + List allLogFiles = FSUtils + .getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100") + .map(s -> s.getPath().toString()) + .collect(Collectors.toList()); + + HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, + schema); + assertEquals("We would read 200 records", 200, + scanner.getTotalLogRecords()); + Set readKeys = new HashSet<>(200); + scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); + assertEquals("Stream collect should return all 200 records", 200, readKeys.size()); + records1.addAll(records3); + Set originalKeys = records1.stream() + .map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()) + .collect( + Collectors.toSet()); + assertEquals("CompositeAvroLogReader should return 200 records from 2 versions", originalKeys, + readKeys); + } + + @Test + public void testAvroLogRecordReaderWithDelete() + throws IOException, URISyntaxException, InterruptedException { + Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); + // Set a small threshold so that every block is a new version + Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .overBaseCommit("100").withFs(fs).build(); + + // Write 1 + List records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100); + HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, schema); + writer = writer.appendBlock(dataBlock); + + // Write 2 + List records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100); + dataBlock = new HoodieAvroDataBlock(records2, schema); + writer = writer.appendBlock(dataBlock); + + records1.addAll(records2); + List originalKeys = records1.stream() + .map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()) + .collect( + Collectors.toList()); + + // Delete 50 keys + List deletedKeys = originalKeys.subList(0, 50); + HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new String[50])); + writer = writer.appendBlock(deleteBlock); + + List allLogFiles = FSUtils + .getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100") + .map(s -> s.getPath().toString()) + .collect(Collectors.toList()); + + HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, + schema); + assertEquals("We still would read 200 records", 200, + scanner.getTotalLogRecords()); + List readKeys = new ArrayList<>(200); + scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); + assertEquals("Stream collect should return all 200 records", 150, readKeys.size()); + originalKeys.removeAll(deletedKeys); + Collections.sort(originalKeys); + Collections.sort(readKeys); + assertEquals("CompositeAvroLogReader should return 200 records from 2 versions", originalKeys, + readKeys); + } +} diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/avro/AvroLogAppenderTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/avro/AvroLogAppenderTest.java deleted file mode 100644 index 2362819189793..0000000000000 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/avro/AvroLogAppenderTest.java +++ /dev/null @@ -1,326 +0,0 @@ -/* - * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.uber.hoodie.common.table.log.avro; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.uber.hoodie.common.minicluster.MiniClusterUtil; -import com.uber.hoodie.common.table.log.HoodieLogAppendConfig; -import com.uber.hoodie.common.table.log.HoodieLogFile; -import com.uber.hoodie.common.util.FSUtils; -import com.uber.hoodie.common.util.SchemaTestUtil; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.commons.collections.IteratorUtils; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.SortedMap; -import java.util.stream.Collectors; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -public class AvroLogAppenderTest { - private FileSystem fs; - private Path partitionPath; - - @BeforeClass - public static void setUpClass() throws IOException, InterruptedException { - // Append is not supported in LocalFileSystem. HDFS needs to be setup. - MiniClusterUtil.setUp(); - } - - @AfterClass - public static void tearDownClass() { - MiniClusterUtil.shutdown(); - } - - @Before - public void setUp() throws IOException, InterruptedException { - this.fs = MiniClusterUtil.fileSystem; - TemporaryFolder folder = new TemporaryFolder(); - folder.create(); - assertTrue(fs.mkdirs(new Path(folder.getRoot().getPath()))); - this.partitionPath = new Path(folder.getRoot().getPath()); - } - - @After - public void tearDown() throws IOException { - fs.delete(partitionPath, true); - } - - @Test - public void testBasicAppend() throws IOException, URISyntaxException, InterruptedException { - HoodieLogAppendConfig logConfig = - HoodieLogAppendConfig.newBuilder().onPartitionPath(partitionPath) - .withLogFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") - .withBaseCommitTime("100") - .withSchema(SchemaTestUtil.getSimpleSchema()).withFs(fs).build(); - RollingAvroLogAppender logAppender = new RollingAvroLogAppender(logConfig); - logAppender.append(SchemaTestUtil.generateTestRecords(0, 100).iterator()); - long size1 = logAppender.getCurrentSize(); - assertTrue("", size1 > 0); - assertEquals("", size1, fs.getFileStatus(logConfig.getLogFile().getPath()).getLen()); - logAppender.close(); - - // Close and Open again and append 100 more records - logAppender = new RollingAvroLogAppender(logConfig); - logAppender.append(SchemaTestUtil.generateTestRecords(100, 100).iterator()); - long size2 = logAppender.getCurrentSize(); - assertTrue("", size2 > size1); - assertEquals("", size2, fs.getFileStatus(logConfig.getLogFile().getPath()).getLen()); - logAppender.close(); - - // Close and Open again and append 100 more records - logAppender = new RollingAvroLogAppender(logConfig); - logAppender.append(SchemaTestUtil.generateTestRecords(200, 100).iterator()); - long size3 = logAppender.getCurrentSize(); - assertTrue("", size3 > size2); - assertEquals("", size3, fs.getFileStatus(logConfig.getLogFile().getPath()).getLen()); - logAppender.close(); - // Cannot get the current size after closing the log - try { - logAppender.getCurrentSize(); - fail("getCurrentSize should fail after the logAppender is closed"); - } catch (IllegalStateException e) { - // pass - } - } - - @Test - public void testLeaseRecovery() throws IOException, URISyntaxException, InterruptedException { - HoodieLogAppendConfig logConfig = - HoodieLogAppendConfig.newBuilder().onPartitionPath(partitionPath) - .withLogFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") - .withBaseCommitTime("100") - .withSchema(SchemaTestUtil.getSimpleSchema()).withFs(fs).build(); - RollingAvroLogAppender logAppender = new RollingAvroLogAppender(logConfig); - logAppender.append(SchemaTestUtil.generateTestRecords(0, 100).iterator()); - // do not close this log appender - // logAppender.close(); - - // Try opening again and append 100 more records - logAppender = new RollingAvroLogAppender(logConfig); - logAppender.append(SchemaTestUtil.generateTestRecords(100, 100).iterator()); - assertEquals("", logAppender.getCurrentSize(), - fs.getFileStatus(logConfig.getLogFile().getPath()).getLen()); - logAppender.close(); - } - - @Test - public void testAppendOnCorruptedBlock() - throws IOException, URISyntaxException, InterruptedException { - HoodieLogAppendConfig logConfig = - HoodieLogAppendConfig.newBuilder().onPartitionPath(partitionPath) - .withLogFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") - .withBaseCommitTime("100") - .withSchema(SchemaTestUtil.getSimpleSchema()).withFs(fs).build(); - RollingAvroLogAppender logAppender = new RollingAvroLogAppender(logConfig); - logAppender.append(SchemaTestUtil.generateTestRecords(0, 100).iterator()); - logAppender.close(); - - // Append some arbit byte[] to thee end of the log (mimics a partially written commit) - assertTrue(fs.exists(logConfig.getLogFile().getPath())); - fs = FileSystem.get(fs.getConf()); - FSDataOutputStream outputStream = - fs.append(logConfig.getLogFile().getPath(), logConfig.getBufferSize()); - outputStream.write("something-random".getBytes()); - outputStream.flush(); - outputStream.close(); - - logAppender = new RollingAvroLogAppender(logConfig); - logAppender.append(SchemaTestUtil.generateTestRecords(100, 100).iterator()); - logAppender.close(); - } - - - @SuppressWarnings("unchecked") - @Test - public void testBasicWriteAndRead() - throws IOException, URISyntaxException, InterruptedException { - HoodieLogAppendConfig logConfig = - HoodieLogAppendConfig.newBuilder().onPartitionPath(partitionPath) - .withLogFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") - .withBaseCommitTime("100") - .withSchema(SchemaTestUtil.getSimpleSchema()).withFs(fs).build(); - RollingAvroLogAppender logAppender = new RollingAvroLogAppender(logConfig); - long size1 = logAppender.getCurrentSize(); - - List inputRecords = SchemaTestUtil.generateTestRecords(0, 100); - logAppender.append(inputRecords.iterator()); - logAppender.close(); - - AvroLogReader logReader = - new AvroLogReader(logConfig.getLogFile(), fs, logConfig.getSchema()); - List result = IteratorUtils.toList(logReader.readBlock(size1)); - assertEquals("Random access should return 100 records", 100, result.size()); - assertEquals("both lists should be the same. (ordering guaranteed)", inputRecords, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testBasicAppendAndRead() - throws IOException, URISyntaxException, InterruptedException { - HoodieLogAppendConfig logConfig = - HoodieLogAppendConfig.newBuilder().onPartitionPath(partitionPath) - .withLogFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") - .withBaseCommitTime("100") - .withSchema(SchemaTestUtil.getSimpleSchema()).withFs(fs).build(); - RollingAvroLogAppender logAppender = new RollingAvroLogAppender(logConfig); - logAppender.append(SchemaTestUtil.generateTestRecords(0, 100).iterator()); - long size1 = logAppender.getCurrentSize(); - logAppender.close(); - - // Close and Open again and append 100 more records - logAppender = new RollingAvroLogAppender(logConfig); - List secondBatchInput = SchemaTestUtil.generateTestRecords(100, 100); - logAppender.append(secondBatchInput.iterator()); - long size2 = logAppender.getCurrentSize(); - logAppender.close(); - - // Close and Open again and append 100 more records - logAppender = new RollingAvroLogAppender(logConfig); - List lastBatchInput = SchemaTestUtil.generateTestRecords(200, 100); - logAppender.append(lastBatchInput.iterator()); - long size3 = logAppender.getCurrentSize(); - logAppender.close(); - - AvroLogReader logReader = - new AvroLogReader(logConfig.getLogFile(), fs, logConfig.getSchema()); - - // Try to grab the middle block here - List secondBatch = IteratorUtils.toList(logReader.readBlock(size1)); - assertEquals("Stream collect should return 100 records", 100, secondBatch.size()); - assertEquals("Collected list should match the input list (ordering guaranteed)", - secondBatchInput, secondBatch); - - // Try to grab the middle block here - List lastBatch = IteratorUtils.toList(logReader.readBlock(size2)); - assertEquals("Stream collect should return 100 records", 100, secondBatch.size()); - assertEquals("Collected list should match the input list (ordering guaranteed)", - lastBatchInput, lastBatch); - - List imaginaryBatch = IteratorUtils.toList(logReader.readBlock(size3)); - assertEquals("Stream collect should return 0 records", 0, imaginaryBatch.size()); - } - - @Test - public void testAppendAndReadOnCorruptedLog() - throws IOException, URISyntaxException, InterruptedException { - HoodieLogAppendConfig logConfig = - HoodieLogAppendConfig.newBuilder().onPartitionPath(partitionPath) - .withLogFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") - .withBaseCommitTime("100") - .withSchema(SchemaTestUtil.getSimpleSchema()).withFs(fs).build(); - RollingAvroLogAppender logAppender = new RollingAvroLogAppender(logConfig); - long size1 = logAppender.getCurrentSize(); - logAppender.append(SchemaTestUtil.generateTestRecords(0, 100).iterator()); - logAppender.close(); - - // Append some arbit byte[] to thee end of the log (mimics a partially written commit) - assertTrue(fs.exists(logConfig.getLogFile().getPath())); - fs = FileSystem.get(fs.getConf()); - FSDataOutputStream outputStream = - fs.append(logConfig.getLogFile().getPath(), logConfig.getBufferSize()); - outputStream.write("something-random".getBytes()); - outputStream.flush(); - outputStream.close(); - - logAppender = new RollingAvroLogAppender(logConfig); - long size2 = logAppender.getCurrentSize(); - logAppender.append(SchemaTestUtil.generateTestRecords(100, 100).iterator()); - logAppender.close(); - - AvroLogReader logReader = - new AvroLogReader(logConfig.getLogFile(), fs, logConfig.getSchema()); - - // Try to grab the middle block here - List secondBatch = IteratorUtils.toList(logReader.readBlock(size1)); - assertEquals("Stream collect should return 100 records", 100, secondBatch.size()); - - // Try to grab the last block here - List lastBatch = IteratorUtils.toList(logReader.readBlock(size2)); - assertEquals("Stream collect should return 100 records", 100, lastBatch.size()); - } - - @Test - public void testCompositeAvroLogReader() - throws IOException, URISyntaxException, InterruptedException { - // Set a small threshold so that every block is a new version - HoodieLogAppendConfig logConfig = - HoodieLogAppendConfig.newBuilder().onPartitionPath(partitionPath) - .withLogFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") - .withBaseCommitTime("100") - .withSchema(SchemaTestUtil.getSimpleSchema()).withSizeThreshold(500).withFs(fs) - .build(); - - RollingAvroLogAppender logAppender = new RollingAvroLogAppender(logConfig); - long size1 = logAppender.getCurrentSize(); - List input1 = SchemaTestUtil.generateTestRecords(0, 100); - logAppender.append(input1.iterator()); - logAppender.close(); - - // Need to rebuild config to set the latest version as path - logConfig = HoodieLogAppendConfig.newBuilder().onPartitionPath(partitionPath) - .withLogFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") - .withBaseCommitTime("100") - .withSchema(SchemaTestUtil.getSimpleSchema()).withSizeThreshold(500).withFs(fs).build(); - logAppender = new RollingAvroLogAppender(logConfig); - long size2 = logAppender.getCurrentSize(); - List input2 = SchemaTestUtil.generateTestRecords(100, 100); - logAppender.append(input2.iterator()); - logAppender.close(); - - logConfig = HoodieLogAppendConfig.newBuilder().onPartitionPath(partitionPath) - .withLogFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") - .withBaseCommitTime("100") - .withSchema(SchemaTestUtil.getSimpleSchema()).withSizeThreshold(500).withFs(fs).build(); - List allLogFiles = FSUtils - .getAllLogFiles(fs, partitionPath, logConfig.getLogFile().getFileId(), - HoodieLogFile.DELTA_EXTENSION, logConfig.getLogFile().getBaseCommitTime()) - .collect(Collectors.toList()); - assertEquals("", 2, allLogFiles.size()); - - SortedMap> offsets = Maps.newTreeMap(); - offsets.put(1, Lists.newArrayList(size1)); - offsets.put(2, Lists.newArrayList(size2)); - CompositeAvroLogReader reader = - new CompositeAvroLogReader(partitionPath, logConfig.getLogFile().getFileId(), - logConfig.getLogFile().getBaseCommitTime(), fs, logConfig.getSchema(), - HoodieLogFile.DELTA_EXTENSION); - Iterator results = reader.readBlocks(offsets); - List totalBatch = IteratorUtils.toList(results); - assertEquals("Stream collect should return all 200 records", 200, totalBatch.size()); - input1.addAll(input2); - assertEquals("CompositeAvroLogReader should return 200 records from 2 versions", input1, - totalBatch); - } -} diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/SchemaTestUtil.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/SchemaTestUtil.java index fbec264fdb596..399abfbb05f1f 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/SchemaTestUtil.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/SchemaTestUtil.java @@ -16,9 +16,12 @@ package com.uber.hoodie.common.util; +import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.exception.HoodieIOException; +import java.util.UUID; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.avro.io.DecoderFactory; @@ -59,4 +62,18 @@ private static List toRecords(Schema writerSchema, Schema readerS } } + public static List generateHoodieTestRecords(int from, int limit) + throws IOException, URISyntaxException { + List records = generateTestRecords(from, limit); + Schema hoodieFieldsSchema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); + return records.stream() + .map(s -> HoodieAvroUtils.rewriteRecord((GenericRecord) s, hoodieFieldsSchema)) + .map(p -> { + p.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, UUID.randomUUID().toString()); + p.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, "0000/00/00"); + return p; + }).collect( + Collectors.toList()); + + } } diff --git a/hoodie-hive/src/test/java/com/uber/hoodie/hive/HDroneDatasetTest.java b/hoodie-hive/src/test/java/com/uber/hoodie/hive/HDroneDatasetTest.java index 2a0c11c233a21..db64bb1d6ae59 100644 --- a/hoodie-hive/src/test/java/com/uber/hoodie/hive/HDroneDatasetTest.java +++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/HDroneDatasetTest.java @@ -55,10 +55,10 @@ public void testDatasetCreation() throws IOException, InitializationError { dataset = HoodieHiveDatasetSyncTask.newBuilder().withReference(metadata) .withConfiguration(TestUtil.hDroneConfiguration).build(); - assertTrue("Table should exist after sync", hiveClient.checkTableExists(metadata)); - assertEquals("After sync, There should not be any new partitions to sync", 0, + assertTrue("Table should exist after flush", hiveClient.checkTableExists(metadata)); + assertEquals("After flush, There should not be any new partitions to flush", 0, dataset.getNewPartitions().size()); - assertEquals("After sync, There should not be any modified partitions to sync", 0, + assertEquals("After flush, There should not be any modified partitions to flush", 0, dataset.getChangedPartitions().size()); assertEquals("Table Schema should have 5 fields", 5,