Serverless Data Pipelines Made Easy with Prefect and AWS ECS Fargate

Serverless Data Pipelines Made Easy with Prefect and AWS ECS Fargate

Published
February 8, 2021
Tags
AWSPrefectPythonData EngineeringServerlessDocker & KubernetesDevOps
image

Photo by Burst from Pexels

Even though there are so many workflow orchestration solutions and cloud services for building data workloads, it’s hard to find one which is actually pleasant to use and allows you to get started quickly. One of the most popular tools for building data pipelines in Python is Prefect — a workflow management platform with a hybrid agent-based execution model.

What does a hybrid execution model entail? It means that even if you use the cloud orchestration platform (Prefect Cloud), you still own and manage your agents. In fact, Prefect has no direct access to your code or data. Instead, it only operates on metadata that you send to their API when registering agents and flows. This means, for one, that this platform can satisfy the stringent security & compliance requirements as the entire workflow execution happens within compute resources of your choice. On the other hand, it allows for an incredible amount of flexibility. Your agent could be a Kubernetes cluster, an ECS Fargate cluster on AWS, any compute server on-prem or in the cloud, or a mix of all of them. Even your laptop can be registered as a Prefect agent.

Although the hybrid execution model has lots of benefits, it may be challenging to configure your execution layer properly. In this article, we’ll look at how to set up Prefect with an AWS ECS Fargate agent and S3 storage, which allows for a fully serverless NoOps execution environment.

To avoid confusion: EKS on Fargate is a way of spinning up a Kubernetes cluster (EKS) that can make use of the serverless data plane provided by Fargate. In contrast, in this article, we will use the vanilla ECS Fargate without Kubernetes.

Prefect Cloud setup

Create your Prefect account

First, sign up for a free Starter account on https://cloud.prefect.io/.

Install Prefect

The following command will install Prefect with AWS subpackage (instead of [aws] you could use [all_extras] if you want to install all Prefect extensions to external systems):

pip install "prefect[aws]"

To make sure that we use Prefect Cloud as the orchestration layer rather than the open-source Prefect Server, switch the context to “cloud”:

prefect backend cloud

Create a personal access token to authenticate with Prefect Cloud

After you registered for a free account, you need to create a Personal Access Token to authenticate your local terminal with Prefect Cloud. This will allow registering your flows (i.e., your ETL & ML data pipelines) to the Prefect Cloud API directly from your local machine. From the main menu, select: User → Personal Access Token → + CREATE TOKEN button.

Prefect Cloud UI — image by the author

image

Choose some meaningful name ex. YourAuthTokenToPrefectCloud. Then copy the token, and within your terminal, run the following command:

prefect auth login -t <YourAuthTokenToPrefectCloud>

The authentication step allows your laptop to communicate with the Prefect Cloud API to register your agents and flows. Then, the API takes care of scheduling your workflows, and your agent is constantly polling the API to know whether it needs to trigger some flow runs. Once the flow is scheduled to be triggered, the agent is informed about that and starts the execution.

In the next step, we will set up an ECS Fargate cluster on AWS to be used as our execution layer. On top of that, we need to have an agent that is polling the API for work (i.e., flow execution). For that purpose, we’ll create an EC2 instance with a background process — you could create an ECS service for that instead.

Before diving into AWS settings, let’s create a RUNNER token that we will use to authenticate our agent with Prefect Cloud.

prefect auth create-token -n DemoFargateAgentToken -s RUNNER

You can replace DemoFargateAgentToken with any name you like. It’s important that you set the scope (-s flag) to RUNNER and that you keep the token somewhere safe. We will need it in the next section.

Creating AWS resources

As mentioned above, we need:

  • a t2.micro EC2 instance to which we’ll deploy our agent — the whole purpose of this instance is to have a single process that is polling the Prefect API for flow runs,
  • an IAM role that we’ll assign to our ECS Fargate task so that our containers are allowed to access the S3 bucket containing our flow code. Additionally, this will allow us to cache intermediate results of our flow to S3,
  • an S3 bucket as our default storage for flow code,
  • an ECS Fargate cluster that will spin up compute resources for our containerized data pipelines on-demand, i.e., serverless. This way, we have no idle compute resources, and we can create an on-demand Dask cluster to parallelize our Python code across several Fargate containers when needed.

Creating an IAM role for our ECS tasks (result: task role ARN)

In this step, we create a task role that will give our ECS task (you can think of it as a container for your flow) permission to access objects in S3.

Ideally, you should follow the least-privilege principle. For development purposes, you can assign both full access permissions for S3 and for ECR so that your ECS containers can communicate with objects stored in S3 as well as with ECR container images.

image

Creating an IAM role for ECS tasks (part 1) — image by author

image

Creating an IAM role for ECS tasks (part 2) — image by author

Creating an S3 bucket to store our flow code

Remember that your bucket name must be globally unique (like a website’s domain name). You can create a bucket in a single CLI command or from a management console.

aws s3 mb s3://yourbucket

Creating an EC2 instance for our agent process

Select the t2-micro instance type (should be more than enough for a single process) with Amazon Linux 2 AMI.

EC2 launch wizard — image by the author

image

Then follow the defaults until you reach the Security Group section, in which you need to allow your IP address SSH access to the instance.

EC2 launch wizard— image by the author

image

After that, click Review and Launch, and proceed to create a key-pair (a .pem file) so that you can use it to SSH to the instance.

image

Create a key pair (.pem file) to SSH to the instance — image by the author

SSH to the EC2 instance to configure the ECS Prefect agent

Once the instance is in a running state, you can connect directly from your browser or by using the SSH client. Now, we need to install Python 3 on our EC2 instance:

Let’s check the version of Python and packages so that you know exactly what I’ve been using for this EC2 setup:

