@@ -11,7 +11,6 @@ import java.util.concurrent.{Callable, ExecutionException, TimeUnit}
11
11
import com .fasterxml .jackson .databind .ObjectMapper
12
12
import com .fasterxml .jackson .module .scala .{DefaultScalaModule , ScalaObjectMapper }
13
13
import com .google .common .annotations .VisibleForTesting
14
- import com .google .common .base .Throwables
15
14
import com .google .common .cache .{Cache , CacheBuilder }
16
15
import com .typesafe .scalalogging .LazyLogging
17
16
import kafka .network .RequestChannel
@@ -26,6 +25,7 @@ class OpaAuthorizer extends Authorizer with LazyLogging {
26
25
private var config : Map [String , String ] = Map .empty
27
26
private lazy val opaUrl = new URL (config(" opa.authorizer.url" )).toURI
28
27
private lazy val allowOnError = config.getOrElse(" opa.authorizer.allow.on.error" , " false" ).toBoolean
28
+ private lazy val superUsers = config.getOrElse(" super.users" , " " ).split(" ;" ).toList
29
29
30
30
private lazy val cache = CacheBuilder .newBuilder
31
31
.initialCapacity(config.getOrElse(" opa.authorizer.cache.initial.capacity" , " 5000" ).toInt)
@@ -35,6 +35,11 @@ class OpaAuthorizer extends Authorizer with LazyLogging {
35
35
.asInstanceOf [Cache [Request , Boolean ]]
36
36
37
37
override def authorize (session : RequestChannel .Session , operation : Operation , resource : Resource ): Boolean = {
38
+ if (superUsers.contains(session.principal.toString)) {
39
+ logger.trace(s " User ${session.principal} is super user " )
40
+ return true
41
+ }
42
+
38
43
val request = Request (Input (session, operation, resource))
39
44
try cache.get(request, new AllowCallable (request, opaUrl, allowOnError))
40
45
catch {
0 commit comments