Skip to content

Conversation

@jasontedor
Copy link
Member

@jasontedor jasontedor commented Nov 17, 2017

This commit addresses a subtle bug in the serialization routine for resync requests. The problem here is that Translog.Operation#readType is not compatible with the implementations of Translog.Operation#writeTo. Unfortunately, this issue prevents primary-replica from succeeding, issues which we will address in follow-ups.

Relates #24841

This commit addresses a subtle bug in the serialization routine for
resync requests. The problem here is that Translog.Operation#readType is
not compatible with the implementations of
Translog.Operation#writeTo. Unfortunately, this issue prevents
primary-replica from succeeding, issues which we will address in
follow-ups.
Copy link
Member

@dakrone dakrone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM but Yannick should probably look also :)

@jasontedor
Copy link
Member Author

@dakrone Thanks for the review; I pushed another commit pursuing a good idea you had to add assertions taking it further and completely removing the Writable interface from Translog.Operation and making private the dangerous constructor and write methods. Would you mind taking another look?

Copy link
Member

@dakrone dakrone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM again, thanks for making it harder to accidentally use the wrong write

Copy link
Contributor

@s1monw s1monw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left one comment. LGTM otherwise

* A generic interface representing an operation performed on the transaction log.
* Each is associated with a type.
*/
public interface Operation extends Writeable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we keep the Writeable and instead force each of the impls to serialize the type? I also wonder if we should rather make it an abstract class and impl. the default writing part in Operation. That way we can keep using write/readList which I'd prefer. The reading part can still be a privat ctor, +1 to that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@s1monw We can do that, but then there's still the possibility of mistakenly calling the inner write implementations from within Translog.java (because now there will be a visible abstract write method on Operation whereas with what we have here there is no such method). I wonder if this is enough to address keeping serialization simple (i.e., not a manual iteration):

diff --git a/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java
index 9883f58fd9..9baf002ef9 100644
--- a/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java
+++ b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java
@@ -26,6 +26,7 @@ import org.elasticsearch.index.translog.Translog;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 
@@ -49,20 +50,15 @@ public final class ResyncReplicationRequest extends ReplicatedWriteRequest<Resyn
     @Override
     public void readFrom(StreamInput in) throws IOException {
         super.readFrom(in);
-        final int size = in.readVInt();
-        operations = new ArrayList<>(size);
-        for (int i = 0; i < size; i++) {
-            operations.add(Translog.Operation.readOperation(in));
-        }
+        operations = Arrays.asList(in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new));
     }
 
+    private static Translog.Operation[] EMPTY_ARRAY = new Translog.Operation[0];
+
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         super.writeTo(out);
-        out.writeVInt(operations.size());
-        for (int i = 0; i < operations.size(); i++) {
-            Translog.Operation.writeOperation(operations.get(i), out);
-        }
+        out.writeArray((o, op) -> Translog.Operation.writeOperation(op, o), operations.toArray(EMPTY_ARRAY));
     }
 
     @Override
diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
index eb2bbdc357..8cb0ee3a9b 100644
--- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
+++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
@@ -706,6 +706,13 @@ public abstract class StreamOutput extends OutputStream {
         }
     }
 
+    public <T> void writeArray(Writer<T> writer, T[] array) throws IOException {
+        writeVInt(array.length);
+        for (T value : array) {
+            writer.write(this, value);
+        }
+    }
+
     public <T extends Writeable> void writeArray(T[] array) throws IOException {
         writeVInt(array.length);
         for (T value: array) {

Then we can keep the inner write method that should never be invoked except from writeOperation off of the abstract Operation.

What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we can make it slightly cleaner with:

diff --git a/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java
index 9883f58fd9..29f4567a6e 100644
--- a/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java
+++ b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java
@@ -25,7 +25,7 @@ import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.translog.Translog;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 
@@ -49,20 +49,15 @@ public final class ResyncReplicationRequest extends ReplicatedWriteRequest<Resyn
     @Override
     public void readFrom(StreamInput in) throws IOException {
         super.readFrom(in);
-        final int size = in.readVInt();
-        operations = new ArrayList<>(size);
-        for (int i = 0; i < size; i++) {
-            operations.add(Translog.Operation.readOperation(in));
-        }
+        operations = Arrays.asList(in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new));
     }
 
+    private static Translog.Operation[] EMPTY_ARRAY = new Translog.Operation[0];
+
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         super.writeTo(out);
-        out.writeVInt(operations.size());
-        for (int i = 0; i < operations.size(); i++) {
-            Translog.Operation.writeOperation(operations.get(i), out);
-        }
+        out.writeArray(Translog.Operation::writeOperation, operations.toArray(EMPTY_ARRAY));
     }
 
     @Override
diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
index eb2bbdc357..8cb0ee3a9b 100644
--- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
+++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
@@ -706,6 +706,13 @@ public abstract class StreamOutput extends OutputStream {
         }
     }
 
