Quick note about the author:
I work as a Staff Data Engineer at Adevinta, currently focusing on the Messaging domain. Here, I develop data products that process saved-search notification data generated by marketplaces like Fotocasa, Coches.net, Milanuncios, and more. Additionally, I am also a highly engaged and proud member of the Data Enablers Team, and I’m writing this post to highlight the significant value our team brings to the organisation.
Regarding my education, I hold a degree in Computer Science and recently completed a Postgraduate Course in Deep Learning. My career journey began in Database Consulting before transitioning to Data Engineering in 2018.
Intro
In today’s data-driven world, unlocking the true value of data requires more than just infrastructure. Recognising this need, Adevinta established a dedicated Data Enablement Team in 2020, anticipating the principles of Data Mesh before it became a mainstream concept. By fostering a collaborative environment between platform and domain engineers, we bridge the gap between data platforms and data products. This ensures that the solutions we create are not only technically sound but also directly aligned with business needs, avoiding the pitfalls of overly complex or irrelevant solutions.
See our blog post Evolving from a Data Platform to a Data Products Platform for a deeper dive into this concept. This blog post discusses how Adevinta transformed its data platform into a data product platform, emphasising the importance of collaboration between different teams.
What Does a Data Enablers Team Look Like?
The Data Enablers Team comprises ten Senior Data Engineers, two of whom serve as representatives of the Data Platform team. This balanced composition ensures effective communication and collaboration between the platform and domain teams (marketplaces or cross solutions).
Time Assignment:
The team dedicates one weekday (approximately 20% of their workweek) to collaborative Data Enablers projects. We use a Kanban board to streamline our workflow and ensure efficient progress.
Who is on the Data Enablers Team?:
- One Lead Data Engineer from Platform (also the Leader of the Data Enablers team)
- One Senior Data Engineer from Platform
- One Senior Data Engineer from Motor (Coches.net & Motos.net)
- One Senior Data Engineer from Real Estate (Fotocasa & Habitaclia)
- One Senior Data Engineer from Jobs (Infojobs)
- One Senior Data Engineer from Common Goods (Milanuncios)
- One Senior Data Engineer from CI (Customer Intelligence)
- One Senior Data Engineer from Cross Components (Messaging – Saved Searches)
Each of Adevinta’s core marketplaces / components is represented.
Uniting Data: Platform and Product Synergy
Adevinta’s diverse marketplaces, each cater to specific user needs, yet all rely on the same robust data platform for essential infrastructure and security. The Data Enablement Team acts as a crucial bridge, aligning the data platform with the unique requirements of each marketplace. This ensures domain teams can effectively leverage the platform’s capabilities to build data products tailored to their specific business needs.
Could Adevinta Build Data Products Without This Team?
While technically possible, building data products without dedicated enablement presents significant challenges:
- Knowledge Gap: Domain teams may lack in-depth knowledge of complex data infrastructure and best practices. The Enablement Team bridges this gap with training, guidance and support.
- Reinventing the Wheel: Without centrally-administered, formalised best practices, domain teams might waste time and resources developing solutions that already exist elsewhere. The Enablement Team promotes knowledge sharing and collaboration to avoid duplication of effort.
- Slow Adoption: Even with a powerful platform, driving adoption requires active promotion and education. The Enablement Team champions the platform’s benefits, encouraging domain teams to leverage its full capabilities.
Enric Martinez, Data Enabler Lead, 12 years in Data, 7 years at Adevinta, 4 years in Data Enablement Team, shares his perspective:
“From my experience, a Data Enablement team is crucial. By acting as a bridge between the platform and business needs, we translate real-world use cases into actionable insights. We ensure everyone follows consistent methodologies, promoting knowledge sharing and collaboration. This not only avoids redundancy but also fosters a thriving community where technical expertise is readily available to all.”
The establishment of the Data Enablement Team well before Data Mesh became widely adopted demonstrates Adevinta’s dedication to innovation and a data-driven future. This proactive approach empowers domain teams to thrive in a decentralised environment, ultimately unlocking the full potential of data to drive business growth and success.
A Success Case: Homogenising Spark Jobs Execution with the DatabricksPysparkOperator
Efficient and reliable execution of Spark jobs is essential for delivering timely insights and driving business value. At Adevinta, our Data Enablers Team recognised the need to streamline the process of launching Spark jobs on Databricks clusters. By developing a custom Databricks operator, we aimed to standardise workflows, reduce errors and empower domain teams to focus on their core business objectives.
I interviewed my teammate, Albert Romaguera Mallol, a Senior Data Engineer with 10 years of experience in data, 7 years at Adevinta, and 3 years in the Data Enablers team. He was the driving force behind the project that culminated in the creation of the DatabricksPysparkOperator. Here’s what he said:
Julian: Albert, as a key driver behind the DatabricksPysparkOperator, could you elaborate on the importance of a standardised, industrialised process for launching Spark jobs?
Albert: Absolutely! A homogeneous process for launching Spark jobs on Databricks clusters provides several critical benefits:
- Consistency and Standardisation: A standardised process ensures that all Spark jobs adhere to consistent guidelines, reducing the risk of errors and inconsistencies. This also makes it easier to manage and maintain jobs over time.
- Improved Efficiency: By automating repetitive tasks and simplifying the job submission process, we can significantly reduce the time and effort required to launch Spark jobs. This allows data engineers to focus on more strategic tasks and accelerate time-to-value.
- Enhanced Governance: A standardised process enables better governance and control over Spark jobs, ensuring compliance with data quality, security and privacy regulations.
- Facilitated Collaboration: A common framework for launching Spark jobs promotes collaboration and knowledge sharing between domain teams, fostering a more cohesive data engineering community.
Julian: Could you please outline the key features of the DatabricksPysparkOperator?
Albert: The DatabricksPysparkOperator provides several key features that directly address these objectives:
- Simplified Authentication: By abstracting authentication, policies and credential retrieval, the operator reduces the cognitive load on users and minimises the risk of errors.
- Output Log Capture: The ability to capture and analyse job output logs is crucial for troubleshooting and performance optimisation. This operator provides a straightforward mechanism for doing this.
- Dynamic Parameterisation: The support for dynamic parameterisation allows for greater flexibility and customisation of Spark jobs, enabling them to adapt to different use cases.
Julian: What were some of the challenges you faced during the development and implementation of the DatabricksPysparkOperator?
Albert: One of the main challenges was finding the right balance between providing specific functionality and avoiding excessive complexity. We wanted to ensure that the operator was easy to use and maintain while still addressing the needs of our data engineers.
Julian: Overall, how has the DatabricksPysparkOperator impacted the way Domain Data Engineers work with Spark jobs?
Albert: The DatabricksPysparkOperator has significantly improved the efficiency and productivity of our Data Engineers. By automating routine tasks and providing a standardised framework, we have enabled domain teams to focus on developing innovative data products and delivering valuable insights.
Behind the Scenes: Leveraging Airflow Custom Operators
The DatabricksPysparkOperator we’ve developed builds upon the foundation of Airflow’s DatabricksSubmitRunOperator. This established operator offers functionalities like submitting Spark JAR or notebook tasks to Databricks clusters. However, for our specific needs, we required a more tailored solution.
Building Upon an Existing Operator
The DatabricksPysparkOperator serves as an extension of the DatabricksSubmitRunOperator, specifically designed to launch PySpark applications on Databricks clusters. Here’s a breakdown of its core functionalities:
- Execution Parameters: It accepts a list of execution parameters that are passed on to the PySpark application.
- Spark Configuration: It allows users to define custom Spark configurations for the job, providing fine-grained control over execution behavior.
- Cluster Management: The operator can automatically provision and configure Databricks clusters based on user-specified parameters like node type, policies and autoscaling options.
- Airflow Integration: The operator seamlessly integrates with Airflow, enabling users to schedule and monitor Spark job executions within established workflows.
Code Reference: DatabricksPysparkOperator Implementation
For those interested in the technical details, the following code snippet demonstrates the implementation of the DatabricksPysparkOperator class.
(Note: Readers unfamiliar with technical details may want to skip this section)
from airflow.exceptions import AirflowException
from airflow.models import Variable, Connection
from datetime import datetime
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.providers.databricks.hooks.databricks import DatabricksHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from typing import Dict, List, Optional
import requests
DEFAULT_DATABRICKS_NODE_TYPE = 'm5.xlarge'
DEFAULT_POLLING_PERIOD_SECONDS = 15
DEFAULT_SPARK_MAX_EXECUTORS = 2
DEFAULT_SPARK_CONF = {"spark.pyspark.python": "python3"}
DEFAULT_DATABRICKS_CLUSTER = {
'aws_attributes': {
'first_on_demand': 1
},
'autoscale': {
'min_workers': 1,
'max_workers': 3
}
}
class DatabricksPysparkOperator(DatabricksSubmitRunOperator):
"""
Runs Pyspark applications from repo:
https://github.mpi-internal.com/scmspain/backend-di--etl-pyspark-databricks
:param team: Name of the team in "es_di_team" nomenclature.
Example:
team="es_di_advertising"
:type team: str
:param object_path: Path of the module or class to run on Databricks
Example:
object_path="advertising.domain_fact_tables.DomainFactTables"
:type object_path: str
:param execution_params: List of parameters used in the execution of the module or class
Example:
execution_params=[f"date_start_iso={date}", f"date_end_iso={date}"]
:type execution_params: List[str]
:param name: Optional application name that will take the application on Databricks.
If not set, the name will be taken from object_path parameter.
:type name: Optional[str]
:param databricks_node_type: Node type used by Spark workers and driver.
Default value m5.xlarge
:type databricks_node_type: Optional[str]
:param databricks_cluster_conf: Dict with cluster configuration, with autoscale workers for example.
Default value DEFAULT_DATABRICKS_CLUSTER variable in this operator
:type databricks_cluster_conf: Optional[Dict]
:param polling_interval: Time in seconds between polling for job completion.
Default value 15
:type Optional[int]
:param max_executors: Number of maximum executors that will run the application.
Default value 2
:type Optional[int]
:param spark_conf: Optional dictionary with spark configuration that will apply to Databricks execution
Example:
spark_conf={"spark.driver.memory":"3g",
"spark.executor.memory": "3g"}
:type spark_conf: Optional[str]
:param pre_action_conn_id: Redshift connection where to launch the pre_action.
:type Optional[str]
:param pre_action: SQL string to launch on Redshift before the application is launched to Databricks.
:type Optional[str]
"""
template_ext = (".sql",)
template_fields = ("json", "pre_action")
template_fields_renderers = {'pre_action': 'sql'}
def __init__(
self,
team: str,
object_path: str,
execution_params: List[str] = None,
name: str = None,
databricks_node_type: str = DEFAULT_DATABRICKS_NODE_TYPE,
databricks_cluster_conf: Dict = None,
databricks_connection_id_con_name: Optional[str] = 'databricks_srv_airflow',
databricks_policy_id_var_name: Optional[str] = 'databricks_srv_airflow_policy_id',
datarbicks_group_var_name: Optional[str] = 'databricks_group_name',
render_params: Optional[bool] = False,
view_access_control_list=[],
polling_interval: int = DEFAULT_POLLING_PERIOD_SECONDS,
max_executors: int = None,
spark_conf: Dict = None,
pre_action_conn_id: Optional[str] = None,
pre_action: Optional[str] = None,
**kwargs
):
# Get S3 path variables
bucket = Variable.get('es_di_enablers/sch_repos_bucket')
code_folder = Variable.get(
'es_di_enablers/etl_pyspark_databricks_code_folder')
applications = Variable.get(
'es_di_enablers/etl_pyspark_applications_zip')
s3_execution_path = f"s3://{bucket}/{code_folder}"
# Set application name
name = name or object_path.replace(".", "_")
name = f"{name}_{datetime.now().isoformat()}"
# Get Databricks credentials
databricks_connection_id = f'{team}/{databricks_connection_id_con_name}'
databricks_policy_id = Variable.get(
f"{team}/{databricks_policy_id_var_name}")
access_control_list = [
{
'group_name': Variable.get(f"{team}/{datarbicks_group_var_name}"),
'permission_level': 'CAN_MANAGE'
}
]
self.view_access_control_list = view_access_control_list
extra_access_control_list = [{
'group_name': group,
'permission_level': 'CAN_VIEW'
} for group in self.view_access_control_list]
# Get execution parameters, append the object path as the last parameter
parameters = execution_params.copy() if execution_params else []
parameters.append(f"path={object_path}")
# Get custom spark configuration and append it to default spark
# parameters
spark_conf = spark_conf or {}
self.max_executors = max_executors
if self.max_executors:
spark_conf['spark.dynamicAllocation.maxExecutors'] = max_executors
databricks_cluster_conf = self.get_databricks_cluster_conf(
databricks_cluster_conf
)
self.render_params = render_params
self.databricks_connection_id = databricks_connection_id
self.pre_action = pre_action
self.pre_action_conn_id = pre_action_conn_id
# Call Airflow Databricks Operator
super(DatabricksPysparkOperator, self).__init__(
databricks_conn_id=databricks_connection_id,
run_name=name,
libraries=[
{"whl": f"{s3_execution_path}/{applications}.wheelhouse.zip"}
],
new_cluster=self.generate_new_cluster(
databricks_node_type=databricks_node_type,
databricks_policy_id=databricks_policy_id,
databricks_cluster_conf=databricks_cluster_conf,
spark_conf={**spark_conf, **DEFAULT_SPARK_CONF}
),
spark_python_task=self.generate_spark_python_task(
executable=f"{s3_execution_path}/launch.py",
parameters=parameters
),
polling_period_seconds=polling_interval,
access_control_list=access_control_list+extra_access_control_list,
**kwargs
)
# Generate spark_python_task dict from an executable path and
# a list of parameters
@staticmethod
def generate_spark_python_task(executable: str, parameters: list):
return {
'python_file': executable,
'parameters': parameters
}
# Generate new_cluster dict from a node_type, policy_id and
# spark configuration dict
@staticmethod
def generate_new_cluster(databricks_node_type: str,
databricks_policy_id: str,
databricks_cluster_conf: Dict,
spark_conf: Dict):
return {
'node_type_id': databricks_node_type,
'spark_conf': spark_conf,
'policy_id': databricks_policy_id,
**databricks_cluster_conf
}
def get_databricks_creds(self):
# Get credentials to call Databricks
databricks_connection_str = Connection.get_connection_from_secrets(
self.databricks_connection_id).get_uri()
databricks_srv_airflow = (databricks_connection_str
.replace("databricks://", "https://")
.split("?token="))
databricks_host = databricks_srv_airflow[0]
databricks_token = databricks_srv_airflow[1]
if databricks_host[-1] == "/":
databricks_host = databricks_host[0:-1]
# Mount header to Databricks call
self.header = dict(Authorization=f"Bearer {databricks_token}")
self.databricks_host = databricks_host
def databricks_get_logs(self):
self.log.info(f"\nRequesting execution log to Databricks {self.databricks_host}...")
r = requests.get(f"{self.databricks_host}/api/2.1/jobs/runs/get-output",
headers=self.header, params={"run_id": self.run_id})
if r.status_code == 200:
if "logs" in r.json():
self.log.info(f"{r.json()['logs']}")
else:
self.log.info(f"Can't get the logs for run_id {self.run_id} on Databricks")
else:
self.log.info(r.content)
def execute(self, context):
if self.pre_action is not None:
pg_hook = PostgresHook(postgres_conn_id=self.pre_action_conn_id)
pg_hook.run(self.pre_action)
self.get_databricks_creds()
try:
result = super().execute(context)
self.databricks_get_logs()
return result
except AirflowException:
if self.run_id:
self.databricks_get_logs()
db_hook = DatabricksHook(databricks_conn_id=self.databricks_connection_id)
db_hook.cancel_run(self.run_id)
self.log.info(
"Task: %s with run_id: %s was requested to be cancelled.", self.task_id, self.run_id
)
raise AirflowException
def get_databricks_cluster_conf(self, databricks_cluster_conf):
env = Variable.get('es_di_enablers/environment')
# In case someone submits a proper Databricks conf
if databricks_cluster_conf:
if 'spark_env_vars' not in databricks_cluster_conf:
databricks_cluster_conf["spark_env_vars"] = {}
databricks_cluster_conf["spark_env_vars"]["ENVIRONMENT"] = env
if 'custom_tags' not in databricks_cluster_conf:
databricks_cluster_conf["custom_tags"] = {}
databricks_cluster_conf["custom_tags"]["airflow-operator"] = "DatabricksPysparkOperator"
return databricks_cluster_conf
# The default conf
default_conf = {
'aws_attributes': {'first_on_demand': 1},
'autoscale': {
'min_workers': 1,
'max_workers': 3
},
'spark_env_vars': {
'ENVIRONMENT': env
},
'custom_tags': {
'airflow-operator': 'DatabricksPysparkOperator'
},
'apply_policy_default_values': 'true'
}
# Changing default conf with max_executors if used in
if self.max_executors:
default_conf["autoscale"]["max_workers"] = self.max_executors
return default_conf
def render_template_fields(self, context) -> None:
# IF Boolean render_params is True, we run the render template on params and add it to the context
if self.render_params:
""" Add the rendered 'params' to the context dictionary before running the templating """
# Like the original method, get the env if not provided
jinja_env = self.get_template_env()
if self.params:
context['params'] = self.render_template(self.params, context, jinja_env, set())
# Call the original method
super().render_template_fields(context=context, jinja_env=jinja_env)
else:
super().render_template_fields(context)
PythonYou can gain a deeper understanding of the underlying concepts leveraged in the DatabricksPysparkOperator using the Airflow Databricks documentation,.
The DatabricksPysparkOperator serves as a connector between Airflow and Databricks, streamlining the execution of PySpark applications within a reliable workflow management system. This custom operator empowers data engineers with an efficient solution for launching and managing Spark jobs on Databricks clusters.
Here’s an example call of the DatabricksPysparkOperator:
fotocasa_android_fact_top_pains_comments_csat_push_alerts = DatabricksPysparkOperator(
task_id="fotocasa_android_fact_top_pains_comments_csat_push_alerts",
name=f"dag_fotocasa_android_fact_top_pains_comments_csat_push_alerts_{start_date_iso}",
team="es_di_cross",
object_path="common.csat_push_alerts.llm.fact_top_pains_comments_csat_push_alerts.DomainTopPainsCommentsCsatPushAlerts",
execution_params=[
f"start_date_iso={start_date_iso}",
f"end_date_iso={end_date_iso}",
f"marketplace=fotocasa",
f"os_platform=Android"
],
pre_action=f"DELETE FROM <table_schema>.<table_name> WHERE date_activity = '{start_date_iso}'and site = '<marketplace>' and sub_channel = 'Android'",
pre_action_conn_id=dataland_conn_id,
dag=dag
)
PythonImpact: Quantifiable Benefits Across the Organisation
The impact of the DatabricksPysparkOperator has been significant:
- Increased Efficiency: Streamlined workflows and automated tasks lead to faster development cycles and reduced time-to-value.
- Enhanced Reliability: Standardised processes and simplified authentication minimise errors and ensure job reliability.
- Empowered Teams: This custom operator empowers domain teams to focus on developing innovative data products instead of managing routine tasks.
Usage Statistics: A Testament to Widespread Adoption
To gauge the operator’s adoption, I searched for occurrences of “DatabricksPysparkOperator” across relevant Github repositories in our various marketplaces and components. The results are impressive:
- A total of 222 occurrences were found across various teams.
- 12 code results in Cross Components (Saved Searches)
- 43 code results in Common Goods (Milanuncios)
- 49 code results in Motor (Coches.net & Motos.net)
- 76 code results in Real Estate (Fotocasa & Habitaclia)
- 18 code results in Jobs (Infojobs)
This widespread usage highlights the value the operator brings to our data engineering efforts.
Recap
Our Technical Data Enablement team plays a pivotal role in bridging the gap between business needs and the data platform. The success of the DatabricksPysparkOperator is just one example of the valuable contributions made by our Data Enablers team.
Other notable achievements include the development of Python Loaders, a collection of reusable methods and functions that simplify data extraction from various sources. These Python Loaders, which incorporate common functionalities like loading data, creating Spark sessions and dataframes, handling secrets and dates and managing file uploads/downloads, have become widely adopted by Data Engineers across the organisation, further demonstrating the team’s impact.
Conclusion
Establishing a Data Enablement Team is a strategic investment for any organisation seeking to maximise data value, accelerate innovation and improve decision-making. Would you consider establishing a Data Enablement Team within your data organisation to unlock the full potential of your data?