Enabling Apache Airflow to copy large S3 objects

If you’re trying to use Apache Airflow to copy large objects in S3, you might have encountered issues where S3 complains about you sending an InvalidRequest. We will fix that in this post by writing a custom operator to handle the underlying problem. Before we do that, let’s first understand where this issue originated.

In case you need a primer on Airflow, check out our post Understanding Apache Airflow on AWS, but if you do - what are you doing here?

The error message that led you to this post looks something like this:

An error occurred (InvalidRequest) when calling the CopyObject operation: The specified copy source is larger than the maximum allowable size for a copy source […]

And the problematic code that caused it may look something like that. Although this code is not problematic in itself. For smaller objects, it works perfectly fine.

from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator

op = S3CopyObjectOperator(
	task_id="copy_source_file",
	source_bucket_name="source-bucket",
	source_bucket_key="path/to_large_object",
	dest_bucket_name="destination-bucket",
	dest_bucket_key="save-me-here",
)

The name of the operator indicates the underlying API call, which is CopyObject. In general, CopyObject is pretty robust, but there’s a failure mode that’s not immediately obvious - it is limited to copying objects up to 5GB in a single operation. That means attempting to copy any object larger than that leads to the aforementioned error message.

This is mentioned in the first big info box in the docs, but who has time to read those, right? It even includes a link to the proposed solution, which means using multipart uploads to achieve this. Before you groan - no, you won’t have to handle splitting up stuff into byte ranges and managing the individual copy operations. All of that comes included in your installation of boto3 in the form of the S3 client’s copy operation.

The copy method uses the underlying s3transfer library that ships with boto3 and transparently manages the multipart uploads. I should note here, that despite the name being “multipart upload”, we don’t need to download the object first before uploading it again, it uses the UploadPartCopy API, which keeps the data internal to S3.

Using this is as simple as subclassing the existing S3CopyObjectOperator and overwriting the execute method, which Airflow calls to perform the actual operation. In the code below, you can see that I try to delegate the API call to the parent class and only become active if there’s an exception.

from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator
from botocore.exceptions import ClientError

class S3CopyOperator(S3CopyObjectOperator):
    """
    An extension of the S3CopyObjectOperator that can copy
    objects larger than 5GB.
    """

    def execute(self, context: Context):

        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)

        try:
            super().execute(context)
        except ClientError as err:

            if err.response["Error"]["Code"] == "InvalidRequest":
                # The response when we try to copy more than 5GB in one request.
                s3_hook.conn.copy(
                    CopySource={
                        "Bucket": self.source_bucket_name,
                        "Key": self.source_bucket_key,
                    },
                    Bucket=self.dest_bucket_name,
                    Key=self.dest_bucket_key,
                )
            else:
                raise err

The error code InvalidRequest is used to denote this specific error, although confusingly it’s not listed in the API docs for CopyObject but as part of the generic error responses documentation. If we get this error, we use the conn object of an S3Hook, which is basically a wrapper around boto3 to call the appropriate copy method with the parameters from our instance. You can use it like this:

op = S3CopyOperator(
	task_id="copy_source_file",
	source_bucket_name="source-bucket",
	source_bucket_key="path/to_large_object",
	dest_bucket_name="destination-bucket",
	dest_bucket_key="save-me-here",
)

This approach means the API doesn’t change, i.e., you can just replace the S3CopyObjectOperator instances with S3CopyOperator instances. Additionally, we only perform the extra work of doing the multipart upload when the simpler method is insufficient. The trade-off is that we’re inefficient if almost every object is larger than 5GB because we’re doing a “useless” API call first. As usual, it depends. A similar approach has been discussed in this Github Issue in the Airflow repository.

— Maurice


Photo by K. Mitch Hodge on Unsplash

Similar Posts You Might Enjoy

Building Data Aggregation Pipelines using Apache Airflow and Athena

Business insights are frequently generated from aggregated data, like daily sales per market segment over time. In this blog post we’ll use Apache Airflow to build a data aggregation pipeline that utilizes Amazon Athena for the heavy lifting. We’ll cover best practices that you should follow to build a production-ready system. - by Maurice Borgmeier

Making the TPC-H dataset available in Athena using Airflow

The TPC-H dataset is commonly used to benchmark data warehouses or, more generally, decision support systems. It describes a typical e-commerce workload and includes benchmark queries to enable performance comparison between different data warehouses. I think the dataset is also useful to teach building different kinds of ETL or analytics workflows, so I decided to explore ways of making it available in Amazon Athena. - by Maurice Borgmeier

Push-Down-Predicates in Parquet and how to use them to reduce IOPS while reading from S3

Working with datasets in pandas will almost inevitably bring you to the point where your dataset doesn’t fit into memory. Especially parquet is notorious for that since it’s so well compressed and tends to explode in size when read into a dataframe. Today we’ll explore ways to limit and filter the data you read using push-down-predicates. Additionally, we’ll see how you can do that efficiently with data stored in S3 and why using pure pyarrow can be several orders of magnitude more I/O-efficient than the plain pandas version. - by Maurice Borgmeier