Put a Stop to Data Swamps With Event-Driven Data Testing

Put a Stop to Data Swamps With Event-Driven Data Testing

image

Data lakes used to have a bad reputation when it comes to data quality. In contrast to data warehouses, data doesn’t need to adhere to any predefined schema before we can load it in. Without proper testing and governance, your data lake can easily turn into a data swamp.

In this article, we’ll look at how to build automated data tests that will be executed any time new data is loaded to a data lake. We’ll also configure SNS-based alerting to get notified about data that deviates from our expectations.

Table of Contents

Python Libraries for Data Quality

There are so many tools for data profiling and data testing out there. Just to list some of them:

  • Pandas Profiling allows us to generate an HTML report showing quantile statistics, histograms, correlations, NULL value distribution, text analysis, categorical variables with high cardinality, and more.
  • dbt Tests let us validate the uniqueness, accepted values, NULL values, and build any custom data test to detect anomalies by using SQL queries.
  • Bulwark provides decorators for functions that return pandas DataFrames (e.g. @dc.HasNoNans()).
  • mobyDQ is a tool from Ubisoft to generate a GraphQL-based web application for data validation.
  • TensorFlow Data Validation detects anomalies in training and model serving data.

We’ll focus on the open source Python library for validating and profiling data called Great Expectations.

Using Great Expectations

The recommended way of using Great Expectations is to:

  • Install the package: pip install great_expectations.
  • Initialize a project: great_expectations --v3-api init.
  • Configure a connection to your data source (e.g. your data warehouse or flat files for pandas or Pyspark validation): great_expectations --v3-api datasource new.
  • Create an initial expectations suite either manually, interactively using a batch of data, or automatically using a built-in profiler: great_expectations --v3-api suite new.
  • Edit this expectations suite in a Jupyter notebook (great_expectations --v3-api suite edit suite_name) or directly by modifying a JSON file (great_expectations/expectations/<suite_name>.json).
  • Create a checkpoint mapping the expectations suite to a data_asset_name that is the actual data you want to test: great_expectations --v3-api checkpoint new checkpoint_name.
  • Run the validation process on a new batch of data: great_expectations --v3-api checkpoint run checkpoint_name.
  • Finally, figure out how to deploy it and run it on schedule. For example, by creating a Python script: great_expectations --v3-api checkpoint script suite_name.

If you want to implement all these steps, you can follow the official tutorial. The most important page of the entire documentation lists all the expectations you can use.

If you look at the bullet points above, you may notice that this setup is quite involved. And it doesn’t even cover how to package and deploy the code to make it production-ready, how to set up alerts on failure, how to host and share the data docs, or how to build a repeatable process around it for data testing in data pipelines.

Using Great Expectations for Event-Driven Data Testing

Let’s try to approach it in a more “Pythonic” way. We want to use Great Expectations on data stored in AWS S3. We want data tests to run automatically any time a new file arrives in S3. While Great Expectations provides a data source connector for Athena, it would require running the validation on an entire Athena table rather than validating only a specific batch of data loaded to a data lake. Thus, it would be harder to track down which S3 PUT operation caused anomalies in the respective data source.

As an alternative to the Athena data source, we could configure the expectations suite using pandas and flat files as a data source, but even then, the entire process seems a bit cumbersome.

Let’s look at how we can approach it only using Python — no configuration files. The ultimate goal is to deploy this script to AWS Lambda and configure the function to be triggered on each S3 PUT operation to the desired path in our S3 data lake.

Demo: Generating Time Series Data for Testing

We’ll start by generating an hourly time series with a deliberately chosen range of values. Generating a synthetic dataset will allow us to conveniently insert additional “bad values” and see if our data tests detect those anomalies. Here is an example dataset we will be using:

Example dataset — image by author
Example dataset — image by author

Implementing Data Tests Using Great Expectations

Which tests can we run for this data?

By and large, any potential anomalies can (and should) be tested. For instance, we can validate:

  • The order of columns.
  • The row count (i.e. the granularity of our time series). Since we are dealing with hourly time series data, we can expect 24 rows per day provided that no data is missing (and that there is no Daylight Saving Time!).
  • The presence of any potential NULL values.
  • Data types. The timestamp should be a datetime column, while value is an integer column.
  • Whether the range of values matches our expectations. In this example, it must be between 0 and 100.

