Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public Iterables() {

public static <T> Iterable<T> concat(Iterable<T>... inputs) {
Objects.requireNonNull(inputs);
return new ConcatenatedIterable(inputs);
return new ConcatenatedIterable<>(inputs);
}

static class ConcatenatedIterable<T> implements Iterable<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,13 @@
import org.elasticsearch.xpack.ccr.action.TransportUnfollowAction;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.action.repositories.DeleteCcrRestoreSessionAction;
import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryAction;
import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction;
import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryAction;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
import org.elasticsearch.xpack.ccr.repository.CcrRepository;
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;
import org.elasticsearch.xpack.ccr.rest.RestCcrStatsAction;
import org.elasticsearch.xpack.ccr.rest.RestDeleteAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.rest.RestFollowStatsAction;
Expand Down Expand Up @@ -156,6 +159,7 @@ public Collection<Object> createComponents(

return Arrays.asList(
ccrLicenseChecker,
new CcrRestoreSourceService(settings),
new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker)
);
}
Expand All @@ -182,6 +186,10 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
PutInternalCcrRepositoryAction.TransportPutInternalRepositoryAction.class),
new ActionHandler<>(DeleteInternalCcrRepositoryAction.INSTANCE,
DeleteInternalCcrRepositoryAction.TransportDeleteInternalRepositoryAction.class),
new ActionHandler<>(PutCcrRestoreSessionAction.INSTANCE,
PutCcrRestoreSessionAction.TransportPutCcrRestoreSessionAction.class),
new ActionHandler<>(DeleteCcrRestoreSessionAction.INSTANCE,
DeleteCcrRestoreSessionAction.TransportDeleteCcrRestoreSessionAction.class),
// stats action
new ActionHandler<>(FollowStatsAction.INSTANCE, TransportFollowStatsAction.class),
new ActionHandler<>(CcrStatsAction.INSTANCE, TransportCcrStatsAction.class),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ccr.action.repositories;

import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;

