Skip to content
Merged
1 change: 1 addition & 0 deletions .bazelignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.git
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ package ai.chronon.aggregator.windowing

import ai.chronon.aggregator.row.RowAggregator
import ai.chronon.aggregator.windowing.HopsAggregator._
import ai.chronon.api.Aggregation
import ai.chronon.api.DataType
import ai.chronon.api.{Aggregation, DataType, Row, TsUtils}
import ai.chronon.api.Extensions.AggregationOps
import ai.chronon.api.Extensions.AggregationsOps
import ai.chronon.api.Extensions.WindowOps
import ai.chronon.api.Extensions.WindowUtils
import ai.chronon.api.Row
import org.slf4j.Logger
import org.slf4j.LoggerFactory

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@
package ai.chronon.aggregator.windowing

import ai.chronon.aggregator.row.RowAggregator
import ai.chronon.api.Aggregation
import ai.chronon.api.AggregationPart
import ai.chronon.api.DataType
import ai.chronon.api.{Aggregation, AggregationPart, DataType, Row, TsUtils}
import ai.chronon.api.Extensions.UnpackedAggregations
import ai.chronon.api.Extensions.WindowMapping
import ai.chronon.api.Extensions.WindowOps
import ai.chronon.api.Row

import java.util
import scala.collection.Seq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
package ai.chronon.aggregator.test

import ai.chronon.aggregator.row.RowAggregator
import ai.chronon.aggregator.windowing.TsUtils
import ai.chronon.api.Extensions.WindowOps
import ai.chronon.api.Row
import ai.chronon.api.{Row, TsUtils}
import ai.chronon.api.Window

