airflow

Airflow unable to iterate through xcom_pull list with GoogleCloud Operatos

醉酒当歌 提交于 2020-01-16 09:07:58
问题 I would like to dynamically get the list of csv files on gcs bucket and then dump each one to a corresponding BQ table. I am using GoogleCloudStorageListOperator and GoogleCloudStorageToBigQueryOperator operators GCS_Files = GoogleCloudStorageListOperator( task_id='GCS_Files', bucket=cf.storage

Airflow get stacked in last task of DAG after several executions

痞子三分冷 提交于 2020-01-16 08:59:41
问题 I have a DAG which is formed by 7 tasks. I have executed it many times but lately it is getting stuck in the last task, which is a very simple python operator as follow: def send_email(warnings): warnings = ast.literal_eval(warnings) warnings_list = '\n'.join(warnings) email_message =f"""Good

Using XCom to Load Schema in Airflow with GoogleCloudStorageToBigQueryOperator

a 夏天 提交于 2020-01-15 10:15:38
问题 I have an XCom associated with the Task ID database_schema stored in Airflow that is the JSON schema for a dataset sales_table that I want to load into BigQuery. The data for the BigQuery dataset sales_table comes from a CSV file retailcustomer_data.csv stored in Google Cloud Storage. The

Airflow task after BranchPythonOperator does not fail and succeed correctly

泄露秘密 提交于 2020-01-14 13:47:09
问题 In my DAG, I have some tasks that should only be run on Saturdays. Therefore I used a BranchPythonOperator to branch between the tasks for Saturdays and a DummyTask. After that, I join both branches and want to run other tasks. The workflow looks like this: Here I set the trigger rule for dummy3