From 76464567531d5e08fc5fdf5bb0b96b8b303d70fb Mon Sep 17 00:00:00 2001 From: Joshua Bigler Date: Sun, 12 Jun 2022 12:29:26 -0700 Subject: [PATCH 1/4] initial exec source commit --- README.md | 5 + src/main/resources/application.conf | 11 ++ .../ksm/source/ExecSourceAcl.scala.scala | 100 ++++++++++++++++++ .../ksm/source/ExecSourceAclTest.scala | 76 +++++++++++++ 4 files changed, 192 insertions(+) create mode 100644 src/main/scala/io/conduktor/ksm/source/ExecSourceAcl.scala.scala create mode 100644 src/test/scala/io/conduktor/ksm/source/ExecSourceAclTest.scala diff --git a/README.md b/README.md index b91f054..9385595 100644 --- a/README.md +++ b/README.md @@ -199,6 +199,11 @@ The [default configurations](src/main/resources/application.conf) can be overwri - `SOURCE_HTTP_AUTH_GOOGLEIAM_SERVICE_ACCOUNT` Google Service Account name. - `SOURCE_HTTP_AUTH_GOOGLEIAM_SERVICE_ACCOUNT_KEY` Google Service Account Key in JSON string encoded. If not the key isn't configured, it'll try to get the token from environment. - `SOURCE_HTTP_AUTH_GOOGLEIAM_TARGET_AUDIENCE` Google Target Audience for token authentication. + - `io.conduktor.ksm.source.ExecSourceAcl`: Get the ACL from the stdout of some executable. Allows the user to write their own executable (written in any language of their choosing) that generates the yaml or csv output defining the ACL. + - `EXEC_SOURCE_CMD`: Full path to the executable + - `EXEC_SOURCE_ARGS`: Arguments passed to the executable, they will be split by the below seperator value. Defaults to '' + - `EXEC_SOURCE_ARGS_SEP`: String seperator to split the argument value. Defaults to ','. For example, setting the args to 'a,b,c,d' and the seperator to ',' will pass in the args [a, b, c, d] to the executable + - `EXEC_SOURCE_PARSER`: 'yaml' or 'csv', defaults to 'yaml' - `NOTIFICATION_CLASS`: Class for notification in case of ACL changes in Kafka. - `io.conduktor.ksm.notification.ConsoleNotification` (default): Print changes to the console. Useful for logging diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 46947cb..08dc61f 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -118,6 +118,17 @@ source { password = ${?SOURCE_BITBUCKET_SERVER_AUTH_PASSWORD} } } + exec { + //command needs to be a full path + command = "/bin/false" + command = ${?SOURCE_EXEC_CMD} + args = "" + args = ${?SOURCE_EXEC_ARGS} + sep = "," + sep = ${?SOURCE_EXEC_ARGS_SEP} + parser = "yaml" + parser = ${?SOURCE_EXEC_PARSER} + } bitbucket-cloud { api { url = "https://api.bitbucket.org/2.0" diff --git a/src/main/scala/io/conduktor/ksm/source/ExecSourceAcl.scala.scala b/src/main/scala/io/conduktor/ksm/source/ExecSourceAcl.scala.scala new file mode 100644 index 0000000..c481cc7 --- /dev/null +++ b/src/main/scala/io/conduktor/ksm/source/ExecSourceAcl.scala.scala @@ -0,0 +1,100 @@ +package io.conduktor.ksm.source + +import com.typesafe.config.Config +import io.conduktor.ksm.parser.AclParserRegistry +import io.conduktor.ksm.source +import org.slf4j.LoggerFactory +import java.util.regex.Pattern + +import sys.process._ +import java.io._ + +class ExecSourceAcl(parserRegistry: AclParserRegistry) extends SourceAcl(parserRegistry) { + + private val log = LoggerFactory.getLogger(classOf[ExecSourceAcl]) + + override val CONFIG_PREFIX: String = "exec" + final val COMMAND_CONFIG: String = "command" + final val COMMAND_ARGS_CONFIG: String = "args" + final val COMMAND_ARGS_SEP_CONFIG: String = "sep" + final val PARSER = "parser" + + var command: List[String] = _ + var parser: String = "csv" + + /** + * internal config definition for the module + */ + override def configure(config: Config): Unit = { + //The command option should be required + val cmd = config.getString(COMMAND_CONFIG) + + //For args we could use some defaults + val args: String = if (!config.hasPath(COMMAND_ARGS_CONFIG)) "" else config.getString(COMMAND_ARGS_CONFIG) + val sep: String = if (!config.hasPath(COMMAND_ARGS_SEP_CONFIG)) "," else config.getString(COMMAND_ARGS_SEP_CONFIG) + + val parserType = if (!config.hasPath(PARSER)) "yaml" else config.getString(PARSER) + configure(cmd, args, sep, parserType) + + } + + def configure(command: String, args: String, sep: String, parser: String): Unit = { + + this.command = List.concat( + List(command), + args.split(Pattern.quote(sep)) + ) + log.info("command: {}", this.command) + + this.parser = parser + log.info("PARSER: {}", this.parser) + } + + def configure(command: String, args: String, sep: String): Unit = { + + configure(command, args, sep, this.parser) + + } + + override def refresh(): Option[ParsingContext] = { + val (return_code, stdout, stderr) = exec(command) + + // If return_code is 0, the command was a success, parse out the stdout + return_code match { + case 0 => + Some( + ParsingContext( + parserRegistry.getParser(this.parser), + new StringReader(stdout) + ) + ) + // Otherwise, assume something went wrong + case _ => { + log.error("Error executing the process, got return code {}", return_code) + log.debug("Stdout: {}", stdout) + log.error("Stderr: {}", stderr) + None + } + } + } + + /** + * Close all the necessary underlying objects or connections belonging to this instance + */ + override def close(): Unit = { + // Do nothing + } + + //Function here is taken from a StackOverflow answer I found + private def exec(cmd: Seq[String]): (Int, String, String) = { + val stdoutStream = new ByteArrayOutputStream + val stderrStream = new ByteArrayOutputStream + val stdoutWriter = new PrintWriter(stdoutStream) + val stderrWriter = new PrintWriter(stderrStream) + val exitValue = cmd.!(ProcessLogger(stdoutWriter.println, stderrWriter.println)) + stdoutWriter.close() + stderrWriter.close() + (exitValue, stdoutStream.toString, stderrStream.toString) + } + +} diff --git a/src/test/scala/io/conduktor/ksm/source/ExecSourceAclTest.scala b/src/test/scala/io/conduktor/ksm/source/ExecSourceAclTest.scala new file mode 100644 index 0000000..2c2dbd5 --- /dev/null +++ b/src/test/scala/io/conduktor/ksm/source/ExecSourceAclTest.scala @@ -0,0 +1,76 @@ +package io.conduktor.ksm.source + +import io.conduktor.ksm.parser.AclParserRegistry +import io.conduktor.ksm.parser.yaml.YamlAclParser +import io.conduktor.ksm.parser.csv.CsvAclParser +import io.conduktor.ksm.parser.{AclParser, AclParserRegistry} + +import java.io.BufferedReader +import java.io.{File, Reader} +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Paths} +import kafka.security.auth._ +import org.apache.kafka.common.resource.PatternType +import org.apache.kafka.common.utils.SecurityUtils +import org.scalamock.scalatest.MockFactory +import org.scalatest.{FlatSpec, Matchers} + +class ExecSourceAclTest extends FlatSpec with Matchers with MockFactory { + + val yamlAclParser = new YamlAclParser() + val aclParserRegistryMock: AclParserRegistry = stub[AclParserRegistry] + (aclParserRegistryMock.getParserByFilename _).when(*).returns(yamlAclParser) + + "Test 1" should "successfully parse exec output" in { + + val yamlContent = + """ + |users: + | alice: + | topics: + | foo: + | - Read + | bar*: + | - Produce + | bob: + | groups: + | bar: + | - Write,Deny,12.34.56.78 + | bob*: + | - All + | transactional_ids: + | bar-*: + | - All + | peter: + | clusters: + | kafka-cluster: + | - Create""".stripMargin + + val file = File.createTempFile("ksm", "test") + val filepath = file.getAbsolutePath + println(filepath) + Files.write( + Paths.get(file.toURI), + yamlContent.getBytes(StandardCharsets.UTF_8) + ) + + val execSourceAcl = new ExecSourceAcl(aclParserRegistryMock) + execSourceAcl.configure("/bin/cat", filepath, "|", "yaml") + + val parsingContext = execSourceAcl.refresh().get + + yamlAclParser.aclsFromReader(parsingContext.reader).result.isRight shouldBe true + } + + "Test 2" should "retun None on non-zero exit status" in { + + val execSourceAcl = new ExecSourceAcl(aclParserRegistryMock) + execSourceAcl.configure("/bin/false", "", "|") + + execSourceAcl.refresh() shouldBe None + + } + + + +} From 2e5a63b5a8e271df6e01401ed6723168160975ab Mon Sep 17 00:00:00 2001 From: Joshua Bigler Date: Sun, 12 Jun 2022 13:38:08 -0700 Subject: [PATCH 2/4] Typos in readme --- README.md | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 9385595..3e26f82 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,7 @@ Current sources shipping with KSM include: - GitLab (using Personal Auth Tokens) - BitBucket - Amazon S3 +- Local Executable - Build your own (and contribute back!) # Building @@ -200,10 +201,10 @@ The [default configurations](src/main/resources/application.conf) can be overwri - `SOURCE_HTTP_AUTH_GOOGLEIAM_SERVICE_ACCOUNT_KEY` Google Service Account Key in JSON string encoded. If not the key isn't configured, it'll try to get the token from environment. - `SOURCE_HTTP_AUTH_GOOGLEIAM_TARGET_AUDIENCE` Google Target Audience for token authentication. - `io.conduktor.ksm.source.ExecSourceAcl`: Get the ACL from the stdout of some executable. Allows the user to write their own executable (written in any language of their choosing) that generates the yaml or csv output defining the ACL. - - `EXEC_SOURCE_CMD`: Full path to the executable - - `EXEC_SOURCE_ARGS`: Arguments passed to the executable, they will be split by the below seperator value. Defaults to '' - - `EXEC_SOURCE_ARGS_SEP`: String seperator to split the argument value. Defaults to ','. For example, setting the args to 'a,b,c,d' and the seperator to ',' will pass in the args [a, b, c, d] to the executable - - `EXEC_SOURCE_PARSER`: 'yaml' or 'csv', defaults to 'yaml' + - `SOURCE_EXEC_CMD`: Full path to the executable + - `SOURCE_EXEC_ARGS`: Arguments passed to the executable, they will be split by the below seperator value. Defaults to '' + - `SOURCE_EXEC_ARGS_SEP`: String seperator to split the argument value. Defaults to ','. For example, setting the args to 'a,b,c,d' and the seperator to ',' will pass in the args [a, b, c, d] to the executable + - `SOURCE_EXEC_PARSER`: 'yaml' or 'csv', defaults to 'yaml' - `NOTIFICATION_CLASS`: Class for notification in case of ACL changes in Kafka. - `io.conduktor.ksm.notification.ConsoleNotification` (default): Print changes to the console. Useful for logging From 5085f54c8c5d70998fc33797a1c6a60b4f28aa69 Mon Sep 17 00:00:00 2001 From: Joshua Bigler Date: Sun, 12 Jun 2022 13:45:14 -0700 Subject: [PATCH 3/4] Removing some unused imports --- .../scala/io/conduktor/ksm/source/ExecSourceAclTest.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/test/scala/io/conduktor/ksm/source/ExecSourceAclTest.scala b/src/test/scala/io/conduktor/ksm/source/ExecSourceAclTest.scala index 2c2dbd5..008a31d 100644 --- a/src/test/scala/io/conduktor/ksm/source/ExecSourceAclTest.scala +++ b/src/test/scala/io/conduktor/ksm/source/ExecSourceAclTest.scala @@ -2,16 +2,12 @@ package io.conduktor.ksm.source import io.conduktor.ksm.parser.AclParserRegistry import io.conduktor.ksm.parser.yaml.YamlAclParser -import io.conduktor.ksm.parser.csv.CsvAclParser -import io.conduktor.ksm.parser.{AclParser, AclParserRegistry} +import io.conduktor.ksm.parser.{AclParserRegistry} -import java.io.BufferedReader -import java.io.{File, Reader} +import java.io.{File} import java.nio.charset.StandardCharsets import java.nio.file.{Files, Paths} import kafka.security.auth._ -import org.apache.kafka.common.resource.PatternType -import org.apache.kafka.common.utils.SecurityUtils import org.scalamock.scalatest.MockFactory import org.scalatest.{FlatSpec, Matchers} From e5cf454e26463a6c6a2d5e3652a9b7eccfc8a0e7 Mon Sep 17 00:00:00 2001 From: Joshua Bigler Date: Sun, 12 Jun 2022 14:16:21 -0700 Subject: [PATCH 4/4] Spelling errors --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 3e26f82..4d3ec40 100644 --- a/README.md +++ b/README.md @@ -202,8 +202,8 @@ The [default configurations](src/main/resources/application.conf) can be overwri - `SOURCE_HTTP_AUTH_GOOGLEIAM_TARGET_AUDIENCE` Google Target Audience for token authentication. - `io.conduktor.ksm.source.ExecSourceAcl`: Get the ACL from the stdout of some executable. Allows the user to write their own executable (written in any language of their choosing) that generates the yaml or csv output defining the ACL. - `SOURCE_EXEC_CMD`: Full path to the executable - - `SOURCE_EXEC_ARGS`: Arguments passed to the executable, they will be split by the below seperator value. Defaults to '' - - `SOURCE_EXEC_ARGS_SEP`: String seperator to split the argument value. Defaults to ','. For example, setting the args to 'a,b,c,d' and the seperator to ',' will pass in the args [a, b, c, d] to the executable + - `SOURCE_EXEC_ARGS`: Arguments passed to the executable, they will be split by the below separator value. Defaults to '' + - `SOURCE_EXEC_ARGS_SEP`: String separator to split the argument value. Defaults to ','. For example, setting the args to 'a,b,c,d' and the separator to ',' will pass in the args [a, b, c, d] to the executable - `SOURCE_EXEC_PARSER`: 'yaml' or 'csv', defaults to 'yaml' - `NOTIFICATION_CLASS`: Class for notification in case of ACL changes in Kafka.