Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class JoinPartFetcher(fetchContext: FetchContext, metadataStore: MetadataStore)
val rightKeys = part.leftToRight.map { case (leftKey, rightKey) => rightKey -> request.keys(leftKey) }
Left(
PrefixedRequest(
part.fullPrefix,
part.columnPrefix,
Request(part.groupBy.getMetaData.getName, rightKeys, request.atMillis, Some(joinContextInner))))
}

Expand Down Expand Up @@ -158,7 +158,7 @@ class JoinPartFetcher(fetchContext: FetchContext, metadataStore: MetadataStore)
response
.map { valueMap =>
if (valueMap != null) {
valueMap.map { case (aggName, aggValue) => prefix + "_" + aggName -> aggValue }
valueMap.map { case (aggName, aggValue) => prefix + aggName -> aggValue }
} else {
Map.empty[String, AnyRef]
}
Expand All @@ -169,7 +169,7 @@ class JoinPartFetcher(fetchContext: FetchContext, metadataStore: MetadataStore)
if (fetchContext.debug || Math.random() < 0.001) {
println(s"Failed to fetch $groupByRequest with \n${ex.traceString}")
}
Map(prefix + "_exception" -> ex.traceString)
Map(prefix + "exception" -> ex.traceString)
}
.get
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ class FetcherBaseTest extends AnyFlatSpec with MockitoSugar with Matchers with M
))
)

val result = baseFetcher.parseGroupByResponse("prefix", request, response)
val result = baseFetcher.parseGroupByResponse("prefix_", request, response)
assertEquals(result, Map("prefix_key" -> "value"))
}

Expand All @@ -227,7 +227,7 @@ class FetcherBaseTest extends AnyFlatSpec with MockitoSugar with Matchers with M
))
)

val result = baseFetcher.parseGroupByResponse("prefix", request, response)
val result = baseFetcher.parseGroupByResponse("prefix_", request, response)
result shouldBe Map()
}

Expand All @@ -243,7 +243,7 @@ class FetcherBaseTest extends AnyFlatSpec with MockitoSugar with Matchers with M
))
)

val result = baseFetcher.parseGroupByResponse("prefix", request, response)
val result = baseFetcher.parseGroupByResponse("prefix_", request, response)
result.keySet shouldBe Set("prefix_exception")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class FetcherFailureTest extends AnyFlatSpec {
val request = Request(joinConf.metaData.name, keyMap)
val (responses, _) = FetcherTestUtil.joinResponses(spark, Array(request), mockApi)
val responseMap = responses.head.values.get
val exceptionKeys = joinConf.joinPartOps.map(jp => jp.fullPrefix + "_exception")
val exceptionKeys = joinConf.joinPartOps.map(jp => jp.columnPrefix + "exception")
exceptionKeys.foreach(k => assertTrue(responseMap.contains(k)))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,11 @@ class FetcherTest extends AnyFlatSpec {
tableUtils.sql(
s"SELECT * FROM $joinTable WHERE ts >= unix_timestamp('$endDs', '${tableUtils.partitionSpec.format}')")
}
val endDsQueries = endDsEvents.drop(endDsEvents.schema.fieldNames.filter(_.contains("unit_test")): _*)
// Keep only left-side columns (keys, ts, ds) and drop all feature columns
val keys = joinConf.leftKeyCols
val leftSideColumns = keys ++ Array(Constants.TimeColumn, tableUtils.partitionColumn)
val columnsToKeep = endDsEvents.schema.fieldNames.filter(leftSideColumns.contains)
val endDsQueries = endDsEvents.select(columnsToKeep.map(col): _*)
val keyIndices = keys.map(endDsQueries.schema.fieldIndex)
val tsIndex = endDsQueries.schema.fieldIndex(Constants.TimeColumn)
val metadataStore = new fetcher.MetadataStore(FetchContext(inMemoryKvStore))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,29 +440,35 @@ object FetcherTestUtil {
queriesDf.show()
queriesDf.save(queriesTable)

val joinConf = Builders.Join(
left = Builders.Source.events(Builders.Query(startPartition = today), table = queriesTable),
joinParts = Seq(
Builders.JoinPart(groupBy = vendorRatingsGroupBy, keyMapping = Map("vendor_id" -> "vendor")),
Builders.JoinPart(groupBy = userPaymentsGroupBy, keyMapping = Map("user_id" -> "user")),
Builders.JoinPart(groupBy = userBalanceGroupBy, keyMapping = Map("user_id" -> "user")),
Builders.JoinPart(groupBy = reviewGroupBy),
Builders.JoinPart(groupBy = creditGroupBy, prefix = "b"),
Builders.JoinPart(groupBy = creditGroupBy, prefix = "a"),
Builders.JoinPart(groupBy = creditDerivationGroupBy, prefix = "c")
),
metaData = Builders.MetaData(name = "test.payments_join",
namespace = namespace,
team = "chronon",
consistencySamplePercent = 30),
derivations = Seq(
Builders.Derivation("*", "*"),
Builders.Derivation("hist_3d", "unit_test_vendor_ratings_txn_types_histogram_3d"),
Builders.Derivation("payment_variance", "unit_test_user_payments_payment_variance/2"),
Builders.Derivation("derived_ds", "from_unixtime(ts/1000, 'yyyy-MM-dd')"),
Builders.Derivation("direct_ds", "ds")
val joinConf = Builders
.Join(
left = Builders.Source.events(Builders.Query(startPartition = today), table = queriesTable),
joinParts = Seq(
Builders
.JoinPart(groupBy = vendorRatingsGroupBy, keyMapping = Map("vendor_id" -> "vendor"))
.setUseLongNames(false),
Builders
.JoinPart(groupBy = userPaymentsGroupBy, keyMapping = Map("user_id" -> "user"))
.setUseLongNames(false),
Builders.JoinPart(groupBy = userBalanceGroupBy, keyMapping = Map("user_id" -> "user")).setUseLongNames(false),
Builders.JoinPart(groupBy = reviewGroupBy).setUseLongNames(false),
Builders.JoinPart(groupBy = creditGroupBy, prefix = "b").setUseLongNames(false),
Builders.JoinPart(groupBy = creditGroupBy, prefix = "a").setUseLongNames(false),
Builders.JoinPart(groupBy = creditDerivationGroupBy, prefix = "c").setUseLongNames(false)
),
metaData = Builders.MetaData(name = "test.payments_join",
namespace = namespace,
team = "chronon",
consistencySamplePercent = 30),
derivations = Seq(
Builders.Derivation("*", "*"),
Builders.Derivation("hist_3d", "vendor_txn_types_histogram_3d"),
Builders.Derivation("payment_variance", "user_payment_variance/2"),
Builders.Derivation("derived_ds", "from_unixtime(ts/1000, 'yyyy-MM-dd')"),
Builders.Derivation("direct_ds", "ds")
)
)
)
.setUseLongNames(false)
joinConf
}

Expand Down