-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Removes the use of the authorized indices list for bulk items authorization #76481
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -298,39 +298,32 @@ private void authorizeAction(final RequestInfo requestInfo, final String request | |
| }, clusterAuthzListener::onFailure)); | ||
| } else if (isIndexAction(action)) { | ||
| final Metadata metadata = clusterService.state().metadata(); | ||
| final AsyncSupplier<Set<String>> authorizedIndicesSupplier = new CachingAsyncSupplier<>(authzIndicesListener -> { | ||
| LoadAuthorizedIndiciesTimeChecker timeChecker = LoadAuthorizedIndiciesTimeChecker.start(requestInfo, authzInfo); | ||
| final AsyncSupplier<ResolvedIndices> resolvedIndicesAsyncSupplier = new CachingAsyncSupplier<>(resolvedIndicesListener -> { | ||
| final LoadAuthorizedIndiciesTimeChecker timeChecker = LoadAuthorizedIndiciesTimeChecker.start(requestInfo, authzInfo); | ||
| authzEngine.loadAuthorizedIndices( | ||
| requestInfo, | ||
| authzInfo, | ||
| metadata.getIndicesLookup(), | ||
| authzIndicesListener.map(authzIndices -> { | ||
| timeChecker.done(authzIndices); | ||
| return authzIndices; | ||
| }) | ||
| ); | ||
| }); | ||
| final AsyncSupplier<ResolvedIndices> resolvedIndicesAsyncSupplier = new CachingAsyncSupplier<>(resolvedIndicesListener -> | ||
| authorizedIndicesSupplier.getAsync( | ||
| requestInfo, | ||
| authzInfo, | ||
| metadata.getIndicesLookup(), | ||
| ActionListener.wrap( | ||
| authorizedIndices -> | ||
| resolvedIndicesListener.onResponse( | ||
| indicesAndAliasesResolver.resolve(action, request, metadata, authorizedIndices) | ||
| ), | ||
| e -> { | ||
| auditTrail.accessDenied(requestId, authentication, action, request, authzInfo); | ||
| if (e instanceof IndexNotFoundException) { | ||
| listener.onFailure(e); | ||
| } else { | ||
| listener.onFailure(denialException(authentication, action, request, e)); | ||
| authorizedIndices -> { | ||
| timeChecker.done(authorizedIndices); | ||
| resolvedIndicesListener.onResponse( | ||
| indicesAndAliasesResolver.resolve(action, request, metadata, authorizedIndices) | ||
| ); | ||
| }, | ||
| e -> { | ||
| auditTrail.accessDenied(requestId, authentication, action, request, authzInfo); | ||
| if (e instanceof IndexNotFoundException) { | ||
| listener.onFailure(e); | ||
| } else { | ||
| listener.onFailure(denialException(authentication, action, request, e)); | ||
| } | ||
| } | ||
| } | ||
| ) | ||
| ) | ||
| ); | ||
| )); | ||
| }); | ||
| authzEngine.authorizeIndexAction(requestInfo, authzInfo, resolvedIndicesAsyncSupplier, | ||
| metadata.getIndicesLookup(), wrapPreservingContext(new AuthorizationResultListener<>(result -> | ||
| handleIndexActionAuthorizationResult(result, requestInfo, requestId, authzInfo, authzEngine, authorizedIndicesSupplier, | ||
| handleIndexActionAuthorizationResult(result, requestInfo, requestId, authzInfo, authzEngine, | ||
| resolvedIndicesAsyncSupplier, metadata, listener), | ||
| listener::onFailure, requestInfo, requestId, authzInfo), threadContext)); | ||
| } else { | ||
|
|
@@ -343,7 +336,6 @@ private void authorizeAction(final RequestInfo requestInfo, final String request | |
| private void handleIndexActionAuthorizationResult(final IndexAuthorizationResult result, final RequestInfo requestInfo, | ||
| final String requestId, final AuthorizationInfo authzInfo, | ||
| final AuthorizationEngine authzEngine, | ||
| final AsyncSupplier<Set<String>> authorizedIndicesSupplier, | ||
| final AsyncSupplier<ResolvedIndices> resolvedIndicesAsyncSupplier, | ||
| final Metadata metadata, | ||
| final ActionListener<Void> listener) { | ||
|
|
@@ -383,7 +375,7 @@ private void handleIndexActionAuthorizationResult(final IndexAuthorizationResult | |
| // if this is performing multiple actions on the index, then check each of those actions. | ||
| assert request instanceof BulkShardRequest | ||
| : "Action " + action + " requires " + BulkShardRequest.class + " but was " + request.getClass(); | ||
| authorizeBulkItems(requestInfo, authzInfo, authzEngine, resolvedIndicesAsyncSupplier, authorizedIndicesSupplier, metadata, | ||
| authorizeBulkItems(requestInfo, authzInfo, authzEngine, resolvedIndicesAsyncSupplier, metadata, | ||
| requestId, | ||
| wrapPreservingContext( | ||
| ActionListener.wrap(ignore -> runRequestInterceptors(requestInfo, authzInfo, authorizationEngine, listener), | ||
|
|
@@ -509,7 +501,6 @@ private void authorizeRunAs(final RequestInfo requestInfo, final AuthorizationIn | |
| */ | ||
| private void authorizeBulkItems(RequestInfo requestInfo, AuthorizationInfo authzInfo, | ||
| AuthorizationEngine authzEngine, AsyncSupplier<ResolvedIndices> resolvedIndicesAsyncSupplier, | ||
| AsyncSupplier<Set<String>> authorizedIndicesSupplier, | ||
| Metadata metadata, String requestId, ActionListener<Void> listener) { | ||
| final Authentication authentication = requestInfo.getAuthentication(); | ||
| final BulkShardRequest request = (BulkShardRequest) requestInfo.getRequest(); | ||
|
|
@@ -519,44 +510,43 @@ private void authorizeBulkItems(RequestInfo requestInfo, AuthorizationInfo authz | |
| final Map<String, Set<String>> actionToIndicesMap = new HashMap<>(); | ||
| final AuditTrail auditTrail = auditTrailService.get(); | ||
|
|
||
| authorizedIndicesSupplier.getAsync(ActionListener.wrap(authorizedIndices -> { | ||
| resolvedIndicesAsyncSupplier.getAsync(ActionListener.wrap(overallResolvedIndices -> { | ||
| final Set<String> localIndices = new HashSet<>(overallResolvedIndices.getLocal()); | ||
| for (BulkItemRequest item : request.items()) { | ||
| final String itemAction = getAction(item); | ||
| String resolvedIndex = resolvedIndexNames.computeIfAbsent(item.index(), key -> { | ||
| final ResolvedIndices resolvedIndices = | ||
| indicesAndAliasesResolver.resolveIndicesAndAliases(itemAction, item.request(), metadata, authorizedIndices); | ||
| if (resolvedIndices.getRemote().size() != 0) { | ||
| throw illegalArgument("Bulk item should not write to remote indices, but request writes to " | ||
| resolvedIndicesAsyncSupplier.getAsync(ActionListener.wrap(overallResolvedIndices -> { | ||
| final Set<String> localIndices = new HashSet<>(overallResolvedIndices.getLocal()); | ||
| for (BulkItemRequest item : request.items()) { | ||
| final String itemAction = getAction(item); | ||
| String resolvedIndex = resolvedIndexNames.computeIfAbsent(item.index(), key -> { | ||
| final ResolvedIndices resolvedIndices = | ||
| indicesAndAliasesResolver.resolveIndicesAndAliases(itemAction, item.request(), metadata, localIndices); | ||
|
Comment on lines
+518
to
+519
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The change here relies on the fact that the last argument Also, the first sentence of the PR description
is not entirely accurate because there is no actual reuse, but rather not needed at all, i.e. the last argument can be
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yang, from this comment, I get the impression that you don't like the approach I took in this PR, but you have still approved. I think we have two options:
These are the two big options that I have weighted.
I think it is accurate because this is what the PR does, it reuses the said variable as an argument.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To clarify myself: I am happy with the overall goal of this PR, i.e. IIUC, removing the need of having a separate The concern that I have is only with what we should pass to
Based on the above, I think passing In longer term, we might still want to think about refactoring existing methods to help reduce the complexity and increases readibility. But I can understand that more refactoring is outside scope of this PR. |
||
| if (resolvedIndices.getRemote().size() != 0) { | ||
| throw illegalArgument("Bulk item should not write to remote indices, but request writes to " | ||
| + String.join(",", resolvedIndices.getRemote())); | ||
| } | ||
| if (resolvedIndices.getLocal().size() != 1) { | ||
| throw illegalArgument("Bulk item should write to exactly 1 index, but request writes to " | ||
| } | ||
| if (resolvedIndices.getLocal().size() != 1) { | ||
| throw illegalArgument("Bulk item should write to exactly 1 index, but request writes to " | ||
| + String.join(",", resolvedIndices.getLocal())); | ||
| } | ||
| final String resolved = resolvedIndices.getLocal().get(0); | ||
| if (localIndices.contains(resolved) == false) { | ||
| throw illegalArgument("Found bulk item that writes to index " + resolved + " but the request writes to " + | ||
| } | ||
| final String resolved = resolvedIndices.getLocal().get(0); | ||
| if (localIndices.contains(resolved) == false) { | ||
| throw illegalArgument("Found bulk item that writes to index [" + resolved + "] but the request writes to " + | ||
| localIndices); | ||
| } | ||
| return resolved; | ||
| }); | ||
|
|
||
| actionToIndicesMap.compute(itemAction, (key, resolvedIndicesSet) -> { | ||
| final Set<String> localSet = resolvedIndicesSet != null ? resolvedIndicesSet : new HashSet<>(); | ||
| localSet.add(resolvedIndex); | ||
| return localSet; | ||
| }); | ||
| } | ||
| } | ||
| return resolved; | ||
| }); | ||
|
|
||
| final ActionListener<Collection<Tuple<String, IndexAuthorizationResult>>> bulkAuthzListener = | ||
| actionToIndicesMap.compute(itemAction, (key, resolvedIndicesSet) -> { | ||
| final Set<String> localSet = resolvedIndicesSet != null ? resolvedIndicesSet : new HashSet<>(); | ||
| localSet.add(resolvedIndex); | ||
| return localSet; | ||
| }); | ||
| } | ||
|
|
||
| final ActionListener<Collection<Tuple<String, IndexAuthorizationResult>>> bulkAuthzListener = | ||
| ActionListener.wrap(collection -> { | ||
| final Map<String, IndicesAccessControl> actionToIndicesAccessControl = new HashMap<>(); | ||
| final AtomicBoolean audit = new AtomicBoolean(false); | ||
| collection.forEach(tuple -> { | ||
| final IndicesAccessControl existing = | ||
| actionToIndicesAccessControl.putIfAbsent(tuple.v1(), tuple.v2().getIndicesAccessControl()); | ||
| actionToIndicesAccessControl.putIfAbsent(tuple.v1(), tuple.v2().getIndicesAccessControl()); | ||
| if (existing != null) { | ||
| throw new IllegalStateException("a value already exists for action " + tuple.v1()); | ||
| } | ||
|
|
@@ -570,32 +560,31 @@ private void authorizeBulkItems(RequestInfo requestInfo, AuthorizationInfo authz | |
| final String itemAction = getAction(item); | ||
| final IndicesAccessControl indicesAccessControl = actionToIndicesAccessControl.get(itemAction); | ||
| final IndicesAccessControl.IndexAccessControl indexAccessControl | ||
| = indicesAccessControl.getIndexPermissions(resolvedIndex); | ||
| = indicesAccessControl.getIndexPermissions(resolvedIndex); | ||
| if (indexAccessControl == null || indexAccessControl.isGranted() == false) { | ||
| auditTrail.explicitIndexAccessEvent(requestId, AuditLevel.ACCESS_DENIED, authentication, itemAction, | ||
| resolvedIndex, item.getClass().getSimpleName(), request.remoteAddress(), authzInfo); | ||
| item.abort(resolvedIndex, denialException(authentication, itemAction, request, | ||
| AuthorizationEngine.IndexAuthorizationResult.getFailureDescription(List.of(resolvedIndex)), null)); | ||
| AuthorizationEngine.IndexAuthorizationResult.getFailureDescription(List.of(resolvedIndex)), null)); | ||
| } else if (audit.get()) { | ||
| auditTrail.explicitIndexAccessEvent(requestId, AuditLevel.ACCESS_GRANTED, authentication, itemAction, | ||
| resolvedIndex, item.getClass().getSimpleName(), request.remoteAddress(), authzInfo); | ||
| } | ||
| } | ||
| listener.onResponse(null); | ||
| }, listener::onFailure); | ||
| final ActionListener<Tuple<String, IndexAuthorizationResult>> groupedActionListener = wrapPreservingContext( | ||
| final ActionListener<Tuple<String, IndexAuthorizationResult>> groupedActionListener = wrapPreservingContext( | ||
| new GroupedActionListener<>(bulkAuthzListener, actionToIndicesMap.size()), threadContext); | ||
|
|
||
| actionToIndicesMap.forEach((bulkItemAction, indices) -> { | ||
| final RequestInfo bulkItemInfo = | ||
| actionToIndicesMap.forEach((bulkItemAction, indices) -> { | ||
| final RequestInfo bulkItemInfo = | ||
| new RequestInfo(requestInfo.getAuthentication(), requestInfo.getRequest(), bulkItemAction); | ||
| authzEngine.authorizeIndexAction(bulkItemInfo, authzInfo, | ||
| authzEngine.authorizeIndexAction(bulkItemInfo, authzInfo, | ||
| ril -> ril.onResponse(new ResolvedIndices(new ArrayList<>(indices), Collections.emptyList())), | ||
| metadata.getIndicesLookup(), ActionListener.wrap(indexAuthorizationResult -> | ||
| groupedActionListener.onResponse(new Tuple<>(bulkItemAction, indexAuthorizationResult)), | ||
| groupedActionListener::onFailure)); | ||
| }); | ||
| }, listener::onFailure)); | ||
| groupedActionListener.onResponse(new Tuple<>(bulkItemAction, indexAuthorizationResult)), | ||
| groupedActionListener::onFailure)); | ||
| }); | ||
| }, listener::onFailure)); | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
combine the two async suppliers into one.