How to run a task if a previous one fails in Apache Airflow

I need to do the following: Check if a server is up, if so I check if the Spark cluster in that server is up, case it's down I try to start it up, case it's already up I move forward to run my...

check if airflow task is running first time

I am writing a airflow dag with postgres operator. I need to perform a one time operation in each task if the task is running first time. How can we do this?

Accessing configuration parameters passed to Airflow through CLI

I am trying to pass the following configuration parameters to Airflow CLI while triggering a dag run. Following is the trigger_dag command I am using. airflow trigger_dag -c...

Copy from Google Cloud Storage Bucket to S3 Bucket

I have set up an airflow workflow that ingests some files from s3 to Google Cloud storage and then runs a workflow of sql queries to create new tables on Big Query. At the end of the workflow I...

How to generate multiple airflow dags through a single script?

I want to generate multiple airflow dags using one script. The dag names should be "test_parameter". Below is my script: from datetime import datetime # Importing Airflow modules from...

Trigger Mysql Procedure from Airflow

I am trying to trigger a stored procedure in Mysql from Airflow DAG using below code: run_this_first = airflow.operators.mysql_operator.MySqlOperator( task_id= 'sql_task', sql= 'call...

Airflow connection type File (path)

Hello everyone I'm playing with Airflow, I'm reading this helpful tutorial. I'm asking help to understand better how Admin->Connection works regarding Conn Type: File (path). I suppose this type...

Airflow BranchPythonOperator doesn't follow the specified branch

I have an AIRFLOW DAG with the following structure. All the functions that start with "check*" are BranchPythonOperator, and the function exceptionControl is a ExecuteDagRunOperator that receives...

How can I return lists from Python Operator in airflow and use it as argument for subsequent task in dags

I have 3 tasks to run in same dags. While Task1 return list of dictionary task2 and task3 try to use one dictionary element from result return by task1. def get_list(): .... return...

Fusing operators together

I'm still in the process of deploying Airflow and I've already felt the need to merge operators together. The most common use-case would be coupling an operator and the corresponding sensor. For...

how to create custom operators in airflow and use them in airflow template which is running through cloud composer(in google cloud platform)

I need to create a custom airflow operator which I should be able to use in airflow template(Written in python) which is running in cloud composer... If I create custom airflow operator how can I...

For Apache Airflow, How can I pass the parameters when manually trigger DAG via CLI?

I use Airflow to manage ETL tasks execution and schedule. A DAG has been created and it works fine. But is it possible to pass parameters when manually trigger the dag via cli. For example: My DAG...

Apache - Airflow 1.10.1 don't start a job

I have a problem with Airflow, The first job in a DAG always starts and ends successfully but the second job never starts automatically. I try to clear the job in the UI but it doesn't starts, if...

How to use apache airflow in a virtual environment?

I am quite new to using apache airflow. I use pycharm as my IDE. I create a project (anaconda environment), create a python script that includes DAG definitions and Bash operators. When I open my...

Call stored procedure airflow using Oracleoperator

I'm new to Python. I am using apache airflow and a task I am calling a stored procedure oracle. I see error illegal variable name / number t1 = OracleOperator( task_id='Exe_PA', ...

Documentation for Airflow HTTP Operator/Sensor Extra Options?

I'm trying to read into the extra_options setting for Airflow to see what properties are possible to set (mainly interested in http timeout). I can't find any supporting documentation for this...

How can we check the output of BashOperator in Airflow?

I'm very new to Airflow, I wonder if I execute a bash operator, how can we get the console output of that Operator? I'm wondering does setting xcom_push = true solve the problem? I'd very glad of...

Airflow metrics with prometheus and grafana

any one knows how to send metrics from airflow to prometheus, I'm not finding much documents about it, I tried the airflow operator metrics on Grafana but it doesnt show any metrics and all it...

Pod Launching failed: Pod took too long to start, Failed to run KubernetesPodOperator secret

I'm running the quickstart for KubernetesPodOperator secret using the link below : https://cloud.google.com/composer/docs/how-to/using/using-kubernetes-pod-operator Code used below : from airflow...

Airflow xcom pull only returns string

I have an airflow pipeline where I need to get a filename from a pubsub subscription and then import that file into a cloud sql instance. I use the CloudSqlInstanceImportOperator to import the CSV...

Using apache-airflow-providers-snowflake on airflow (no module named Snowflake)

I have installed package apache-airflow-providers-snowflake on airflow on docker and i am getting error No module named Snowflake Please refer attachment (check the error mentioned for the...

How to mask a password when password is sent as an argument in BashOperator

I am using BashOperator to run a bash command that requires multiple parameters to work. Among those parameters I am sending a password. bash_task = BashOperator( ...

How to branch multiple paths in Airflow DAG using branch operator?

This is what I want, but I don't know how to achieve this in airflow, as both of the tasks are being executed. To summarize: T1 executes T2 executes Based on the output of T2 I want to either go...

Airflow 2 : get execution date inside task

I used to create tasks with the python operator and retrieve execution in airflow 1 as follow def task(**kwargs): date = kwargs['execution_date'] What is the correct way to do it with the...

How to ingest Delta Lake MetaData into Amundsen Data Discovery Engine?

I have setup the Amundsen and the UI Works fine. I am trying to run the sample delta lake loader given in the examples in their repository. """ This is a example script for extracting Delta Lake...

No module named 'airflow.providers.ssh' on AWS Airflow (Amazon MWAA)

I need to use sshoperator in a DAG on AWS Airflow (Amazon MWAA), so I imported the following library in my DAG file from airflow.contrib.operators.ssh_operator import SSHOperator It seems...

Airflow: how to use trigger parameters in functions

We are using Airflow's KubernetesPodOperator for our data pipelines. What we would like to add is the option to pass in parameters via the UI. We currently use it in a way that we have different...

Amazon MWAA can't find custom modules

I am setting up an AWS MWAA instance. I have custom operators which themselves reference other python files. I followed the directory structure suggested here (by astronomer.io) and am able to...

How to invoke a cloud function from google cloud composer?

For a requirement I want to call/invoke a cloud function from inside a cloud composer pipeline but I cant find much info on it, I tried using SimpleHTTP airflow operator but I get this...

Cannot turn on Airflow's Smart Sensors feature (custom sensor)

I am trying to use Airflow's Smart Sensors feature on a custom sensor operator (i.e. subclass of BaseSensorOperator). The documentation on this feature is pretty sparse right now. The shard jobs...