How to implement data tests

Here is an example implementation of those tests in a single class. It allows you to run each data test individually as well as run all the tests at once. Additionally, the parse_data_test_result() method sends an SNS email alert on any failed data test.

To create an SNS topic for email alerts, you can run the code below. Then, follow the link from the AWS email to confirm your SNS email subscription.

How to run data tests locally

You probably want to test your data locally before moving on to production. The data tests from TimeseriesDataTestRunner can be executed on a local development machine. The code snippet below implements:

  • One happy-path test that will succeed because it generates data that matches our expectations.
  • Seven failing tests corresponding to the previously defined test cases. By deliberately generating skewed data, we can ensure that our tests are working correctly and detect data that deviates from our expectations.

When we run this locally, we should receive seven emails similar to this one:

 Email about failed data test — image by author
Email about failed data test — image by author

How to run data tests on AWS Lambda

The most efficient way to run those tests automatically is to build a Lambda function with an S3 PUT object event trigger. This way, any time a new file gets uploaded to the specified S3 location, the Lambda function will be automatically triggered to test our data.

To accomplish that, we need a Lambda function that will read the S3 key from the event metadata, read the uploaded file into a pandas DataFrame, and finally run the tests. Here is a simple implementation of that:

To build this Lambda function, we need:

  • requirements.txt:
  • Dockerfile:
  • A couple of shell commands to build and push a Docker image to ECR:

Note that timeseries_data_test_runner.py, timeseries_data_generator.py, and lambda.py are located in the src folder. This is important if you want to use the Dockerfile shown above. The project structure looks as follows (the last file will be explained in the next section):

|-- Dockerfile
|-- requirements.txt
|-- src
|   |-- lambda.py
|   |-- timeseries_data_generator.py
|   `-- timeseries_data_test_runner.py
`-- upload_new_timeseries_to_s3.py

Now all that is left to do is to:

  1. Create your Lambda function.
  2. Adjust the memory and timeout settings based on your use case (the defaults are too small for data testing — you can allocate up to 10GB of memory to your Lambda function and set a timeout of up to 15 minutes).
  3. Configure the S3 trigger.
  4. Set the IAM permissions for the Lambda function so that it can read the files from S3 and trigger the SNS alert.
  5. Test the process by uploading new files to your data lake.
image

Creating your Lambda function — image by author

image

Changing the memory and timeout settings— image by author

image

Setting the S3 trigger — image by author

image
image

Setting the IAM permissions — image by author

Testing the AWS process by uploading new files to a data lake

Similarly to how we ran data tests locally, we can now start uploading our data to S3 and see the alerts being triggered due to failed data tests on AWS.

If you execute all the tests above, you should receive seven emails from AWS similar to the ones below:

image

Results of failed data tests being triggered automatically on each upload to S3 — image by author

The biggest advantage of our custom alerts is that they show exactly which S3 file upload caused the specific data test to fail. This is something that seems to be hard to accomplish when using the default implementation of Great Expectations.

Additionally, using purely pandas-based expectations makes testing easier and seems to be more “Pythonic” than working with configuration files and Jupyter notebooks. However, the downside of the presented approach is the lack of data docs. If you care about those, have a look at the official Great Expectations tutorial.

How To Monitor This Process

As you can imagine, if you build data tests in Lambda for a large number of S3 datasets, monitoring can become overwhelming. Dashbird is a platform that addresses this problem by providing dashboards and well-formatted logs for monitoring, alerting, and observability of serverless resources on AWS. The image below shows how Dashbird can help detect issues and bottlenecks in your serverless data tests:

image

Monitoring of serverless functions in Dashbird — image by the author

image

Conclusion

In this article, we looked at various Python libraries for data profiling and testing. We examined two ways of using Great Expectations: the traditional config-file-based method as well as a more “Pythonic” do-it-yourself approach using a custom test runner and email alerts. We then investigated how to execute data tests locally and how to ensure automated test runs after any new file gets uploaded to S3.

Thank you for reading!