diff --git a/src/main/java/org/opensearch/security/OpenSearchSecurityPlugin.java b/src/main/java/org/opensearch/security/OpenSearchSecurityPlugin.java index ffb7cfc075..89081cb71a 100644 --- a/src/main/java/org/opensearch/security/OpenSearchSecurityPlugin.java +++ b/src/main/java/org/opensearch/security/OpenSearchSecurityPlugin.java @@ -99,6 +99,10 @@ import org.opensearch.extensions.ExtensionsManager; import org.opensearch.http.HttpServerTransport; import org.opensearch.http.HttpServerTransport.Dispatcher; +import org.opensearch.identity.ScheduledJobIdentityManager; +import org.opensearch.identity.Subject; +import org.opensearch.identity.noop.NoopTokenManager; +import org.opensearch.identity.tokens.TokenManager; import org.opensearch.index.Index; import org.opensearch.index.IndexModule; import org.opensearch.index.cache.query.QueryCache; @@ -106,6 +110,7 @@ import org.opensearch.indices.SystemIndexDescriptor; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.plugins.ClusterPlugin; +import org.opensearch.plugins.IdentityPlugin; import org.opensearch.plugins.MapperPlugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.rest.RestController; @@ -145,6 +150,8 @@ import org.opensearch.security.http.SecurityHttpServerTransport; import org.opensearch.security.http.SecurityNonSslHttpServerTransport; import org.opensearch.security.http.XFFResolver; +import org.opensearch.security.identity.SecurityScheduledJobIdentityManager; +import org.opensearch.security.identity.SecuritySubject; import org.opensearch.security.privileges.PrivilegesEvaluator; import org.opensearch.security.privileges.PrivilegesInterceptor; import org.opensearch.security.privileges.RestLayerPrivilegesEvaluator; @@ -193,7 +200,7 @@ import org.opensearch.watcher.ResourceWatcherService; // CS-ENFORCE-SINGLE -public final class OpenSearchSecurityPlugin extends OpenSearchSecuritySSLPlugin implements ClusterPlugin, MapperPlugin { +public final class OpenSearchSecurityPlugin extends OpenSearchSecuritySSLPlugin implements ClusterPlugin, MapperPlugin, IdentityPlugin { private static final String KEYWORD = ".keyword"; private static final Logger actionTrace = LogManager.getLogger("opendistro_security_action_trace"); @@ -212,6 +219,7 @@ public final class OpenSearchSecurityPlugin extends OpenSearchSecuritySSLPlugin private volatile ConfigurationRepository cr; private volatile AdminDNs adminDns; private volatile ClusterService cs; + private volatile SecuritySubject subject = new SecuritySubject(); private static volatile DiscoveryNode localNode; private volatile AuditLog auditLog; private volatile BackendRegistry backendRegistry; @@ -226,6 +234,8 @@ public final class OpenSearchSecurityPlugin extends OpenSearchSecuritySSLPlugin private volatile Salt salt; private volatile OpensearchDynamicSetting transportPassiveAuthSetting; + private volatile ScheduledJobIdentityManager scheduledJobIdentityManager; + public static boolean isActionTraceEnabled() { return actionTrace.isTraceEnabled(); } @@ -990,6 +1000,8 @@ public Collection createComponents( cr = ConfigurationRepository.create(settings, this.configPath, threadPool, localClient, clusterService, auditLog); + subject.setThreadContext(threadPool.getThreadContext()); + userService = new UserService(cs, cr, settings, localClient); final XFFResolver xffResolver = new XFFResolver(threadPool); @@ -1065,6 +1077,8 @@ public Collection createComponents( ); components.add(principalExtractor); + scheduledJobIdentityManager = new SecurityScheduledJobIdentityManager(cs, localClient, threadPool); + // NOTE: We need to create DefaultInterClusterRequestEvaluator before creating ConfigurationRepository since the latter requires // security index to be accessible which means // communciation with other nodes is already up. However for the communication to be up, there needs to be trusted nodes_dn. Hence @@ -1084,6 +1098,7 @@ public Collection createComponents( components.add(si); components.add(dcf); components.add(userService); + components.add(scheduledJobIdentityManager); return components; @@ -1886,6 +1901,21 @@ private static String handleKeyword(final String field) { return field; } + @Override + public Subject getSubject() { + return subject; + } + + @Override + public ScheduledJobIdentityManager getScheduledJobIdentityManager() { + return scheduledJobIdentityManager; + } + + @Override + public TokenManager getTokenManager() { + return new NoopTokenManager(); + } + public static DiscoveryNode getLocalNode() { return localNode; } diff --git a/src/main/java/org/opensearch/security/auth/BackendRegistry.java b/src/main/java/org/opensearch/security/auth/BackendRegistry.java index 0a287d19f5..7a51953c44 100644 --- a/src/main/java/org/opensearch/security/auth/BackendRegistry.java +++ b/src/main/java/org/opensearch/security/auth/BackendRegistry.java @@ -202,6 +202,7 @@ && isBlocked(((InetSocketAddress) request.getHttpChannel().getRemoteAddress()).g if (adminDns.isAdminDN(sslPrincipal)) { // PKI authenticated REST call threadPool.getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_USER, new User(sslPrincipal)); + threadPool.getThreadContext().putPersistent(ConfigConstants.OPENDISTRO_SECURITY_AUTHENTICATED_USER, new User(sslPrincipal)); auditLog.logSucceededLogin(sslPrincipal, true, null, request); return true; } @@ -362,6 +363,10 @@ && isBlocked(((InetSocketAddress) request.getHttpChannel().getRemoteAddress()).g ConfigConstants.OPENDISTRO_SECURITY_USER, impersonatedUser == null ? authenticatedUser : impersonatedUser ); + threadContext.putPersistent( + ConfigConstants.OPENDISTRO_SECURITY_AUTHENTICATED_USER, + impersonatedUser == null ? authenticatedUser : impersonatedUser + ); auditLog.logSucceededLogin( (impersonatedUser == null ? authenticatedUser : impersonatedUser).getName(), false, diff --git a/src/main/java/org/opensearch/security/identity/ScheduledJobIdentity.java b/src/main/java/org/opensearch/security/identity/ScheduledJobIdentity.java new file mode 100644 index 0000000000..2971fd3204 --- /dev/null +++ b/src/main/java/org/opensearch/security/identity/ScheduledJobIdentity.java @@ -0,0 +1,180 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.security.identity; + +import java.io.IOException; +import java.time.Instant; + +import com.google.common.base.Objects; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.security.user.User; + +import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; + +/** + * Scheduled Job Identity. + */ +public class ScheduledJobIdentity implements Writeable, ToXContentObject { + public static final String JOB_ID_FIELD = "job_id"; + public static final String JOB_INDEX_FIELD = "job_index"; + public static final String LAST_UPDATE_TIME_FIELD = "last_update_time"; + public static final String CREATED_TIME_FIELD = "created_time"; + public static final String USER_FIELD = "user"; + + private final String jobId; + private final String jobIndex; + private final Instant createdTime; + private final Instant lastUpdateTime; + private final User user; + + public ScheduledJobIdentity(String jobId, String jobIndex, Instant createdTime, Instant lastUpdateTime, User user) { + this.jobId = jobId; + this.jobIndex = jobIndex; + this.createdTime = createdTime; + this.lastUpdateTime = lastUpdateTime; + this.user = user; + } + + public ScheduledJobIdentity(StreamInput input) throws IOException { + jobId = input.readString(); + jobIndex = input.readString(); + createdTime = input.readInstant(); + lastUpdateTime = input.readInstant(); + if (input.readBoolean()) { + user = new User(input); + } else { + user = null; + } + } + + /** + * Parse content parser to {@link java.time.Instant}. + * + * @param parser json based content parser + * @return instance of {@link java.time.Instant} + * @throws IOException IOException if content can't be parsed correctly + */ + public static Instant toInstant(XContentParser parser) throws IOException { + if (parser.currentToken() == null || parser.currentToken() == XContentParser.Token.VALUE_NULL) { + return null; + } + if (parser.currentToken().isValue()) { + return Instant.ofEpochMilli(parser.longValue()); + } + return null; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + XContentBuilder xContentBuilder = builder.startObject() + .field(JOB_ID_FIELD, jobId) + .field(JOB_INDEX_FIELD, jobIndex) + .field(CREATED_TIME_FIELD, createdTime.toEpochMilli()) + .field(LAST_UPDATE_TIME_FIELD, lastUpdateTime.toEpochMilli()); + if (user != null) { + xContentBuilder.field(USER_FIELD, user); + } + return xContentBuilder.endObject(); + } + + @Override + public void writeTo(StreamOutput output) throws IOException { + output.writeString(jobId); + output.writeString(jobIndex); + output.writeInstant(createdTime); + output.writeInstant(lastUpdateTime); + if (user != null) { + output.writeBoolean(true); // user exists + user.writeTo(output); + } else { + output.writeBoolean(false); // user does not exist + } + } + + public static ScheduledJobIdentity parse(XContentParser parser) throws IOException { + String jobId = null; + String jobIndex = null; + Instant createdTime = null; + Instant lastUpdateTime = null; + User user = null; + + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String fieldName = parser.currentName(); + parser.nextToken(); + + switch (fieldName) { + case JOB_ID_FIELD: + jobId = parser.text(); + break; + case JOB_INDEX_FIELD: + jobIndex = parser.text(); + break; + case CREATED_TIME_FIELD: + createdTime = toInstant(parser); + break; + case LAST_UPDATE_TIME_FIELD: + lastUpdateTime = toInstant(parser); + break; + case USER_FIELD: + user = User.parse(parser); + break; + default: + parser.skipChildren(); + break; + } + } + return new ScheduledJobIdentity(jobId, jobIndex, createdTime, lastUpdateTime, user); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ScheduledJobIdentity that = (ScheduledJobIdentity) o; + return Objects.equal(getJobId(), that.getJobId()) + && Objects.equal(getJobIndex(), that.getJobIndex()) + && Objects.equal(getCreatedTime(), that.getCreatedTime()) + && Objects.equal(getLastUpdateTime(), that.getLastUpdateTime()); + } + + @Override + public int hashCode() { + return Objects.hashCode(jobId, jobIndex, createdTime, lastUpdateTime); + } + + public String getJobId() { + return jobId; + } + + public String getJobIndex() { + return jobIndex; + } + + public Instant getCreatedTime() { + return createdTime; + } + + public Instant getLastUpdateTime() { + return lastUpdateTime; + } + + public User getUser() { + return user; + } +} diff --git a/src/main/java/org/opensearch/security/identity/SecurityIndex.java b/src/main/java/org/opensearch/security/identity/SecurityIndex.java new file mode 100644 index 0000000000..83b4477baf --- /dev/null +++ b/src/main/java/org/opensearch/security/identity/SecurityIndex.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.security.identity; + +import java.util.function.Supplier; + +import org.opensearch.security.util.ThrowingSupplierWrapper; + +import static org.opensearch.security.identity.SecurityIndices.SCHEDULED_JOB_IDENTITY_INDEX; + +/** + * Represent a security index + * + */ +public enum SecurityIndex { + + // throw RuntimeException since we don't know how to handle the case when the mapping reading throws IOException + SCHEDULED_JOB_IDENTITY( + SCHEDULED_JOB_IDENTITY_INDEX, + ThrowingSupplierWrapper.throwingSupplierWrapper(SecurityIndices::getScheduledJobIdentityMappings) + ); + + private final String indexName; + private final String mapping; + + SecurityIndex(String name, Supplier mappingSupplier) { + this.indexName = name; + this.mapping = mappingSupplier.get(); + } + + public String getIndexName() { + return indexName; + } + + public String getMapping() { + return mapping; + } + +} diff --git a/src/main/java/org/opensearch/security/identity/SecurityIndices.java b/src/main/java/org/opensearch/security/identity/SecurityIndices.java new file mode 100644 index 0000000000..d7f88e2e63 --- /dev/null +++ b/src/main/java/org/opensearch/security/identity/SecurityIndices.java @@ -0,0 +1,169 @@ +package org.opensearch.security.identity; + +import java.io.IOException; +import java.net.URL; +import java.util.EnumMap; + +import com.google.common.io.Resources; +import org.apache.commons.codec.Charsets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentParser; + +public class SecurityIndices { + protected Logger logger = LogManager.getLogger(getClass()); + + public final static String SCHEDULED_JOB_IDENTITY_INDEX = ".opendistro-security-scheduled-job-identity"; + public static final String SCHEDULED_JOB_IDENTITY_INDEX_MAPPING_FILE = "mappings/scheduled-job-identity.json"; + static final String META = "_meta"; + private static final String SCHEMA_VERSION = "schema_version"; + public static Integer NO_SCHEMA_VERSION = 0; + // minimum shards of the scheduled job identity index + public static int minJobIndexReplicas = 1; + // maximum shards of the scheduled job identity index + public static int maxJobIndexReplicas = 20; + private final Client client; + private final ClusterService cs; + // keep track of whether the mapping version is up-to-date + private EnumMap indexStates; + + class IndexState { + // keep track of whether the mapping version is up-to-date + private Boolean mappingUpToDate; + + // record schema version reading from the mapping file + private Integer schemaVersion; + + IndexState(SecurityIndex index) { + this.mappingUpToDate = false; + this.schemaVersion = parseSchemaVersion(index.getMapping()); + } + } + + /** + * Constructor function + * + * @param client ES client supports administrative actions + */ + public SecurityIndices(Client client, ClusterService cs) { + this.client = client; + this.cs = cs; + + this.indexStates = new EnumMap(SecurityIndex.class); + } + + private ActionListener markMappingUpToDate( + SecurityIndex index, + ActionListener followingListener + ) { + return ActionListener.wrap(createdResponse -> { + if (createdResponse.isAcknowledged()) { + IndexState indexState = indexStates.computeIfAbsent(index, IndexState::new); + if (Boolean.FALSE.equals(indexState.mappingUpToDate)) { + indexState.mappingUpToDate = Boolean.TRUE; + logger.info(new ParameterizedMessage("Mark [{}]'s mapping up-to-date", index.getIndexName())); + } + } + followingListener.onResponse(createdResponse); + }, exception -> followingListener.onFailure(exception)); + } + + private static Integer parseSchemaVersion(String mapping) { + try { + XContentParser xcp = XContentType.JSON.xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, mapping); + + while (!xcp.isClosed()) { + XContentParser.Token token = xcp.currentToken(); + if (token != null && token != XContentParser.Token.END_OBJECT && token != XContentParser.Token.START_OBJECT) { + if (xcp.currentName() != META) { + xcp.nextToken(); + xcp.skipChildren(); + } else { + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + if (xcp.currentName().equals(SCHEMA_VERSION)) { + + Integer version = xcp.intValue(); + if (version < 0) { + version = NO_SCHEMA_VERSION; + } + return version; + } else { + xcp.nextToken(); + } + } + + } + } + xcp.nextToken(); + } + return NO_SCHEMA_VERSION; + } catch (Exception e) { + // since this method is called in the constructor that is called by OpenSearchSecurityPlugin.createComponents, + // we cannot throw checked exception + throw new RuntimeException(e); + } + } + + protected boolean doesScheduledJobIdentityIndexExists() { + if (!cs.state().metadata().hasConcreteIndex(SCHEDULED_JOB_IDENTITY_INDEX)) { + return false; + } + return true; + } + + /** + * Create scheduled job identity index. + * + * @param actionListener action called after create index + */ + public void initScheduledJobIdentityIndex(ActionListener actionListener) { + try { + CreateIndexRequest request = new CreateIndexRequest(SCHEDULED_JOB_IDENTITY_INDEX).mapping( + getScheduledJobIdentityMappings(), + XContentType.JSON + ); + request.settings( + Settings.builder() + // Schedule job identity index is small. 1 primary shard is enough + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + // Job scheduler puts both primary and replica shards in the + // hash ring. Auto-expand the number of replicas based on the + // number of data nodes (up to 20) in the cluster so that each node can + // become a coordinating node. This is useful when customers + // scale out their cluster so that we can do adaptive scaling + // accordingly. + // At least 1 replica for fail-over. + .put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, minJobIndexReplicas + "-" + maxJobIndexReplicas) + .put("index.hidden", true) + ); + client.admin().indices().create(request, markMappingUpToDate(SecurityIndex.SCHEDULED_JOB_IDENTITY, actionListener)); + } catch (IOException e) { + logger.error("Fail to init scheduler job identity index", e); + actionListener.onFailure(e); + } + } + + /** + * Get scheduled job identity index mapping json content. + * + * @return scheduled job identity index mapping + * @throws IOException IOException if mapping file can't be read correctly + */ + public static String getScheduledJobIdentityMappings() throws IOException { + URL url = SecurityScheduledJobIdentityManager.class.getClassLoader().getResource(SCHEDULED_JOB_IDENTITY_INDEX_MAPPING_FILE); + return Resources.toString(url, Charsets.UTF_8); + } +} diff --git a/src/main/java/org/opensearch/security/identity/SecurityScheduledJobIdentityManager.java b/src/main/java/org/opensearch/security/identity/SecurityScheduledJobIdentityManager.java new file mode 100644 index 0000000000..3721235e4d --- /dev/null +++ b/src/main/java/org/opensearch/security/identity/SecurityScheduledJobIdentityManager.java @@ -0,0 +1,238 @@ +package org.opensearch.security.identity; + +import java.io.IOException; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import com.google.common.collect.ImmutableMap; +import org.apache.cxf.rs.security.jose.jws.JwsJwtCompactConsumer; +import org.apache.cxf.rs.security.jose.jwt.JwtToken; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import org.opensearch.OpenSearchSecurityException; +import org.opensearch.action.ActionListener; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.identity.ScheduledJobIdentityManager; +import org.opensearch.identity.schedule.ScheduledJobIdentityModel; +import org.opensearch.identity.schedule.ScheduledJobOperator; +import org.opensearch.identity.schedule.ScheduledJobUserModel; +import org.opensearch.identity.tokens.AuthToken; +import org.opensearch.identity.tokens.BearerAuthToken; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.security.user.User; +import org.opensearch.threadpool.ThreadPool; + +import static org.opensearch.security.identity.SecurityIndices.SCHEDULED_JOB_IDENTITY_INDEX; +import static org.opensearch.security.support.ConfigConstants.OPENDISTRO_SECURITY_USER; + +public class SecurityScheduledJobIdentityManager implements ScheduledJobIdentityManager { + protected Logger logger = LogManager.getLogger(getClass()); + public static final ToXContent.MapParams XCONTENT_WITH_TYPE = new ToXContent.MapParams(ImmutableMap.of("with_type", "true")); + + private final ClusterService cs; + + private final Client client; + + private final ThreadPool threadPool; + + private final SecurityIndices securityIndices; + + public SecurityScheduledJobIdentityManager(ClusterService cs, Client client, ThreadPool threadPool) { + this.cs = cs; + this.client = client; + this.threadPool = threadPool; + this.securityIndices = new SecurityIndices(client, cs); + } + + @Override + public void associateJobWithOperator(String jobId, String indexName, Optional operator) { + if (operator.isEmpty()) { + // TODO Associate Job with Authenticated User + User currentUser = (User) threadPool.getThreadContext().getPersistent(OPENDISTRO_SECURITY_USER); + System.out.println("Current User: " + currentUser); + } + if (!securityIndices.doesScheduledJobIdentityIndexExists()) { + securityIndices.initScheduledJobIdentityIndex(ActionListener.wrap(response -> { + if (response.isAcknowledged()) { + logger.info("Created {} with mappings.", SCHEDULED_JOB_IDENTITY_INDEX); + createScheduledJobIdentityEntry(jobId, indexName, operator.get()); + } else { + logger.warn("Created {} with mappings call not acknowledged.", SCHEDULED_JOB_IDENTITY_INDEX); + throw new OpenSearchSecurityException( + "Created " + SCHEDULED_JOB_IDENTITY_INDEX + " with mappings call not acknowledged." + ); + } + }, exception -> new OpenSearchSecurityException("Created " + SCHEDULED_JOB_IDENTITY_INDEX + " with mappings call failed."))); + } else { + createScheduledJobIdentityEntry(jobId, indexName, operator.get()); + } + } + + private void createScheduledJobIdentityEntry(String jobId, String indexName, ScheduledJobOperator operator) { + SearchRequest searchRequest = new SearchRequest().indices(SCHEDULED_JOB_IDENTITY_INDEX); + BoolQueryBuilder boolQuery = QueryBuilders.boolQuery() + .must(QueryBuilders.matchQuery("job_id", jobId)) + .must(QueryBuilders.matchQuery("job_index", indexName)); + searchRequest.source(SearchSourceBuilder.searchSource().query(boolQuery)); + + client.search( + searchRequest, + ActionListener.wrap( + response -> indexScheduledJobIdentity(response, jobId, indexName, operator), + exception -> new OpenSearchSecurityException( + "Exception received while querying for " + jobId + " in " + SCHEDULED_JOB_IDENTITY_INDEX + ) + ) + ); + } + + private void deleteScheduledJobIdentityEntry(String jobId, String indexName) { + SearchRequest searchRequest = new SearchRequest().indices(SCHEDULED_JOB_IDENTITY_INDEX); + BoolQueryBuilder boolQuery = QueryBuilders.boolQuery() + .must(QueryBuilders.matchQuery("job_id", jobId)) + .must(QueryBuilders.matchQuery("job_index", indexName)); + searchRequest.source(SearchSourceBuilder.searchSource().query(boolQuery)); + + client.search( + searchRequest, + ActionListener.wrap( + response -> deleteScheduledJobIdentity(response, jobId, indexName), + exception -> new OpenSearchSecurityException( + "Exception received while querying for " + jobId + " in " + SCHEDULED_JOB_IDENTITY_INDEX + ) + ) + ); + } + + private void indexScheduledJobIdentity(SearchResponse response, String jobId, String indexName, ScheduledJobOperator operator) + throws IOException { + long totalHits = response.getHits().getTotalHits().value; + if (totalHits > 1) { + // Should not happen + logger.warn( + "Multiple scheduled job identities already exists in " + SCHEDULED_JOB_IDENTITY_INDEX + " for job with jobId " + jobId + ); + } else if (totalHits == 1) { + logger.info("Scheduled Job Identity already exists in " + SCHEDULED_JOB_IDENTITY_INDEX + " for job with jobId " + jobId); + } else { + final User user = convertOperatorToUser(operator); + ScheduledJobIdentity identityOfJob = new ScheduledJobIdentity(jobId, indexName, Instant.now(), Instant.now(), user); + IndexRequest indexRequest = new IndexRequest(SCHEDULED_JOB_IDENTITY_INDEX).setRefreshPolicy( + WriteRequest.RefreshPolicy.IMMEDIATE + ).source(identityOfJob.toXContent(XContentFactory.jsonBuilder(), XCONTENT_WITH_TYPE)); + client.index( + indexRequest, + ActionListener.wrap( + indexResponse -> logger.info( + "Successfully created scheduled job identity index entry for jobId " + jobId + " in index " + indexName + ), + exception -> new OpenSearchSecurityException( + "Exception received while indexing for " + jobId + " in " + SCHEDULED_JOB_IDENTITY_INDEX + ) + ) + ); + } + } + + private User convertOperatorToUser(ScheduledJobOperator operator) { + ScheduledJobIdentityModel identity = operator.getIdentity(); + User user = null; + + if (identity.getUser() != null) { + ScheduledJobUserModel userModel = identity.getUser(); + String username = userModel.getUsername(); + Map attributes = userModel.getAttributes(); + if (!(attributes.containsKey("roles") && attributes.containsKey("backend_roles"))) { + throw new OpenSearchSecurityException("Attempting to save user details for scheduled job, but user info is empty"); + } + List roles = Arrays.stream(attributes.get("roles").split(",")).collect(Collectors.toList()); + List backendRoles = Arrays.stream(attributes.get("backend_roles").split(",")).collect(Collectors.toList()); + user = new User(username, backendRoles, roles, List.of(), null); + } else if (identity.getAuthToken() != null) { + // TODO get token info + String oboToken = identity.getAuthToken(); + JwsJwtCompactConsumer jwtConsumer = new JwsJwtCompactConsumer(oboToken); + JwtToken jwt = jwtConsumer.getJwtToken(); + System.out.println("jwt claims: " + jwt.getClaims()); + } + if (user == null) { + throw new OpenSearchSecurityException("Unable to convert operator to a user"); + } + return user; + } + + private void deleteScheduledJobIdentity(SearchResponse response, String jobId, String indexName) { + long totalHits = response.getHits().getTotalHits().value; + if (totalHits > 1) { + // Should not happen + logger.warn( + "Multiple scheduled job identities already exists in " + + SCHEDULED_JOB_IDENTITY_INDEX + + " for job with jobId " + + jobId + + " in index " + + indexName + ); + } else if (totalHits == 0) { + logger.info( + "No scheduled job identity found in " + + SCHEDULED_JOB_IDENTITY_INDEX + + " for job with jobId " + + jobId + + " in index " + + indexName + ); + } else { + String docId = response.getHits().getHits()[0].getId(); + DeleteRequest deleteRequest = new DeleteRequest(SCHEDULED_JOB_IDENTITY_INDEX).setRefreshPolicy( + WriteRequest.RefreshPolicy.IMMEDIATE + ).id(docId); + client.delete( + deleteRequest, + ActionListener.wrap( + indexResponse -> logger.info( + "Successfully deleted scheduled job identity index entry for jobId " + jobId + " in index " + indexName + ), + exception -> new OpenSearchSecurityException( + "Exception received while deleting scheduled job identity entry for " + + jobId + + " in " + + SCHEDULED_JOB_IDENTITY_INDEX + ) + ) + ); + } + } + + @Override + public void deleteJobOperatorEntry(String jobId, String indexName) { + if (!securityIndices.doesScheduledJobIdentityIndexExists()) { + throw new OpenSearchSecurityException("Scheduled Job Identity Index (" + SCHEDULED_JOB_IDENTITY_INDEX + ") does not exist."); + } + deleteScheduledJobIdentityEntry(jobId, indexName); + } + + @Override + public AuthToken issueAccessTokenOnBehalfOfOperator(String jobId, String indexName, Optional extensionUniqueId) { + if (!securityIndices.doesScheduledJobIdentityIndexExists()) { + throw new OpenSearchSecurityException("Scheduled Job Identity Index (" + SCHEDULED_JOB_IDENTITY_INDEX + ") does not exist."); + } + BearerAuthToken bearerAuthToken = new BearerAuthToken("header.payload.signature"); + return bearerAuthToken; + } +} diff --git a/src/main/java/org/opensearch/security/identity/SecuritySubject.java b/src/main/java/org/opensearch/security/identity/SecuritySubject.java new file mode 100644 index 0000000000..9d08607801 --- /dev/null +++ b/src/main/java/org/opensearch/security/identity/SecuritySubject.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.security.identity; + +import java.security.Principal; + +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.identity.NamedPrincipal; +import org.opensearch.identity.Subject; +import org.opensearch.identity.tokens.AuthToken; +import org.opensearch.security.support.ConfigConstants; +import org.opensearch.security.user.User; + +public class SecuritySubject implements Subject { + + private ThreadContext threadContext; + + public SecuritySubject() {} + + public void setThreadContext(ThreadContext threadContext) { + this.threadContext = threadContext; + } + + @Override + public Principal getPrincipal() { + if (threadContext == null) { + return NamedPrincipal.UNAUTHENTICATED; + } + final User user = (User) threadContext.getTransient(ConfigConstants.OPENDISTRO_SECURITY_USER); + if (user == null) { + return NamedPrincipal.ROOT; + } + return new NamedPrincipal(user.getName()); + } + + @Override + public void authenticate(AuthToken authToken) { + // TODO implement this - replace with logic from SecurityRestFilter + } +} diff --git a/src/main/java/org/opensearch/security/privileges/PrivilegesEvaluator.java b/src/main/java/org/opensearch/security/privileges/PrivilegesEvaluator.java index a3738dadac..777de7a92f 100644 --- a/src/main/java/org/opensearch/security/privileges/PrivilegesEvaluator.java +++ b/src/main/java/org/opensearch/security/privileges/PrivilegesEvaluator.java @@ -213,6 +213,7 @@ private void setUserInfoInThreadContext(User user, Set mappedRoles) { joiner.add(requestedTenant); } threadContext.putTransient(OPENDISTRO_SECURITY_USER_INFO_THREAD_CONTEXT, joiner.toString()); + // threadContext.putPersistent(OPENDISTRO_SECURITY_USER_INFO_THREAD_CONTEXT, joiner.toString()); } } diff --git a/src/main/java/org/opensearch/security/support/ConfigConstants.java b/src/main/java/org/opensearch/security/support/ConfigConstants.java index ee04ff62f3..52dc8cc5b3 100644 --- a/src/main/java/org/opensearch/security/support/ConfigConstants.java +++ b/src/main/java/org/opensearch/security/support/ConfigConstants.java @@ -111,6 +111,7 @@ public class ConfigConstants { public static final String OPENDISTRO_SECURITY_SSL_TRANSPORT_PRINCIPAL = OPENDISTRO_SECURITY_CONFIG_PREFIX + "ssl_transport_principal"; public static final String OPENDISTRO_SECURITY_USER = OPENDISTRO_SECURITY_CONFIG_PREFIX + "user"; + public static final String OPENDISTRO_SECURITY_AUTHENTICATED_USER = OPENDISTRO_SECURITY_CONFIG_PREFIX + "authenticated_user"; public static final String OPENDISTRO_SECURITY_USER_HEADER = OPENDISTRO_SECURITY_CONFIG_PREFIX + "user_header"; public static final String OPENDISTRO_SECURITY_USER_INFO_THREAD_CONTEXT = OPENDISTRO_SECURITY_CONFIG_PREFIX + "user_info"; diff --git a/src/main/java/org/opensearch/security/user/User.java b/src/main/java/org/opensearch/security/user/User.java index 43ca5d0e57..1d1c29022f 100644 --- a/src/main/java/org/opensearch/security/user/User.java +++ b/src/main/java/org/opensearch/security/user/User.java @@ -33,14 +33,21 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import com.google.common.collect.Lists; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; + +import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; /** * A authenticated user and attributes associated to them (like roles, tenant, custom attributes) @@ -48,7 +55,7 @@ * Do not subclass from this class! * */ -public class User implements Serializable, Writeable, CustomAttributesAware { +public class User implements Serializable, Writeable, ToXContent, CustomAttributesAware { public static final User ANONYMOUS = new User( "opendistro_security_anonymous", @@ -67,6 +74,13 @@ public class User implements Serializable, Writeable, CustomAttributesAware { null ); + // field name in toXContent + public static final String NAME_FIELD = "name"; + public static final String BACKEND_ROLES_FIELD = "backend_roles"; + public static final String ROLES_FIELD = "roles"; + public static final String CUSTOM_ATTRIBUTE_NAMES_FIELD = "custom_attribute_names"; + public static final String REQUESTED_TENANT_FIELD = "user_requested_tenant"; + private static final long serialVersionUID = -5500938501822658596L; private final String name; /** @@ -124,6 +138,28 @@ public User(final String name) { this(name, null, null); } + public User(String name, List backendRoles, List roles, List customAttNames, String requestedTenant) { + super(); + + if (name == null || name.isEmpty()) { + throw new IllegalArgumentException("name must not be null or empty"); + } + + this.name = name; + + if (roles != null) { + this.addSecurityRoles(roles); + } + + if (backendRoles != null) { + this.addRoles(backendRoles); + } + + if (requestedTenant != null) { + this.setRequestedTenant(requestedTenant); + } + } + public final String getName() { return name; } @@ -177,6 +213,49 @@ public final void addAttributes(final Map attributes) { } } + public static User parse(XContentParser parser) throws IOException { + String name = ""; + List backendRoles = new ArrayList<>(); + List roles = new ArrayList<>(); + List customAttNames = new ArrayList<>(); + String requestedTenant = null; + + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case NAME_FIELD: + name = parser.text(); + break; + case BACKEND_ROLES_FIELD: + ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser); + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + backendRoles.add(parser.text()); + } + break; + case ROLES_FIELD: + ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser); + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + roles.add(parser.text()); + } + break; + case CUSTOM_ATTRIBUTE_NAMES_FIELD: + ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser); + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + customAttNames.add(parser.text()); + } + break; + case REQUESTED_TENANT_FIELD: + requestedTenant = parser.textOrNull(); + break; + default: + break; + } + } + return new User(name, backendRoles, roles, customAttNames, requestedTenant); + } + public final String getRequestedTenant() { return requestedTenant; } @@ -283,4 +362,19 @@ public final Set getSecurityRoles() { ? Collections.synchronizedSet(Collections.emptySet()) : Collections.unmodifiableSet(this.securityRoles); } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + List customAttNames = new ArrayList<>(); + if (attributes != null) { + customAttNames = attributes.keySet().stream().collect(Collectors.toList()); + } + builder.startObject() + .field(NAME_FIELD, name) + .field(BACKEND_ROLES_FIELD, roles) + .field(ROLES_FIELD, securityRoles) + .field(CUSTOM_ATTRIBUTE_NAMES_FIELD, customAttNames) + .field(REQUESTED_TENANT_FIELD, requestedTenant); + return builder.endObject(); + } } diff --git a/src/main/java/org/opensearch/security/util/ThrowingSupplier.java b/src/main/java/org/opensearch/security/util/ThrowingSupplier.java new file mode 100644 index 0000000000..51447d20ec --- /dev/null +++ b/src/main/java/org/opensearch/security/util/ThrowingSupplier.java @@ -0,0 +1,23 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.security.util; + +/** + * A supplier that can throw checked exception + * + * @param method parameter type + * @param Exception type + */ +@FunctionalInterface +public interface ThrowingSupplier { + T get() throws E; +} diff --git a/src/main/java/org/opensearch/security/util/ThrowingSupplierWrapper.java b/src/main/java/org/opensearch/security/util/ThrowingSupplierWrapper.java new file mode 100644 index 0000000000..a23731ea2f --- /dev/null +++ b/src/main/java/org/opensearch/security/util/ThrowingSupplierWrapper.java @@ -0,0 +1,43 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.security.util; + +import java.util.function.Supplier; + +public class ThrowingSupplierWrapper { + /* + * Private constructor to avoid Jacoco complaining about public constructor + * not covered: https://tinyurl.com/yetc7tra + */ + private ThrowingSupplierWrapper() {} + + /** + * Utility method to use a method throwing checked exception inside a place + * that does not allow throwing the corresponding checked exception (e.g., + * enum initialization). + * Convert the checked exception thrown by by throwingConsumer to a RuntimeException + * so that the compiler won't complain. + * @param the method's return type + * @param throwingSupplier the method reference that can throw checked exception + * @return converted method reference + */ + public static Supplier throwingSupplierWrapper(ThrowingSupplier throwingSupplier) { + + return () -> { + try { + return throwingSupplier.get(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + }; + } +} diff --git a/src/main/resources/mappings/scheduled-job-identity.json b/src/main/resources/mappings/scheduled-job-identity.json new file mode 100644 index 0000000000..3ec64a2531 --- /dev/null +++ b/src/main/resources/mappings/scheduled-job-identity.json @@ -0,0 +1,55 @@ +{ + "dynamic": false, + "_meta": { + "schema_version": 1 + }, + "properties": { + "schema_version": { + "type": "integer" + }, + "created_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "last_update_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "job_id": { + "type": "keyword" + }, + "job_index": { + "type": "keyword" + }, + "user": { + "type": "nested", + "properties": { + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "backend_roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + } + } + } + } +} diff --git a/src/test/java/org/opensearch/security/identity/SecurityIndicesTestUtils.java b/src/test/java/org/opensearch/security/identity/SecurityIndicesTestUtils.java new file mode 100644 index 0000000000..f15b76df8f --- /dev/null +++ b/src/test/java/org/opensearch/security/identity/SecurityIndicesTestUtils.java @@ -0,0 +1,65 @@ +package org.opensearch.security.identity; + +import java.util.Arrays; +import java.util.TreeMap; + +import org.mockito.quality.Strictness; + +import org.opensearch.client.AdminClient; +import org.opensearch.client.Client; +import org.opensearch.client.IndicesAdminClient; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexAbstraction; +import org.opensearch.cluster.metadata.Metadata; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; + +public class SecurityIndicesTestUtils { + public static ClusterState createClusterState(final IndexShorthand... indices) { + final TreeMap indexMap = new TreeMap(); + Arrays.stream(indices).forEach(indexShorthand -> { + final IndexAbstraction indexAbstraction = mock(IndexAbstraction.class, withSettings().strictness(Strictness.LENIENT)); + when(indexAbstraction.getType()).thenReturn(indexShorthand.type); + indexMap.put(indexShorthand.name, indexAbstraction); + }); + + final Metadata mockMetadata = mock(Metadata.class, withSettings().strictness(Strictness.LENIENT)); + when(mockMetadata.getIndicesLookup()).thenReturn(indexMap); + + final ClusterState mockClusterState = mock(ClusterState.class, withSettings().strictness(Strictness.LENIENT)); + when(mockClusterState.getMetadata()).thenReturn(mockMetadata); + when(mockClusterState.metadata()).thenReturn(mockMetadata); + + if (indices != null) { + for (IndexShorthand index : indices) { + when(mockMetadata.hasConcreteIndex(index.name)).thenReturn(true); + } + } + + return mockClusterState; + } + + public static Client createNodeClient() { + final IndicesAdminClient mockIndicesAdminClient = mock(IndicesAdminClient.class, withSettings().strictness(Strictness.LENIENT)); + + final AdminClient mockAdminClient = mock(AdminClient.class, withSettings().strictness(Strictness.LENIENT)); + when(mockAdminClient.indices()).thenReturn(mockIndicesAdminClient); + + final Client mockClient = mock(Client.class, withSettings().strictness(Strictness.LENIENT)); + when(mockClient.admin()).thenReturn(mockAdminClient); + + return mockClient; + } + + public static class IndexShorthand { + public final String name; + public final IndexAbstraction.Type type; + + public IndexShorthand(final String name, final IndexAbstraction.Type type) { + this.name = name; + this.type = type; + } + } +} diff --git a/src/test/java/org/opensearch/security/identity/SecurityIndicesTests.java b/src/test/java/org/opensearch/security/identity/SecurityIndicesTests.java new file mode 100644 index 0000000000..3057f364d2 --- /dev/null +++ b/src/test/java/org/opensearch/security/identity/SecurityIndicesTests.java @@ -0,0 +1,74 @@ +package org.opensearch.security.identity; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexAbstraction; +import org.opensearch.cluster.service.ClusterService; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.verify; +import static org.opensearch.security.identity.SecurityIndices.SCHEDULED_JOB_IDENTITY_INDEX; +import static org.opensearch.security.identity.SecurityIndicesTestUtils.createNodeClient; + +@RunWith(MockitoJUnitRunner.class) +public class SecurityIndicesTests { + + @Mock + private ClusterService clusterService; + + @Mock + private Client client; + + @Test + public void testScheduledJobIdentityIndexNotExists() throws Exception { + doReturn( + SecurityIndicesTestUtils.createClusterState( + new SecurityIndicesTestUtils.IndexShorthand("my-index", IndexAbstraction.Type.CONCRETE_INDEX) + ) + ).when(clusterService).state(); + SecurityIndices indices = new SecurityIndices(client, clusterService); + + boolean exists = indices.doesScheduledJobIdentityIndexExists(); + assertFalse(exists); + } + + @Test + public void testScheduledJobIdentityIndexExists() throws Exception { + doReturn( + SecurityIndicesTestUtils.createClusterState( + new SecurityIndicesTestUtils.IndexShorthand(SCHEDULED_JOB_IDENTITY_INDEX, IndexAbstraction.Type.CONCRETE_INDEX) + ) + ).when(clusterService).state(); + SecurityIndices indices = new SecurityIndices(client, clusterService); + + boolean exists = indices.doesScheduledJobIdentityIndexExists(); + assertTrue(exists); + } + + @Test + public void testInitScheduledJobIdentityIndex() throws Exception { + Client nodeClient = createNodeClient(); + SecurityIndices indices = new SecurityIndices(nodeClient, clusterService); + ActionListener actionListener = new ActionListener() { + @Override + public void onResponse(CreateIndexResponse createIndexResponse) {} + + @Override + public void onFailure(Exception e) {} + }; + indices.initScheduledJobIdentityIndex(actionListener); + + verify(nodeClient).admin(); + verify(nodeClient.admin()).indices(); + verify(nodeClient.admin().indices()).create(any(), any()); + } +} diff --git a/src/test/java/org/opensearch/security/identity/SecurityScheduledJobIdentityManagerTests.java b/src/test/java/org/opensearch/security/identity/SecurityScheduledJobIdentityManagerTests.java new file mode 100644 index 0000000000..516f6a049a --- /dev/null +++ b/src/test/java/org/opensearch/security/identity/SecurityScheduledJobIdentityManagerTests.java @@ -0,0 +1,5 @@ +package org.opensearch.security.identity; + +public class SecurityScheduledJobIdentityManagerTests { + +} diff --git a/src/test/java/org/opensearch/security/identity/SecuritySubjectTests.java b/src/test/java/org/opensearch/security/identity/SecuritySubjectTests.java new file mode 100644 index 0000000000..12975d7099 --- /dev/null +++ b/src/test/java/org/opensearch/security/identity/SecuritySubjectTests.java @@ -0,0 +1,38 @@ +package org.opensearch.security.identity; + +import java.security.Principal; + +import org.junit.Test; + +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.identity.NamedPrincipal; +import org.opensearch.security.support.ConfigConstants; +import org.opensearch.security.user.User; + +import static org.junit.Assert.assertEquals; + +public class SecuritySubjectTests { + + @Test + public void testCurrentSubjectIsUnauthenticated() { + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + SecuritySubject subject = new SecuritySubject(); + subject.setThreadContext(threadContext); + + Principal principal = subject.getPrincipal(); + assertEquals(NamedPrincipal.UNAUTHENTICATED, principal); + } + + @Test + public void testCurrentSubjectIsPresent() { + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + SecuritySubject subject = new SecuritySubject(); + User user = new User("testuser"); + threadContext.putTransient(ConfigConstants.OPENDISTRO_SECURITY_USER, user); + subject.setThreadContext(threadContext); + + Principal principal = subject.getPrincipal(); + assertEquals("testuser", principal.getName()); + } +}