Skip to content

Commit

Permalink
Add config to preserve stream topic name as a column for `CLPLogRecor…
Browse files Browse the repository at this point in the history
…dExtractor` (apache#14599)

* Add config to preserve stream topic name in rows

* fill row if extract all
  • Loading branch information
itschrispeck authored Dec 14, 2024
1 parent 3677671 commit f412987
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ public void init(Set<String> fields, @Nullable RecordExtractorConfig recordExtra
_serverMetrics = serverMetrics;
}

public void init(Set<String> fields, @Nullable RecordExtractorConfig recordExtractorConfig, String topicName) {
init(fields, recordExtractorConfig);
_topicName = topicName;
}

@Override
public void init(Set<String> fields, @Nullable RecordExtractorConfig recordExtractorConfig) {
_config = (CLPLogRecordExtractorConfig) recordExtractorConfig;
Expand Down Expand Up @@ -131,6 +136,11 @@ public void init(Set<String> fields, @Nullable RecordExtractorConfig recordExtra
public GenericRow extract(Map<String, Object> from, GenericRow to) {
Set<String> clpEncodedFieldNames = _config.getFieldsForClpEncoding();

// Preserve topic name if configured, regardless of _extractAll
if (_config.getTopicNameDestinationColumn() != null) {
to.putValue(_config.getTopicNameDestinationColumn(), _topicName);
}

if (_extractAll) {
for (Map.Entry<String, Object> recordEntry : from.entrySet()) {
String recordKey = recordEntry.getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,16 @@ public class CLPLogRecordExtractorConfig implements RecordExtractorConfig {
public static final String REMOVE_PROCESSED_FIELDS_CONFIG_KEY = "removeProcessedFields";
public static final String UNENCODABLE_FIELD_SUFFIX_CONFIG_KEY = "unencodableFieldSuffix";
public static final String UNENCODABLE_FIELD_ERROR_CONFIG_KEY = "unencodableFieldError";
// Preserve the topic name as a column in each destination row. If null, the topic name will not be preserved.
public static final String TOPIC_NAME_DESTINATION_COLUMN_CONFIG_KEY = "topicNameDestinationColumn";

private static final Logger LOGGER = LoggerFactory.getLogger(CLPLogRecordExtractorConfig.class);

private final Set<String> _fieldsForClpEncoding = new HashSet<>();
private String _unencodableFieldSuffix = null;
private String _unencodableFieldError = null;
private boolean _removeProcessedFields = false;
private String _topicNameDestinationColumn = null;

@Override
public void init(Map<String, String> props) {
Expand All @@ -64,6 +67,15 @@ public void init(Map<String, String> props) {
return;
}

String topicNameDestinationColumn = props.get(TOPIC_NAME_DESTINATION_COLUMN_CONFIG_KEY);
if (null != topicNameDestinationColumn) {
if (topicNameDestinationColumn.length() == 0) {
LOGGER.warn("Ignoring empty value for {}", TOPIC_NAME_DESTINATION_COLUMN_CONFIG_KEY);
} else {
_topicNameDestinationColumn = topicNameDestinationColumn;
}
}

String concatenatedFieldNames = props.get(FIELDS_FOR_CLP_ENCODING_CONFIG_KEY);
if (null == concatenatedFieldNames) {
return;
Expand Down Expand Up @@ -114,4 +126,7 @@ public String getUnencodableFieldSuffix() {
public String getUnencodableFieldError() {
return _unencodableFieldError;
}
public String getTopicNameDestinationColumn() {
return _topicNameDestinationColumn;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class CLPLogRecordExtractorTest {
private static final String _MESSAGE_2_FIELD_NAME = "message2";
private static final String _MESSAGE_2_FIELD_VALUE = "Stopped job_123 on node-987: 3 cores, 6 threads and "
+ "22.0% memory used.";
private static final String _TOPIC_NAME = "test-topic";
private static final String _TOPIC_NAME_DEST_COLUMN = "_streamTopicName";

@Test
public void testCLPEncoding() {
Expand Down Expand Up @@ -147,6 +149,23 @@ public void testEmptyCLPEncodingConfig() {
assertEquals(row.getValue(_MESSAGE_2_FIELD_NAME), _MESSAGE_2_FIELD_VALUE);
}

@Test
public void testPreserveTopicName() {
Map<String, String> props = new HashMap<>();
Set<String> fieldsToRead = new HashSet<>();
fieldsToRead.add(_MESSAGE_1_FIELD_NAME);

// Test null topicNameDestinationColumn config
GenericRow row;
row = extract(props, fieldsToRead);
assertNull(row.getValue(_TOPIC_NAME_DEST_COLUMN));

// Test with valid topicNameDestinationColumn config
props.put(CLPLogRecordExtractorConfig.TOPIC_NAME_DESTINATION_COLUMN_CONFIG_KEY, _TOPIC_NAME_DEST_COLUMN);
row = extract(props, fieldsToRead);
assertEquals(row.getValue(_TOPIC_NAME_DEST_COLUMN), _TOPIC_NAME);
}

private void addCLPEncodedField(String fieldName, Set<String> fields) {
fields.add(fieldName + ClpRewriter.LOGTYPE_COLUMN_SUFFIX);
fields.add(fieldName + ClpRewriter.DICTIONARY_VARS_COLUMN_SUFFIX);
Expand All @@ -157,7 +176,7 @@ private GenericRow extract(Map<String, String> props, Set<String> fieldsToRead)
CLPLogRecordExtractorConfig extractorConfig = new CLPLogRecordExtractorConfig();
CLPLogRecordExtractor extractor = new CLPLogRecordExtractor();
extractorConfig.init(props);
extractor.init(fieldsToRead, extractorConfig);
extractor.init(fieldsToRead, extractorConfig, _TOPIC_NAME);

// Assemble record
Map<String, Object> record = new HashMap<>();
Expand Down

0 comments on commit f412987

Please sign in to comment.