Our ECS Fargate agent will need several environment variables. Let’s set them using export:

Line 1 above refers to the RUNNER token we created in the last section (don't confuse it with the Personal Access Token that we use to authorize our machine to register flows to the Prefect Cloud API). To find the subnet IDs (line 5), go to the VPC section and copy the IDs from your default VPC. Of course, you can create your own VPC if you prefer. For development purposes, the default VPC is fine.

Also, if you don’t have the ecsTaskExecutionRole, you can create it by following the “create ECS cluster” wizard in the AWS management console.

Note: executionRoleArn and the task_role_arn are different. While the execution role gives your agent permission to execute containers on ECS (ex. pulling the image from ECR, storing logs in CloudWatch), the task role (created as part of the section: Creating an IAM role for our ECS tasks) gives your flow’s container permission to access other AWS resources such as the S3 bucket that holds your Prefect flow’s metadata or to some other AWS services such as SQS, SNS, or DynamoDB.

Creating a Fargate cluster

Let’s now use aws configure to configure the AWS CLI so that we can create an ECS Fargate cluster directly from this EC2 instance. If you don’t have an AWS key pair, you should create an IAM user first that has permission to create an ECS cluster.

To make this setup easy, we use the standard ECS configuration that creates a cluster with the default name called “default” and compatibility “Fargate” (i.e., serverless data plane).

aws ecs create-cluster

That’s it! Our cluster has been created. All that is left to do is to start a Prefect agent process that will poll the Prefect Cloud API for flow runs and will submit the work to the serverless ECS container platform on AWS. We can do that in a single CLI command:

Note that:

  • The ECSTaskS3Role is the IAM role that we created earlier (section: Creating an IAM role for our ECS tasks) that will be assumed by the ECS tasks (containers) in order to have permissions to access S3,
  • We added labels “fargate-dev” and “s3-flow-storage” — those labels are important as this way we are “telling” Prefect Cloud API which agent should receive flow runs for the specific flow (i.e., data pipeline). Since our agent gets those labels assigned, any flow that has a RunConfig with matching labels will be orchestrated by that agent. It will become clearer once we deploy an example flow.
  • We added > /dev/null 2>&1 & at the end to ensure that the command runs in the background (daemon process) and to suppress logging to stdout and stderr. Let me know in the comments or in a private message if you know a better way to run this command as a background process.

If we go to the UI, we can see that our ECS agent is available and ready to execute scheduled flow runs:

image

ECS agent is visible in the UI — image by the author

Since our ECS agent on EC2 runs in a background process, you can disconnect from the SSH session. If at some point, your Prefect agent should no longer be healthy, you can always SSH to the EC2 instance again, kill the process, and then start the agent again. The nice thing about Prefect Cloud is that you can see the health of your agents directly in the UI — no need to implement any extra monitoring layer to track this information.

Deploying our first serverless flow

ECSRun configuration & Prefect Storage

In the current Prefect version (0.14.x), you need to define a run configuration to determine which agent will pick up your flow code for execution. In the code below, you can see that ECSRun() takes a label s3-flow-storage to make sure that this flow will only be picked up by an ECS agent with this corresponding label. It works similarly to the labels and selectors in Kubernetes.

In the example below, we simply construct a Pandas dataframe and print a Pandas version to show you that you can use your favorite Python packages and run any Python function, as long as you call it with a @task decorator. Then, we also pass the run configuration and the information about where to put, and from where to retrieve our flow’s metadata (Prefect Storage) — here, it’s an S3 bucket.

The image used as a base image for your flow, could be one of Prefect’s images from Dockerhub, ex. prefecthq/prefect:0.14.1. We used our own custom Docker image, which is simply a copy of Prefect’s base image with additional Pydata packages installed on top to have all necessary components to build an ETL or ML data flow.

Here is the Dockerfile we used for the publicly available image anisienia/prefect-pydata:

image

You can obviously specify the exact package versions and include any custom dependencies you may need in your image. Also, you could use ECR instead of Dockerhub. Storing public Dockerhub images is 100% free — if you have no credentials and no custom company’s business logic inside (just Python package dependencies), you can save some AWS costs this way.

If you want to learn more about how to build a Dockerhub container image, here is a recipe for you:

Registering a flow for ECS agent

Here is an example flow we can use:

image

Further notes & additional information:

  • The task_role_arn is the ARN of an IAM role that we created as part of the section: Creating an IAM role for our ECS tasks.
  • Inside ECSRun() we defined a label s3-flow-storage (line 8) so that our ECS agent, which got this label attached, can pick up this flow for execution. This mechanism allows orchestrating the flows across the proper agents for execution. Note that the labels between the agent and the flow must match exactly. This means that ALL labels from the agent must match ALL labels from the flow.
  • Inside ECSRun() we also defined a custom Docker image (here: our image from Dockerhub on line 10) to ensure that our flow environment includes all Python packages with the exact versions that we need.
  • The command flow.register() requires to specify a project name. You can create a project in the UI or use the command: prefect create project <ProjectName>.
image

The logs from the flow example shown above — image by the author

Conclusion

In this article, we looked at how to create an ECS Fargate agent with Prefect. We created an ECS cluster and an EC2 instance with a Prefect ECS agent running as a background process that continuously polls the Prefect Cloud API for new flow runs.

We then ran an example flow that used our Docker image as a basis to reflect custom Python package dependencies. When registering our flow, the flow’s metadata is sent to the specified S3 bucket (line 12 in the code snippet). The same flow’s metadata is retrieved by the ECS Fargate agent every time the Prefect Cloud API schedules and triggers this flow to run.

Note: If you have any issues with the agent setup, there is a chance that somebody has already experienced the same problem and shared their solution in the Prefect community Slack: https://prefect-community.slack.com/.

Thank you for reading!

References & additional resources: