Skip to content

Commit f587df3

Browse files
authored
Merge branch 'main' into nikhil/compiler_fixes
2 parents 6b0a16c + d1524ca commit f587df3

File tree

15 files changed

+783
-126
lines changed

15 files changed

+783
-126
lines changed

.github/workflows/push_to_canary.yaml

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ jobs:
8282
exit 1
8383
fi
8484
echo "wheel_file=$EXPECTED_ZIPLINE_WHEEL" >> $GITHUB_OUTPUT
85+
echo "version=${{ env.VERSION }}" >> $GITHUB_OUTPUT
8586
8687
- name: Upload Wheel
8788
uses: actions/upload-artifact@v4
@@ -132,6 +133,7 @@ jobs:
132133
path: bazel-bin/service/service_assembly_deploy.jar
133134
outputs:
134135
wheel_file: ${{ steps.build-wheel.outputs.wheel_file }}
136+
version: ${{ steps.build-wheel.outputs.version }}
135137

136138

137139
push_to_aws:
@@ -173,10 +175,10 @@ jobs:
173175
shell: bash
174176
run: |
175177
set -eo pipefail
176-
aws s3 cp ${{ needs.build_artifacts.outputs.wheel_file }} s3://zipline-artifacts-canary/release/candidate/wheels/ --metadata="updated_date=$(date),commit=$(git rev-parse HEAD),branch=$(git rev-parse --abbrev-ref HEAD)"
177-
aws s3 cp flink_assembly_deploy.jar s3://zipline-artifacts-canary/release/candidate/jars/flink_assembly_deploy.jar --metadata="updated_date=$(date),commit=$(git rev-parse HEAD),branch=$(git rev-parse --abbrev-ref HEAD)"
178-
aws s3 cp cloud_aws_lib_deploy.jar s3://zipline-artifacts-canary/release/candidate/jars/cloud_aws_lib_deploy.jar --metadata="updated_date=$(date),commit=$(git rev-parse HEAD),branch=$(git rev-parse --abbrev-ref HEAD)"
179-
aws s3 cp service_assembly_deploy.jar s3://zipline-artifacts-canary/release/candidate/jars/service_assembly_deploy.jar --metadata="updated_date=$(date),commit=$(git rev-parse HEAD),branch=$(git rev-parse --abbrev-ref HEAD)"
178+
aws s3 cp ${{ needs.build_artifacts.outputs.wheel_file }} s3://zipline-artifacts-canary/release/${{ needs.build_artifacts.outputs.version }}/wheels/ --metadata="updated_date=$(date),commit=$(git rev-parse HEAD),branch=$(git rev-parse --abbrev-ref HEAD)"
179+
aws s3 cp flink_assembly_deploy.jar s3://zipline-artifacts-canary/release/${{ needs.build_artifacts.outputs.version }}/jars/flink_assembly_deploy.jar --metadata="updated_date=$(date),commit=$(git rev-parse HEAD),branch=$(git rev-parse --abbrev-ref HEAD)"
180+
aws s3 cp cloud_aws_lib_deploy.jar s3://zipline-artifacts-canary/release/${{ needs.build_artifacts.outputs.version }}/jars/cloud_aws_lib_deploy.jar --metadata="updated_date=$(date),commit=$(git rev-parse HEAD),branch=$(git rev-parse --abbrev-ref HEAD)"
181+
aws s3 cp service_assembly_deploy.jar s3://zipline-artifacts-canary/release/${{ needs.build_artifacts.outputs.version }}/jars/service_assembly_deploy.jar --metadata="updated_date=$(date),commit=$(git rev-parse HEAD),branch=$(git rev-parse --abbrev-ref HEAD)"
180182
181183
182184
push_to_gcp:
@@ -228,14 +230,14 @@ jobs:
228230
IMAGE_TAG: main
229231
run: |
230232
set -eo pipefail
231-
gcloud storage cp ${{ needs.build_artifacts.outputs.wheel_file }} gs://zipline-artifacts-canary/release/candidate/wheels/
232-
gcloud storage objects update gs://zipline-artifacts-canary/release/candidate/wheels/${{ needs.build_artifacts.outputs.wheel_file }} --custom-metadata="updated_date=$(date),commit=$(git rev-parse HEAD),branch=$(git rev-parse --abbrev-ref HEAD)"
233-
gcloud storage cp flink_assembly_deploy.jar gs://zipline-artifacts-canary/release/candidate/jars/flink_assembly_deploy.jar
234-
gcloud storage objects update gs://zipline-artifacts-canary/release/candidate/jars/flink_assembly_deploy.jar --custom-metadata="updated_date=$(date),commit=$(git rev-parse HEAD),branch=$(git rev-parse --abbrev-ref HEAD)"
235-
gcloud storage cp cloud_gcp_lib_deploy.jar gs://zipline-artifacts-canary/release/candidate/jars/cloud_gcp_lib_deploy.jar
236-
gcloud storage objects update gs://zipline-artifacts-canary/release/candidate/jars/cloud_gcp_lib_deploy.jar --custom-metadata="updated_date=$(date),commit=$(git rev-parse HEAD),branch=$(git rev-parse --abbrev-ref HEAD)"
237-
gcloud storage cp service_assembly_deploy.jar gs://zipline-artifacts-canary/release/candidate/jars/service_assembly_deploy.jar
238-
gcloud storage objects update gs://zipline-artifacts-canary/release/candidate/jars/service_assembly_deploy.jar --custom-metadata="updated_date=$(date),commit=$(git rev-parse HEAD),branch=$(git rev-parse --abbrev-ref HEAD)"
233+
gcloud storage cp ${{ needs.build_artifacts.outputs.wheel_file }} gs://zipline-artifacts-canary/release/${{ needs.build_artifacts.outputs.version }}/wheels/
234+
gcloud storage objects update gs://zipline-artifacts-canary/release/${{ needs.build_artifacts.outputs.version }}/wheels/${{ needs.build_artifacts.outputs.wheel_file }} --custom-metadata="updated_date=$(date),commit=$(git rev-parse HEAD),branch=$(git rev-parse --abbrev-ref HEAD)"
235+
gcloud storage cp flink_assembly_deploy.jar gs://zipline-artifacts-canary/release/${{ needs.build_artifacts.outputs.version }}/jars/flink_assembly_deploy.jar
236+
gcloud storage objects update gs://zipline-artifacts-canary/release/${{ needs.build_artifacts.outputs.version }}/jars/flink_assembly_deploy.jar --custom-metadata="updated_date=$(date),commit=$(git rev-parse HEAD),branch=$(git rev-parse --abbrev-ref HEAD)"
237+
gcloud storage cp cloud_gcp_lib_deploy.jar gs://zipline-artifacts-canary/release/${{ needs.build_artifacts.outputs.version }}/jars/cloud_gcp_lib_deploy.jar
238+
gcloud storage objects update gs://zipline-artifacts-canary/release/${{ needs.build_artifacts.outputs.version }}/jars/cloud_gcp_lib_deploy.jar --custom-metadata="updated_date=$(date),commit=$(git rev-parse HEAD),branch=$(git rev-parse --abbrev-ref HEAD)"
239+
gcloud storage cp service_assembly_deploy.jar gs://zipline-artifacts-canary/release/${{ needs.build_artifacts.outputs.version }}/jars/service_assembly_deploy.jar
240+
gcloud storage objects update gs://zipline-artifacts-canary/release/${{ needs.build_artifacts.outputs.version }}/jars/service_assembly_deploy.jar --custom-metadata="updated_date=$(date),commit=$(git rev-parse HEAD),branch=$(git rev-parse --abbrev-ref HEAD)"
239241
240242
241243
run_aws_integration_tests:
@@ -262,7 +264,7 @@ jobs:
262264
shell: bash
263265
run: |
264266
set -xo pipefail
265-
./scripts/distribution/run_aws_quickstart.sh --canary
267+
./scripts/distribution/run_aws_quickstart.sh --canary --version ${{ needs.build_artifacts.outputs.version }}
266268
267269
- name: On Fail Notify Slack
268270
id: notify_slack
@@ -311,7 +313,7 @@ jobs:
311313
id: gcp_integration_tests
312314
run: |
313315
set -xo pipefail
314-
./scripts/distribution/run_gcp_quickstart.sh --canary
316+
./scripts/distribution/run_gcp_quickstart.sh --canary --version ${{ needs.build_artifacts.outputs.version }}
315317
316318
- name: On Fail Notify Slack
317319
id: notify_slack

