Airflow (MWAA) to Snowflake, SNS, and Glue

In a recent project I had an opportunity to work with Amazon Managed Workflows for Apache Airflow (MWAA) for the first time and decided to document some of the various integrations I needed to perform. In some ways, the process for integration became much easier compared to working with a standalone Airflow environment, especially when integrating with other AWS services. In other ways, it became more complicated and incorporated more workarounds when working with MWAA, especially with DBT. In this post I will discuss the steps to incorporate Snowflake, AWS SNS, and AWS Glue in the Airflow DAG, using MWAA. In a follow-up post I will be covering various ways to integrate MWAA with DBT.

Airflow (MWAA)

MWAA allows a developer the ability to manage various workflows within their AWS environment without the hassle of setting up Airflow and maintaining the infrastructure over time.

The service, just like the name implies, is fully managed by AWS and after going through the process of setting up a new designated S3 Bucket, various directories where your DAGs, Plugins, and requirements files will exist, along with selecting various options including the Airflow version, security groups, environment class, and adding any additional Airflow-specific configurations, your environment will be built, deployed, and easily accessed via AWS.

Airflow to Snowflake

In order to set up the connection from MWAA to Snowflake, the Snowflake provider needs to be present when the Airflow UI is accessed from AWS. In order to make sure the Snowflake option is available three packages need to be added to the “requirements.txt” file that is being used for the MWAA configuration, located in the designated Airflow S3 bucket. The three python packages are:

  • apache-airflow-providers-snowflake==3.0.0

  • snowflake-sqlalchemy==1.3.4

  • snowflake-connector-python==2.7.8

After the new version of the “requirements.txt” file has been uploaded, the new version needs to be updated in the MWAA environment. In order to update the version go to Amazon MWAA >> Environments. Select the environment being used and select “Edit”. Then select the most recent version of the "requirements.txt" file.

test

Then click “next” twice and “save”. After about 15 mins the MWAA environment would have finished updating and the Airflow UI can now be opened. Once the Airflow UI is open go to Admin >> Connections and create a new connection, with the connection type as “Snowflake”.

test

Then add in the related fields in order to establish the connection to the Snowflake account. You can now utilize this connection in your Airflow DAG.  For example,the “SnowflakeHook” was used in order to retrieve a result of a query, then completed and xcom_push to use that result in a separate task in Airflow.

The new Snowflake connection could also be used for the “SnowflakeOperator”,although in this case the transformations in Snowflake will be facilitated using DBT, so only a “SnowflakeHook” was necessary for this example.

def new_record_count(ti,**context):
  dwh_hook = SnowflakeHook(snowflake_conn_id="snowflake_conn")
  result = dwh_hook.get_first(new_records_query[0])
  logging.info(f"Number of new records in RAW_CUSTOMER_TRANSACTIONS  - {result[0]}")

  ti.xcom_push(key = 'new_record_count', value = int(result[0]))

Airflow to AWS Glue

In order to allow MWAA to access and initiate a Glue job the first step is to ensure that the MWAA execution role has the proper access.To complete this step select the MWAA environment that is being used and click on the execution role under Permissions.

test

 This will take you to your IAM for the MWAA execution role. Then go to “Add Permissions >> “Attach policies”, search for the "AWSGlueServiceRole" and add the policy to the execution role. Now the "AwsGlueJobOperator" can be easily incorporated in the DAG and used to specify the existing Glue job to be run. In addition to specifying the name of the Glue job, include the additional fields “region_name”, “iam_role_name”, and “num_of_dpus”.

aws_glue = AwsGlueJobOperator(
                        task_id = "pydeequ_glue_job"
                        ,job_name = "Pydeequ_Glue_Job"
                        ,region_name = "us-west-2"
                        ,iam_role_name = "AWSGlueServiceRoleDefault-Dev"
                        ,num_of_dpus=10
   )

After running the DAG, the Airflow log can be checked and as long as the Glue job ran successfully it should exit with a 0 status and the run state “SUCCEEDED”.

test

Airflow to AWS SNS

This section will cover how to set up a connection to AWS SNS in order to send notifications to users that are subscribed to a SNS topic. The topic that was used for this section is ”airflow-test”. This topic allows any amazon resource to publish, but is limited to only allow users with a whitelisted domain to be subscribed. There are various ways to incorporate the "SnsPublishOperator" and in this section two different methods will be demonstrated. 

Method 1

In order to ensure that the message being published goes to the correct topic the topic ARN will need to be copied and needs to be set to the “target_arn” parameter in the "SnsPublishOperator".

Using the SnsPublishOperator in a function provides the opportunity to use the function for the “on_failure_callback” parameter to automatically execute the function in the event that any of the tasks fail. This also allows us to inherit the context of the task and use any of the stored values in the message that are related to the task at the time of failure. In this example the value “task_instance_key_str” was pulled from the context.

sns_topic_arn = {your_arn_string_here}

def on_failure(context):
    op = SnsPublishOperator(
        task_id='dag_failure'
        ,target_arn=sns_topic_arn
        ,subject="DAG FAILED"
        ,message=f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}"
    )
    op.execute(context)

In order to make sure that this function is used as a default parameter for all tasks, this needs to be set within the “default_args” when initializing the DAG. This will now send emails to users that are subscribed to the topic any time there is a failure in any of the tasks of the DAG.

args = {
    "owner": "airflow"
    ,"provide_context": True
    ,"on_failure_callback": on_failure
}

dag = DAG(
    dag_id="snowflake_dbt_customer_orders"
    ,start_date=datetime(2022,6,20)
    ,schedule_interval="*/5 * * * *"
    ,catchup=False
    ,default_args=args
)

Method 2

The second method covers how to incorporate the "SnsPublishOperator" simply as a Airflow task. You will need the topic ARN as before and will need to ensure that the “target_arn” parameter is set to that value. This operator can be incredibly valuable when there needs to be intermittent updates to the users with the current progress of the pipeline, especially if there are multiple branches in the pipeline for various scenarios.

sns_test = SnsPublishOperator(
                    task_id='sns_test',
                    target_arn=sns_topic_arn,
                    subject="TASK_SNS_TEST",
                    message="This is a sample message for testing"
)

Conclusion

I hope this has been helpful to understand some different ways of utilizing the MWAA service and incorporating it into a pipeline. In the next post we will cover various ways of initiating a DBT job within a MWAA DAG. One where the DBT project is stored in the same S3 directory as the DAGs and the other includes two different ways where you can connect to an EC2 instance where DBT is installed and the DBT projects reside and how to subsequently initiate the project via SSH.

Collin Stoffel

Senior Data Engineer

Published: Aug 18, 2022

Updated: Jan 19, 20237 min read

AWS Certified Team

Tech Holding Team is a AWS Certified & validates cloud expertise to help professionals highlight in-demand skills and organizations build effective, innovative teams for cloud initiatives using AWS.

By using this site, you agree to thePrivacy Policy.