class NaiveAggregator(aggregator: RowAggregator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package ai.chronon.aggregator.test
import ai.chronon.aggregator.test.SawtoothAggregatorTest.sawtoothAggregate
import ai.chronon.aggregator.windowing.FiveMinuteResolution
import ai.chronon.aggregator.windowing.SawtoothOnlineAggregator
import ai.chronon.aggregator.windowing.TsUtils
import ai.chronon.api.Extensions.WindowOps
import ai.chronon.api.Extensions.WindowUtils
import ai.chronon.api._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
* limitations under the License.
*/

package ai.chronon.online

import ai.chronon.aggregator.windowing.TsUtils
import ai.chronon.api.PartitionSpec
package ai.chronon.api

sealed trait DataRange {
def toTimePoints: Array[Long]
Expand Down
10 changes: 10 additions & 0 deletions api/src/main/scala/ai/chronon/api/Extensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import ai.chronon.api.DataModel._
import ai.chronon.api.Operation._
import ai.chronon.api.QueryUtils.buildSelects
import ai.chronon.api.ScalaJavaConversions._
import ai.chronon.api.DateRange

import com.fasterxml.jackson.core.`type`.TypeReference
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.spark.sql.Column
Expand Down Expand Up @@ -1240,4 +1242,12 @@ object Extensions {
_keyNameForKvStore(model.metaData, ModelKeyword)
}
}

implicit class DateRangeOps(dateRange: DateRange) {
def toPartitionRange(implicit partitionSpec: PartitionSpec): PartitionRange = {
val start = dateRange.startDate
val end = dateRange.endDate
new PartitionRange(start, end)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@
* limitations under the License.
*/

package ai.chronon.aggregator.windowing
package ai.chronon.api

import org.apache.commons.lang3.time.FastDateFormat

import java.util.Date
import java.util.TimeZone
import java.util.{Date, TimeZone}

object TsUtils {
val formatter: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss", TimeZone.getTimeZone("UTC"))
Expand Down
7 changes: 6 additions & 1 deletion api/thrift/common.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,9 @@ enum ConfigType {
GROUP_BY = 2
JOIN = 3
MODEL = 4
}
}

struct DateRange {
1: string startDate
2: string endDate
}
10 changes: 3 additions & 7 deletions api/thrift/hub.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ struct JobTrackerRequest {
1: optional string name
2: optional string type
3: optional string branch
10: optional DateRange dateRange // We may not need to use this, but in case it helps with page load times
10: optional common.DateRange dateRange // We may not need to use this, but in case it helps with page load times
}

struct JobTrackerResponse {
Expand Down Expand Up @@ -60,7 +60,7 @@ struct TaskInfo {
2: optional string logPath
3: optional string trackerUrl
4: optional TaskArgs taskArgs
5: optional DateRange dateRange // specific to batch nodes
5: optional common.DateRange dateRange // specific to batch nodes

// time information - useful for gantt / waterfall view
10: optional i64 submittedTs
Expand All @@ -75,10 +75,6 @@ struct TaskInfo {
31: optional TaskResources utilizedResources
}

struct DateRange {
1: string startDate
2: string endDate
}

struct TaskArgs {
1: optional list<string> argsList
Expand Down Expand Up @@ -113,7 +109,7 @@ struct Submission {
1: optional orchestration.NodeKey node
10: optional i64 submittedTs
20: optional i64 finishedTs
21: optional DateRange dateRange
21: optional common.DateRange dateRange
}

enum ConfType{
Expand Down
39 changes: 39 additions & 0 deletions api/thrift/orchestration.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,48 @@ union Dependency {
2: optional TableDependency tableDependency
}

struct SourceWithFilter {
1: optional api.Source source
2: optional map<string,list<string>> excludeKeys
}

struct SourceJobArgs {
1: optional SourceWithFilter source
100: optional common.DateRange range
101: optional string outputTable
}

struct BootstrapJobArgs {
1: optional api.Join join
2: optional common.DateRange range
100: optional string leftSourceTable
101: optional string outputTable
}

struct MergeJobArgs {
1: optional api.Join join
2: optional common.DateRange range
100: optional string leftInputTable
101: optional map<api.JoinPart, string> joinPartsToTables
102: optional string outputTable
}

struct JoinDerivationJobArgs {
1: optional string trueLeftTable
2: optional string baseTable
3: optional list<api.Derivation> derivations
100: optional common.DateRange range
101: optional string outputTable
}

struct JoinPartJobArgs {
1: optional string leftTable
2: optional string leftDataModel
3: optional api.JoinPart joinPart
4: optional string outputTable
100: optional common.DateRange range
101: optional map<string, list<string>> skewKeys
}

/**
* -- Phase 0 plan -- (same as chronon oss)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package ai.chronon.online

import ai.chronon.aggregator.windowing.TsUtils
import ai.chronon.api.Derivation
import ai.chronon.api.{Derivation, TsUtils}
import ai.chronon.api.Extensions.DerivationOps
import ai.chronon.api.LongType
import ai.chronon.api.StringType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import ai.chronon.api.Extensions.WindowOps
import ai.chronon.api.PartitionSpec
import ai.chronon.api.TimeUnit
import ai.chronon.api.Window
import ai.chronon.online.PartitionRange
import ai.chronon.online.PartitionRange.collapseToRange
import ai.chronon.api.PartitionRange
import ai.chronon.api.PartitionRange.collapseToRange
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import ai.chronon.api
import ai.chronon.api.Extensions.SourceOps
import ai.chronon.api.PartitionSpec
import ai.chronon.api.Window
import ai.chronon.online.PartitionRange
import ai.chronon.api.PartitionRange
import ai.chronon.orchestration.Table
import ai.chronon.orchestration.TableDependency

Expand Down
6 changes: 3 additions & 3 deletions scripts/distribution/build_and_upload_artifacts.sh
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ cd $CHRONON_ROOT_DIR

echo "Building wheel"
#Check python version >= 3.9
MAJOR_PYTHON_VERSION=$(python --version | cut -d " " -f2 | cut -d "." -f 1)
MINOR_PYTHON_VERSION=$(python --version | cut -d " " -f2 | cut -d "." -f 2)
MAJOR_PYTHON_VERSION=$(python3 --version | cut -d " " -f2 | cut -d "." -f 1)
MINOR_PYTHON_VERSION=$(python3 --version | cut -d " " -f2 | cut -d "." -f 2)

EXPECTED_MINIMUM_MAJOR_PYTHON_VERSION=3
EXPECTED_MINIMUM_MINOR_PYTHON_VERSION=9
Expand Down Expand Up @@ -260,4 +260,4 @@ if [ "$BUILD_GCP" = true ]; then
fi

# Cleanup wheel stuff
rm ./*.whl
rm ./*.whl
8 changes: 4 additions & 4 deletions scripts/distribution/run_zipline_quickstart.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ git clone [email protected]:zipline-ai/cananry-confs.git
cd cananry-confs

# Use the branch with Zipline specific team.json
git fetch origin davidhan/canary
git checkout davidhan/canary
git fetch origin davidhan/selects
git checkout davidhan/selects
Comment on lines +26 to +27
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should merge these branches


# Create a virtualenv to fresh install zipline-ai
python3 -m venv tmp_chronon
Expand All @@ -35,7 +35,7 @@ gcloud storage cp gs://zipline-artifacts-canary/jars/$WHEEL_FILE .

# Install the wheel (force)
pip uninstall zipline-ai
pip install --force-reinstall $WHEEL_FILE
pip install --force-reinstall $WHEEL_FILE --break-system-packages

export PYTHONPATH="${PYTHONPATH}:$(pwd)"

Expand Down Expand Up @@ -88,7 +88,7 @@ check_dataproc_job_state $METADATA_UPLOAD_JOB_ID
# Need to wait for upload-to-kv to finish
echo -e "${GREEN}<<<<<.....................................FETCH.....................................>>>>>\033[0m"
touch tmp_fetch.out
zipline run --mode fetch --conf-type group_bys --name quickstart/purchases.v1_test -k '{"user_id":"5"}' 2>&1 | tee tmp_fetch.out | grep -q purchase_price_average_14d
zipline run --mode fetch --conf-type group_bys --name quickstart.purchases.v1_test -k '{"user_id":"5"}' 2>&1 | tee tmp_fetch.out | grep -q purchase_price_average_14d
cat tmp_fetch.out | grep purchase_price_average_14d
# check if exit code of previous is 0
if [ $? -ne 0 ]; then
Expand Down
4 changes: 2 additions & 2 deletions spark/src/main/scala/ai/chronon/spark/Analyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package ai.chronon.spark
import ai.chronon.api
import ai.chronon.api.ColorPrinter.ColorString
import ai.chronon.api.DataModel.{DataModel, Entities, Events}
import ai.chronon.api.{Accuracy, AggregationPart, Constants, DataType}
import ai.chronon.api.{Accuracy, AggregationPart, Constants, DataType, PartitionRange}
import ai.chronon.api.Extensions._
import ai.chronon.api.ScalaJavaConversions._
import ai.chronon.online.{PartitionRange, SparkConversions}
import ai.chronon.online.{SparkConversions}
import ai.chronon.spark.Driver.parseConf
import ai.chronon.spark.Extensions.QuerySparkOps
import org.apache.datasketches.common.ArrayOfStringsSerDe
Expand Down
21 changes: 11 additions & 10 deletions spark/src/main/scala/ai/chronon/spark/BootstrapInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,10 @@
package ai.chronon.spark

import ai.chronon.api
import ai.chronon.api.Constants
import ai.chronon.api.{Constants, ExternalPart, JoinPart, PartitionRange, PartitionSpec, StructField}
import ai.chronon.api.Extensions._
import ai.chronon.api.ExternalPart
import ai.chronon.api.JoinPart
import ai.chronon.api.PartitionSpec
import ai.chronon.api.ScalaJavaConversions._
import ai.chronon.api.StructField
import ai.chronon.online.PartitionRange
import ai.chronon.api.PartitionRange
import ai.chronon.online.SparkConversions
import ai.chronon.spark.Extensions._
import org.apache.spark.sql.Row
Expand Down Expand Up @@ -84,7 +80,8 @@ object BootstrapInfo {
def from(joinConf: api.Join,
range: PartitionRange,
tableUtils: TableUtils,
leftSchema: Option[StructType]): BootstrapInfo = {
leftSchema: Option[StructType],
externalPartsAlreadyIncluded: Boolean = false): BootstrapInfo = {

implicit val tu = tableUtils
implicit val partitionSpec: PartitionSpec = tableUtils.partitionSpec
Expand Down Expand Up @@ -123,9 +120,13 @@ object BootstrapInfo {

// Enrich each external part with the expected output schema
logger.info(s"\nCreating BootstrapInfo for ExternalParts for Join ${joinConf.metaData.name}")
val externalParts: Seq[ExternalPartMetadata] = Option(joinConf.onlineExternalParts.toScala)
.getOrElse(Seq.empty)
.map(part => ExternalPartMetadata(part, part.keySchemaFull, part.valueSchemaFull))
val externalParts: Seq[ExternalPartMetadata] = if (externalPartsAlreadyIncluded) {
Seq.empty
} else {
Option(joinConf.onlineExternalParts.toScala)
.getOrElse(Seq.empty)
.map(part => ExternalPartMetadata(part, part.keySchemaFull, part.valueSchemaFull))
}

val leftFields = leftSchema
.map(schema => SparkConversions.toChrononSchema(schema))
Expand Down
Loading