Skip to content

Commit

Permalink
Register CcrRepository based on settings update (elastic#36086)
Browse files Browse the repository at this point in the history
This commit adds an empty CcrRepository snapshot/restore repository.
When a new cluster is registered in the remote cluster settings, a new
CcrRepository is registered for that cluster.

This is implemented using a new concept of "internal repositories".
RepositoryPlugin now allows implementations to return factories for
"internal repositories". The "internal repositories" are different from
normal repositories in that they cannot be registered through the
external repository api. Additionally, "internal repositories" are local
to a node and are not stored in the cluster state.

The repository will be unregistered if the remote cluster is removed.
  • Loading branch information
Tim-Brooks committed Dec 5, 2018
1 parent a5cc353 commit d784ccc
Show file tree
Hide file tree
Showing 19 changed files with 1,012 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,17 @@ public interface RepositoryPlugin {
default Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
return Collections.emptyMap();
}

/**
* Returns internal repository types added by this plugin. Internal repositories cannot be registered
* through the external API.
*
* @param env The environment for the local node, which may be used for the local settings and path.repo
*
* The key of the returned {@link Map} is the type name of the repository and
* the value is a factory to construct the {@link Repository} interface.
*/
default Map<String, Repository.Factory> getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
return Collections.emptyMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,24 @@ public RepositoriesModule(Environment env, List<RepositoryPlugin> repoPlugins, T
}
}

Map<String, Repository.Factory> internalFactories = new HashMap<>();
for (RepositoryPlugin repoPlugin : repoPlugins) {
Map<String, Repository.Factory> newRepoTypes = repoPlugin.getInternalRepositories(env, namedXContentRegistry);
for (Map.Entry<String, Repository.Factory> entry : newRepoTypes.entrySet()) {
if (internalFactories.put(entry.getKey(), entry.getValue()) != null) {
throw new IllegalArgumentException("Internal repository type [" + entry.getKey() + "] is already registered");
}
if (factories.put(entry.getKey(), entry.getValue()) != null) {
throw new IllegalArgumentException("Internal repository type [" + entry.getKey() + "] is already registered as a " +
"non-internal repository");
}
}
}

