diff --git a/airflow/migrations/versions/0010_2_9_0_add_run_id_to_audit_log_table_and_change_event_name_length.py b/airflow/migrations/versions/0010_2_9_0_add_run_id_to_audit_log_table_and_change_event_name_length.py index 44bef77578b65..22b9c4337811b 100644 --- a/airflow/migrations/versions/0010_2_9_0_add_run_id_to_audit_log_table_and_change_event_name_length.py +++ b/airflow/migrations/versions/0010_2_9_0_add_run_id_to_audit_log_table_and_change_event_name_length.py @@ -59,8 +59,8 @@ def downgrade(): if conn.dialect.name == "mssql": with op.batch_alter_table("log") as batch_op: batch_op.drop_index("idx_log_event") - batch_op.alter_column("event", type_=sa.String(30), nullable=False) + batch_op.alter_column("event", type_=sa.String(30)) batch_op.create_index("idx_log_event", ["event"]) else: with op.batch_alter_table("log") as batch_op: - batch_op.alter_column("event", type_=sa.String(30), nullable=False) + batch_op.alter_column("event", type_=sa.String(30)) diff --git a/airflow/migrations/versions/0028_3_0_0_drop_ab_user_id_foreign_key.py b/airflow/migrations/versions/0028_3_0_0_drop_ab_user_id_foreign_key.py index f88aaa014bb3a..8a9c77042e95f 100644 --- a/airflow/migrations/versions/0028_3_0_0_drop_ab_user_id_foreign_key.py +++ b/airflow/migrations/versions/0028_3_0_0_drop_ab_user_id_foreign_key.py @@ -45,6 +45,13 @@ def upgrade(): with op.batch_alter_table("task_instance_note", schema=None) as batch_op: batch_op.drop_constraint("task_instance_note_user_fkey", type_="foreignkey") + if op.get_bind().dialect.name == "mysql": + with op.batch_alter_table("dag_run_note", schema=None) as batch_op: + batch_op.drop_index("dag_run_note_user_fkey") + + with op.batch_alter_table("task_instance_note", schema=None) as batch_op: + batch_op.drop_index("task_instance_note_user_fkey") + def downgrade(): """Unapply Drop ab_user.id foreign key.""" @@ -53,3 +60,10 @@ def downgrade(): with op.batch_alter_table("dag_run_note", schema=None) as batch_op: batch_op.create_foreign_key("dag_run_note_user_fkey", "ab_user", ["user_id"], ["id"]) + + if op.get_bind().dialect.name == "mysql": + with op.batch_alter_table("task_instance_note", schema=None) as batch_op: + batch_op.create_index("task_instance_note_user_fkey", ["user_id"], unique=False) + + with op.batch_alter_table("dag_run_note", schema=None) as batch_op: + batch_op.create_index("dag_run_note_user_fkey", ["user_id"], unique=False) diff --git a/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py b/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py index b76bf209bd412..c782991881079 100644 --- a/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py +++ b/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py @@ -44,14 +44,24 @@ def upgrade(): with op.batch_alter_table("dag_run", schema=None) as batch_op: - batch_op.alter_column("execution_date", new_column_name="logical_date", existing_type=sa.TIMESTAMP) + batch_op.alter_column( + "execution_date", + new_column_name="logical_date", + existing_type=sa.TIMESTAMP, + existing_nullable=False, + ) with op.batch_alter_table("dag_run", schema=None) as batch_op: batch_op.drop_constraint("dag_run_dag_id_execution_date_key", type_="unique") def downgrade(): with op.batch_alter_table("dag_run", schema=None) as batch_op: - batch_op.alter_column("logical_date", new_column_name="execution_date", existing_type=sa.TIMESTAMP) + batch_op.alter_column( + "logical_date", + new_column_name="execution_date", + existing_type=sa.TIMESTAMP, + existing_nullable=False, + ) with op.batch_alter_table("dag_run", schema=None) as batch_op: batch_op.create_unique_constraint( "dag_run_dag_id_execution_date_key", diff --git a/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py b/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py index 2abd2116f989a..41cfddc9cef0b 100644 --- a/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py +++ b/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py @@ -167,6 +167,32 @@ def _get_type_id_column(dialect_name: str) -> sa.types.TypeEngine: return sa.String(36) +def create_foreign_keys(): + for fk in ti_fk_constraints: + if fk["table"] in ["task_instance_history", "task_map"]: + continue + with op.batch_alter_table(fk["table"]) as batch_op: + batch_op.create_foreign_key( + constraint_name=fk["fk"], + referent_table=ti_table, + local_cols=ti_fk_cols, + remote_cols=ti_fk_cols, + ondelete="CASCADE", + ) + for fk in ti_fk_constraints: + if fk["table"] not in ["task_instance_history", "task_map"]: + continue + with op.batch_alter_table(fk["table"]) as batch_op: + batch_op.create_foreign_key( + constraint_name=fk["fk"], + referent_table=ti_table, + local_cols=ti_fk_cols, + remote_cols=ti_fk_cols, + ondelete="CASCADE", + onupdate="CASCADE", + ) + + def upgrade(): """Add UUID primary key to task instance table.""" conn = op.get_bind() @@ -232,15 +258,7 @@ def upgrade(): batch_op.create_primary_key("task_instance_pkey", ["id"]) # Create foreign key constraints - for fk in ti_fk_constraints: - with op.batch_alter_table(fk["table"]) as batch_op: - batch_op.create_foreign_key( - constraint_name=fk["fk"], - referent_table=ti_table, - local_cols=ti_fk_cols, - remote_cols=ti_fk_cols, - ondelete="CASCADE", - ) + create_foreign_keys() def downgrade(): @@ -270,12 +288,4 @@ def downgrade(): batch_op.create_primary_key("task_instance_pkey", ti_fk_cols) # Re-add foreign key constraints - for fk in ti_fk_constraints: - with op.batch_alter_table(fk["table"]) as batch_op: - batch_op.create_foreign_key( - constraint_name=fk["fk"], - referent_table=ti_table, - local_cols=ti_fk_cols, - remote_cols=ti_fk_cols, - ondelete="CASCADE", - ) + create_foreign_keys() diff --git a/airflow/migrations/versions/0049_3_0_0_remove_pickled_data_from_xcom_table.py b/airflow/migrations/versions/0049_3_0_0_remove_pickled_data_from_xcom_table.py index 2b19827b6ae4c..ebe8e37a71ff0 100644 --- a/airflow/migrations/versions/0049_3_0_0_remove_pickled_data_from_xcom_table.py +++ b/airflow/migrations/versions/0049_3_0_0_remove_pickled_data_from_xcom_table.py @@ -101,9 +101,9 @@ def upgrade(): op.execute( """ ALTER TABLE xcom - ALTER COLUMN value TYPE JSONB + ALTER COLUMN value TYPE JSON USING CASE - WHEN value IS NOT NULL THEN CAST(CONVERT_FROM(value, 'UTF8') AS JSONB) + WHEN value IS NOT NULL THEN CAST(CONVERT_FROM(value, 'UTF8') AS JSON) ELSE NULL END """ @@ -136,6 +136,7 @@ def upgrade(): # Drop the old `value_old` column with op.batch_alter_table("xcom", schema=None) as batch_op: batch_op.drop_column("value_old") + op.drop_table("_xcom_archive") def downgrade(): diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index c2c67e6872f58..d27f5eda6ce03 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -2f8a9fc0bb125d3103c89af0afd0c33c041055243dcd6bc3de9682e544e180a2 \ No newline at end of file +b12f6811bb7a340362e4b8774b3bb81db28a7d0258564b3431bd537368554cc3 \ No newline at end of file diff --git a/tests_common/test_utils/db.py b/tests_common/test_utils/db.py index 34b7dc71e4b21..53326cdf1af27 100644 --- a/tests_common/test_utils/db.py +++ b/tests_common/test_utils/db.py @@ -63,7 +63,7 @@ def initial_db_init(): from airflow.www.extensions.init_auth_manager import get_auth_manager db.resetdb() - db.downgrade(to_revision="044f740568ec") + db.downgrade(to_revision="5f2621c13b39") db.upgradedb(to_revision="head") db.bootstrap_dagbag() # minimal app to add roles