Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -83,7 +84,6 @@ public class GobblinMCEPublisher extends DataPublisher {
public static final String SERIALIZED_AUDIT_COUNT_MAP_KEY = "serializedAuditCountMap";

public GobblinMCEPublisher(State state) throws IOException {

this(state, GobblinMCEProducer.getGobblinMCEProducer(state));
}

Expand All @@ -96,7 +96,7 @@ public GobblinMCEPublisher(State state, GobblinMCEProducer producer) {
public void publishData(Collection<? extends WorkUnitState> states) throws IOException {
// First aggregate the new files by partition
for (State state : states) {
Map<Path, Metrics> newFiles = computeFileMetrics(state);
Map<Path, Metrics> newFiles = computeFileMetrics(state, state.getPropAsList(NEW_FILES_LIST, ""));
Map<String, String> offsetRange = getPartitionOffsetRange(OFFSET_RANGE_KEY);
if (newFiles.isEmpty()) {
// There'll be only one dummy file here. This file is parsed for DB and table name calculation.
Expand All @@ -114,7 +114,7 @@ public void publishData(Collection<? extends WorkUnitState> states) throws IOExc
}
}

private Map<String, String> getPartitionOffsetRange(String offsetKey) {
protected Map<String, String> getPartitionOffsetRange(String offsetKey) {
return state.getPropAsList(offsetKey)
.stream()
.collect(Collectors.toMap(s -> s.split(MAP_DELIMITER_KEY)[0], s -> s.split(MAP_DELIMITER_KEY)[1]));
Expand All @@ -125,11 +125,11 @@ private Map<String, String> getPartitionOffsetRange(String offsetKey) {
* and calculate the hive spec for each datafile and submit the task to register that datafile
* @throws IOException
*/
private Map<Path, Metrics> computeFileMetrics(State state) throws IOException {
protected Map<Path, Metrics> computeFileMetrics(State state, List<String> fileList) throws IOException {
Map<Path, Metrics> newFiles = new HashMap<>();
NameMapping mapping = getNameMapping();
FileSystem fs = FileSystem.get(conf);
for (final String pathString : state.getPropAsList(NEW_FILES_LIST, "")) {
for (final String pathString : fileList) {
Path path = new Path(pathString);
LinkedList<FileStatus> fileStatuses = new LinkedList<>();
fileStatuses.add(fs.getFileStatus(path));
Expand All @@ -153,7 +153,7 @@ private Map<Path, Metrics> computeFileMetrics(State state) throws IOException {
* It's used in GMCE writer {@link GobblinMCEWriter} merely for getting the DB and table name.
* @throws IOException
*/
private Map<Path, Metrics> computeDummyFile(State state) throws IOException {
protected Map<Path, Metrics> computeDummyFile(State state) throws IOException {
Map<Path, Metrics> newFiles = new HashMap<>();
FileSystem fs = FileSystem.get(conf);
if (!state.contains(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR)) {
Expand Down