+    public <T> void writeArray(Writer<T> writer, T[] array) throws IOException {
+        writeVInt(array.length);
+        for (T value : array) {
+            writer.write(this, value);
+        }
+    }
+
     public <T extends Writeable> void writeArray(T[] array) throws IOException {
         writeVInt(array.length);
         for (T value: array) {
diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
index 4942d8135c..4373c8d053 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
@@ -19,7 +19,6 @@
 
 package org.elasticsearch.index.translog;
 
-import com.vividsolutions.jts.util.Assert;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.logging.log4j.util.Supplier;
 import org.apache.lucene.index.Term;
@@ -893,7 +892,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
 
         /**
          * Reads the type and the operation from the given stream. The operation must be written with
-         * {@link Operation#writeOperation(Operation, StreamOutput)}
+         * {@link Operation#writeOperation(StreamOutput, Operation)}
          */
         static Operation readOperation(final StreamInput input) throws IOException {
             final Translog.Operation.Type type = Translog.Operation.Type.fromId(input.readByte());
@@ -914,7 +913,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
         /**
          * Writes the type and translog operation to the given stream
          */
-        static void writeOperation(final Translog.Operation operation, final StreamOutput output) throws IOException {
+        static void writeOperation(final StreamOutput output, final Operation operation) throws IOException {
             output.writeByte(operation.opType().id());
             switch(operation.opType()) {
                 case CREATE:
@@ -1493,7 +1492,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
         // because closing it closes the underlying stream, which we don't
         // want to do here.
         out.resetDigest();
-        Translog.Operation.writeOperation(op, out);
+        Translog.Operation.writeOperation(out, op);
         long checksum = out.getChecksum();
         out.writeInt((int) checksum);
     }
diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
index 477a7ae849..1a17e0dc6a 100644
--- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
+++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
@@ -2343,7 +2343,7 @@ public class TranslogTests extends ESTestCase {
         Translog.Index index = new Translog.Index(eIndex, eIndexResult);
 
         BytesStreamOutput out = new BytesStreamOutput();
-        Translog.Operation.writeOperation(index, out);
+        Translog.Operation.writeOperation(out, index);
         StreamInput in = out.bytes().streamInput();
         Translog.Index serializedIndex = (Translog.Index) Translog.Operation.readOperation(in);
         assertEquals(index, serializedIndex);
@@ -2354,7 +2354,7 @@ public class TranslogTests extends ESTestCase {
         Translog.Delete delete = new Translog.Delete(eDelete, eDeleteResult);
 
         out = new BytesStreamOutput();
-        Translog.Operation.writeOperation(delete, out);
+        Translog.Operation.writeOperation(out, delete);
         in = out.bytes().streamInput();
         Translog.Delete serializedDelete = (Translog.Delete) Translog.Operation.readOperation(in);
         assertEquals(delete, serializedDelete);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is wrong with this:

diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
index 20c428960f..04f44e962a 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
@@ -893,7 +893,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
 
         /**
          * Reads the type and the operation from the given stream. The operation must be written with
-         * {@link Operation#writeType(Operation, StreamOutput)}
+         * {@link Operation#writeTo(StreamOutput)}
          */
         static Operation readType(StreamInput input) throws IOException {
             Translog.Operation.Type type = Translog.Operation.Type.fromId(input.readByte());
@@ -911,15 +911,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
                     throw new IOException("No type for [" + type + "]");
             }
         }
-
-        /**
-         * Writes the type and translog operation to the given stream
-         */
-        static void writeType(Translog.Operation operation, StreamOutput output) throws IOException {
-            output.writeByte(operation.opType().id());
-            operation.writeTo(output);
-        }
-
     }
 
     public static class Source {
@@ -954,7 +945,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
         private final String routing;
         private final String parent;
 
-        public Index(StreamInput in) throws IOException {
+        private Index(StreamInput in) throws IOException {
             final int format = in.readVInt(); // SERIALIZATION_FORMAT
             assert format >= FORMAT_2_X : "format was: " + format;
             id = in.readString();
@@ -1069,6 +1060,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
 
         @Override
         public void writeTo(StreamOutput out) throws IOException {
+            out.writeByte(opType().id());
             out.writeVInt(SERIALIZATION_FORMAT);
             out.writeString(id);
             out.writeString(type);
@@ -1156,7 +1148,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
         private final long version;
         private final VersionType versionType;
 
-        public Delete(StreamInput in) throws IOException {
+        private Delete(StreamInput in) throws IOException {
             final int format = in.readVInt();// SERIALIZATION_FORMAT
             assert format >= FORMAT_5_0 : "format was: " + format;
             if (format >= FORMAT_SINGLE_TYPE) {
@@ -1253,6 +1245,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
 
         @Override
         public void writeTo(StreamOutput out) throws IOException {
+            out.writeByte(opType().id());
             out.writeVInt(SERIALIZATION_FORMAT);
             out.writeString(type);
             out.writeString(id);
@@ -1322,7 +1315,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
             return reason;
         }
 
-        NoOp(final StreamInput in) throws IOException {
+        private NoOp(final StreamInput in) throws IOException {
             seqNo = in.readLong();
             primaryTerm = in.readLong();
             reason = in.readString();
@@ -1339,6 +1332,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
 
         @Override
         public void writeTo(StreamOutput out) throws IOException {
+            out.writeByte(opType().id());
             out.writeLong(seqNo);
             out.writeLong(primaryTerm);
             out.writeString(reason);
@@ -1483,7 +1477,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
         // because closing it closes the underlying stream, which we don't
         // want to do here.
         out.resetDigest();
-        Translog.Operation.writeType(op, out);
+        op.writeTo(out);
         long checksum = out.getChecksum();
         out.writeInt((int) checksum);
     }
diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
index 78ed6697b2..10ada1195d 100644
--- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
+++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
@@ -2345,7 +2345,7 @@ public class TranslogTests extends ESTestCase {
         BytesStreamOutput out = new BytesStreamOutput();
         index.writeTo(out);
         StreamInput in = out.bytes().streamInput();
-        Translog.Index serializedIndex = new Translog.Index(in);
+        Translog.Index serializedIndex = (Translog.Index) Translog.Operation.readType(in);
         assertEquals(index, serializedIndex);
 
         Engine.Delete eDelete = new Engine.Delete(doc.type(), doc.id(), newUid(doc), randomSeqNum, randomPrimaryTerm,
@@ -2356,11 +2356,12 @@ public class TranslogTests extends ESTestCase {
         out = new BytesStreamOutput();
         delete.writeTo(out);
         in = out.bytes().streamInput();
-        Translog.Delete serializedDelete = new Translog.Delete(in);
+        Translog.Delete serializedDelete = (Translog.Delete) Translog.Operation.readType(in);
         assertEquals(delete, serializedDelete);
 
         // simulate legacy delete serialization
         out = new BytesStreamOutput();
+        out.writeByte(Translog.Operation.Type.DELETE.id());
         out.writeVInt(Translog.Delete.FORMAT_5_0);
         out.writeString(UidFieldMapper.NAME);
         out.writeString("my_type#my_id");
@@ -2369,7 +2370,7 @@ public class TranslogTests extends ESTestCase {
         out.writeLong(2); // seq no
         out.writeLong(0); // primary term
         in = out.bytes().streamInput();
-        serializedDelete = new Translog.Delete(in);
+        serializedDelete = (Translog.Delete) Translog.Operation.readType(in);
         assertEquals("my_type", serializedDelete.type());
         assertEquals("my_id", serializedDelete.id());
     }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writeable is the interface we should use for this. there is no reason to add another abstraction for it. we have a special case here so special case it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess somebody has to move... so lets go with this but please make ResyncReplicationRequest#operations and array then.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@s1monw There's no new abstraction being added here, only reusing existing ones? The only addition is a method that serializes an array with a Writer<T> (the functional interface is not new) but that only makes it symmetric with an existing method to de-serialize an array with a Reader<T>.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am talking about the writeArray method. Anyway go for it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @s1monw. I pushed, can you take another look? I am happy to keep iterating on this if you feel differently after seeing the change.

Copy link
Contributor

@s1monw s1monw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

}
}

public <T> void writeArray(Writer<T> writer, T[] array) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadocs please

@jasontedor jasontedor merged commit 28660be into elastic:master Nov 21, 2017
jasontedor added a commit that referenced this pull request Nov 21, 2017
This commit addresses a subtle bug in the serialization routine for
resync requests. The problem here is that Translog.Operation#readType is
not compatible with the implementations of
Translog.Operation#writeTo. Unfortunately, this issue prevents
primary-replica from succeeding, issues which we will address in
follow-ups.

Relates #27418
jasontedor added a commit that referenced this pull request Nov 21, 2017
This commit addresses a subtle bug in the serialization routine for
resync requests. The problem here is that Translog.Operation#readType is
not compatible with the implementations of
Translog.Operation#writeTo. Unfortunately, this issue prevents
primary-replica from succeeding, issues which we will address in
follow-ups.

Relates #27418
@jasontedor jasontedor deleted the resync-serialization branch November 21, 2017 01:57
@clintongormley clintongormley added :Distributed Indexing/Engine Anything around managing Lucene and the Translog in an open shard. and removed :Sequence IDs labels Feb 14, 2018
@jimczi jimczi added v7.0.0-beta1 and removed v7.0.0 labels Feb 7, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

>bug :Distributed Indexing/Engine Anything around managing Lucene and the Translog in an open shard. v6.0.1 v6.1.0 v7.0.0-beta1

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants