Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Unset discovery nodes for every transport node actions request ([#17682](https://github.com/opensearch-project/OpenSearch/pull/17682))
- Implement parallel shard refresh behind cluster settings ([#17782](https://github.com/opensearch-project/OpenSearch/pull/17782))
- Bump OpenSearch Core main branch to 3.0.0 ([#18039](https://github.com/opensearch-project/OpenSearch/pull/18039))
- Update API of Message in index to add the timestamp for lag calculation in ingestion polling ([#17977](https://github.com/opensearch-project/OpenSearch/pull/17977/))


### Changed
- Change the default max header size from 8KB to 16KB. ([#18024](https://github.com/opensearch-project/OpenSearch/pull/18024))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@
public class KafkaMessage implements Message<byte[]> {
private final byte[] key;
private final byte[] payload;
private final Long timestamp;

/**
* Constructor
* @param key the key of the message
* @param payload the payload of the message
* @param timestamp the timestamp of the message in milliseconds
*/
public KafkaMessage(@Nullable byte[] key, byte[] payload) {
public KafkaMessage(@Nullable byte[] key, byte[] payload, Long timestamp) {
this.key = key;
this.payload = payload;
this.timestamp = timestamp;
}

/**
Expand All @@ -40,4 +43,9 @@ public byte[] getKey() {
public byte[] getPayload() {
return payload;
}

@Override
public Long getTimestamp() {
return timestamp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ private synchronized List<ReadResult<KafkaOffset, KafkaMessage>> fetch(
}
lastFetchedOffset = currentOffset;
KafkaOffset kafkaOffset = new KafkaOffset(currentOffset);
KafkaMessage message = new KafkaMessage(messageAndOffset.key(), messageAndOffset.value());
KafkaMessage message = new KafkaMessage(messageAndOffset.key(), messageAndOffset.value(), messageAndOffset.timestamp());
results.add(new ReadResult<>(kafkaOffset, message));
}
return results;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,20 @@ public void testConstructorAndGetters() {
byte[] key = { 1, 2, 3 };
byte[] payload = { 4, 5, 6 };

KafkaMessage message = new KafkaMessage(key, payload);
KafkaMessage message = new KafkaMessage(key, payload, 1000L);

Assert.assertArrayEquals(key, message.getKey());
Assert.assertArrayEquals(payload, message.getPayload());
Assert.assertEquals(1000L, message.getTimestamp().longValue());
}

public void testConstructorWithNullKey() {
byte[] payload = { 4, 5, 6 };

KafkaMessage message = new KafkaMessage(null, payload);
KafkaMessage message = new KafkaMessage(null, payload, null);

assertNull(message.getKey());
Assert.assertArrayEquals(payload, message.getPayload());
Assert.assertNull(message.getTimestamp());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,25 @@
*/
public class KinesisMessage implements Message<byte[]> {
private final byte[] payload;
private final Long timestamp;

/**
* Constructor
* @param payload the payload of the message
* @param timestamp the timestamp of the message in milliseconds
*/
public KinesisMessage(byte[] payload) {
public KinesisMessage(byte[] payload, Long timestamp) {
this.payload = payload;
this.timestamp = timestamp;
}

@Override
public byte[] getPayload() {
return payload;
}

@Override
public Long getTimestamp() {
return timestamp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ private synchronized List<ReadResult<SequenceNumber, KinesisMessage>> fetch(

for (Record record : records) {
SequenceNumber sequenceNumber1 = new SequenceNumber(record.sequenceNumber());
KinesisMessage message = new KinesisMessage(record.data().asByteArray());
Long timestamp = record.approximateArrivalTimestamp() != null ? record.approximateArrivalTimestamp().toEpochMilli() : null;
KinesisMessage message = new KinesisMessage(record.data().asByteArray(), timestamp);
results.add(new ReadResult<>(sequenceNumber1, message));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@
public class KinesisMessageTests extends OpenSearchTestCase {
public void testConstructorAndGetters() {
byte[] payload = { 1, 2, 3 };
KinesisMessage message = new KinesisMessage(payload);
KinesisMessage message = new KinesisMessage(payload, 1000L);

Assert.assertArrayEquals("Payload should be correctly initialized and returned", payload, message.getPayload());
Assert.assertEquals(1000L, message.getTimestamp().longValue());
}

public void testConstructorWithNullPayload() {
KinesisMessage message = new KinesisMessage(null);
KinesisMessage message = new KinesisMessage(null, null);

Assert.assertNull("Payload should be null", message.getPayload());
Assert.assertNull("Timestamp should be null", message.getTimestamp());
}
}
6 changes: 6 additions & 0 deletions server/src/main/java/org/opensearch/index/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,10 @@
@ExperimentalApi
public interface Message<T> {
T getPayload();

/**
* Get the timestamp of the message in milliseconds
* @return the timestamp of the message
*/
Long getTimestamp();
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
private volatile boolean paused;
private volatile IngestionErrorStrategy errorStrategy;

private volatile long lastPolledMessageTimestamp = 0;

private IngestionShardConsumer consumer;

private ExecutorService consumerThread;
Expand Down Expand Up @@ -247,7 +249,7 @@
}
totalPolledCount.inc();
blockingQueueContainer.add(result);

lastPolledMessageTimestamp = result.getMessage().getTimestamp() == null ? 0 : result.getMessage().getTimestamp();
logger.debug(
"Put message {} with pointer {} to the blocking queue",
String.valueOf(result.getMessage().getPayload()),
Expand Down Expand Up @@ -366,9 +368,17 @@
builder.setTotalPolledCount(totalPolledCount.count());
builder.setTotalProcessedCount(blockingQueueContainer.getTotalProcessedCount());
builder.setTotalSkippedCount(blockingQueueContainer.getTotalSkippedCount());
builder.setLagInMillis(computeLag());

Check warning on line 371 in server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java#L371

Added line #L371 was not covered by tests
return builder.build();
}

/**
* Returns the lag in milliseconds since the last polled message
*/
private long computeLag() {
return System.currentTimeMillis() - lastPolledMessageTimestamp;

Check warning on line 379 in server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java#L379

Added line #L379 was not covered by tests
}

public State getState() {
return this.state;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,16 @@ public PollingIngestStats(StreamInput in) throws IOException {
long totalSkippedCount = in.readLong();
this.messageProcessorStats = new MessageProcessorStats(totalProcessedCount, totalSkippedCount);
long totalPolledCount = in.readLong();
this.consumerStats = new ConsumerStats(totalPolledCount);
long lagInMillis = in.readLong();
this.consumerStats = new ConsumerStats(totalPolledCount, lagInMillis);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(messageProcessorStats.totalProcessedCount);
out.writeLong(messageProcessorStats.totalSkippedCount);
out.writeLong(consumerStats.totalPolledCount);
out.writeLong(consumerStats.lagInMillis);
}

@Override
Expand All @@ -56,6 +58,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.endObject();
builder.startObject("consumer_stats");
builder.field("total_polled_count", consumerStats.totalPolledCount);
builder.field("lag_in_millis", consumerStats.lagInMillis);
builder.endObject();
builder.endObject();
return builder;
Expand Down Expand Up @@ -93,7 +96,7 @@ public record MessageProcessorStats(long totalProcessedCount, long totalSkippedC
* Stats for consumer (poller)
*/
@ExperimentalApi
public record ConsumerStats(long totalPolledCount) {
public record ConsumerStats(long totalPolledCount, long lagInMillis) {
}

/**
Expand All @@ -104,6 +107,7 @@ public static class Builder {
private long totalProcessedCount;
private long totalSkippedCount;
private long totalPolledCount;
private long lagInMillis;

public Builder() {}

Expand All @@ -122,9 +126,14 @@ public Builder setTotalSkippedCount(long totalSkippedCount) {
return this;
}

public Builder setLagInMillis(long lagInMillis) {
this.lagInMillis = lagInMillis;
return this;
}

public PollingIngestStats build() {
MessageProcessorStats messageProcessorStats = new MessageProcessorStats(totalProcessedCount, totalSkippedCount);
ConsumerStats consumerStats = new ConsumerStats(totalPolledCount);
ConsumerStats consumerStats = new ConsumerStats(totalPolledCount, lagInMillis);
return new PollingIngestStats(messageProcessorStats, consumerStats);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ public byte[] getPayload() {
return payload;
}

@Override
public Long getTimestamp() {
return System.currentTimeMillis();
}

@Override
public String toString() {
return new String(payload, StandardCharsets.UTF_8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public void testToXContent() throws IOException {
+ stats.getMessageProcessorStats().totalSkippedCount()
+ "},\"consumer_stats\":{\"total_polled_count\":"
+ stats.getConsumerStats().totalPolledCount()
+ ",\"lag_in_millis\":"
+ stats.getConsumerStats().lagInMillis()
+ "}}}";

assertEquals(expected, builder.toString());
Expand All @@ -56,6 +58,7 @@ private PollingIngestStats createTestInstance() {
.setTotalProcessedCount(randomNonNegativeLong())
.setTotalSkippedCount(randomNonNegativeLong())
.setTotalPolledCount(randomNonNegativeLong())
.setLagInMillis(randomNonNegativeLong())
.build();
}
}
Loading