Back to all tech blogs

Creating source-aligned data products in Adevinta Spain

  • Data/ML
18 min read
From best-effort governance to governed data integration by design

Intro

Our product vision for the Data Platform team in Adevinta in Spain is centred on empowering every Adevintan to consume and share high-quality purposeful data to make decisions faster. It aims to maximise the value generated from data throughout the entire value chain, promoting autonomous value extraction and expediting the generation of data products through robust quality processes and governance automation. In order to achieve this we set a goal defined as: Improve process, responsibilities and tooling to ensure data quality.

This goal is built on four main principles:

  • You produce it, you own it
  • Data driven teams without Data Analysts
  • Data governance by design
  • Data people working as Software Engineers

Focusing on data governance by design, we define it as being the team owner of a business domain who is the owner of the data produced in it. They are also accountable for publishing that data in the data platform and the metadata in the catalogue.

To support this definition, the data platform team and our users, developed a solution based on data contracts. This approach maintains producer ownership while ensuring governance and seamless access for consumers through automation.

Where our journey began

Before we implemented data contracts, data integration was being carried out by the Data Platform team, along with Data Engineers from each marketplace. These teams worked together to integrate each new source in the platform with an almost individual process to fulfil the needs of the analytics teams.

The diagram below shows how this process used to work. As you can see, for most of the sources from the different marketplaces, a microservice pushes the data as json to a Kafka topic. The data platform loads from those topics, checking schema references via a URL or an API. Even though there is a schema, the process lacks key features such as versioning, runtime contract checks, SLOs and governance metadata. So we found ourselves dealing with breaking schema changes on a daily basis.

Segment behaviour ingestion schema. There are several sources (mobile and webs) that send data to segment. Then a lambda/microservice sends the data to a Kafka Topic, to be loaded to the data lake by a spark process.
Segment behaviour ingestion schema
Segment behaviour ingestion schema. There are several sources (mobile and webs) that send data to segment. Then a lambda/microservice sends the data to a Kafka Topic, to be loaded to the data lake by a spark process.

Goals for improvement

We defined several goals to implement data contracts for ingestion.

Data product oriented: We aim to provide more than just a dataset in S3 and the metastore. We want the whole data product experience. This means delivering consumable output designed to solve a business problem. In order to be effective it should be trustable, discoverable, well-known and versioned. The data contract will include all necessary information to meet these needs. Each output table will be versioned to prevent schema changes from affecting usability.

Ownership: Data should be pushed by the sources, organised and scheduled by the data producers, so they can hold accountability and ownership of them.

Agreement: The contract will serve as an agreement between the producer, who defines how the data is stored and how it will be processed, and the consumer, who needs knowledge, enablement and access to the data. The consumer should be able to gather all this information from the contract.

Automation: We aim to minimise human interaction as much as possible, letting the producer and consumer focus on the contract, which holds all the necessary information. The contract will allow a process to consume data from the source and make it available for the consumer.

Standardisation: Data is loaded each time in the same standardised way. A single app is in charge of loading all kinds of data, with some minor adjustments. This will make it easier to evolve the underlying technology without causing disruption to the business.

Safe and secure: There will be a focus on privacy management. Removing the personal fields with no analytical value, while ingesting the ones with value into separated tables. This method ensures that reading rights are given only to the authorised consumers while making it easier to handle GDPR requirements, such as the right to be forgotten

Contract Implementation

Definition

A data contract is an agreement between a data producer, data consumers and a third party responsible for programmatic standardisation. It captures expectations around schema life cycle, semantic metadata, security and compliance requirements, quality specifications, SLOs/SLAs and other forms of data governance. A data contract for ingestion in Adevinta Spain is an immutable json file stored in a repository in GitHub containing the following fields:

Contract name Name of the data ingested, it will match the output table, so the consumer will be able to find it easily once loaded.
Description A description of what kind of data is being stored using this contract.
Schema Defines the structure of the data, including name and types. 

Once defined in another GitHub repository, schema is accessible through URL. Schema versions are X.Y where X represents the major and Y the minor version.

Contract Version A number that defines the version of the contract. For any change, including schema changes, whether minor or major, a new contract version should be created. 
Start Date Start date for the validity of the contract.
End Date Validity end date of the contract. Only applicable if there is a higher version for this contract.
PII list  List of fields that contain Personal Identifiable Information (PII). This is information that may allow someone to identify the user, such as their name, ID, IP address etc.

Each field indicates if it is required for analytical purposes or not. This prevents the process from loading unnecessary data.

