|
24 | 24 | import org.elasticsearch.common.io.stream.StreamOutput;
|
25 | 25 | import org.elasticsearch.common.xcontent.XContentFactory;
|
26 | 26 | import org.elasticsearch.index.engine.Engine;
|
27 |
| -import org.elasticsearch.index.engine.IgnoreOnRecoveryEngineException; |
28 | 27 | import org.elasticsearch.index.mapper.DocumentMapperForType;
|
29 | 28 | import org.elasticsearch.index.mapper.MapperException;
|
30 | 29 | import org.elasticsearch.index.mapper.MapperService;
|
31 | 30 | import org.elasticsearch.index.mapper.Mapping;
|
32 |
| -import org.elasticsearch.index.mapper.Uid; |
33 | 31 | import org.elasticsearch.index.translog.Translog;
|
34 | 32 | import org.elasticsearch.rest.RestStatus;
|
35 | 33 |
|
@@ -149,59 +147,39 @@ private void maybeAddMappingUpdate(String type, Mapping update, String docId, bo
|
149 | 147 | * is encountered.
|
150 | 148 | */
|
151 | 149 | private void performRecoveryOperation(Engine engine, Translog.Operation operation, boolean allowMappingUpdates, Engine.Operation.Origin origin) throws IOException {
|
152 |
| - |
153 |
| - try { |
154 |
| - switch (operation.opType()) { |
155 |
| - case INDEX: |
156 |
| - Translog.Index index = (Translog.Index) operation; |
157 |
| - // we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all |
158 |
| - // autoGeneratedID docs that are coming from the primary are updated correctly. |
159 |
| - Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()), |
160 |
| - source(shardId.getIndexName(), index.type(), index.id(), index.source(), XContentFactory.xContentType(index.source())) |
161 |
| - .routing(index.routing()).parent(index.parent()), index.seqNo(), index.primaryTerm(), |
162 |
| - index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin, index.getAutoGeneratedIdTimestamp(), true); |
163 |
| - maybeAddMappingUpdate(engineIndex.type(), engineIndex.parsedDoc().dynamicMappingsUpdate(), engineIndex.id(), allowMappingUpdates); |
164 |
| - logger.trace("[translog] recover [index] op [({}, {})] of [{}][{}]", index.seqNo(), index.primaryTerm(), index.type(), index.id()); |
165 |
| - index(engine, engineIndex); |
166 |
| - break; |
167 |
| - case DELETE: |
168 |
| - Translog.Delete delete = (Translog.Delete) operation; |
169 |
| - logger.trace("[translog] recover [delete] op [({}, {})] of [{}][{}]", delete.seqNo(), delete.primaryTerm(), delete.type(), delete.id()); |
170 |
| - final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), |
171 |
| - delete.primaryTerm(), delete.version(), delete.versionType().versionTypeForReplicationAndRecovery(), |
172 |
| - origin, System.nanoTime()); |
173 |
| - delete(engine, engineDelete); |
174 |
| - break; |
175 |
| - case NO_OP: |
176 |
| - final Translog.NoOp noOp = (Translog.NoOp) operation; |
177 |
| - final long seqNo = noOp.seqNo(); |
178 |
| - final long primaryTerm = noOp.primaryTerm(); |
179 |
| - final String reason = noOp.reason(); |
180 |
| - logger.trace("[translog] recover [no_op] op [({}, {})] of [{}]", seqNo, primaryTerm, reason); |
181 |
| - final Engine.NoOp engineNoOp = |
182 |
| - new Engine.NoOp(seqNo, primaryTerm, origin, System.nanoTime(), reason); |
183 |
| - noOp(engine, engineNoOp); |
184 |
| - break; |
185 |
| - default: |
186 |
| - throw new IllegalStateException("No operation defined for [" + operation + "]"); |
187 |
| - } |
188 |
| - } catch (ElasticsearchException e) { |
189 |
| - boolean hasIgnoreOnRecoveryException = false; |
190 |
| - ElasticsearchException current = e; |
191 |
| - while (true) { |
192 |
| - if (current instanceof IgnoreOnRecoveryEngineException) { |
193 |
| - hasIgnoreOnRecoveryException = true; |
194 |
| - break; |
195 |
| - } |
196 |
| - if (current.getCause() instanceof ElasticsearchException) { |
197 |
| - current = (ElasticsearchException) current.getCause(); |
198 |
| - } else { |
199 |
| - break; |
200 |
| - } |
201 |
| - } |
202 |
| - if (!hasIgnoreOnRecoveryException) { |
203 |
| - throw e; |
204 |
| - } |
| 150 | + switch (operation.opType()) { |
| 151 | + case INDEX: |
| 152 | + Translog.Index index = (Translog.Index) operation; |
| 153 | + // we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all |
| 154 | + // autoGeneratedID docs that are coming from the primary are updated correctly. |
| 155 | + Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()), |
| 156 | + source(shardId.getIndexName(), index.type(), index.id(), index.source(), XContentFactory.xContentType(index.source())) |
| 157 | + .routing(index.routing()).parent(index.parent()), index.seqNo(), index.primaryTerm(), |
| 158 | + index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin, index.getAutoGeneratedIdTimestamp(), true); |
| 159 | + maybeAddMappingUpdate(engineIndex.type(), engineIndex.parsedDoc().dynamicMappingsUpdate(), engineIndex.id(), allowMappingUpdates); |
| 160 | + logger.trace("[translog] recover [index] op [({}, {})] of [{}][{}]", index.seqNo(), index.primaryTerm(), index.type(), index.id()); |
| 161 | + index(engine, engineIndex); |
| 162 | + break; |
| 163 | + case DELETE: |
| 164 | + Translog.Delete delete = (Translog.Delete) operation; |
| 165 | + logger.trace("[translog] recover [delete] op [({}, {})] of [{}][{}]", delete.seqNo(), delete.primaryTerm(), delete.type(), delete.id()); |
| 166 | + final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), |
| 167 | + delete.primaryTerm(), delete.version(), delete.versionType().versionTypeForReplicationAndRecovery(), |
| 168 | + origin, System.nanoTime()); |
| 169 | + delete(engine, engineDelete); |
| 170 | + break; |
| 171 | + case NO_OP: |
| 172 | + final Translog.NoOp noOp = (Translog.NoOp) operation; |
| 173 | + final long seqNo = noOp.seqNo(); |
| 174 | + final long primaryTerm = noOp.primaryTerm(); |
| 175 | + final String reason = noOp.reason(); |
| 176 | + logger.trace("[translog] recover [no_op] op [({}, {})] of [{}]", seqNo, primaryTerm, reason); |
| 177 | + final Engine.NoOp engineNoOp = |
| 178 | + new Engine.NoOp(seqNo, primaryTerm, origin, System.nanoTime(), reason); |
| 179 | + noOp(engine, engineNoOp); |
| 180 | + break; |
| 181 | + default: |
| 182 | + throw new IllegalStateException("No operation defined for [" + operation + "]"); |
205 | 183 | }
|
206 | 184 | operationProcessed();
|
207 | 185 | }
|
|
0 commit comments