api/python/ai/chronon/cli/compile/parse_configs.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111

1212
logger = get_logger()
1313

14-
1514
def from_folder(
1615
cls: type, input_dir: str, compile_context: CompileContext
1716
) -> List[CompiledObj]:

api/python/ai/chronon/cli/compile/parse_teams.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@
1111
EnvironmentVariables,
1212
ExecutionInfo,
1313
)
14-
from ai.chronon.api.ttypes import Team
14+
1515
from ai.chronon.cli.compile.display.console import console
16+
from ai.chronon.api.ttypes import Join, MetaData, Team
1617
from ai.chronon.cli.logger import get_logger
1718

1819
logger = get_logger()
@@ -92,7 +93,31 @@ def update_metadata(obj: Any, team_dict: Dict[str, Team]):
9293
_DEFAULT_CONF_TEAM in team_dict
9394
), f"'{_DEFAULT_CONF_TEAM}' team not found in teams.py, please add an entry 🙏."
9495

95-
metadata.outputNamespace = team_dict[team].outputNamespace
96+
# Only set the outputNamespace if it hasn't been set already
97+
if not metadata.outputNamespace:
98+
metadata.outputNamespace = team_dict[team].outputNamespace
99+
100+
if isinstance(obj, Join):
101+
join_namespace = obj.metaData.outputNamespace
102+
# set the metadata for each join part and labelParts
103+
def set_join_part_metadata(join_part_gb, output_namespace):
104+
if join_part_gb is not None:
105+
if join_part_gb.metaData:
106+
# Only set the outputNamespace if it hasn't been set already
107+
if not join_part_gb.metaData.outputNamespace:
108+
join_part_gb.metaData.outputNamespace = output_namespace
109+
else:
110+
# If there's no metaData at all, create it and set outputNamespace
111+
join_part_gb.metaData = MetaData()
112+
join_part_gb.metaData.outputNamespace = output_namespace
113+
114+
if obj.joinParts:
115+
for jp in (obj.joinParts or []):
116+
set_join_part_metadata(jp.groupBy, join_namespace)
117+
118+
if obj.labelParts:
119+
for lb in (obj.labelParts.labels or []):
120+
set_join_part_metadata(lb, join_namespace)
96121

