Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,64 +18,59 @@
*/
package org.elasticsearch.action.resync;

import org.elasticsearch.Version;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Arrays;

public final class ResyncReplicationRequest extends ReplicatedWriteRequest<ResyncReplicationRequest> {

private List<Translog.Operation> operations;
private Translog.Operation[] operations;

ResyncReplicationRequest() {
super();
}

public ResyncReplicationRequest(ShardId shardId, List<Translog.Operation> operations) {
public ResyncReplicationRequest(ShardId shardId, Translog.Operation[] operations) {
super(shardId);
this.operations = operations;
}

public List<Translog.Operation> getOperations() {
public Translog.Operation[] getOperations() {
return operations;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
final int size = in.readVInt();
operations = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
operations.add(Translog.Operation.readOperation(in));
if (in.getVersion().equals(Version.V_6_0_0)) {
throw new IllegalStateException("resync replication request serialization is broken in 6.0.0");
}
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(operations.size());
for (int i = 0; i < operations.size(); i++) {
Translog.Operation.writeOperation(operations.get(i), out);
}
out.writeArray(Translog.Operation::writeOperation, operations);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ResyncReplicationRequest that = (ResyncReplicationRequest) o;
return Objects.equals(getOperations(), that.getOperations());
return Arrays.equals(operations, that.operations);
}

@Override
public int hashCode() {
return Objects.hash(getOperations());
return Arrays.hashCode(operations);
}

@Override
Expand All @@ -84,7 +79,8 @@ public String toString() {
"shardId=" + shardId +
", timeout=" + timeout +
", index='" + index + '\'' +
", ops=" + operations.size() +
", ops=" + operations.length +
"}";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,13 @@ public void writeDoubleArray(double[] values) throws IOException {
}
}

public <T> void writeArray(Writer<T> writer, T[] array) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadocs please

writeVInt(array.length);
for (T value : array) {
writer.write(this, value);
}
}

public <T extends Writeable> void writeArray(T[] array) throws IOException {
writeVInt(array.length);
for (T value: array) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ public void onFailure(Exception e) {
}
}

private static Translog.Operation[] EMPTY_ARRAY = new Translog.Operation[0];

@Override
protected void doRun() throws Exception {
long size = 0;
Expand Down Expand Up @@ -247,7 +249,7 @@ protected void doRun() throws Exception {

if (!operations.isEmpty()) {
task.setPhase("sending_ops");
ResyncReplicationRequest request = new ResyncReplicationRequest(shardId, operations);
ResyncReplicationRequest request = new ResyncReplicationRequest(shardId, operations.toArray(EMPTY_ARRAY));
logger.trace("{} sending batch of [{}][{}] (total sent: [{}], skipped: [{}])", shardId, operations.size(),
new ByteSizeValue(size), totalSentOps.get(), totalSkippedOps.get());
syncAction.sync(request, task, primaryAllocationId, primaryTerm, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.index.translog;

import com.vividsolutions.jts.util.Assert;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.index.Term;
Expand Down Expand Up @@ -893,7 +892,7 @@ public static Type fromId(byte id) {

/**
* Reads the type and the operation from the given stream. The operation must be written with
* {@link Operation#writeOperation(Operation, StreamOutput)}
* {@link Operation#writeOperation(StreamOutput, Operation)}
*/
static Operation readOperation(final StreamInput input) throws IOException {
final Translog.Operation.Type type = Translog.Operation.Type.fromId(input.readByte());
Expand All @@ -914,7 +913,7 @@ static Operation readOperation(final StreamInput input) throws IOException {
/**
* Writes the type and translog operation to the given stream
*/
static void writeOperation(final Translog.Operation operation, final StreamOutput output) throws IOException {
static void writeOperation(final StreamOutput output, final Operation operation) throws IOException {
output.writeByte(operation.opType().id());
switch(operation.opType()) {
case CREATE:
Expand Down Expand Up @@ -1493,7 +1492,7 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl
// because closing it closes the underlying stream, which we don't
// want to do here.
out.resetDigest();
Translog.Operation.writeOperation(op, out);
Translog.Operation.writeOperation(out, op);
long checksum = out.getChecksum();
out.writeInt((int) checksum);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Collections;

import static org.hamcrest.Matchers.equalTo;

Expand All @@ -40,7 +39,7 @@ public void testSerialization() throws IOException {
final byte[] bytes = "{}".getBytes(Charset.forName("UTF-8"));
final Translog.Index index = new Translog.Index("type", "id", 0, Versions.MATCH_ANY, VersionType.INTERNAL, bytes, null, null, -1);
final ShardId shardId = new ShardId(new Index("index", "uuid"), 0);
final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, Collections.singletonList(index));
final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, new Translog.Operation[]{index});

final BytesStreamOutput out = new BytesStreamOutput();
before.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception {
AtomicBoolean syncActionCalled = new AtomicBoolean();
PrimaryReplicaSyncer.SyncAction syncAction =
(request, parentTask, allocationId, primaryTerm, listener) -> {
logger.info("Sending off {} operations", request.getOperations().size());
logger.info("Sending off {} operations", request.getOperations().length);
syncActionCalled.set(true);
assertThat(parentTask, instanceOf(PrimaryReplicaSyncer.ResyncTask.class));
listener.onResponse(new ResyncReplicationResponse());
Expand Down Expand Up @@ -98,7 +98,7 @@ public void testSyncerOnClosingShard() throws Exception {
CountDownLatch syncCalledLatch = new CountDownLatch(1);
PrimaryReplicaSyncer.SyncAction syncAction =
(request, parentTask, allocationId, primaryTerm, listener) -> {
logger.info("Sending off {} operations", request.getOperations().size());
logger.info("Sending off {} operations", request.getOperations().length);
syncActionCalled.set(true);
syncCalledLatch.countDown();
threadPool.generic().execute(() -> listener.onResponse(new ResyncReplicationResponse()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2343,7 +2343,7 @@ public void testTranslogOpSerialization() throws Exception {
Translog.Index index = new Translog.Index(eIndex, eIndexResult);

BytesStreamOutput out = new BytesStreamOutput();
Translog.Operation.writeOperation(index, out);
Translog.Operation.writeOperation(out, index);
StreamInput in = out.bytes().streamInput();
Translog.Index serializedIndex = (Translog.Index) Translog.Operation.readOperation(in);
assertEquals(index, serializedIndex);
Expand All @@ -2354,7 +2354,7 @@ public void testTranslogOpSerialization() throws Exception {
Translog.Delete delete = new Translog.Delete(eDelete, eDeleteResult);

out = new BytesStreamOutput();
Translog.Operation.writeOperation(delete, out);
Translog.Operation.writeOperation(out, delete);
in = out.bytes().streamInput();
Translog.Delete serializedDelete = (Translog.Delete) Translog.Operation.readOperation(in);
assertEquals(delete, serializedDelete);
Expand Down