Map<String, Repository.Factory> repositoryTypes = Collections.unmodifiableMap(factories);
repositoriesService = new RepositoriesService(env.settings(), clusterService, transportService, repositoryTypes, threadPool);
Map<String, Repository.Factory> internalRepositoryTypes = Collections.unmodifiableMap(internalFactories);
repositoriesService = new RepositoriesService(env.settings(), clusterService, transportService, repositoryTypes,
internalRepositoryTypes, threadPool);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -57,19 +58,22 @@ public class RepositoriesService implements ClusterStateApplier {
private static final Logger logger = LogManager.getLogger(RepositoriesService.class);

private final Map<String, Repository.Factory> typesRegistry;
private final Map<String, Repository.Factory> internalTypesRegistry;

private final ClusterService clusterService;

private final ThreadPool threadPool;

private final VerifyNodeRepositoryAction verifyAction;

private final Map<String, Repository> internalRepositories = ConcurrentCollections.newConcurrentMap();
private volatile Map<String, Repository> repositories = Collections.emptyMap();

public RepositoriesService(Settings settings, ClusterService clusterService, TransportService transportService,
Map<String, Repository.Factory> typesRegistry,
Map<String, Repository.Factory> typesRegistry, Map<String, Repository.Factory> internalTypesRegistry,
ThreadPool threadPool) {
this.typesRegistry = typesRegistry;
this.internalTypesRegistry = internalTypesRegistry;
this.clusterService = clusterService;
this.threadPool = threadPool;
// Doesn't make sense to maintain repositories on non-master and non-data nodes
Expand Down Expand Up @@ -101,7 +105,7 @@ public void registerRepository(final RegisterRepositoryRequest request, final Ac

// Trying to create the new repository on master to make sure it works
try {
closeRepository(createRepository(newRepositoryMetaData));
closeRepository(createRepository(newRepositoryMetaData, typesRegistry));
} catch (Exception e) {
registrationListener.onFailure(e);
return;
Expand Down Expand Up @@ -316,7 +320,7 @@ public void applyClusterState(ClusterChangedEvent event) {
closeRepository(repository);
repository = null;
try {
repository = createRepository(repositoryMetaData);
repository = createRepository(repositoryMetaData, typesRegistry);
} catch (RepositoryException ex) {
// TODO: this catch is bogus, it means the old repo is already closed,
// but we have nothing to replace it
Expand All @@ -325,7 +329,7 @@ public void applyClusterState(ClusterChangedEvent event) {
}
} else {
try {
repository = createRepository(repositoryMetaData);
repository = createRepository(repositoryMetaData, typesRegistry);
} catch (RepositoryException ex) {
logger.warn(() -> new ParameterizedMessage("failed to create repository [{}]", repositoryMetaData.name()), ex);
}
Expand Down Expand Up @@ -356,31 +360,59 @@ public Repository repository(String repositoryName) {
if (repository != null) {
return repository;
}
repository = internalRepositories.get(repositoryName);
if (repository != null) {
return repository;
}
throw new RepositoryMissingException(repositoryName);
}

public void registerInternalRepository(String name, String type) {
RepositoryMetaData metaData = new RepositoryMetaData(name, type, Settings.EMPTY);
Repository repository = internalRepositories.computeIfAbsent(name, (n) -> {
logger.debug("put internal repository [{}][{}]", name, type);
return createRepository(metaData, internalTypesRegistry);
});
if (type.equals(repository.getMetadata().type()) == false) {
logger.warn(new ParameterizedMessage("internal repository [{}][{}] already registered. this prevented the registration of " +
"internal repository [{}][{}].", name, repository.getMetadata().type(), name, type));
} else if (repositories.containsKey(name)) {
logger.warn(new ParameterizedMessage("non-internal repository [{}] already registered. this repository will block the " +
"usage of internal repository [{}][{}].", name, metaData.type(), name));
}
}

public void unregisterInternalRepository(String name) {
Repository repository = internalRepositories.remove(name);
if (repository != null) {
RepositoryMetaData metadata = repository.getMetadata();
logger.debug(() -> new ParameterizedMessage("delete internal repository [{}][{}].", metadata.type(), name));
closeRepository(repository);
}
}

/** Closes the given repository. */
private void closeRepository(Repository repository) {
logger.debug("closing repository [{}][{}]", repository.getMetadata().type(), repository.getMetadata().name());
repository.close();
}

/**
* Creates repository holder
* Creates repository holder. This method starts the repository
*/
private Repository createRepository(RepositoryMetaData repositoryMetaData) {
private Repository createRepository(RepositoryMetaData repositoryMetaData, Map<String, Repository.Factory> factories) {
logger.debug("creating repository [{}][{}]", repositoryMetaData.type(), repositoryMetaData.name());
Repository.Factory factory = typesRegistry.get(repositoryMetaData.type());
Repository.Factory factory = factories.get(repositoryMetaData.type());
if (factory == null) {
throw new RepositoryException(repositoryMetaData.name(),
"repository type [" + repositoryMetaData.type() + "] does not exist");
}
try {
Repository repository = factory.create(repositoryMetaData, typesRegistry::get);
Repository repository = factory.create(repositoryMetaData, factories::get);
repository.start();
return repository;
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("failed to create repository [{}][{}]", repositoryMetaData.type(), repositoryMetaData.name()), e);
logger.warn(new ParameterizedMessage("failed to create repository [{}][{}]", repositoryMetaData.type(), repositoryMetaData.name()), e);
throw new RepositoryException(repositoryMetaData.name(), "failed to create repository", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public String getKey(final String key) {
REMOTE_CLUSTERS_SEEDS);

protected final Settings settings;
protected final ClusterNameExpressionResolver clusterNameResolver;
private final ClusterNameExpressionResolver clusterNameResolver;

/**
* Creates a new {@link RemoteClusterAware} instance
Expand Down Expand Up @@ -237,14 +237,15 @@ static DiscoveryNode buildSeedNode(String clusterName, String address, boolean p
* indices per cluster are collected as a list in the returned map keyed by the cluster alias. Local indices are grouped under
* {@link #LOCAL_CLUSTER_GROUP_KEY}. The returned map is mutable.
*
* @param remoteClusterNames the remote cluster names
* @param requestIndices the indices in the search request to filter
* @param indexExists a predicate that can test if a certain index or alias exists in the local cluster
*
* @return a map of grouped remote and local indices
*/
public Map<String, List<String>> groupClusterIndices(String[] requestIndices, Predicate<String> indexExists) {
protected Map<String, List<String>> groupClusterIndices(Set<String> remoteClusterNames, String[] requestIndices,
Predicate<String> indexExists) {
Map<String, List<String>> perClusterIndices = new HashMap<>();
Set<String> remoteClusterNames = getRemoteClusterNames();
for (String index : requestIndices) {
int i = index.indexOf(RemoteClusterService.REMOTE_CLUSTER_INDEX_SEPARATOR);
if (i >= 0) {
Expand Down Expand Up @@ -276,9 +277,6 @@ public Map<String, List<String>> groupClusterIndices(String[] requestIndices, Pr
return perClusterIndices;
}

protected abstract Set<String> getRemoteClusterNames();


/**
* Subclasses must implement this to receive information about updated cluster aliases. If the given address list is
* empty the cluster alias is unregistered and should be removed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ boolean isRemoteNodeConnected(final String remoteCluster, final DiscoveryNode no
public Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions, String[] indices, Predicate<String> indexExists) {
Map<String, OriginalIndices> originalIndicesMap = new HashMap<>();
if (isCrossClusterSearchEnabled()) {
final Map<String, List<String>> groupedIndices = groupClusterIndices(indices, indexExists);
final Map<String, List<String>> groupedIndices = groupClusterIndices(getRemoteClusterNames(), indices, indexExists);
if (groupedIndices.isEmpty()) {
//search on _all in the local cluster if neither local indices nor remote indices were specified
originalIndicesMap.put(LOCAL_CLUSTER_GROUP_KEY, new OriginalIndices(Strings.EMPTY_ARRAY, indicesOptions));
Expand Down Expand Up @@ -374,8 +374,7 @@ RemoteClusterConnection getRemoteClusterConnection(String cluster) {
return connection;
}

@Override
protected Set<String> getRemoteClusterNames() {
Set<String> getRemoteClusterNames() {
return this.remoteClusters.keySet();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ private IndicesClusterStateService createIndicesClusterStateService(DiscoveryNod
Collections.emptySet());
final ClusterService clusterService = mock(ClusterService.class);
final RepositoriesService repositoriesService = new RepositoriesService(settings, clusterService,
transportService, null, threadPool);
transportService, Collections.emptyMap(), Collections.emptyMap(), threadPool);
final PeerRecoveryTargetService recoveryTargetService = new PeerRecoveryTargetService(threadPool,
transportService, null, clusterService);
final ShardStateAction shardStateAction = mock(ShardStateAction.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.repositories;

import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class RepositoriesModuleTests extends ESTestCase {

private Environment environment;
private NamedXContentRegistry contentRegistry;
private List<RepositoryPlugin> repoPlugins = new ArrayList<>();
private RepositoryPlugin plugin1;
private RepositoryPlugin plugin2;
private Repository.Factory factory;

@Override
public void setUp() throws Exception {
super.setUp();
environment = mock(Environment.class);
contentRegistry = mock(NamedXContentRegistry.class);
plugin1 = mock(RepositoryPlugin.class);
plugin2 = mock(RepositoryPlugin.class);
factory = mock(Repository.Factory.class);
repoPlugins.add(plugin1);
repoPlugins.add(plugin2);
when(environment.settings()).thenReturn(Settings.EMPTY);
}

public void testCanRegisterTwoRepositoriesWithDifferentTypes() {
when(plugin1.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));
when(plugin2.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type2", factory));

// Would throw
new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class),
mock(ThreadPool.class), contentRegistry);
}

public void testCannotRegisterTwoRepositoriesWithSameTypes() {
when(plugin1.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));
when(plugin2.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));

IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class),
mock(ThreadPool.class), contentRegistry));

assertEquals("Repository type [type1] is already registered", ex.getMessage());
}

public void testCannotRegisterTwoInternalRepositoriesWithSameTypes() {
when(plugin1.getInternalRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));
when(plugin2.getInternalRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));

IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class),
mock(ThreadPool.class), contentRegistry));

assertEquals("Internal repository type [type1] is already registered", ex.getMessage());
}

public void testCannotRegisterNormalAndInternalRepositoriesWithSameTypes() {
when(plugin1.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));
when(plugin2.getInternalRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));

IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class),
mock(ThreadPool.class), contentRegistry));

assertEquals("Internal repository type [type1] is already registered as a non-internal repository", ex.getMessage());
}
}
Loading

0 comments on commit d784ccc

Please sign in to comment.