Skip to content

Commit 794444c

Browse files
authored
Feat/optimised file writing (#15)
* Vault token renewal Adds a new parameter for the Vault docs to drive the auth token renewal. * Add file headers * Complete the Vault ticket renewal. * Avoid file to be created over and over again For the AES decoder there was a strategy to create a file (using random UUID) every time the "key" was returned. The new code is changing the approach to make sure we only create the file once. This means though for the AES encoding the encoding format should contain an id for a file:${encoding}_${id}. The id has to be unique as well. Small code improvement around decoding * Fix compilation
1 parent ba8f797 commit 794444c

12 files changed

+345
-133
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
*
3+
* * Copyright 2017-2020 Lenses.io Ltd
4+
*
5+
*/
6+
package io.lenses.connect.secrets.io
7+
8+
import java.io.BufferedOutputStream
9+
import java.io.FileOutputStream
10+
import java.nio.file.Path
11+
import java.nio.file.Paths
12+
13+
import com.typesafe.scalalogging.StrictLogging
14+
import io.lenses.connect.secrets.utils.WithRetry
15+
16+
import scala.concurrent.duration._
17+
import scala.util.Try
18+
19+
trait FileWriter {
20+
def write(fileName: String, content: Array[Byte], key: String): Path
21+
}
22+
23+
class FileWriterOnce(rootPath: Path)
24+
extends FileWriter
25+
with WithRetry
26+
with StrictLogging {
27+
rootPath.toFile.mkdirs()
28+
def write(fileName: String, content: Array[Byte], key: String): Path = {
29+
val fullPath = Paths.get(rootPath.toString, fileName)
30+
val file = fullPath.toFile
31+
if (file.exists()) fullPath
32+
else {
33+
val tempFile = Paths.get(rootPath.toString, fileName + ".bak").toFile
34+
withRetry(10, Some(500.milliseconds)) {
35+
if (tempFile.exists()) tempFile.delete()
36+
tempFile.createNewFile()
37+
val fos = new BufferedOutputStream(new FileOutputStream(tempFile))
38+
try {
39+
fos.write(content)
40+
fos.flush()
41+
logger.info(
42+
s"Payload written to [${file.getAbsolutePath}] for key [$key]"
43+
)
44+
} finally {
45+
Try(fos.close())
46+
}
47+
Try(tempFile.renameTo(file))
48+
}
49+
fullPath
50+
}
51+
}
52+
}

src/main/scala/io/lenses/connect/secrets/package.scala

+28-33
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,26 @@
66

77
package io.lenses.connect.secrets
88

9-
import java.io.{File, FileOutputStream}
9+
import java.io.File
10+
import java.io.FileOutputStream
1011
import java.time.OffsetDateTime
1112
import java.util.Base64
1213

1314
import com.typesafe.scalalogging.StrictLogging
1415
import org.apache.kafka.common.config.ConfigData
1516
import org.apache.kafka.connect.errors.ConnectException
1617

17-
import scala.util.{Failure, Success, Try}
1818
import scala.collection.JavaConverters._
1919
import scala.collection.mutable
20+
import scala.util.Failure
21+
import scala.util.Success
22+
import scala.util.Try
2023

2124
package object connect extends StrictLogging {
2225

23-
val FILE_ENCODING = "file-encoding"
24-
val FILE_DIR = "file.dir"
25-
val FILE_DIR_DESC =
26+
val FILE_ENCODING: String = "file-encoding"
27+
val FILE_DIR: String = "file.dir"
28+
val FILE_DIR_DESC: String =
2629
"""
2730
| Location to write any files for any secrets that need to
2831
| be written to disk. For example java keystores.
@@ -77,32 +80,19 @@ package object connect extends StrictLogging {
7780
}
7881

7982
// decode a key bases on the prefix encoding
80-
def decodeKey(key: String, value: String, fileName: String): String = {
81-
82-
key.toLowerCase match {
83-
case k
84-
if k.startsWith(
85-
s"${Encoding.BASE64_FILE.toString.toLowerCase}_"
86-
) =>
83+
def decodeKey(
84+
encoding: Option[Encoding.Value],
85+
key: String,
86+
value: String,
87+
writeFileFn: Array[Byte] => String
88+
): String = {
89+
encoding.fold(value) {
90+
case Encoding.BASE64 => decode(key, value)
91+
case Encoding.BASE64_FILE =>
8792
val decoded = decodeToBytes(key, value)
88-
fileWriter(fileName, decoded, key)
89-
fileName
90-
91-
case k
92-
if k.startsWith(
93-
s"${Encoding.BASE64.toString.toLowerCase}_"
94-
) =>
95-
decode(key, value)
96-
97-
case k
98-
if k.startsWith(
99-
s"${Encoding.UTF8_FILE.toString.toLowerCase}_"
100-
) =>
101-
fileWriter(fileName, value.getBytes(), key)
102-
fileName
103-
104-
case _ =>
105-
value
93+
writeFileFn(decoded)
94+
case Encoding.UTF8 => value
95+
case Encoding.UTF8_FILE => writeFileFn(value.getBytes())
10696
}
10797
}
10898

@@ -154,8 +144,8 @@ package object connect extends StrictLogging {
154144

155145
//calculate the min expiry for secrets and return the configData and expiry
156146
def getSecretsAndExpiry(
157-
secrets: Map[String, (String, Option[OffsetDateTime])])
158-
: (Option[OffsetDateTime], ConfigData) = {
147+
secrets: Map[String, (String, Option[OffsetDateTime])]
148+
): (Option[OffsetDateTime], ConfigData) = {
159149
var expiryList = mutable.ListBuffer.empty[OffsetDateTime]
160150

161151
val data = secrets
@@ -176,6 +166,11 @@ package object connect extends StrictLogging {
176166
}
177167
}
178168

179-
def getFileName(rootDir: String, path: String, key: String, separator: String): String =
169+
def getFileName(
170+
rootDir: String,
171+
path: String,
172+
key: String,
173+
separator: String
174+
): String =
180175
s"${rootDir.stripSuffix(separator)}$separator$path$separator${key.toLowerCase}"
181176
}

src/main/scala/io/lenses/connect/secrets/providers/AWSHelper.scala

+11-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package io.lenses.connect.secrets.providers
88

99
import java.nio.file.FileSystems
10+
import java.nio.file.Paths
1011
import java.time.OffsetDateTime
1112
import java.util.Calendar
1213

@@ -16,7 +17,10 @@ import com.amazonaws.services.secretsmanager.{AWSSecretsManager, AWSSecretsManag
1617
import com.fasterxml.jackson.databind.ObjectMapper
1718
import com.typesafe.scalalogging.StrictLogging
1819
import io.lenses.connect.secrets.config.AWSProviderSettings
19-
import io.lenses.connect.secrets.connect.{AuthMode, decodeKey, getFileName}
20+
import io.lenses.connect.secrets.connect.{decodeKey, getFileName, AuthMode}
21+
import io.lenses.connect.secrets.io.FileWriter
22+
import io.lenses.connect.secrets.io.FileWriterOnce
23+
import io.lenses.connect.secrets.utils.EncodingAndId
2024
import org.apache.kafka.connect.errors.ConnectException
2125

2226
import scala.collection.JavaConverters._
@@ -108,12 +112,17 @@ trait AWSHelper extends StrictLogging {
108112
)
109113
)
110114

115+
val fileWriter:FileWriter = new FileWriterOnce(Paths.get(rootDir, secretId))
111116
// decode the value
117+
val encodingAndId = EncodingAndId.from(key)
112118
(
113119
decodeKey(
114120
key = key,
115121
value = value,
116-
fileName = getFileName(rootDir, secretId, key.toLowerCase, separator)
122+
encoding = encodingAndId.encoding,
123+
writeFileFn = content=>{
124+
fileWriter.write(key.toLowerCase, content, key).toString
125+
}
117126
),
118127
getTTL(client, secretId)
119128
)

src/main/scala/io/lenses/connect/secrets/providers/Aes256DecodingHelper.scala

+20-19
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
11
package io.lenses.connect.secrets.providers
22

3+
import java.security.SecureRandom
4+
import java.util.Base64
5+
36
import javax.crypto.Cipher
4-
import javax.crypto.SecretKey
57
import javax.crypto.spec.IvParameterSpec
68
import javax.crypto.spec.SecretKeySpec
79

8-
import scala.util.Try
9-
10-
import java.security.SecureRandom
11-
import java.util.Base64
1210
import scala.util.Failure
13-
import scala.util.Success
11+
import scala.util.Try
1412

1513
private[providers] object Aes256DecodingHelper {
1614

@@ -35,13 +33,14 @@ private[providers] object Aes256DecodingHelper {
3533
}
3634
}
3735

38-
private[providers] class Aes256DecodingHelper private (
39-
key: String,
40-
ivSeparator: String
41-
) {
42-
import B64._
36+
private[providers] class Aes256DecodingHelper private(
37+
key: String,
38+
ivSeparator: String
39+
) {
40+
4341
import Aes256DecodingHelper.CHARSET
44-
import InitializationVector._
42+
import B64._
43+
4544
private val secret = new SecretKeySpec(key.getBytes(CHARSET), "AES")
4645

4746
def decrypt(s: String): Try[String] =
@@ -52,9 +51,9 @@ private[providers] class Aes256DecodingHelper private (
5251
} yield new String(decrypted, CHARSET)
5352

5453
private def decryptBytes(
55-
iv: InitializationVector,
56-
bytes: Array[Byte]
57-
): Try[Array[Byte]] =
54+
iv: InitializationVector,
55+
bytes: Array[Byte]
56+
): Try[Array[Byte]] =
5857
for {
5958
cipher <- getCipher(Cipher.DECRYPT_MODE, iv)
6059
encrypted <- Try(cipher.doFinal(bytes))
@@ -69,10 +68,12 @@ private[providers] class Aes256DecodingHelper private (
6968
}
7069
}
7170

72-
private case class InitializationVector private (bytes: Array[Byte])
71+
private case class InitializationVector private(bytes: Array[Byte])
7372

7473
private object InitializationVector {
74+
7575
import B64._
76+
7677
private val random = new SecureRandom()
7778
private val length = 16
7879

@@ -84,9 +85,9 @@ private object InitializationVector {
8485
}
8586

8687
def extractInitialisationVector(
87-
s: String,
88-
ivSeparator: String
89-
): Try[(InitializationVector, String)] =
88+
s: String,
89+
ivSeparator: String
90+
): Try[(InitializationVector, String)] =
9091
s.indexOf(ivSeparator) match {
9192
case -1 =>
9293
Failure(
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,64 @@
11
package io.lenses.connect.secrets.providers
22

3+
import java.nio.file.Paths
4+
import java.util
5+
6+
import io.lenses.connect.secrets.config.Aes256ProviderConfig
7+
import io.lenses.connect.secrets.connect.decodeKey
8+
import io.lenses.connect.secrets.connect.Encoding
9+
import io.lenses.connect.secrets.io.FileWriter
10+
import io.lenses.connect.secrets.io.FileWriterOnce
11+
import io.lenses.connect.secrets.utils.EncodingAndId
312
import org.apache.kafka.common.config.ConfigData
413
import org.apache.kafka.common.config.provider.ConfigProvider
5-
import org.apache.kafka.connect.errors.ConnectException
614
import org.apache.kafka.common.config.ConfigException
7-
import java.util
8-
import javax.crypto.SecretKey
9-
import javax.crypto.SecretKeyFactory
10-
import javax.crypto.spec.PBEKeySpec
11-
import javax.crypto.spec.SecretKeySpec
12-
import io.lenses.connect.secrets.config.Aes256ProviderConfig
13-
import io.lenses.connect.secrets.connect.{decodeKey, getFileName}
15+
import org.apache.kafka.connect.errors.ConnectException
16+
1417
import scala.collection.JavaConverters._
15-
import scala.util.Try
16-
import io.lenses.connect.secrets.connect.Encoding
17-
import java.nio.file.FileSystems
18-
import java.util.UUID.randomUUID
18+
1919

2020
class Aes256DecodingProvider extends ConfigProvider {
2121

2222
var decoder: Option[Aes256DecodingHelper] = None
2323
var writeDir: String = ""
24-
24+
25+
private var fileWriter:FileWriter = _
2526
override def configure(configs: util.Map[String, _]): Unit = {
2627
val aes256Cfg = Aes256ProviderConfig(configs)
2728
val aes256Key = aes256Cfg.aes256Key
2829
decoder = Option(aes256Key)
2930
.map(Aes256DecodingHelper.init)
3031
.map(_.fold(e => throw new ConfigException(e), identity))
3132
writeDir = aes256Cfg.writeDirectory
33+
fileWriter = new FileWriterOnce(Paths.get(writeDir, "secrets"))
3234
}
3335

3436
override def get(path: String): ConfigData = new ConfigData(Map.empty[String, String].asJava)
35-
36-
override def get(path: String, keys: util.Set[String]): ConfigData =
37+
38+
override def get(path: String, keys: util.Set[String]): ConfigData = {
39+
val encodingAndId = EncodingAndId.from(path)
3740
decoder match {
3841
case Some(d) =>
3942
def decrypt(key: String): String = {
40-
val decrypted = d.decrypt(key).fold(e => throw new ConnectException(e.getMessage(), e), identity)
41-
val keyPrefixedWithEncoding = if (path.nonEmpty) s"${path.toLowerCase}_$key" else key
43+
val decrypted = d.decrypt(key).fold(e => throw new ConnectException("Failed to decrypt the secret.", e), identity)
4244
decodeKey(
43-
key = keyPrefixedWithEncoding,
45+
key = key,
4446
value = decrypted,
45-
fileName = getFileName(writeDir, "secrets", randomUUID().toString, FileSystems.getDefault.getSeparator)
47+
encoding = encodingAndId.encoding,
48+
writeFileFn = { content =>
49+
encodingAndId.id match {
50+
case Some(value) => fileWriter.write(value, content, key).toString
51+
case None => throw new ConnectException(s"Invalid argument received for key:$key. Expecting a file identifier.")
52+
}
53+
}
4654
)
4755
}
48-
56+
4957
new ConfigData(keys.asScala.map(k => k -> decrypt(k)).toMap.asJava)
5058
case None =>
5159
throw new ConnectException("decoder is not configured.")
5260
}
61+
}
5362

5463
override def close(): Unit = {}
5564
}

0 commit comments

Comments
 (0)