Skip to content

Commit

Permalink
fix for mysql and postgres
Browse files Browse the repository at this point in the history
  • Loading branch information
ephraimbuddy committed Nov 21, 2024
1 parent 6e840ac commit 7b78586
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ def downgrade():
if conn.dialect.name == "mssql":
with op.batch_alter_table("log") as batch_op:
batch_op.drop_index("idx_log_event")
batch_op.alter_column("event", type_=sa.String(30), nullable=False)
batch_op.alter_column("event", type_=sa.String(30))
batch_op.create_index("idx_log_event", ["event"])
else:
with op.batch_alter_table("log") as batch_op:
batch_op.alter_column("event", type_=sa.String(30), nullable=False)
batch_op.alter_column("event", type_=sa.String(30))
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ def upgrade():
with op.batch_alter_table("task_instance_note", schema=None) as batch_op:
batch_op.drop_constraint("task_instance_note_user_fkey", type_="foreignkey")

if op.get_bind().dialect.name == "mysql":
with op.batch_alter_table("dag_run_note", schema=None) as batch_op:
batch_op.drop_index("dag_run_note_user_fkey")

with op.batch_alter_table("task_instance_note", schema=None) as batch_op:
batch_op.drop_index("task_instance_note_user_fkey")


def downgrade():
"""Unapply Drop ab_user.id foreign key."""
Expand All @@ -53,3 +60,10 @@ def downgrade():

with op.batch_alter_table("dag_run_note", schema=None) as batch_op:
batch_op.create_foreign_key("dag_run_note_user_fkey", "ab_user", ["user_id"], ["id"])

if op.get_bind().dialect.name == "mysql":
with op.batch_alter_table("task_instance_note", schema=None) as batch_op:
batch_op.create_index("task_instance_note_user_fkey", ["user_id"], unique=False)

with op.batch_alter_table("dag_run_note", schema=None) as batch_op:
batch_op.create_index("dag_run_note_user_fkey", ["user_id"], unique=False)
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,24 @@

def upgrade():
with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.alter_column("execution_date", new_column_name="logical_date", existing_type=sa.TIMESTAMP)
batch_op.alter_column(
"execution_date",
new_column_name="logical_date",
existing_type=sa.TIMESTAMP,
existing_nullable=False,
)
with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.drop_constraint("dag_run_dag_id_execution_date_key", type_="unique")


def downgrade():
with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.alter_column("logical_date", new_column_name="execution_date", existing_type=sa.TIMESTAMP)
batch_op.alter_column(
"logical_date",
new_column_name="execution_date",
existing_type=sa.TIMESTAMP,
existing_nullable=False,
)
with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.create_unique_constraint(
"dag_run_dag_id_execution_date_key",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,32 @@ def _get_type_id_column(dialect_name: str) -> sa.types.TypeEngine:
return sa.String(36)


def create_foreign_keys():
for fk in ti_fk_constraints:
if fk["table"] in ["task_instance_history", "task_map"]:
continue
with op.batch_alter_table(fk["table"]) as batch_op:
batch_op.create_foreign_key(
constraint_name=fk["fk"],
referent_table=ti_table,
local_cols=ti_fk_cols,
remote_cols=ti_fk_cols,
ondelete="CASCADE",
)
for fk in ti_fk_constraints:
if fk["table"] not in ["task_instance_history", "task_map"]:
continue
with op.batch_alter_table(fk["table"]) as batch_op:
batch_op.create_foreign_key(
constraint_name=fk["fk"],
referent_table=ti_table,
local_cols=ti_fk_cols,
remote_cols=ti_fk_cols,
ondelete="CASCADE",
onupdate="CASCADE",
)


def upgrade():
"""Add UUID primary key to task instance table."""
conn = op.get_bind()
Expand Down Expand Up @@ -232,15 +258,7 @@ def upgrade():
batch_op.create_primary_key("task_instance_pkey", ["id"])

# Create foreign key constraints
for fk in ti_fk_constraints:
with op.batch_alter_table(fk["table"]) as batch_op:
batch_op.create_foreign_key(
constraint_name=fk["fk"],
referent_table=ti_table,
local_cols=ti_fk_cols,
remote_cols=ti_fk_cols,
ondelete="CASCADE",
)
create_foreign_keys()


def downgrade():
Expand Down Expand Up @@ -270,12 +288,4 @@ def downgrade():
batch_op.create_primary_key("task_instance_pkey", ti_fk_cols)

# Re-add foreign key constraints
for fk in ti_fk_constraints:
with op.batch_alter_table(fk["table"]) as batch_op:
batch_op.create_foreign_key(
constraint_name=fk["fk"],
referent_table=ti_table,
local_cols=ti_fk_cols,
remote_cols=ti_fk_cols,
ondelete="CASCADE",
)
create_foreign_keys()
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ def upgrade():
op.execute(
"""
ALTER TABLE xcom
ALTER COLUMN value TYPE JSONB
ALTER COLUMN value TYPE JSON
USING CASE
WHEN value IS NOT NULL THEN CAST(CONVERT_FROM(value, 'UTF8') AS JSONB)
WHEN value IS NOT NULL THEN CAST(CONVERT_FROM(value, 'UTF8') AS JSON)
ELSE NULL
END
"""
Expand Down Expand Up @@ -136,6 +136,7 @@ def upgrade():
# Drop the old `value_old` column
with op.batch_alter_table("xcom", schema=None) as batch_op:
batch_op.drop_column("value_old")
op.drop_table("_xcom_archive")


def downgrade():
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2f8a9fc0bb125d3103c89af0afd0c33c041055243dcd6bc3de9682e544e180a2
b12f6811bb7a340362e4b8774b3bb81db28a7d0258564b3431bd537368554cc3
2 changes: 1 addition & 1 deletion tests_common/test_utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def initial_db_init():
from airflow.www.extensions.init_auth_manager import get_auth_manager

db.resetdb()
db.downgrade(to_revision="044f740568ec")
db.downgrade(to_revision="5f2621c13b39")
db.upgradedb(to_revision="head")
db.bootstrap_dagbag()
# minimal app to add roles
Expand Down

0 comments on commit 7b78586

Please sign in to comment.