System Integration Test for Data Products

System Integration Test for Data Products

Or how to do Data Engineering like Metallica!

ยท

6 min read

Introduction

One of the challenges of having a distributed solution for data platforms is testing the system end to end. Of course, each piece of the system can be tested in isolation, but it's difficult to conclude the end to end integrity of the system without a testing solution, which covers the system as a whole.

In a recent project at DataChef we found ourselves occupied with an extensive system integration test (SIT for short), done in an online session, across 3 timezones with nearly 30 people available in the call.

A chaotic operation done online with many people involved.

Facing this cumbersome situation, we decided to help with automating the process. And this was quite easy to achieve using BDD.

Why BDD?

Behavior-Driven Development (BDD) is not new to the world of the software engineering, but in the data world, I don't see it much practiced.

Of course, you might be curious, what we are going to cover in this article, can be done using any other test framework. However, using BDD framework, and specially the common practice of defining the test as scenarios in Cucumber syntax, enables us to express and document the process in a human-readable format. This has two important benefits, which we were aiming for:

  1. Make it easy to understand, so it can be easily scaled to other teams and systems in the organization.

  2. Make it easy to develop, to help with fast adaption, and handover.

Example Implementation

scenario

Consider the following data flow:

Sample Architecture with 3 systems involved.

In this scenario, we:

  1. Have 3 systems:

    1. Source: Which starts the process by publishing into input topics.

    2. AWS Platform: Which is responsible for transforming the incoming data into proper data products.

    3. Salesforce: The target system.

And that's it. As far as the integration test is concerned, we are going to consider each of these systems a black box, and only focus on:

  1. Orchestration: Ensuring each corresponding system is running during the test.

  2. Verification: Proposed input data for test, is producing expected result within defined threshold (read more: Threshold Test).

In Action

Let's first see how we want to express our test:

Feature: Product 1
  Process the input messages from input_topic_1 and publish the resulting
  data product in product_v1.

  Scenario: Basic Message
    Given sample data is loaded from: tests/fixtures/input_topic.avro
    And with schema: test/fixtures/input_topic_v1_0.avsc
    And published to Kafka topic: input_topic_1

    When glue job started: product_1
    And finished processing
    And result consumed from Kafka topic on checkpoint: product_v1

    Then the result contains 2 records with tolerance of +/- 1

The sample scenario we are defining is quite straightforward. If you are not familiar with Cucumber syntax, I'd suggest taking a look at Cucumber.io. In general, we:

  1. Given: some sample data being populated into input_topic_1.

  2. When: and the output of the related Glue job has been consumed.

  3. Then: we expect to receive between 1 and 3 records.

To make this test executable, we are going to use pytest-bdd library. It's very feature reach, and easily integrates with the testing facility we use for other stages of the development.

The first thing we need is to provide helper function for the โ€Given section:


@given(
    parsers.cfparse(
        "sample data is loaded from: {filename}", extra_types={"filename": str}
    ),
    target_fixture="input_data",
)
def load_input_data(filename: str) -> dict:
    """Read input files and return the result as Python data
    structure. Allowed formats: Avro, Json.

    Args:
        filename (str): path to the data file
    """
    logger.info(f"Loading file: {filename}")
    match filename.split(".")[-1]:
        case "avro":
            with open(filename, "rb") as fp:
                reader = fastavro.reader(fp)
                return next(reader)
        case "json":
            with open(filename) as f:
                return json.load(f)
        case other_file_fmt:
            raise ValueError(f"Unsupported File Format: {other_file_fmt}")

@given(
    parsers.cfparse(
        "with schema: {filename}", extra_types={"filename": str}
    ),
    target_fixture="input_schema",
)
def load_input_schema(filename: str) -> str:
    """Read input schema file and return the result as string.

    Args:
        filename (str): path to the schema file
    """
    logger.info(f"Loading schema: {filename}")
    with open(filename) as fp:
        return fp.read()

@given(parsers.cfparse("published to kafka topic: {topic}", extra_types={"topic": str}))
def publish_to_kafka(topic: str, input_data: list | dict, input_schema: str) -> None:
    """Publish given data to given Kafka topic.

    Args:
        topic (str): Kafka topic name
        input_data (list | dict): data to be published
        input_schema (str): schema of the data to be published

    Raises:
        Exception: if the data cannot be published
    """
    logger.info(f"Publishing to topic: {topic}")
    value_schema = avro.loads(input_schema)
    producer = AvroProducer(avro_producer_config, default_value_schema=value_schema)
    if not isinstance(input_data, list):
        input_data = [input_data]

    for value in input_data:
        producer.produce(topic=topic, value=value)

    producer.flush()

There is nothing exciting in the main body of code. We are reading some files in each function and publish some variables to Kafka topic. The interesting part, however, is the decorators @given. There we:

  1. Define parsers, which indicates which lines they should act upon, what they should extract as context.

  2. Define context variables, which helps with populating the required context for tests to run.

The syntax for the rest of the test, remains the same, and I would rather not make this article bloated with obvious code. If you need additional information about pytest-bdd and its features (which are great), I'd suggest referring to their nice to read document.

But you didn't test Salesforce

That's correct, and nothing is holding us back to doing so. All it takes, is to develop some scenario utilities based on the Salesforce API and as easy as that we can expand our scenarios to Salesforce.

Threshold-Based Testing

The value of this technique, might not be so obvious in the example we reviewed above. However, I'd like to elaborate on this requirement and why it's important.

As we mentioned earlier, our main intention with this test, is to avoid testing the internals of the application, like verifying the actual output field by field. The main reason for that is:

  1. It can effortlessly become a cumbersome chore to maintain such a test in Cucumber.

  2. We are already testing that level of correctness, in earlier test stages:

    1. Unit Tests

    2. Functional Tests

Here, the intention is to verify system connectivity. So instead of focusing on details, we verify the system integrity. Given that, now we can turn our focus, on the volume of data as well. And this is where, threshold-based testing shines the best. Specially in the big data world.

Dedicated Environment

Running integration tests, even in automated fashion, can become expensive (it's not all about money, but developer's valuable time). To help with that, we found this is quite crucial for these tests to have their dedicated environment. This helps with ensuring there is no left-over data from other processes or failed applications.

Conclusion

Well now, we have all we need to master our puppets (a.k.a. data products) like a champ! There are, of course, many opportunities to improve this process and build on top of it. These scenarios can be developed across teams, to test and guarantee their contracts automatically, and in our experience, that proved to be way more effective and reliable than the initial online solution.

Please don't hesitate to let us know how do you tackle such situations and what's your experience with the proposed solution in this article.

Cover Art of Metallica's Master of Puppets Album

ย