97122
if metadata.executionInfo is None:
98123
metadata.executionInfo = ExecutionInfo()
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
"""
2+
Tests for the parse_teams module.
3+
"""
4+
5+
# Copyright (C) 2023 The Chronon Authors.
6+
#
7+
# Licensed under the Apache License, Version 2.0 (the "License");
8+
# you may not use this file except in compliance with the License.
9+
# You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing, software
14+
# distributed under the License is distributed on an "AS IS" BASIS,
15+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
# See the License for the specific language governing permissions and
17+
# limitations under the License.
18+
19+
from ai.chronon.api.ttypes import GroupBy, Join, JoinPart, LabelParts, MetaData, Team
20+
from ai.chronon.cli.compile import parse_teams
21+
22+
23+
def test_update_metadata_with_existing_output_namespace():
24+
"""Test that update_metadata doesn't override existing outputNamespace."""
25+
# Setup
26+
team_name = "test_team"
27+
team_dict = {
28+
"default": Team(outputNamespace="default_namespace"),
29+
team_name: Team(outputNamespace="team_namespace"),
30+
}
31+
32+
# Test with existing outputNamespace
33+
existing_namespace = "existing_namespace"
34+
obj = GroupBy(metaData=MetaData(
35+
team=team_name,
36+
name="test.group_by.name",
37+
outputNamespace=existing_namespace
38+
))
39+
40+
# Call the function
41+
parse_teams.update_metadata(obj, team_dict)
42+
43+
# Verify outputNamespace wasn't changed
44+
assert obj.metaData.outputNamespace == existing_namespace
45+
46+
47+
def test_update_metadata_without_existing_output_namespace():
48+
"""Test that update_metadata sets outputNamespace when not already set."""
49+
# Setup
50+
team_name = "test_team"
51+
team_dict = {
52+
"default": Team(outputNamespace="default_namespace"),
53+
team_name: Team(outputNamespace="team_namespace"),
54+
}
55+
56+
# Test without existing outputNamespace
57+
obj = GroupBy(metaData=MetaData(
58+
team=team_name,
59+
name="test.group_by.name",
60+
))
61+
62+
# Call the function
63+
parse_teams.update_metadata(obj, team_dict)
64+
65+
# Verify outputNamespace was set from team
66+
assert obj.metaData.outputNamespace == "team_namespace"
67+
68+
69+
def test_update_metadata_preserves_join_part_namespace():
70+
"""Test that update_metadata preserves outputNamespace in join parts."""
71+
# Setup
72+
team_name = "test_team"
73+
team_dict = {
74+
"default": Team(outputNamespace="default_namespace"),
75+
team_name: Team(outputNamespace="team_namespace"),
76+
}
77+
78+
# Create a join with join parts that have existing outputNamespace
79+
join_part_gb = GroupBy(metaData=MetaData(outputNamespace="existing_jp_namespace"))
80+
join_part = JoinPart(groupBy=join_part_gb)
81+
82+
# Create a join with label parts that have existing outputNamespace
83+
label_part_gb = GroupBy(metaData=MetaData(outputNamespace="existing_label_namespace"))
84+
label_parts = LabelParts(labels=[label_part_gb])
85+
86+
# Create the join object
87+
join = Join(
88+
metaData=MetaData(
89+
team=team_name,
90+
name="test.join.name",
91+
outputNamespace="join_namespace"
92+
),
93+
joinParts=[join_part],
94+
labelParts=label_parts
95+
)
96+
97+
# Call the function
98+
parse_teams.update_metadata(join, team_dict)
99+
100+
# Verify outputNamespace values were preserved
101+
assert join.metaData.outputNamespace == "join_namespace"
102+
assert join.joinParts[0].groupBy.metaData.outputNamespace == "existing_jp_namespace"
103+
assert join.labelParts.labels[0].metaData.outputNamespace == "existing_label_namespace"
104+
105+
106+
def test_update_metadata_sets_missing_join_part_namespace():
107+
"""Test that update_metadata sets outputNamespace for join parts when not set."""
108+
# Setup
109+
team_name = "test_team"
110+
team_dict = {
111+
"default": Team(outputNamespace="default_namespace"),
112+
team_name: Team(outputNamespace="team_namespace"),
113+
}
114+
115+
# Create a join with join parts that don't have outputNamespace
116+
join_part_gb = GroupBy(metaData=MetaData())
117+
join_part = JoinPart(groupBy=join_part_gb)
118+
119+
# Create a join with label parts that don't have outputNamespace
120+
label_part_gb = GroupBy(metaData=MetaData())
121+
label_parts = LabelParts(labels=[label_part_gb])
122+
123+
# Create the join object
124+
join = Join(
125+
metaData=MetaData(
126+
team=team_name,
127+
name="test.join.name",
128+
outputNamespace="join_namespace"
129+
),
130+
joinParts=[join_part],
131+
labelParts=label_parts
132+
)
133+
134+
# Call the function
135+
parse_teams.update_metadata(join, team_dict)
136+
137+
# Verify outputNamespace values were set correctly
138+
assert join.metaData.outputNamespace == "join_namespace"
139+
assert join.joinParts[0].groupBy.metaData.outputNamespace == "join_namespace"
140+
assert join.labelParts.labels[0].metaData.outputNamespace == "join_namespace"

