Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,42 +18,59 @@
*/
package org.elasticsearch.action.resync;

import org.elasticsearch.Version;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;

import java.io.IOException;
import java.util.List;
import java.util.Arrays;

public final class ResyncReplicationRequest extends ReplicatedWriteRequest<ResyncReplicationRequest> {

private List<Translog.Operation> operations;
private Translog.Operation[] operations;

ResyncReplicationRequest() {
super();
}

public ResyncReplicationRequest(ShardId shardId, List<Translog.Operation> operations) {
public ResyncReplicationRequest(ShardId shardId, Translog.Operation[] operations) {
super(shardId);
this.operations = operations;
}

public List<Translog.Operation> getOperations() {
public Translog.Operation[] getOperations() {
return operations;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
operations = in.readList(Translog.Operation::readType);
if (in.getVersion().equals(Version.V_6_0_0)) {
throw new IllegalStateException("resync replication request serialization is broken in 6.0.0");
}
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(operations);
out.writeArray(Translog.Operation::writeOperation, operations);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ResyncReplicationRequest that = (ResyncReplicationRequest) o;
return Arrays.equals(operations, that.operations);
}

@Override
public int hashCode() {
return Arrays.hashCode(operations);
}

@Override
Expand All @@ -62,7 +79,8 @@ public String toString() {
"shardId=" + shardId +
", timeout=" + timeout +
", index='" + index + '\'' +
", ops=" + operations.size() +
", ops=" + operations.length +
"}";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,13 @@ public void writeDoubleArray(double[] values) throws IOException {
}
}

public <T> void writeArray(Writer<T> writer, T[] array) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadocs please

writeVInt(array.length);
for (T value : array) {
writer.write(this, value);
}
}

public <T extends Writeable> void writeArray(T[] array) throws IOException {
writeVInt(array.length);
for (T value: array) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ public void onFailure(Exception e) {
}
}

private static Translog.Operation[] EMPTY_ARRAY = new Translog.Operation[0];

@Override
protected void doRun() throws Exception {
long size = 0;
Expand Down Expand Up @@ -247,7 +249,7 @@ protected void doRun() throws Exception {

if (!operations.isEmpty()) {
task.setPhase("sending_ops");
ResyncReplicationRequest request = new ResyncReplicationRequest(shardId, operations);
ResyncReplicationRequest request = new ResyncReplicationRequest(shardId, operations.toArray(EMPTY_ARRAY));
logger.trace("{} sending batch of [{}][{}] (total sent: [{}], skipped: [{}])", shardId, operations.size(),
new ByteSizeValue(size), totalSentOps.get(), totalSkippedOps.get());
syncAction.sync(request, task, primaryAllocationId, primaryTerm, this);
Expand Down
55 changes: 32 additions & 23 deletions core/src/main/java/org/elasticsearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.uid.Versions;
Expand Down Expand Up @@ -847,7 +846,7 @@ public interface Snapshot extends Closeable {
* A generic interface representing an operation performed on the transaction log.
* Each is associated with a type.
*/
public interface Operation extends Writeable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we keep the Writeable and instead force each of the impls to serialize the type? I also wonder if we should rather make it an abstract class and impl. the default writing part in Operation. That way we can keep using write/readList which I'd prefer. The reading part can still be a privat ctor, +1 to that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@s1monw We can do that, but then there's still the possibility of mistakenly calling the inner write implementations from within Translog.java (because now there will be a visible abstract write method on Operation whereas with what we have here there is no such method). I wonder if this is enough to address keeping serialization simple (i.e., not a manual iteration):

diff --git a/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java
index 9883f58fd9..9baf002ef9 100644
--- a/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java
+++ b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java
@@ -26,6 +26,7 @@ import org.elasticsearch.index.translog.Translog;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 
@@ -49,20 +50,15 @@ public final class ResyncReplicationRequest extends ReplicatedWriteRequest<Resyn
     @Override
     public void readFrom(StreamInput in) throws IOException {
         super.readFrom(in);
-        final int size = in.readVInt();
-        operations = new ArrayList<>(size);
-        for (int i = 0; i < size; i++) {
-            operations.add(Translog.Operation.readOperation(in));
-        }
+        operations = Arrays.asList(in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new));
     }
 
+    private static Translog.Operation[] EMPTY_ARRAY = new Translog.Operation[0];
+
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         super.writeTo(out);
-        out.writeVInt(operations.size());
-        for (int i = 0; i < operations.size(); i++) {
-            Translog.Operation.writeOperation(operations.get(i), out);
-        }
+        out.writeArray((o, op) -> Translog.Operation.writeOperation(op, o), operations.toArray(EMPTY_ARRAY));
     }
 
     @Override
diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
index eb2bbdc357..8cb0ee3a9b 100644
--- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
+++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
@@ -706,6 +706,13 @@ public abstract class StreamOutput extends OutputStream {
         }
     }
 
+    public <T> void writeArray(Writer<T> writer, T[] array) throws IOException {
+        writeVInt(array.length);
+        for (T value : array) {
+            writer.write(this, value);
+        }
+    }
+
     public <T extends Writeable> void writeArray(T[] array) throws IOException {
         writeVInt(array.length);
         for (T value: array) {

Then we can keep the inner write method that should never be invoked except from writeOperation off of the abstract Operation.

What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we can make it slightly cleaner with:

diff --git a/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java
index 9883f58fd9..29f4567a6e 100644
--- a/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java
+++ b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java
@@ -25,7 +25,7 @@ import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.translog.Translog;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 
@@ -49,20 +49,15 @@ public final class ResyncReplicationRequest extends ReplicatedWriteRequest<Resyn
     @Override
     public void readFrom(StreamInput in) throws IOException {
         super.readFrom(in);
-        final int size = in.readVInt();
-        operations = new ArrayList<>(size);
-        for (int i = 0; i < size; i++) {
-            operations.add(Translog.Operation.readOperation(in));
-        }
+        operations = Arrays.asList(in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new));
     }
 
+    private static Translog.Operation[] EMPTY_ARRAY = new Translog.Operation[0];
+
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         super.writeTo(out);
-        out.writeVInt(operations.size());
-        for (int i = 0; i < operations.size(); i++) {
-            Translog.Operation.writeOperation(operations.get(i), out);
-        }
+        out.writeArray(Translog.Operation::writeOperation, operations.toArray(EMPTY_ARRAY));
     }
 
     @Override
diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
index eb2bbdc357..8cb0ee3a9b 100644
--- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
+++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
@@ -706,6 +706,13 @@ public abstract class StreamOutput extends OutputStream {
         }
     }
 
+    public <T> void writeArray(Writer<T> writer, T[] array) throws IOException {
+        writeVInt(array.length);
+        for (T value : array) {
+            writer.write(this, value);
+        }
+    }
+
     public <T extends Writeable> void writeArray(T[] array) throws IOException {
         writeVInt(array.length);
         for (T value: array) {
diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
index 4942d8135c..4373c8d053 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
@@ -19,7 +19,6 @@
 
 package org.elasticsearch.index.translog;
 
-import com.vividsolutions.jts.util.Assert;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.logging.log4j.util.Supplier;
 import org.apache.lucene.index.Term;
@@ -893,7 +892,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
 
         /**
          * Reads the type and the operation from the given stream. The operation must be written with
-         * {@link Operation#writeOperation(Operation, StreamOutput)}
+         * {@link Operation#writeOperation(StreamOutput, Operation)}
          */
         static Operation readOperation(final StreamInput input) throws IOException {
             final Translog.Operation.Type type = Translog.Operation.Type.fromId(input.readByte());
@@ -914,7 +913,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
         /**
          * Writes the type and translog operation to the given stream
          */
-        static void writeOperation(final Translog.Operation operation, final StreamOutput output) throws IOException {
+        static void writeOperation(final StreamOutput output, final Operation operation) throws IOException {
             output.writeByte(operation.opType().id());
             switch(operation.opType()) {
                 case CREATE:
@@ -1493,7 +1492,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
         // because closing it closes the underlying stream, which we don't
         // want to do here.
         out.resetDigest();
-        Translog.Operation.writeOperation(op, out);
+        Translog.Operation.writeOperation(out, op);
         long checksum = out.getChecksum();
         out.writeInt((int) checksum);
     }
diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
index 477a7ae849..1a17e0dc6a 100644
--- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
+++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
@@ -2343,7 +2343,7 @@ public class TranslogTests extends ESTestCase {
         Translog.Index index = new Translog.Index(eIndex, eIndexResult);
 
         BytesStreamOutput out = new BytesStreamOutput();
-        Translog.Operation.writeOperation(index, out);
+        Translog.Operation.writeOperation(out, index);
         StreamInput in = out.bytes().streamInput();
         Translog.Index serializedIndex = (Translog.Index) Translog.Operation.readOperation(in);
         assertEquals(index, serializedIndex);
@@ -2354,7 +2354,7 @@ public class TranslogTests extends ESTestCase {
         Translog.Delete delete = new Translog.Delete(eDelete, eDeleteResult);
 
         out = new BytesStreamOutput();
-        Translog.Operation.writeOperation(delete, out);
+        Translog.Operation.writeOperation(out, delete);
         in = out.bytes().streamInput();
         Translog.Delete serializedDelete = (Translog.Delete) Translog.Operation.readOperation(in);
         assertEquals(delete, serializedDelete);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is wrong with this:

diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
index 20c428960f..04f44e962a 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
@@ -893,7 +893,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
 
         /**
          * Reads the type and the operation from the given stream. The operation must be written with
-         * {@link Operation#writeType(Operation, StreamOutput)}
+         * {@link Operation#writeTo(StreamOutput)}
          */
         static Operation readType(StreamInput input) throws IOException {
             Translog.Operation.Type type = Translog.Operation.Type.fromId(input.readByte());
@@ -911,15 +911,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
                     throw new IOException("No type for [" + type + "]");
             }
         }
-
-        /**
-         * Writes the type and translog operation to the given stream
-         */
-        static void writeType(Translog.Operation operation, StreamOutput output) throws IOException {
-            output.writeByte(operation.opType().id());
-            operation.writeTo(output);
-        }
-
     }
 
     public static class Source {
@@ -954,7 +945,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
         private final String routing;
         private final String parent;
 
-        public Index(StreamInput in) throws IOException {
+        private Index(StreamInput in) throws IOException {
             final int format = in.readVInt(); // SERIALIZATION_FORMAT
             assert format >= FORMAT_2_X : "format was: " + format;
             id = in.readString();
@@ -1069,6 +1060,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
 
         @Override
         public void writeTo(StreamOutput out) throws IOException {
+            out.writeByte(opType().id());
             out.writeVInt(SERIALIZATION_FORMAT);
             out.writeString(id);
             out.writeString(type);
@@ -1156,7 +1148,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
         private final long version;
         private final VersionType versionType;
 
-        public Delete(StreamInput in) throws IOException {
+        private Delete(StreamInput in) throws IOException {
             final int format = in.readVInt();// SERIALIZATION_FORMAT
             assert format >= FORMAT_5_0 : "format was: " + format;
             if (format >= FORMAT_SINGLE_TYPE) {
@@ -1253,6 +1245,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
 
         @Override
         public void writeTo(StreamOutput out) throws IOException {
+            out.writeByte(opType().id());
             out.writeVInt(SERIALIZATION_FORMAT);
             out.writeString(type);
             out.writeString(id);
@@ -1322,7 +1315,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
             return reason;
         }
 
-        NoOp(final StreamInput in) throws IOException {
+        private NoOp(final StreamInput in) throws IOException {
             seqNo = in.readLong();
             primaryTerm = in.readLong();
             reason = in.readString();
@@ -1339,6 +1332,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
 
         @Override
         public void writeTo(StreamOutput out) throws IOException {
+            out.writeByte(opType().id());
             out.writeLong(seqNo);
             out.writeLong(primaryTerm);
             out.writeString(reason);
@@ -1483,7 +1477,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
         // because closing it closes the underlying stream, which we don't
         // want to do here.
         out.resetDigest();
-        Translog.Operation.writeType(op, out);
+        op.writeTo(out);
         long checksum = out.getChecksum();
         out.writeInt((int) checksum);
     }
diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
index 78ed6697b2..10ada1195d 100644
--- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
+++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
@@ -2345,7 +2345,7 @@ public class TranslogTests extends ESTestCase {
         BytesStreamOutput out = new BytesStreamOutput();
         index.writeTo(out);
         StreamInput in = out.bytes().streamInput();
-        Translog.Index serializedIndex = new Translog.Index(in);
+        Translog.Index serializedIndex = (Translog.Index) Translog.Operation.readType(in);
         assertEquals(index, serializedIndex);
 
         Engine.Delete eDelete = new Engine.Delete(doc.type(), doc.id(), newUid(doc), randomSeqNum, randomPrimaryTerm,
@@ -2356,11 +2356,12 @@ public class TranslogTests extends ESTestCase {
         out = new BytesStreamOutput();
         delete.writeTo(out);
         in = out.bytes().streamInput();
-        Translog.Delete serializedDelete = new Translog.Delete(in);
+        Translog.Delete serializedDelete = (Translog.Delete) Translog.Operation.readType(in);
         assertEquals(delete, serializedDelete);
 
         // simulate legacy delete serialization
         out = new BytesStreamOutput();
+        out.writeByte(Translog.Operation.Type.DELETE.id());
         out.writeVInt(Translog.Delete.FORMAT_5_0);
         out.writeString(UidFieldMapper.NAME);
         out.writeString("my_type#my_id");
@@ -2369,7 +2370,7 @@ public class TranslogTests extends ESTestCase {
         out.writeLong(2); // seq no
         out.writeLong(0); // primary term
         in = out.bytes().streamInput();
-        serializedDelete = new Translog.Delete(in);
+        serializedDelete = (Translog.Delete) Translog.Operation.readType(in);
         assertEquals("my_type", serializedDelete.type());
         assertEquals("my_id", serializedDelete.id());
     }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writeable is the interface we should use for this. there is no reason to add another abstraction for it. we have a special case here so special case it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess somebody has to move... so lets go with this but please make ResyncReplicationRequest#operations and array then.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@s1monw There's no new abstraction being added here, only reusing existing ones? The only addition is a method that serializes an array with a Writer<T> (the functional interface is not new) but that only makes it symmetric with an existing method to de-serialize an array with a Reader<T>.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am talking about the writeArray method. Anyway go for it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @s1monw. I pushed, can you take another look? I am happy to keep iterating on this if you feel differently after seeing the change.

public interface Operation {
enum Type {
@Deprecated
CREATE((byte) 1),
Expand Down Expand Up @@ -876,7 +875,7 @@ public static Type fromId(byte id) {
case 4:
return NO_OP;
default:
throw new IllegalArgumentException("No type mapped for [" + id + "]");
throw new IllegalArgumentException("no type mapped for [" + id + "]");
}
}
}
Expand All @@ -893,31 +892,44 @@ public static Type fromId(byte id) {

/**
* Reads the type and the operation from the given stream. The operation must be written with
* {@link Operation#writeType(Operation, StreamOutput)}
* {@link Operation#writeOperation(StreamOutput, Operation)}
*/
static Operation readType(StreamInput input) throws IOException {
Translog.Operation.Type type = Translog.Operation.Type.fromId(input.readByte());
static Operation readOperation(final StreamInput input) throws IOException {
final Translog.Operation.Type type = Translog.Operation.Type.fromId(input.readByte());
switch (type) {
case CREATE:
// the deserialization logic in Index was identical to that of Create when create was deprecated
// the de-serialization logic in Index was identical to that of Create when create was deprecated
case INDEX:
return new Index(input);
case DELETE:
return new Delete(input);
case INDEX:
return new Index(input);
case NO_OP:
return new NoOp(input);
default:
throw new IOException("No type for [" + type + "]");
throw new AssertionError("no case for [" + type + "]");
}
}

/**
* Writes the type and translog operation to the given stream
*/
static void writeType(Translog.Operation operation, StreamOutput output) throws IOException {
static void writeOperation(final StreamOutput output, final Operation operation) throws IOException {
output.writeByte(operation.opType().id());
operation.writeTo(output);
switch(operation.opType()) {
case CREATE:
// the serialization logic in Index was identical to that of Create when create was deprecated
case INDEX:
((Index) operation).write(output);
break;
case DELETE:
((Delete) operation).write(output);
break;
case NO_OP:
((NoOp) operation).write(output);
break;
default:
throw new AssertionError("no case for [" + operation.opType() + "]");
}
}

}
Expand Down Expand Up @@ -954,7 +966,7 @@ public static class Index implements Operation {
private final String routing;
private final String parent;

public Index(StreamInput in) throws IOException {
private Index(final StreamInput in) throws IOException {
final int format = in.readVInt(); // SERIALIZATION_FORMAT
assert format >= FORMAT_2_X : "format was: " + format;
id = in.readString();
Expand Down Expand Up @@ -1067,8 +1079,7 @@ public Source getSource() {
return new Source(source, routing, parent);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
private void write(final StreamOutput out) throws IOException {
out.writeVInt(SERIALIZATION_FORMAT);
out.writeString(id);
out.writeString(type);
Expand Down Expand Up @@ -1156,7 +1167,7 @@ public static class Delete implements Operation {
private final long version;
private final VersionType versionType;

public Delete(StreamInput in) throws IOException {
private Delete(final StreamInput in) throws IOException {
final int format = in.readVInt();// SERIALIZATION_FORMAT
assert format >= FORMAT_5_0 : "format was: " + format;
if (format >= FORMAT_SINGLE_TYPE) {
Expand Down Expand Up @@ -1251,8 +1262,7 @@ public Source getSource() {
throw new IllegalStateException("trying to read doc source from delete operation");
}

@Override
public void writeTo(StreamOutput out) throws IOException {
private void write(final StreamOutput out) throws IOException {
out.writeVInt(SERIALIZATION_FORMAT);
out.writeString(type);
out.writeString(id);
Expand Down Expand Up @@ -1322,7 +1332,7 @@ public String reason() {
return reason;
}

NoOp(final StreamInput in) throws IOException {
private NoOp(final StreamInput in) throws IOException {
seqNo = in.readLong();
primaryTerm = in.readLong();
reason = in.readString();
Expand All @@ -1337,8 +1347,7 @@ public NoOp(final long seqNo, final long primaryTerm, final String reason) {
this.reason = reason;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
private void write(final StreamOutput out) throws IOException {
out.writeLong(seqNo);
out.writeLong(primaryTerm);
out.writeString(reason);
Expand Down Expand Up @@ -1440,7 +1449,7 @@ static Translog.Operation readOperation(BufferedChecksumStreamInput in) throws I
verifyChecksum(in);
in.reset();
}
operation = Translog.Operation.readType(in);
operation = Translog.Operation.readOperation(in);
verifyChecksum(in);
} catch (TranslogCorruptedException e) {
throw e;
Expand Down Expand Up @@ -1483,7 +1492,7 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl
// because closing it closes the underlying stream, which we don't
// want to do here.
out.resetDigest();
Translog.Operation.writeType(op, out);
Translog.Operation.writeOperation(out, op);
long checksum = out.getChecksum();
out.writeInt((int) checksum);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.resync;

import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
import java.nio.charset.Charset;

import static org.hamcrest.Matchers.equalTo;

public class ResyncReplicationRequestTests extends ESTestCase {

public void testSerialization() throws IOException {
final byte[] bytes = "{}".getBytes(Charset.forName("UTF-8"));
final Translog.Index index = new Translog.Index("type", "id", 0, Versions.MATCH_ANY, VersionType.INTERNAL, bytes, null, null, -1);
final ShardId shardId = new ShardId(new Index("index", "uuid"), 0);
final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, new Translog.Operation[]{index});

final BytesStreamOutput out = new BytesStreamOutput();
before.writeTo(out);

final StreamInput in = out.bytes().streamInput();
final ResyncReplicationRequest after = new ResyncReplicationRequest();
after.readFrom(in);

assertThat(after, equalTo(before));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception {
AtomicBoolean syncActionCalled = new AtomicBoolean();
PrimaryReplicaSyncer.SyncAction syncAction =
(request, parentTask, allocationId, primaryTerm, listener) -> {
logger.info("Sending off {} operations", request.getOperations().size());
logger.info("Sending off {} operations", request.getOperations().length);
syncActionCalled.set(true);
assertThat(parentTask, instanceOf(PrimaryReplicaSyncer.ResyncTask.class));
listener.onResponse(new ResyncReplicationResponse());
Expand Down Expand Up @@ -98,7 +98,7 @@ public void testSyncerOnClosingShard() throws Exception {
CountDownLatch syncCalledLatch = new CountDownLatch(1);
PrimaryReplicaSyncer.SyncAction syncAction =
(request, parentTask, allocationId, primaryTerm, listener) -> {
logger.info("Sending off {} operations", request.getOperations().size());
logger.info("Sending off {} operations", request.getOperations().length);
syncActionCalled.set(true);
syncCalledLatch.countDown();
threadPool.generic().execute(() -> listener.onResponse(new ResyncReplicationResponse()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2343,9 +2343,9 @@ public void testTranslogOpSerialization() throws Exception {
Translog.Index index = new Translog.Index(eIndex, eIndexResult);

BytesStreamOutput out = new BytesStreamOutput();
index.writeTo(out);
Translog.Operation.writeOperation(out, index);
StreamInput in = out.bytes().streamInput();
Translog.Index serializedIndex = new Translog.Index(in);
Translog.Index serializedIndex = (Translog.Index) Translog.Operation.readOperation(in);
assertEquals(index, serializedIndex);

Engine.Delete eDelete = new Engine.Delete(doc.type(), doc.id(), newUid(doc), randomSeqNum, randomPrimaryTerm,
Expand All @@ -2354,13 +2354,14 @@ public void testTranslogOpSerialization() throws Exception {
Translog.Delete delete = new Translog.Delete(eDelete, eDeleteResult);

out = new BytesStreamOutput();
delete.writeTo(out);
Translog.Operation.writeOperation(out, delete);
in = out.bytes().streamInput();
Translog.Delete serializedDelete = new Translog.Delete(in);
Translog.Delete serializedDelete = (Translog.Delete) Translog.Operation.readOperation(in);
assertEquals(delete, serializedDelete);

// simulate legacy delete serialization
out = new BytesStreamOutput();
out.writeByte(Translog.Operation.Type.DELETE.id());
out.writeVInt(Translog.Delete.FORMAT_5_0);
out.writeString(UidFieldMapper.NAME);
out.writeString("my_type#my_id");
Expand All @@ -2369,7 +2370,7 @@ public void testTranslogOpSerialization() throws Exception {
out.writeLong(2); // seq no
out.writeLong(0); // primary term
in = out.bytes().streamInput();
serializedDelete = new Translog.Delete(in);
serializedDelete = (Translog.Delete) Translog.Operation.readOperation(in);
assertEquals("my_type", serializedDelete.type());
assertEquals("my_id", serializedDelete.id());
}
Expand Down