The Apache Airflow Scheduler and the Workers look for custom plugins during startup on the AWS-managed Fargate container for your environment at /usr/local/airflow/plugins/*.
• Directory structure. The directory structure (at /*) is based on the contents of your plugins.zip file. For example, if your plugins.zip contains the operators directory as a top-level directory, then the directory will be extracted to /usr/local/airflow/plugins/operators on your environment.
Examples of custom plugins
• Size limit. We recommend a plugins.zip file less than than 1 GB. The larger the size of a
plugins.zip file, the longer the startup time on an environment. Although Amazon MWAA doesn't limit the size of a plugins.zip file explicitly, if dependencies can't be installed within ten minutes, the Fargate service will time-out and attempt to rollback the environment to a stable state.
NoteFor environments using Apache Airflow v1.10.12 or v2.0.2, Amazon MWAA limits outbound traffic on the Apache Airflow web server, and does not allow you to install plugins nor Python dependencies directly on the web server. Starting, with Apache Airflow v2.2.2, you can install plugins and dependencies directly on the web server.
Examples of custom plugins
The following section uses sample code in the Apache Airflow reference guide to show how to structure your local development environment.
Example using a flat directory structure in plugins.zip
Apache Airflow v2
The following example shows a plugins.zip file with a flat directory structure for Apache Airflow v2.
Example flat directory with PythonVirtualenvOperator plugins.zip The following example shows the top-level tree of a plugins.zip file for the
PythonVirtualenvOperator custom plugin in Creating a custom plugin for Apache Airflow PythonVirtualenvOperator (p. 205).
### virtual_python_plugin.py
Example plugins/virtual_python_plugin.py
The following example shows the PythonVirtualenvOperator custom plugin.
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from airflow.plugins_manager import AirflowPlugin import airflow.utils.python_virtualenv
from typing import List
def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]:
Examples of custom plugins
cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/
virtualenv', tmp_dir]
if system_site_packages:
cmd.append('--system-site-packages') if python_bin is not None:
cmd.append(f'--python={python_bin}') return cmd
airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd class VirtualPythonPlugin(AirflowPlugin):
name = 'virtual_python_plugin'
Apache Airflow v1
The following example shows a plugins.zip file with a flat directory structure for Apache Airflow v1.
Example flat directory with PythonVirtualenvOperator plugins.zip The following example shows the top-level tree of a plugins.zip file for the
PythonVirtualenvOperator custom plugin in Creating a custom plugin for Apache Airflow PythonVirtualenvOperator (p. 205).
### virtual_python_plugin.py
Example plugins/virtual_python_plugin.py
The following example shows the PythonVirtualenvOperator custom plugin.
from airflow.plugins_manager import AirflowPlugin
from airflow.operators.python_operator import PythonVirtualenvOperator def _generate_virtualenv_cmd(self, tmp_dir):
cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/
virtualenv', tmp_dir]
if self.system_site_packages:
cmd.append('--system-site-packages') if self.python_version is not None:
cmd.append('--python=python{}'.format(self.python_version)) return cmd
PythonVirtualenvOperator._generate_virtualenv_cmd=_generate_virtualenv_cmd class EnvVarPlugin(AirflowPlugin):
name = 'virtual_python_plugin'
Example using a nested directory structure in plugins.zip
Apache Airflow v2
The following example shows a plugins.zip file with separate directories for hooks, operators, and a sensors directory for Apache Airflow v2.
Example plugins.zip
__init__.py
my_airflow_plugin.py
Examples of custom plugins
The following example shows the import statements in the DAG (DAGs folder) that uses the custom plugins.
Example dags/your_dag.py
from airflow import DAG
from datetime import datetime, timedelta
from operators.my_airflow_operator import MyOperator
'start_date': datetime(2018, 1, 1), 'email_on_failure': False,
default_args=default_args) as dag:
sens = MySensor(
hello_task = HelloOperator(task_id='sample-task', name='foo_bar')
sens >> op >> hello_task
Example plugins/my_airflow_plugin.py
name = 'my_airflow_plugin'
Examples of custom plugins hooks = [MyHook]
operators = [MyOperator]
sensors = [MySensor]
The following examples show each of the import statements needed in the custom plugin files.
Example hooks/my_airflow_hook.py
from airflow.hooks.base import BaseHook
class MyHook(BaseHook):
def my_method(self):
print("Hello World")
Example sensors/my_airflow_sensor.py
from airflow.sensors.base import BaseSensorOperator from airflow.utils.decorators import apply_defaults
class MySensor(BaseSensorOperator):
@apply_defaults def __init__(self, *args, **kwargs):
super(MySensor, self).__init__(*args, **kwargs) def poke(self, context):
return True
Example operators/my_airflow_operator.py
from airflow.operators.bash import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.my_airflow_hook import MyHook
class MyOperator(BaseOperator):
@apply_defaults def __init__(self, my_field, *args, **kwargs):
super(MyOperator, self).__init__(*args, **kwargs) self.my_field = my_field
def execute(self, context):
hook = MyHook('my_conn') hook.my_method()
Example operators/hello_operator.py
from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults class HelloOperator(BaseOperator):
Examples of custom plugins
def execute(self, context):
message = "Hello {}".format(self.name) print(message)
return message
Follow the steps in Testing custom plugins using the Amazon MWAA CLI utility (p. 112), and then Creating a plugins.zip file (p. 113) to zip the contents within your plugins directory. For example, cd plugins.
Apache Airflow v1
The following example shows a plugins.zip file with separate directories for hooks, operators, and a sensors directory for Apache Airflow v1.10.12.
Example plugins.zip
|-- my_airflow_operator.py |-- hello_operator.py sensors/
|-- __init__.py
|-- my_airflow_sensor.py
The following example shows the import statements in the DAG (DAGs folder) that uses the custom plugins.
Example dags/your_dag.py
from airflow import DAG
from datetime import datetime, timedelta from operators.my_operator import MyOperator
'start_date': datetime(2018, 1, 1), 'email_on_failure': False,
Examples of custom plugins schedule_interval='@once',
default_args=default_args) as dag:
sens = MySensor(
task_id='taskA' )
op = MyOperator(
task_id='taskB', my_field='some text' )
hello_task = HelloOperator(task_id='sample-task', name='foo_bar')
sens >> op >> hello_task
Example plugins/my_airflow_plugin.py
from airflow.plugins_manager import AirflowPlugin from hooks.my_airflow_hook import *
from operators.my_airflow_operator import * from utils.my_utils import *
class PluginName(AirflowPlugin):
name = 'my_airflow_plugin'
hooks = [MyHook]
operators = [MyOperator]
sensors = [MySensor]
The following examples show each of the import statements needed in the custom plugin files.
Example hooks/my_airflow_hook.py
from airflow.hooks.base_hook import BaseHook
class MyHook(BaseHook):
def my_method(self):
print("Hello World")
Example sensors/my_airflow_sensor.py
from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.utils.decorators import apply_defaults
class MySensor(BaseSensorOperator):
@apply_defaults def __init__(self, *args, **kwargs):
super(MySensor, self).__init__(*args, **kwargs) def poke(self, context):
return True