scripts/distribution/build_and_upload_artifacts.sh

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ if [[ $EXPECTED_MINIMUM_MINOR_PYTHON_VERSION -gt $MINOR_PYTHON_VERSION ]] ; then
100100
exit 1
101101
fi
102102

103-
WHEEL_VERSION="0.1.0"
103+
WHEEL_VERSION="0.1.0+dev_$USER"
104104

105105
bash scripts/distribution/build_wheel.sh $WHEEL_VERSION
106106

@@ -181,8 +181,8 @@ function upload_to_gcp() {
181181
set -euxo pipefail
182182
for element in "${customer_ids_to_upload[@]}"
183183
do
184-
NEW_ELEMENT_JAR_PATH=gs://zipline-artifacts-$element/release/latest/jars/
185-
NEW_ELEMENT_WHEEL_PATH=gs://zipline-artifacts-$element/release/latest/wheels/
184+
NEW_ELEMENT_JAR_PATH=gs://zipline-artifacts-$element/release/$WHEEL_VERSION/jars/
185+
NEW_ELEMENT_WHEEL_PATH=gs://zipline-artifacts-$element/release/$WHEEL_VERSION/wheels/
186186
gcloud storage cp "$CLOUD_GCP_JAR" "$NEW_ELEMENT_JAR_PATH" --custom-metadata="zipline_user=$USER,updated_date=$(date),commit=$(git rev-parse HEAD),branch=$(git rev-parse --abbrev-ref HEAD)"
187187
gcloud storage cp "$CLOUD_GCP_EMBEDDED_JAR" "$NEW_ELEMENT_JAR_PATH" --custom-metadata="zipline_user=$USER,updated_date=$(date),commit=$(git rev-parse HEAD),branch=$(git rev-parse --abbrev-ref HEAD)"
188188
gcloud storage cp "$SERVICE_JAR" "$NEW_ELEMENT_JAR_PATH" --custom-metadata="zipline_user=$USER,updated_date=$(date),commit=$(git rev-parse HEAD),branch=$(git rev-parse --abbrev-ref HEAD)"
@@ -210,8 +210,8 @@ function upload_to_aws() {
210210
set -euxo pipefail
211211
for element in "${customer_ids_to_upload[@]}"
212212
do
213-
NEW_ELEMENT_JAR_PATH=s3://zipline-artifacts-$element/release/latest/jars/
214-
NEW_ELEMENT_WHEEL_PATH=s3://zipline-artifacts-$element/release/latest/wheels/
213+
NEW_ELEMENT_JAR_PATH=s3://zipline-artifacts-$element/release/$WHEEL_VERSION/jars/
214+
NEW_ELEMENT_WHEEL_PATH=s3://zipline-artifacts-$element/release/$WHEEL_VERSION/wheels/
215215
aws s3 cp "$CLOUD_AWS_JAR" "$NEW_ELEMENT_JAR_PATH" --metadata="zipline_user=$USER,updated_date=$(date),commit=$(git rev-parse HEAD),branch=$(git rev-parse --abbrev-ref HEAD)"
216216
aws s3 cp "$SERVICE_JAR" "$NEW_ELEMENT_JAR_PATH" --metadata="zipline_user=$USER,updated_date=$(date),commit=$(git rev-parse HEAD),branch=$(git rev-parse --abbrev-ref HEAD)"
217217
aws s3 cp "$EXPECTED_ZIPLINE_WHEEL" "$NEW_ELEMENT_WHEEL_PATH" --metadata="zipline_user=$USER,updated_date=$(date),commit=$(git rev-parse HEAD),branch=$(git rev-parse --abbrev-ref HEAD)"
@@ -248,4 +248,6 @@ if [ "$BUILD_GCP" = true ]; then
248248
fi
249249

250250
# Cleanup wheel stuff
251-
rm ./*.whl
251+
rm ./*.whl
252+
253+
echo "Built and uploaded $WHEEL_VERSION"

0 commit comments

Comments
 (0)