Datalake is a versatile framework for data orchestration, offering seamless integration with databases like MySQL and Elasticsearch. It simplifies data ingestion, processing, and transformation through pipelines, with a secure configuration using environment variables. This guide walks through the setup, connection management, and usage with step-by-step instructions.
1. Inheriting from Datalake
The DatalakeConnection class inherits from the Datalake class, gaining access to all the parent class's methods. These methods include:
Creating pipelines.
Adding connections.
Executing tasks.
This inheritance allows developers to leverage pre-built functionality while extending capabilities specific to their workflows.
2. Configuring Connections
Connections represent data sources like MySQL, Elasticsearch, and Redis. Configuration for each connection is fetched dynamically from a Config file or environment variables.
SQL Databases (e.g., MySQL, PostgreSQL): Identified with the sql type.
Elasticsearch (ES): Identified with the es type.
Redis: Identified with the redis type.
Each connection type is clearly categorized to ensure proper handling and integration into the pipeline.
3. Creating the Pipeline
A pipeline is a central structure for managing data workflows. The create_pipeline method from the Datalake class initializes a pipeline with a unique name.
The pipeline coordinates multiple connections and their respective tasks, ensuring a structured and cohesive execution process.
4. Adding Connections
Connections are added to the pipeline using unique identifiers. Each connection requires:
A name (e.g., sql_connection, es_connection).
A configuration containing the necessary connection details (e.g., host, port, authentication credentials).
This modular approach allows for easy referencing and management of multiple data sources within the same pipeline.
5. Executing Connections
Once all the connections are added to the pipeline, the execute_all method is used to run them concurrently.
This method employs threading to ensure efficient execution across multiple connections.
Results from each connection are collected and can be processed further.
6. Storing Connections
All connections added to the pipeline are stored in a connections dictionary.
Each connection can be accessed by its unique identifier (e.g., sql_connection, es_connection).
This storage mechanism simplifies reuse and modularity within complex workflows.
Here's the example code for execution and connections:
Step 1. Create the .env File
In your project directory, create a .env file and add your configuration variables in KEY=VALUE format.
The Datalake class from groclake.datalake provides the basic functionalities for managing data pipelines. The Config module contains the configuration details for MySQL, Elasticsearch (ES), and Redis.
from groclake.datalake import Datalake
from config import Config
Step 4. Define DatalakeConnection Class
The DatalakeConnection class extends Datalake and adds specific data connections to a pipeline.
class DatalakeConnection(Datalake):
def __init__(self):
super().__init__() # Inherit from Datalake class
# Define the configuration for each connection
MYSQL_CONFIG = Config.MYSQL_CONFIG
MYSQL_CONFIG['connection_type'] = 'sql'
ES_CONFIG = Config.ES_CONFIG
ES_CONFIG['connection_type'] = 'es'
REDIS_CONFIG = Config.REDIS_CONFIG
REDIS_CONFIG['connection_type'] = 'redis'
S3_CONFIG = Config.S3_CONFIG
S3_CONFIG['connection_type'] = 's3'
MONGODB_CONFIG = Config.MONGODB_CONFIG
MONGODB_CONFIG['connection_type'] = 'mongo'
# Create and add connections to the pipeline
self.test_pipeline = self.create_pipeline(name="test_pipeline")
self.test_pipeline.add_connection(name="sql_connection", config=MYSQL_CONFIG)
self.test_pipeline.add_connection(name="es_connection", config=ES_CONFIG)
self.test_pipeline.add_connection(name="redis_connection", config=REDIS_CONFIG)
self.test_pipeline.add_connection(name="s3_connection", config=S3_CONFIG)
self.test_pipeline.add_connection(name="mongdb_connection",config=MONGODB_CONFIG)
self.test_pipeline.add_connection(name="gcp_connection", config=GCP_CONFIG)
self.test_pipeline.add_connection(name="webp_gcp_connection", config=WEBP_GCP_CONFIG)
# Execute all connections at once
self.execute_all()
# Initialize connections
self.connections = {
"sql_connection": self.get_connection("sql_connection"),
"es_connection": self.get_connection("es_connection"),
"redis_connection": self.get_connection("redis_connection"),
"s3_connection" = self.get_connection("s3_connection"),
"mongodb_connection"=self.get_connection("mongdb_connection"),
"plotch_gcp_connection" = self.get_connection("gcp_connection"),
"webp_gcp_connection" = self.get_connection("webp_gcp_connection")
}
def get_connection(self, connection_name):
"""
Returns a connection by name from the pipeline.
"""
return self.test_pipeline.get_connection_by_name(connection_name)
Here we create an instance of the DatalakeConnection class. When the class is instantiated, it automatically creates the pipeline, adds the connections (MySQL, Elasticsearch, Redis), and executes them concurrently.
MySQL Connection: <groclake.datalake.connection.SQLConnection object at 0x7b166dc5b410>
Elasticsearch Connection: <groclake.datalake.connection.ESConnection object at 0x7b166dc5ae10>
Redis Connection: <groclake.datalake.connection.RedisConnection object at 0x7b166dc58910>
Example use of MySQL connection
# INSERT DATA
def insert_user(name, status):
query = "INSERT INTO user_details (user_id, name, status) VALUES (%s, %s, %s)"
params = ("1234", name, status)
mysql_connection.write(query, params)
return True
#FETCH DATA
def get_user_info(user_id):
query = "SELECT * FROM user_details WHERE user_id = %s"
params = (user_id,)
response = mysql_connection.read(query, params)
return response