AuthorWrite something about yourself. No need to be fancy, just an overview. ArchivesCategories |
Back to Blog
Airflow xcom pythonoperator8/15/2023 One of them is that only JSON-serializable values can be stored in XComs, this prevents us from storing custom classes instances.In this post, we will build on our work in Part II: Foundational ETL Code and Part III: Getting Started with Airflow by converting our ETL pipeline into a Directed Acyclic Graph (DAG), which comprises the tasks and dependencies for the pipeline on Airflow. 4) XComs custom backendsĮven though XComs is what powers Taskflow up, it has it’s own limitations. On the other hand, extract_from_db is still using the old API, and we can access the values it saves into XComs by accessing its output property without executing any of the XComs functions. | output is a property available for every operator, this means we can use it for every existing task in our DAGs.Īs extract_from_api is using the new Taskflow API we can get rid of the XComs functions it was using and just return its output and assign it to ext1_data variable, which then is passed as an argument to the transform task. Return with DAG ( '' ) as dag : ext1_data = extract_from_api () ext2 = PythonOperator ( task_id = 'extract_from_db', python_callable = extract_from_db, ) transform ( ext1_data, ext2. Using these decorators makes the code more intuitive and make easier to task () def extract_from_api (): # TODO: Fetch the data from an API and store it where it can be This is done by encapsulating in decorators all the boilerplate needed in the past. Taskflow simplifies how a DAG and its tasks are declared. And this was an example imagine how much of this code there would be in a real-life pipeline! The Taskflow way, DAG definition using Taskflow Without Taskflow, we ended up writing a lot of repetitive code. Pass with DAG ( 'awesome_etl_v1' ) as dag : ext1 = PythonOperator ( task_id = 'extract_from_api', python_callable = extract_from_api, provide_context = True, ) ext2 = PythonOperator ( task_id = 'extract_from_db', python_callable = extract_from_api, provide_context = True, ) trn = PythonOperator ( task_id = 'transform', python_callable = extract_from_api, provide_context = True, ) load = PythonOperator ( task_id = 'load', python_callable = extract_from_api, provide_context = True, ) << trn << load Pass def load ( ** kwargs ): # TODO: Read the transformed data and save it where it can be analyzed later # then store it where it can be read later Pass def transform ( ** kwargs ): # TODO: Read the data extracted, transform it to fit our needs and Pass def extract_from_db ( ** kwargs ): # TODO: Fetch the data from our database and store it where it can Traditionally, we’d write our DAGs following a structure similar to the one shown next:ĭef extract_from_api ( ** kwargs ): # TODO: Fetch the data from an API and store it where it can be The old way, DAG Definition before Taskflowįor our ETL process, we need to implement some tasks to get the data out of its source, transform it, and load it into the persistent storage. You can find the complete code on GitHub following this link. Finally, we’d store the transformed data in persistent storage to be consumed later by the analysis team. We want to process the information we’ve recovered from those two sources, unify the data’s schema, and keep relevant information for us. The data comes from two different sources: a third-party CRM via a RESTfull API and an owned relational database. Imagine you need to write an ETL process with Airflow to process sales information about our customers. It seems pretty straightforward, right? But it also comes with several tools to make our life easier, primarily if most of your DAGs are written as Python tasks. The Taskflow API is an abstraction built on top of XComs that allows developers to send messages between tasks in a DAG (Directed Acyclic Graph). This blog post will show why you should too. After considering whether we should adopt this new feature, we finally decided to make Taskflow a part of our daily work. And we especially had a good time working with its new Taskflow API. Since then we’ve had the opportunity to experience most of them. Airflow introduced several changes in its second version a year ago.
0 Comments
Read More
Leave a Reply. |