Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** A cache for {@link AuthSession} instances. */
/**
* A cache for {@link AuthSession} instances.
*
* @deprecated since 1.10.0, will be removed in 1.11.0; use {@link SessionCache}.
*/
@Deprecated
public class AuthSessionCache implements AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(AuthSessionCache.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.rest.auth;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Ticker;
import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultSessionCache<K, V extends AuthSession> implements SessionCache<K, V> {

private static final Logger LOG = LoggerFactory.getLogger(DefaultSessionCache.class);

private final Duration sessionTimeout;
private final Executor executor;
private final Ticker ticker;

private volatile Cache<K, V> sessionCache;

/**
* Creates a new cache with the given session timeout, and with default executor and default
* ticker for eviction tasks.
*
* @param name a distinctive name for the cache.
* @param sessionTimeout the session timeout. Sessions will become eligible for eviction after
* this duration of inactivity.
*/
public DefaultSessionCache(String name, Duration sessionTimeout) {
this(
sessionTimeout,
ThreadPools.newExitingWorkerPool(name + "-auth-session-evict", 1),
Ticker.systemTicker());
}

/**
* Creates a new cache with the given session timeout, executor, and ticker. This method is useful
* for testing mostly.
*
* @param sessionTimeout the session timeout. Sessions will become eligible for eviction after
* this duration of inactivity.
* @param executor the executor to use for eviction tasks; if null, the cache will create a
* default executor. The executor will be closed when this cache is closed.
* @param ticker the ticker to use for the cache.
*/
DefaultSessionCache(Duration sessionTimeout, Executor executor, Ticker ticker) {
this.sessionTimeout = sessionTimeout;
this.executor = executor;
this.ticker = ticker;
}

@Override
public V cachedSession(K key, Function<K, V> loader) {
return sessionCache().get(key, loader);
}

@Override
public void close() {
try {
Cache<K, V> cache = sessionCache;
this.sessionCache = null;
if (cache != null) {
cache.invalidateAll();
cache.cleanUp();
}
} finally {
if (executor instanceof ExecutorService) {
ExecutorService service = (ExecutorService) executor;
service.shutdown();
if (!Uninterruptibles.awaitTerminationUninterruptibly(service, 10, TimeUnit.SECONDS)) {
LOG.warn("Timed out waiting for eviction executor to terminate");
}
service.shutdownNow();
}
}
}

@VisibleForTesting
Cache<K, V> sessionCache() {
if (sessionCache == null) {
synchronized (this) {
if (sessionCache == null) {
this.sessionCache = newSessionCache();
}
}
}

return sessionCache;
}

private Cache<K, V> newSessionCache() {
Caffeine<K, V> builder =
Caffeine.newBuilder()
.executor(executor)
.expireAfterAccess(sessionTimeout)
.ticker(ticker)
.removalListener(
(id, auth, cause) -> {
if (auth != null) {
auth.close();
}
});

return builder.build();
}
}
24 changes: 15 additions & 9 deletions core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Manager.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class OAuth2Manager extends RefreshingAuthManager {
private RESTClient refreshClient;
private long startTimeMillis;
private OAuthTokenResponse authResponse;
private AuthSessionCache sessionCache;
private SessionCache<String, OAuth2Util.AuthSession> sessionCache;

public OAuth2Manager(String managerName) {
super(managerName + "-token-refresh");
Expand Down Expand Up @@ -105,7 +105,7 @@ public OAuth2Util.AuthSession catalogSession(
RESTClient sharedClient, Map<String, String> properties) {
// This client will be used for token refreshes; it should not have an auth session.
this.refreshClient = sharedClient.withAuthSession(AuthSession.EMPTY);
this.sessionCache = newSessionCache(name, properties);
this.sessionCache = newAuthSessionCache(name, properties);
AuthConfig config = AuthConfig.fromProperties(properties);
Map<String, String> headers = OAuth2Util.authHeaders(config.token());
OAuth2Util.AuthSession session = new OAuth2Util.AuthSession(headers, config);
Expand Down Expand Up @@ -156,21 +156,27 @@ public OAuth2Util.AuthSession tableSession(

@Override
public void close() {
try {
SessionCache<String, OAuth2Util.AuthSession> cache = sessionCache;
this.sessionCache = null;
try (cache) {
super.close();
} finally {
AuthSessionCache cache = sessionCache;
this.sessionCache = null;
if (cache != null) {
cache.close();
}
}
}

/**
* @deprecated since 1.10.0, will be removed in 1.11.0; use {@link #newAuthSessionCache(String,
* Map)}
*/
@Deprecated
protected AuthSessionCache newSessionCache(String managerName, Map<String, String> properties) {
return new AuthSessionCache(managerName, sessionTimeout(properties));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't return null here as that would be a behavioral change. We can only deprecate and leave the impl as-is. Also I don't think we need the AuthManagerSessionCacheAdapter. We would basically keep newSessionCache as-is and then have newAuthSessionCache that returns the parameterized version of the cache

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I agree. The point here is: what to do if someone extended this class and overrode the newSessionCache method?

If they did that, it's because they have a custom cache, and we need to honor their intent and thus use the custom cache + adapter.

However, if the method is not overridden, then they were using the default cache, and in this case we want to use the new, generic cache instead.

Returning null here allows to distinguish both situations. It's thus not a behavioral change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if people didn't override newSessionCache and just upgrade Iceberg and realize that this method now returns null? Not everyone will be using newAuthSessionCache immediately

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adutra if you revert the last commit and apply the below diff, wouldn't that work and avoid all of the extra complexity?

--- a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Manager.java
+++ b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Manager.java
@@ -61,7 +61,7 @@ public class OAuth2Manager extends RefreshingAuthManager {
   private RESTClient refreshClient;
   private long startTimeMillis;
   private OAuthTokenResponse authResponse;
-  private AuthSessionCache sessionCache;
+  private SessionCache<String, OAuth2Util.AuthSession> sessionCache;

   public OAuth2Manager(String managerName) {
     super(managerName + "-token-refresh");
@@ -105,7 +105,7 @@ public class OAuth2Manager extends RefreshingAuthManager {
       RESTClient sharedClient, Map<String, String> properties) {
     // This client will be used for token refreshes; it should not have an auth session.
     this.refreshClient = sharedClient.withAuthSession(AuthSession.EMPTY);
-    this.sessionCache = newSessionCache(name, properties);
+    this.sessionCache = newAuthSessionCache(name, properties);
     AuthConfig config = AuthConfig.fromProperties(properties);
     Map<String, String> headers = OAuth2Util.authHeaders(config.token());
     OAuth2Util.AuthSession session = new OAuth2Util.AuthSession(headers, config);
@@ -159,7 +159,7 @@ public class OAuth2Manager extends RefreshingAuthManager {
     try {
       super.close();
     } finally {
-      AuthSessionCache cache = sessionCache;
+      SessionCache<String, OAuth2Util.AuthSession> cache = sessionCache;
       this.sessionCache = null;
       if (cache != null) {
         cache.close();
@@ -175,6 +175,11 @@ public class OAuth2Manager extends RefreshingAuthManager {
     return new AuthSessionCache(managerName, sessionTimeout(properties));
   }

+  protected SessionCache<String, OAuth2Util.AuthSession> newAuthSessionCache(
+      String managerName, Map<String, String> properties) {
+    return new SessionCache<>(managerName, sessionTimeout(properties));
+  }
+

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would, but if someone is overriding the deprecated method, the overridden impl would be ignored. But I'm fine with that.

}

protected SessionCache<String, OAuth2Util.AuthSession> newAuthSessionCache(
String managerName, Map<String, String> properties) {
return new DefaultSessionCache<>(managerName, sessionTimeout(properties));
}

protected OAuth2Util.AuthSession maybeCreateChildSession(
Map<String, String> credentials,
Map<String, String> properties,
Expand Down
38 changes: 38 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/auth/SessionCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.rest.auth;

import java.util.function.Function;

/** A cache for {@link AuthSession} instances. */
public interface SessionCache<K, V extends AuthSession> extends AutoCloseable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please elaborate why we still need a separate interface? I'd like to understand the use case you're trying to solve here. Currently I don't see any need for having a separate interface and DefaultSessionCache could be named just SessionCache. Also DefaultSessionCache already is parameterized, so that should give us enough flexibility?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An interface could allow people to use another cache library than Caffeine, or create the Caffeine cache in a different way. Generally speaking an interface is more flexible for implementors than a superclass.

Also, as @danielcweeks pointed out, the default implementation could be made package-private.

But if you insist on removing the interface, I won't oppose.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not insisting but rather trying to understand the actual use case and whether there's a need to add this kind of flexibility. Let's also see what @danielcweeks thinks about this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nastra and @danielcweeks which direction should I move in?

  1. Keep the SessionCache interface and change the visibility of DefaultSessionCache to package-private
  2. Remove the interface and rename DefaultSessionCache to SessionCache


/**
* Returns a cached session for the given key, loading it with the given loader if it is not
* already cached.
*
* @param key the key to use for the session.
* @param loader the loader to use to load the session if it is not already cached.
* @return the cached session.
*/
V cachedSession(K key, Function<K, V> loader);

@Override
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.rest.auth;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

class TestDefaultSessionCache {

@Test
void cachedHitsAndMisses() {
DefaultSessionCache<String, AuthSession> cache =
new DefaultSessionCache<>(Duration.ofHours(1), Runnable::run, System::nanoTime);
AuthSession session1 = Mockito.mock(AuthSession.class);
AuthSession session2 = Mockito.mock(AuthSession.class);

@SuppressWarnings("unchecked")
Function<String, AuthSession> loader = Mockito.mock(Function.class);
Mockito.when(loader.apply("key1")).thenReturn(session1);
Mockito.when(loader.apply("key2")).thenReturn(session2);

AuthSession session = cache.cachedSession("key1", loader);
assertThat(session).isNotNull().isSameAs(session1);

session = cache.cachedSession("key1", loader);
assertThat(session).isNotNull().isSameAs(session1);

session = cache.cachedSession("key2", loader);
assertThat(session).isNotNull().isSameAs(session2);

session = cache.cachedSession("key2", loader);
assertThat(session).isNotNull().isSameAs(session2);

Mockito.verify(loader, times(1)).apply("key1");
Mockito.verify(loader, times(1)).apply("key2");

assertThat(cache.sessionCache().asMap()).hasSize(2);
cache.close();
assertThat(cache.sessionCache().asMap()).isEmpty();

Mockito.verify(session1).close();
Mockito.verify(session2).close();
}

@Test
@SuppressWarnings("unchecked")
void cacheEviction() {
AtomicLong ticker = new AtomicLong(0);
DefaultSessionCache<String, AuthSession> cache =
new DefaultSessionCache<>(Duration.ofHours(1), Runnable::run, ticker::get);
AuthSession session1 = Mockito.mock(AuthSession.class);

Function<String, AuthSession> loader = Mockito.mock(Function.class);
Mockito.when(loader.apply("key1")).thenReturn(session1);

AuthSession session = cache.cachedSession("key1", loader);
assertThat(session).isNotNull().isSameAs(session1);

Mockito.verify(loader, times(1)).apply("key1");
Mockito.verify(session1, never()).close();

ticker.set(TimeUnit.HOURS.toNanos(1));
cache.sessionCache().cleanUp();
Mockito.verify(session1).close();

cache.close();
}
}
Loading