SDRN / user_id The SDRN is an attribute that identifies a single client.  Personal fields require an SDRN to link them with a specific client. If the personal information does not belong to a client, but to an employee a user_id value is used instead
relationKey A field to relate each row of personal information to each row of non-personal data (usually messageId).
Source Logical grouping of different data sources.
SLOs Service Level Objective definitions: Owner identification support contact point, periodicity of the data, time to recover retention period and list of sites that the data belongs to.

Sample data contract


					
{
      "contract_name": "name_of_the_event",
      "contract_version": "2",
      "description": "my event",
      "start_date": "2022-12-07T00:00:00+00:00",
      "schema": {
          "source": "url",
          "location": {
             "url":"https://publicRepo/schema.json/1.1.json"
          },
          "format": "jsonSchema",
          "version": "1.1"
        },
      "landing_source":{
          "kafka_topic": "my_topic"
        },
      "sdrn": "client.@id",
      "relationKey": "eventId",
      "pii_fields": [
         {
            "field": "payload.order.number",
            "analytical": true,
            "mapping": "order.number"
         },
         {
            "field": "payload.order.business",
            "analytical": false
         }
      ],
      "source": "data_origin",
      "slas": {
        "owner": "owner team",
        "contact_support": "owner slack channel",
        "data_periodicity": "hourly",
        "time_to_recover": "10h",
        "retention": "30d"
      }
}
JSON

Scheduling automation

Once the contract is defined, we need to schedule its consumption. In the first MVP, the Data Platform team, as a facilitator, was responsible for building an airflow DAG for each contract. This worked well for the MVP but it took around five days from the contract definition to get the data available for consumption.

We automated the process, by creating the DAG through the GitHub workflow described above. This workflow is called once the contract is deployed.

This workflow will perform several steps (checkout repo, install Python) and finally run a sh script that will group modified contracts by source, a logical aggregation of them defined by the user.

Step of the GitHub workflow that is in charge of calling the DAG creation script. It will only run if there is any change to the contract
Step of the GitHub workflow that is in charge of calling the DAG creation script. It will only run if there is any change to the contract
Step of the GitHub workflow that is in charge of calling the DAG creation script. It will only run if there is any change to the contract

Once aggregated a Python process is called, and based on a template will create the DAG itself. Some fields from the contract, such as periodicity and execution_time will be used to fill the DAG scheduling information. Each contract from the same source will be a task in the resulting DAG.

Project organisation dag_creation folder contains the python script that automatically creates the DAG to schedule the consumption process. It contains templates, tests, the main file and a couple of services
Project organisation dag_creation folder contains the python script that automatically creates the DAG to schedule the consumption process. It contains templates, tests, the main file and a couple of services
Project organisation dag_creation folder contains the python script that automatically creates the DAG to schedule the consumption process. It contains templates, tests, the main file and a couple of services

This automation speeds up the Time to Effective Data (TtED), representing the time that elapses from the data contract being promoted to production to the time the data is available in the data lake, from around 5 days to around 30 minutes.

Contract creation

Contract creation is easy but repetitive, so we were able to help the users by implementing some automations to propose a contract each time a schema is defined.

Schema definition

The schema is defined as a Json file in a GitHub repository. This process and repository has existed for a long time in Adevinta, we are simply using it more to apply data contracts to ensure governance and compliance.

Once the user, creates the definition they trigger a github workflow that will create a contract proposal:

GitHub Workflow will run for every push in the master branch that modifies the relevant files
GitHub Workflow will run for every push in the master branch that modifies the relevant files
GitHub Workflow will run for every push in the master branch that modifies the relevant files

Contract proposal

The contract proposal workflow will first detect which schemas were added. A schema is never modified. If the schema must evolve, a new schema will be created with a minor or major version increase depending on whether the change is forward-compatible or not. This ensures that an event with a given version will always remain with the same schema, guaranteeing compatibility with the destination table as explained in the following chapter “Ingestion process”.

The workflow will then launch a Python script that first checks if the event’s schema represents a new event (version 1) or an updated version of an existing event. The process will split depending on that check.


					
def _run_process(execution_date, event_path, delete_event: bool = False):

    if _is_version_1(event_path) and not delete_event:
        _create_version_1(execution_date, event_path)
    else:
        _process_higher_version(execution_date, event_path, delete_event)
JSON

If it is the first version, the process will use a template, creating a simple contract. It will perform several queries to Adevinta repositories in order to fill information in the contract proposal. Many fields that cannot be known at the time will be left for the user to fill. For instance which of the fields contain personal data and SLOs. A templated contract proposal is shown below:

{
      "contract_name": "MyNewEvent",
      "contract_version": "1",
      "description": "Event published from microservice",
      "start_date": "2024-10-01T00:00:00+00:00",
      "schema": {
          "source": "url",
          "version": "1",
          "location": {
            "url": "https://schema.xxxxxx/events/xxxxx/MyNewEvent-Event.json/1.json"
          },
          "format": "jsonSchema"
        },
      "landing_source":{
          "kafka_topic": "pub.mytopic"
        },
      "pii_fields": [<FILL or leave empty if there are no private fields> ],
      "source": "ms-mysource",
      "slas": {
        "owner": "team-myteam",
        "contact_support": "<FILL>",
        "data_periodicity": "<daily/hourly>",
        "Execution_hour": <FILL or remove if hourly>,
        "time_to_recover": "<FILL>",
        "retention": "<FILL>",
        "provider_ids": ["mymarketplace"]
      }
}

JSON

If the version is higher than one, the process will look for a contract matching the schema name to copy. It will select the latest one, and copy it with the new version, while, at the same time, expiring the previous contract.

In both cases, a pull request will be raised for the user to review. In the first case, the user will need to complete several fields. In the second, the user will only need to modify the proposal if a new personal data field was included or if some other information has changed (execution hour for instance).

Automatic Pull Request to create a contract from the event schema
Automatic Pull Request to create a contract from the event schema
Automatic Pull Request to create a contract from the event schema

Despite this automation, the data producer can still create a contract manually if needed. But the idea is for them to focus on the business value and data definition, creating only the schema and filling some fields inside the contract. Everything (contract proposal, deployment and scheduling) should then run automatically until the data is available and ready to be exploited in the data lake.

Ingestion process

Overview

Once the contract is created and the DAG is deployed and activated, it will trigger as defined in the contract (hourly or daily at 8am for instance). Then a Spark process written in Scala will run for each of the contracts with the following parameters:

config  Config file for the spark json
start_ts Start timestamp 
end_ts End timestamp
connector Source type, only kafka allowed so far
contract Contract to be processed in format <source>:<contractName>

 

As the first step, the Spark process will look for all contracts matching source and name in the S3 bucket where they are stored. It will load only the contracts valid as per the start_ts and end_ts received as parameter. This is done because an event may have several versions alive at the same time.

It then connects to the Kafka topic defined in the contract and reads data for the given time frame (start_ts -> end_ts), filtering the read data by two clauses:

  • A field name type, in the event must match the contract name
  • A field schema version in the event must match the schema version defined in the contract

Then the data is loaded, and it’s time to clean it up. The consumer will check for personal fields in the contract, separating them to another dataFrame if they have analytical purposes or removing them directly otherwise. See the GDPR management section of this article for more information.

There is no other transformation apart from the add/delete of the column explained above. The process also lacks any kind of schema evolution thus relying on all the evolution logic described in the contract lifecycle section. Once the data is split between personal and non-personal it will be stored in the proper table partitioned by processingTime, thus ensuring idempotency and allowing consistent retries.

GDPR management considerations

Many times, when ingesting data to a data platform, we fall under the garbage-in-garbage-out paradigm, where we store as much data as possible regardless of its analytical value. This approach may avoid schema evolutions and backfills in the future. However, it also ingests more data than needed, increasing storage demands and, more importantly, management and maintenance costs. 

Another issue when ingesting data to a data platform is that personal fields span across several database tables, making it difficult to confine access to authorised users. This complexity increases the time and cost of maintaining compliance, such as the right to be forgotten.

With that in mind, we defined the main goals of the GDPR management in this ingestion process as:

  • Minimise ingestion of non-analytical personal information
  • Store all personal information in a single partitioned table

The first goal is achieved by defining in the contract if a personal field has analytical value and removing any that don’t. 

The second goal is based on a personal table per website whose schema evolves only with the approval of the security office. So if a producer has a personal field that is not yet present in said table, an assessment must take place to ensure that the field has analytical value.

Here are the details of the personal data related fields defined in the contract:

PII list  List of fields that contain personal information. Each field contains:

  • Personal field name in source event
  • A boolean indicating whether it has analytical value or not
  • A mapping to the personal table if the name does not match exactly with the one in the source event.
    • For instance the field phone, may exist as telephone in the personal table
SDRN The SDRN is an attribute that identifies a single client in Adevinta in Spain. Each client that has personal data requires an SDRN.
user_ud If the personal information does not belong to a client, but to an employee for instance, a user_id is used instead and SDRN is omitted
relationKey Field to relate each row from the personal information to their non-personal data equivalent. Usually the id of each event.

 

This way, if the input data comes with personal data, two output tables will be updated (regular data and personal data). This raises a new problem: we need to make the whole write atomic. This means that if the write on the personal data fails we need to rollback both that write and the one for the regular data. In order to do that, we take advantage of the “time travel” feature from Delta Tables. 

