Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,37 +122,51 @@ public Pair<String, Boolean> process(OMUpdateEventBatch events) {
final Collection<String> taskTables = getTaskTables();

while (eventIterator.hasNext()) {
OMDBUpdateEvent<String, OmKeyInfo> omdbUpdateEvent = eventIterator.next();
OMDBUpdateEvent<String, Object> omdbUpdateEvent = eventIterator.next();
// Filter event inside process method to avoid duping
if (!taskTables.contains(omdbUpdateEvent.getTable())) {
continue;
}
String updatedKey = omdbUpdateEvent.getKey();
OmKeyInfo omKeyInfo = omdbUpdateEvent.getValue();
Object value = omdbUpdateEvent.getValue();
Object oldValue = omdbUpdateEvent.getOldValue();

try {
switch (omdbUpdateEvent.getAction()) {
case PUT:
handlePutKeyEvent(omKeyInfo, fileSizeCountMap);
break;
if (value instanceof OmKeyInfo) {
OmKeyInfo omKeyInfo = (OmKeyInfo) value;
OmKeyInfo omKeyInfoOld = (OmKeyInfo) oldValue;

case DELETE:
handleDeleteKeyEvent(updatedKey, omKeyInfo, fileSizeCountMap);
break;
try {
switch (omdbUpdateEvent.getAction()) {
case PUT:
handlePutKeyEvent(omKeyInfo, fileSizeCountMap);
break;

case UPDATE:
handleDeleteKeyEvent(updatedKey, omdbUpdateEvent.getOldValue(),
fileSizeCountMap);
handlePutKeyEvent(omKeyInfo, fileSizeCountMap);
break;
case DELETE:
handleDeleteKeyEvent(updatedKey, omKeyInfo, fileSizeCountMap);
break;

default: LOG.trace("Skipping DB update event : {}",
omdbUpdateEvent.getAction());
case UPDATE:
if (omKeyInfoOld != null) {
handleDeleteKeyEvent(updatedKey, omKeyInfoOld, fileSizeCountMap);
handlePutKeyEvent(omKeyInfo, fileSizeCountMap);
} else {
LOG.warn("Update event does not have the old keyInfo for {}.",
updatedKey);
}
break;

default:
LOG.trace("Skipping DB update event : {}",
omdbUpdateEvent.getAction());
}
} catch (Exception e) {
LOG.error("Unexpected exception while processing key {}.",
updatedKey, e);
return new ImmutablePair<>(getTaskName(), false);
}
} catch (Exception e) {
LOG.error("Unexpected exception while processing key {}.",
updatedKey, e);
return new ImmutablePair<>(getTaskName(), false);
} else {
LOG.warn("Unexpected value type {} for key {}. Skipping processing.",
value.getClass().getName(), updatedKey);
}
}
writeCountsToDB(false, fileSizeCountMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,16 @@ public boolean processWithLegacy(OMUpdateEventBatch events) {
String updatedKey = omdbUpdateEvent.getKey();

try {
OMDBUpdateEvent<String, OmKeyInfo> keyTableUpdateEvent =
(OMDBUpdateEvent<String, OmKeyInfo>) omdbUpdateEvent;
OmKeyInfo updatedKeyInfo = keyTableUpdateEvent.getValue();
OmKeyInfo oldKeyInfo = keyTableUpdateEvent.getOldValue();
OMDBUpdateEvent<String, ?> keyTableUpdateEvent = omdbUpdateEvent;
Object value = keyTableUpdateEvent.getValue();
Object oldValue = keyTableUpdateEvent.getOldValue();
if (!(value instanceof OmKeyInfo)) {
LOG.warn("Unexpected value type {} for key {}. Skipping processing.",
value.getClass().getName(), updatedKey);
continue;
}
OmKeyInfo updatedKeyInfo = (OmKeyInfo) value;
OmKeyInfo oldKeyInfo = (OmKeyInfo) oldValue;

// KeyTable entries belong to both Legacy and OBS buckets.
// Check bucket layout and if it's OBS
Expand Down