Skip to content

Commit

Permalink
Improve list and add commands
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk committed Mar 9, 2020
1 parent 09534b1 commit 0db84a1
Show file tree
Hide file tree
Showing 10 changed files with 69 additions and 28 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.10.0] - 2020-03-08
### Added
- `list` command now shows data size, number of records, and last modified time
- `add` command now accounts for dataset dependency order

## [0.9.0] - 2020-03-08
### Changed
- Using new metadata chain prototype!
Expand Down
2 changes: 1 addition & 1 deletion core.transform.streaming
7 changes: 3 additions & 4 deletions src/main/scala/dev/kamu/cli/Kamu.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,16 @@ import java.io.PrintStream
import dev.kamu.cli.commands._
import dev.kamu.cli.external._
import dev.kamu.cli.output._
import dev.kamu.core.utils.AutoClock
import dev.kamu.core.utils.{AutoClock, Clock}
import dev.kamu.core.utils.fs._
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.log4j.Level

class Kamu(
config: KamuConfig,
fileSystem: FileSystem
fileSystem: FileSystem,
systemClock: Clock
) {
val systemClock = new AutoClock()

val workspaceLayout = WorkspaceLayout(
kamuRootDir = config.kamuRoot,
metadataDir = config.kamuRoot.resolve("datasets"),
Expand Down
5 changes: 4 additions & 1 deletion src/main/scala/dev/kamu/cli/KamuApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package dev.kamu.cli

import dev.kamu.core.utils.AutoClock
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.log4j.{Level, LogManager}
Expand All @@ -18,6 +19,8 @@ class UsageException(message: String = "", cause: Throwable = None.orNull)
object KamuApp extends App {
val logger = LogManager.getLogger(getClass.getName)

val systemClock = new AutoClock()

val fileSystem = FileSystem.get(new Configuration())
FileSystem.enableSymlinks()
fileSystem.setWriteChecksum(false)
Expand All @@ -36,7 +39,7 @@ object KamuApp extends App {
.getLogger(getClass.getPackage.getName)
.setLevel(if (cliArgs.debug()) Level.ALL else cliArgs.logLevel())

new Kamu(config, fileSystem)
new Kamu(config, fileSystem, systemClock)
.run(cliArgs)
} catch {
case e: UsageException =>
Expand Down
25 changes: 15 additions & 10 deletions src/main/scala/dev/kamu/cli/commands/ListCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,24 @@ class ListCommand(
val rs = new SimpleResultSet()
rs.addColumn("ID")
rs.addColumn("Kind")
rs.addColumn("Records")
rs.addColumn("Size")
rs.addColumn("LastModified")

metadataRepository
.getAllDatasets()
.map(
id => (id, metadataRepository.getDatasetKind(id).toString)
)
.sortBy {
case (id, _) => id.toString
}
.foreach {
case (id, kind) =>
rs.addRow(id, kind)
}
.sortBy(_.toString)
.foreach(id => {
val kind = metadataRepository.getDatasetKind(id).toString
val summary = metadataRepository.getDatasetSummary(id)
rs.addRow(
id,
kind,
summary.numRecords,
summary.dataSize,
summary.lastModified
)
})

outputFormatter.format(rs)
}
Expand Down
36 changes: 30 additions & 6 deletions src/test/scala/dev/kamu/cli/KamuListSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@ class KamuListSpec extends FlatSpec with Matchers with KamuTestBase {
withEmptyWorkspace { kamu =>
val rs = kamu.runEx("list").resultSet.get

rs.columns shouldEqual Vector("ID", "Kind")
rs.columns shouldEqual Vector(
"ID",
"Kind",
"Records",
"Size",
"LastModified"
)
rs.rows shouldEqual Vector.empty
}
}
Expand All @@ -29,11 +35,29 @@ class KamuListSpec extends FlatSpec with Matchers with KamuTestBase {

val rs = kamu.runEx("list").resultSet.get

rs.columns shouldEqual Vector("ID", "Kind")
rs.rows should contain theSameElementsAs (Seq(
Array(rootDS.id, "Root"),
Array(derivDS.id, "Derivative")
))
rs.columns shouldEqual Vector(
"ID",
"Kind",
"Records",
"Size",
"LastModified"
)
rs.rows should contain theSameElementsAs Seq(
Array(
rootDS.id,
"Root",
0,
0,
kamu.systemClock.instant()
),
Array(
derivDS.id,
"Derivative",
0,
0,
kamu.systemClock.instant()
)
)
}
}
}
8 changes: 5 additions & 3 deletions src/test/scala/dev/kamu/cli/KamuTestAdapter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import java.nio.charset.StandardCharsets

import dev.kamu.cli.output._
import dev.kamu.core.utils.fs._
import dev.kamu.core.manifests.{DatasetSnapshot, DatasetID}
import dev.kamu.core.manifests.{DatasetID, DatasetSnapshot}
import dev.kamu.core.utils.{Clock, ManualClock}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SparkSession}

Expand All @@ -39,8 +40,9 @@ case class CommandResult(
class KamuTestAdapter(
val config: KamuConfig, // config should be public for tests to access workspaceRoot
fileSystem: FileSystem,
spark: SparkSession
) extends Kamu(config, fileSystem) {
spark: SparkSession,
val systemClock: ManualClock
) extends Kamu(config, fileSystem, systemClock) {

val _captureFormatter = new CaptureOutputFormatter
val _captureOutput = new ByteArrayOutputStream()
Expand Down
5 changes: 4 additions & 1 deletion src/test/scala/dev/kamu/cli/KamuTestBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package dev.kamu.cli

import dev.kamu.core.utils.ManualClock
import dev.kamu.core.utils.fs._
import dev.kamu.core.utils.test.KamuDataFrameSuite
import org.apache.hadoop.conf.Configuration
Expand All @@ -21,7 +22,9 @@ trait KamuTestBase extends KamuDataFrameSuite { self: Suite =>
def withEmptyDir[T](func: KamuTestAdapter => T): T = {
Temp.withRandomTempDir(fileSystem, "kamu-test-") { tempDir =>
val config = KamuConfig(workspaceRoot = tempDir)
val kamu = new KamuTestAdapter(config, fileSystem, spark)
val clock = new ManualClock()
clock.advance()
val kamu = new KamuTestAdapter(config, fileSystem, spark, clock)
func(kamu)
}
}
Expand Down

0 comments on commit 0db84a1

Please sign in to comment.