As the first step, we read the current version in the Delta history for the regular data table, before writing to that table. If there is a failure there, it will rollback thanks to Delta atomicity and the process will finish with error. If, on the other hand, it finishes successfully, we’ll go forward to write to the personal data table, capturing any error if it fails so we can time travel to the previous version in the regular data table before finishing the process.

Error management

When there is an error, both the producer and the consumer want to be aware of it, as they may need to fix it or manage the delay of the dependent processes respectively. The Data Platform team, as facilitators, need to be aware of errors in case some kind of bug in the code was promoted to production or if there is any issue with the underlying infrastructure.

In order to help the producer to analyse, find and fix the error, the process will write the input data in a special table called Quarantine.  

The Quarantine table contains the following fields:

 

rawEvent Input data as String
error Error information 
source Source of the data
event Event name
Contract version Contract version
processingTime Execution time

 

This table allows the producers to check if the error is due to something related to the source code, to the contract definition or some other issue. Knowing the root cause they can then perform the necessary actions to fix it, whether they are correcting the source, fixing the contract, modifying the schema or something else as required.

Once the error cause is known and fixed, the process can be rerun. Quarantine data is not re-ingested in any case, it’s only there for debugging purposes.

Summary of the History Zone, with Non-Personal data in the tables matching the contract name, personal fields in a master table and data from errors stored as Json in the Quarantine zone
Summary of the History Zone, with Non-Personal data in the tables matching the contract name, personal fields in a master table and data from errors stored as Json in the Quarantine zone
Summary of the History Zone, with Non-Personal data in the tables matching the contract name, personal fields in a master table and data from errors stored as Json in the Quarantine zone

Ingestion diagram

Summary of ingestion process described above.  Data starts in the Kafka topic and is loaded to the data lake by a Spark process based on the contract
Summary of ingestion process described above.  Data starts in the Kafka topic and is loaded to the data lake by a Spark process based on the contract
Summary of ingestion process described above.  Data starts in the Kafka topic and is loaded to the data lake by a Spark process based on the contract

Monitoring and Alerting

In order to keep track of the errors, data volumetry and everything that may affect data availability there are several processes in place.

First if an Airflow DAG fails, an alert will be sent to a Slack channel, so both the producers, consumers and Data Platform team as facilitators can be aware of it, check it and deal with it in the proper way.

Second, there is a Grafana dashboard to monitor the process. It’s a public dashboard, so anyone in the company can check it, both interested producers and consumers, as well as Data Platform team members as facilitators and builders of the process.

There are three sections in the dashboard:

  • Process monitor: This section contains visualisations showing the amount of data for each event in several views (per event, per owner, per source) so we can keep track of sudden changes in volumetry that may indicate problems that need to be dealt with
  • Table management: This one
  •  helps to keep track of the tables creation and schema evolution, it’s useful to know when a table evolves

Errors: Here, the errors raised during the process will be shown. As said before, each error will result in the data being loaded in the Quarantine zone and an alert being sent to a Slack channel.

Sample Grafana dashboard that provides observability for the amount of data ingested over time, as well as size per event type and owner
Sample Grafana dashboard that provides observability for the amount of data ingested over time, as well as size per event type and owner
Sample Grafana dashboard that provides observability for the amount of data ingested over time, as well as size per event type and owner

Conclusion

With this solution, we achieved most of the goals we defined at the beginning of the project:

  • Data is a consumable output, source aligned and properly versioned
  • All the metadata that allows the consumer to know how to consume the data (frequency, schema, personal fields) is present in an easy-to-find public document (the contract)
  • Personal fields are separated from the data, making it easier to ensure access control and GDPR compliance
  • A single code to ingest several sources of data in a standardised way. There is no need to code again for each new source. In fact we can extend this code with some minor changes to other types of events with differences in definition time or in the schema format.
  • Table creation/update is managed outside runtime, so there is no risk to break the process doing schema evolution at execution time
  • This table management technique allows us to swiftly detect errors when defining the contract and the schema
  • The Producer is the owner of the whole process, without intervention from the facilitator team. Right now, there is still some kind of interaction from us when validating contracts or solving problems. This will disappear as the users grow more used to the process.

Related techblogs

Discover all techblogs

Why would I need an AI search assistant?

Read more about Why would I need an AI search assistant?
Why would I need an AI search assistant?

From Filters to Phrases: Our AI Revolution in Car Search

Read more about From Filters to Phrases: Our AI Revolution in Car Search
From Filters to Phrases: Our AI Revolution in Car Search

From ten to one: Streamlining experimentation tooling at Adevinta

Read more about From ten to one: Streamlining experimentation tooling at Adevinta