Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import jakarta.annotation.PreDestroy;

public class AirliftAnnouncer
implements Announcer
Expand All @@ -39,6 +40,7 @@ public ListenableFuture<?> forceAnnounce()
return airliftAnnouncer.forceAnnounce();
}

@PreDestroy
@Override
public void stop()
{
Expand Down
39 changes: 39 additions & 0 deletions core/trino-main/src/main/java/io/trino/node/AnnounceConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.node;

import com.google.common.collect.ImmutableList;
import io.airlift.configuration.Config;
import jakarta.validation.constraints.NotNull;

import java.net.URI;
import java.util.List;

public class AnnounceConfig
{
private List<URI> coordinatorUris = List.of();

@NotNull
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be @NotEmpty? Is there a purpose to configure discovery.type=announce with an empty list?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coordinator does not need an announcement unless there are multiple coordinators.

public List<@NotNull URI> getCoordinatorUris()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does using @NotNull here do anything? Can the config system create a list with null elements?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means you cannot have a null element in the list. It isn't required, but it is reasonable documentation

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand what it does, but seems confusing because it should be impossible. We don't do that anywhere else, but I'm fine if you want to leave it.

{
return coordinatorUris;
}

@Config("discovery.uri")
public AnnounceConfig setCoordinatorUris(List<URI> coordinatorUris)
{
this.coordinatorUris = ImmutableList.copyOf(coordinatorUris);
return this;
}
}
174 changes: 174 additions & 0 deletions core/trino-main/src/main/java/io/trino/node/AnnounceNodeAnnouncer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.node;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.net.HttpHeaders;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.FormatMethod;
import com.google.inject.Inject;
import io.airlift.http.client.HttpClient;
import io.airlift.http.client.Request;
import io.airlift.http.client.StaticBodyGenerator;
import io.airlift.http.client.StatusResponseHandler.StatusResponse;
import io.airlift.log.Logger;
import jakarta.annotation.PreDestroy;

import java.net.URI;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
import static io.airlift.http.client.Request.Builder.preparePost;
import static io.airlift.http.client.StaticBodyGenerator.createStaticBodyGenerator;
import static io.airlift.http.client.StatusResponseHandler.createStatusResponseHandler;
import static jakarta.ws.rs.core.MediaType.TEXT_PLAIN;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.function.Predicate.not;

public class AnnounceNodeAnnouncer
implements Announcer
{
private static final Logger log = Logger.get(AnnounceNodeAnnouncer.class);

private final HttpClient httpClient;
private final List<URI> announceUris;
private final StaticBodyGenerator currentHostAnnouncement;

private final ScheduledExecutorService announceExecutor = newSingleThreadScheduledExecutor(daemonThreadsNamed("node-state-poller"));
private final AtomicLong lastWarningLogged = new AtomicLong(0);

private final AtomicBoolean started = new AtomicBoolean();
private final boolean coordinator;

@Inject
public AnnounceNodeAnnouncer(InternalNode currentNode, AnnounceConfig config, @ForAnnouncer HttpClient httpClient)
{
this(currentNode.getInternalUri(), config.getCoordinatorUris(), currentNode.isCoordinator(), httpClient);
}

@VisibleForTesting
public AnnounceNodeAnnouncer(URI internalUri, Collection<URI> coordinatorUris, boolean coordinator, HttpClient httpClient)
{
URI currentUri = internalUri;
this.announceUris = coordinatorUris.stream()
.filter(not(currentUri::equals))
.map(uri -> uriBuilderFrom(uri).appendPath("/v1/announce").build())
.distinct()
.toList();
this.coordinator = coordinator;
this.httpClient = requireNonNull(httpClient, "httpClient is null");
this.currentHostAnnouncement = createStaticBodyGenerator(currentUri.toString(), UTF_8);
}

@Override
public void start()
{
if (announceUris.isEmpty()) {
if (!coordinator) {
log.warn("No coordinator URIs configured, skipping node state announcements");
}
return;
}

checkState(!announceExecutor.isShutdown(), "Announcer has been destroyed");
if (!started.compareAndSet(false, true)) {
return;
}

announceExecutor.scheduleWithFixedDelay(
() -> {
try {
forceAnnounce();
}
catch (RuntimeException e) {
// this should not happen, but if it does we do not want to stop announcing
log.error(e, "Error announcing node to coordinators");
}
}, 5, 5, SECONDS);
forceAnnounce();
}

@Override
public ListenableFuture<?> forceAnnounce()
{
return Futures.allAsList(announceUris.stream()
.map(this::announce)
.toList());
}

private ListenableFuture<?> announce(URI announceUri)
{
Request request = preparePost()
.setUri(announceUri)
.setBodyGenerator(currentHostAnnouncement)
.setHeader(HttpHeaders.CONTENT_TYPE, TEXT_PLAIN)
.build();
ListenableFuture<StatusResponse> responseFuture = httpClient.executeAsync(request, createStatusResponseHandler());
Futures.addCallback(
responseFuture, new FutureCallback<>()
{
@Override
public void onSuccess(StatusResponse response)
{
int statusCode = response.getStatusCode();
if (statusCode < 200 || statusCode >= 300) {
logWarning("Failed to announce node state to %s: %s", announceUri, statusCode);
}
}

@Override
public void onFailure(Throwable t)
{
logWarning("Error announcing node state to %s: %s", announceUri, t.getMessage());
}
}, directExecutor());
Comment on lines +130 to +147
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, this could be

Futures.addCallback(addSuccessCallback(responseFuture, () -> {
    ...
});
Futures.addCallback(addExceptionCallback(responseFuture, t ->
    logWarning("Error announcing node state to %s: %s", announceUri, t.getMessage()));

return Futures.nonCancellationPropagating(responseFuture);
}

@PreDestroy
@Override
public void stop()
{
announceExecutor.shutdownNow();
try {
announceExecutor.awaitTermination(30, SECONDS);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

@FormatMethod
private void logWarning(String format, Object... args)
{
// log at most once per second per node
long now = System.nanoTime();
if (now - lastWarningLogged.get() >= SECONDS.toNanos(1)) {
log.warn(format, args);
lastWarningLogged.set(now);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.node;

import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableSet;
import io.trino.cache.SafeCaches;

import java.net.URI;
import java.util.Set;

import static java.util.Collections.newSetFromMap;
import static java.util.concurrent.TimeUnit.SECONDS;

public class AnnounceNodeInventory
implements NodeInventory
{
private final Set<URI> nodes = newSetFromMap(SafeCaches.<URI, Boolean>buildNonEvictableCache(CacheBuilder.newBuilder()
.expireAfterWrite(30, SECONDS))
.asMap());

public void announce(URI node)
{
nodes.add(node);
}

@Override
public Set<URI> getNodes()
{
return ImmutableSet.copyOf(nodes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.node;

import com.google.inject.Binder;
import com.google.inject.Scopes;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.airlift.units.Duration;
import io.trino.server.ServerConfig;

import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.airlift.jaxrs.JaxrsBinder.jaxrsBinder;
import static io.trino.server.InternalCommunicationHttpClientModule.internalHttpClientModule;
import static java.util.concurrent.TimeUnit.SECONDS;

public class AnnounceNodeInventoryModule
extends AbstractConfigurationAwareModule
{
@Override
protected void setup(Binder binder)
{
boolean coordinator = buildConfigObject(ServerConfig.class).isCoordinator();
if (coordinator) {
jaxrsBinder(binder).bind(AnnounceNodeResource.class);

binder.bind(AnnounceNodeInventory.class).in(Scopes.SINGLETON);
binder.bind(NodeInventory.class).to(AnnounceNodeInventory.class).in(Scopes.SINGLETON);
}

// both coordinator and worker can announce, although coordinator normally does not
binder.bind(Announcer.class).to(AnnounceNodeAnnouncer.class).in(Scopes.SINGLETON);
configBinder(binder).bindConfig(AnnounceConfig.class);
install(internalHttpClientModule("announcer", ForAnnouncer.class)
.withConfigDefaults(config -> {
config.setIdleTimeout(new Duration(3, SECONDS));
config.setRequestTimeout(new Duration(3, SECONDS));
}).build());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.node;

import com.google.inject.Inject;
import io.trino.server.security.ResourceSecurity;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;

import java.net.URI;
import java.util.Set;

import static io.trino.server.security.ResourceSecurity.AccessType.INTERNAL_ONLY;
import static io.trino.server.security.ResourceSecurity.AccessType.MANAGEMENT_READ;
import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON;
import static jakarta.ws.rs.core.MediaType.TEXT_PLAIN;
import static java.util.Objects.requireNonNull;

/**
* Announces the existence of a node to the cluster.
*/
@Path("/v1/announce")
public class AnnounceNodeResource
{
private final AnnounceNodeInventory announceNodeInventory;

@Inject
public AnnounceNodeResource(AnnounceNodeInventory announceNodeInventory)
{
this.announceNodeInventory = requireNonNull(announceNodeInventory, "announceNodeInventory is null");
}

@ResourceSecurity(MANAGEMENT_READ)
@GET
@Produces(APPLICATION_JSON)
public Set<URI> getNodes()
{
return announceNodeInventory.getNodes();
}

@ResourceSecurity(INTERNAL_ONLY)
@POST
@Consumes(TEXT_PLAIN)
public void announce(String uri)
{
announceNodeInventory.announce(URI.create(uri));
}
}
Loading