public class DeleteCcrRestoreSessionAction extends Action<DeleteCcrRestoreSessionAction.DeleteCcrRestoreSessionResponse> {

public static final DeleteCcrRestoreSessionAction INSTANCE = new DeleteCcrRestoreSessionAction();
private static final String NAME = "internal:admin/ccr/restore/session/delete";

private DeleteCcrRestoreSessionAction() {
super(NAME);
}

@Override
public DeleteCcrRestoreSessionResponse newResponse() {
return new DeleteCcrRestoreSessionResponse();
}

public static class TransportDeleteCcrRestoreSessionAction
extends TransportSingleShardAction<DeleteCcrRestoreSessionRequest, DeleteCcrRestoreSessionResponse> {

private final IndicesService indicesService;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps it's nicer to have CcrRestoreSourceService have a reference to IndicesService instead of having it here in the TransportAction class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how to do this? CcrRestoreSourceService is created in createComponents. And we do not have IndicesService there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see two other options to the current one:

  1. Pass IndicesService to createComponents.
  2. Create CcrRestoreSourceService using Guice, by overriding Collection<Module> createGuiceModules().

Neither sounds really great so let's keep the current model for now.

private final CcrRestoreSourceService ccrRestoreService;

@Inject
public TransportDeleteCcrRestoreSessionAction(ThreadPool threadPool, ClusterService clusterService, ActionFilters actionFilters,
IndexNameExpressionResolver resolver, TransportService transportService,
IndicesService indicesService, CcrRestoreSourceService ccrRestoreService) {
super(NAME, threadPool, clusterService, transportService, actionFilters, resolver, DeleteCcrRestoreSessionRequest::new,
ThreadPool.Names.GENERIC);
this.indicesService = indicesService;
this.ccrRestoreService = ccrRestoreService;
}

@Override
protected DeleteCcrRestoreSessionResponse shardOperation(DeleteCcrRestoreSessionRequest request, ShardId shardId) {
IndexShard indexShard = indicesService.getShardOrNull(shardId);
if (indexShard == null) {
throw new ShardNotFoundException(shardId);
}
ccrRestoreService.closeSession(request.getSessionUUID(), indexShard);
return new DeleteCcrRestoreSessionResponse();
}

@Override
protected DeleteCcrRestoreSessionResponse newResponse() {
return new DeleteCcrRestoreSessionResponse();
}

@Override
protected boolean resolveIndex(DeleteCcrRestoreSessionRequest request) {
return false;
}

@Override
protected ShardsIterator shards(ClusterState state, InternalRequest request) {
final ShardId shardId = request.request().getShardId();
// The index uuid is not correct if we restore with a rename
IndexShardRoutingTable shardRoutingTable = state.routingTable().shardRoutingTable(shardId.getIndexName(), shardId.id());
return shardRoutingTable.primaryShardIt();
}
}

public static class DeleteCcrRestoreSessionResponse extends ActionResponse {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ccr.action.repositories;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.single.shard.SingleShardRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;

public class DeleteCcrRestoreSessionRequest extends SingleShardRequest<DeleteCcrRestoreSessionRequest> {

private String sessionUUID;
private ShardId shardId;

DeleteCcrRestoreSessionRequest() {
}

public DeleteCcrRestoreSessionRequest(String sessionUUID, ShardId shardId) {
super(shardId.getIndexName());
this.sessionUUID = sessionUUID;
this.shardId = shardId;
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public void readFrom(StreamInput streamInput) throws IOException {
super.readFrom(streamInput);
sessionUUID = streamInput.readString();
shardId = ShardId.readShardId(streamInput);
}

@Override
public void writeTo(StreamOutput streamOutput) throws IOException {
super.writeTo(streamOutput);
streamOutput.writeString(sessionUUID);
shardId.writeTo(streamOutput);
}

public String getSessionUUID() {
return sessionUUID;
}

public ShardId getShardId() {
return shardId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ccr.action.repositories;

import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class PutCcrRestoreSessionAction extends Action<PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse> {

public static final PutCcrRestoreSessionAction INSTANCE = new PutCcrRestoreSessionAction();
private static final String NAME = "internal:admin/ccr/restore/session/put";

private PutCcrRestoreSessionAction() {
super(NAME);
}

@Override
public PutCcrRestoreSessionResponse newResponse() {
throw new UnsupportedOperationException();
}

@Override
public Writeable.Reader<PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse> getResponseReader() {
return PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse::new;
}

public static class TransportPutCcrRestoreSessionAction
extends TransportSingleShardAction<PutCcrRestoreSessionRequest, PutCcrRestoreSessionResponse> {

private final IndicesService indicesService;
private final CcrRestoreSourceService ccrRestoreService;

@Inject
public TransportPutCcrRestoreSessionAction(ThreadPool threadPool, ClusterService clusterService, ActionFilters actionFilters,
IndexNameExpressionResolver resolver, TransportService transportService,
IndicesService indicesService, CcrRestoreSourceService ccrRestoreService) {
super(NAME, threadPool, clusterService, transportService, actionFilters, resolver, PutCcrRestoreSessionRequest::new,
ThreadPool.Names.GENERIC);
this.indicesService = indicesService;
this.ccrRestoreService = ccrRestoreService;
}

@Override
protected PutCcrRestoreSessionResponse shardOperation(PutCcrRestoreSessionRequest request, ShardId shardId) throws IOException {
IndexShard indexShard = indicesService.getShardOrNull(shardId);
if (indexShard == null) {
throw new ShardNotFoundException(shardId);
}
Store.MetadataSnapshot sourceMetaData = ccrRestoreService.openSession(request.getSessionUUID(), indexShard);
Store.RecoveryDiff recoveryDiff = sourceMetaData.recoveryDiff(request.getMetaData());

ArrayList<StoreFileMetaData> filesToRecover = new ArrayList<>(recoveryDiff.different);
filesToRecover.addAll(recoveryDiff.missing);
return new PutCcrRestoreSessionResponse(indexShard.routingEntry().currentNodeId(), recoveryDiff.identical, filesToRecover);
}

@Override
protected PutCcrRestoreSessionResponse newResponse() {
return new PutCcrRestoreSessionResponse();
}

@Override
protected boolean resolveIndex(PutCcrRestoreSessionRequest request) {
return false;
}

@Override
protected ShardsIterator shards(ClusterState state, InternalRequest request) {
final ShardId shardId = request.request().getShardId();
// The index uuid is not correct if we restore with a rename
IndexShardRoutingTable shardRoutingTable = state.routingTable().shardRoutingTable(shardId.getIndexName(), shardId.id());
return shardRoutingTable.primaryShardIt();
}
}


public static class PutCcrRestoreSessionResponse extends ActionResponse {

private String nodeId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be made final? I see that you both implemented a constructor with StreamInput and the readFrom method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. Unfortunately you must implement this:

        @Override
        protected PutCcrRestoreSessionResponse newResponse() {
            return new PutCcrRestoreSessionResponse();
        }

on TransportSingleShardAction.

private List<StoreFileMetaData> identicalFiles;
private List<StoreFileMetaData> filesToRecover;

PutCcrRestoreSessionResponse() {
}

PutCcrRestoreSessionResponse(String nodeId, List<StoreFileMetaData> identicalFiles, List<StoreFileMetaData> filesToRecover) {
this.nodeId = nodeId;
this.identicalFiles = identicalFiles;
this.filesToRecover = filesToRecover;
}

PutCcrRestoreSessionResponse(StreamInput streamInput) throws IOException {
super(streamInput);
nodeId = streamInput.readString();
identicalFiles = streamInput.readList(StoreFileMetaData::new);
filesToRecover = streamInput.readList(StoreFileMetaData::new);
}

@Override
public void readFrom(StreamInput streamInput) throws IOException {
super.readFrom(streamInput);
nodeId = streamInput.readString();
identicalFiles = streamInput.readList(StoreFileMetaData::new);
filesToRecover = streamInput.readList(StoreFileMetaData::new);
}

@Override
public void writeTo(StreamOutput streamOutput) throws IOException {
super.writeTo(streamOutput);
streamOutput.writeString(nodeId);
streamOutput.writeList(identicalFiles);
streamOutput.writeList(filesToRecover);
}

public String getNodeId() {
return nodeId;
}

public List<StoreFileMetaData> getIdenticalFiles() {
return identicalFiles;
}

public List<StoreFileMetaData> getFilesToRecover() {
return filesToRecover;
}
}
}
Loading