Airflow Xcom Exclusive __exclusive__ [NEW]
Mastering Airflow XComs: The Exclusive Guide to Advanced Data Sharing
The TaskFlow API drastically reduces boilerplate code while maintaining identical underlying database operations. 3. Custom XCom Backends: Breaking the Database Constraint
AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_THRESHOLD=0 airflow xcom exclusive
If you are using external storage, ensure your custom XCom backend or your retention policies are configured to clean up data after DAG runs are complete. Overriding the clear method in your custom backend is an excellent way to manage this lifecycle.
This pattern keeps XComs small while enabling large data transfers. Use S3, GCS, Azure Blob Storage, or a shared file system mounted across all Airflow workers. Mastering Airflow XComs: The Exclusive Guide to Advanced
Apache Airflow XComs should be reserved exclusively for small metadata pointers, such as S3 keys or row IDs, to prevent metadata database bottlenecks. For large data transfers, utilizing custom XCom backends for object storage like S3 or GCS is recommended to optimize DAG performance. Read more on best practices at Astronomer Documentation Apache Airflow XComs — Airflow 3.2.0 Documentation
However, XCom is often misunderstood and misused. This comprehensive guide explores the best practices for using XCom in your Airflow pipelines, focusing on the philosophy of keeping XCom messages lightweight, manageable, and efficient—what we call . Overriding the clear method in your custom backend
import json import uuid from airflow.models.xcom import BaseXCom from airflow.providers.amazon.aws.hooks.s3 import S3Hook class S3XComBackend(BaseXCom): PREFIX = "s3://" BUCKET_NAME = "my-company-airflow-xcom-bucket" @staticmethod def serialize_value(value, **kwargs): # We only want to intercept complex data; plain strings/dicts can use default if needed, # but for safety, we push all substantial data to S3. s3_hook = S3Hook(aws_conn_id="aws_default") key = f"xcom/uuid.uuid4().json" # Serialize your data to string/bytes string_data = json.dumps(value) # Upload to S3 s3_hook.load_string( string_data, key=key, bucket_name=S3XComBackend.BUCKET_NAME, replace=True ) # This return value is what gets written to the Airflow Metadata DB return BaseXCom.serialize_value(f"S3XComBackend.PREFIXS3XComBackend.BUCKET_NAME/key") @staticmethod def deserialize_value(result): # Extract the DB stored string stored_uri = BaseXCom.deserialize_value(result) # Check if it points to our S3 backend if isinstance(stored_uri, str) and stored_uri.startswith(S3XComBackend.PREFIX): s3_hook = S3Hook(aws_conn_id="aws_default") # Parse the bucket and key out of the URI path = stored_uri.replace(S3XComBackend.PREFIX, "") bucket, key = path.split("/", 1) # Download file file_content = s3_hook.read_key(key, bucket_name=bucket) return json.loads(file_content) return stored_uri Use code with caution. Step 2: Configure Airflow to Use Your Backend
There are two primary ways to interact with XComs: the traditional method and the TaskFlow API (introduced in Airflow 2.0). 1. The TaskFlow API (Recommended)
Any value returned by an Airflow operator’s execute() method—or a TaskFlow API Python function—is automatically pushed to an XCom with the default key return_value .