LogoLogo
  • 👋Welcome to Groclake
  • ⏩Jump right in
  • 🗣️Introduction to Groclake
  • 🧠High level Concepts
    • Agent Discovery
    • Agent Registry
    • Agent Communication
      • Agent Text Transfer Protocol - ATTP
    • Agent Security
      • Agent Private Cloud - APC
      • Authentication & Encryption
      • Zero Trust Policy
  • 💽Installation & Guide
  • 🏗️Groclake Use Cases
  • 📰Groclake Records
  • Example Codes
  • GrocAgent
    • What is GrocAgent?
    • Example Chat Agent
    • Reflections in GrocAgent
      • Workflow of Reflection Handler
  • Lakes
    • 💾Data & Model Management
      • Datalake
        • Create Datalake
        • Retrieve Document
        • Upload Documents
        • Datalake Connections
          • Snowflake integration
      • Vectorlake
        • Creating vector
        • Generating Vector
        • Pushing Vector
        • Retrieve Document
        • Searching Vector
      • Modellake
        • Create Modellake
        • Language Translation
        • Conversation AI
        • Text to Speech
        • Chat Completion
      • Knowledgelake
        • Create Knowledge Base
        • Push Documents from a URL
        • Push Documents from Local Storage
        • Searching for Information
    • ⚒️Tool Management & Gateway
      • Toollake
        • Tools
        • Salesforce CRM Integration
        • Slack Communication Module
        • New Relic Integration
        • Google Calendar Integration
          • Check Slot Availability
          • Get Available Slots
          • Delete Event
          • Create new event
          • Create new calendar event
    • 🤖Agent Management & Deployment
      • Agentlake
        • Register your agent
        • Fetch agent details & categories
        • Create Agent Private Cloud (APC)
        • Assign Agent Private Cloud (APC) to an Agent
      • Promptlake
        • Setting Connection & Initializing
        • Storing a Prompt
        • Fetching a Prompt
        • Example API Calls
      • Memorylake
        • Context Component Examples
        • Value Structure
        • Setup & Guide
        • Storing & Retrieving Memory
        • Wildcard Search
        • Updating Memory Quality
    • 🗃️Index Stores
      • Cataloglake
        • Create catalog
        • Generate Product Data
        • Fetch Catalog Data
        • Push Product Data
        • Optimize Data Retrieval with Catalog Caching
        • Search for Products
        • Filter Product Search
        • Update Product Data
        • Recommend Products Based on Product Name
        • Update Inventory in Catalog
        • Fetch Inventory Details from Catalog
        • Fetch Product Price
        • Update Product Price in Catalog
        • Cache Image in Catalog
        • Sync Your Catalog with external ecomm platforms
        • Deleting items
        • Address Parsing and Intent Extraction
        • Creating Mapper
        • Convert Mapper's Metadata
        • Fetching Mapper
        • Updating Mapper
        • Example use case of Cataloglake
      • Joblake
        • Joblake Mapping
        • Creating a Joblake
      • Resumelake
        • Resumelake Mapping
        • Creating a Resumelake
Powered by GitBook
On this page
  • Step 1. Create the .env File
  • Step 2. Create config.py file
  • Step 3. Import Required Modules
  • Step 4. Define DatalakeConnection Class
  • Step 5. Intialize DatalakeConnection class
  • Step 6. Accessing a Specific Connection
  • Example use of MySQL connection
  • Example use of Elasticsearch connection
  • Example use of Redis connection
  • Example use of S3 Connection
  • Example use of MongoDB Connection
  • Example use of GCP Connection
  1. Lakes
  2. Data & Model Management
  3. Datalake

Datalake Connections

Datalake is a versatile framework for data orchestration, offering seamless integration with databases like MySQL and Elasticsearch. It simplifies data ingestion, processing, and transformation through pipelines, with a secure configuration using environment variables. This guide walks through the setup, connection management, and usage with step-by-step instructions.

1. Inheriting from Datalake

The DatalakeConnection class inherits from the Datalake class, gaining access to all the parent class's methods. These methods include:

  • Creating pipelines.

  • Adding connections.

  • Executing tasks.

This inheritance allows developers to leverage pre-built functionality while extending capabilities specific to their workflows.


2. Configuring Connections

Connections represent data sources like MySQL, Elasticsearch, and Redis. Configuration for each connection is fetched dynamically from a Config file or environment variables.

  • SQL Databases (e.g., MySQL, PostgreSQL): Identified with the sql type.

  • Elasticsearch (ES): Identified with the es type.

  • Redis: Identified with the redis type.

Each connection type is clearly categorized to ensure proper handling and integration into the pipeline.


3. Creating the Pipeline

A pipeline is a central structure for managing data workflows. The create_pipeline method from the Datalake class initializes a pipeline with a unique name. The pipeline coordinates multiple connections and their respective tasks, ensuring a structured and cohesive execution process.


4. Adding Connections

Connections are added to the pipeline using unique identifiers. Each connection requires:

  • A name (e.g., sql_connection, es_connection).

  • A configuration containing the necessary connection details (e.g., host, port, authentication credentials).

This modular approach allows for easy referencing and management of multiple data sources within the same pipeline.


5. Executing Connections

Once all the connections are added to the pipeline, the execute_all method is used to run them concurrently.

  • This method employs threading to ensure efficient execution across multiple connections.

  • Results from each connection are collected and can be processed further.


6. Storing Connections

All connections added to the pipeline are stored in a connections dictionary.

  • Each connection can be accessed by its unique identifier (e.g., sql_connection, es_connection).

  • This storage mechanism simplifies reuse and modularity within complex workflows.

Here's the example code for execution and connections:

Step 1. Create the .env File

In your project directory, create a .env file and add your configuration variables in KEY=VALUE format.

# Example .env file

# MySQL Configuration
MYSQL_HOST_PROD=127.0.0.1
MYSQL_PORT_PROD=3306
MYSQL_USER_PROD=root
MYSQL_PASSWORD_PROD=password
MYSQL_DB_PROD=example_db

# Elasticsearch Configuration
ES_HOST=127.0.0.1
ES_PORT=9200
ES_API_KEY=es_api_key

# Redis Configuration
REDIS_HOST=127.0.0.1
REDIS_PORT=6379

# S3 Configuration
AWS_S3_BUCKET=my-s3-bucket-name
AWS_S3_FOLDER=uploads/images/
AWS_REGION_NAME=us-east-1
AWS_ACCESS_KEY_ID=AKIAXXXXXXXXXXXXXXXX
AWS_SECRET_ACCESS_KEY=abcd1234abcd5678abcd9012abcd3456abcd7890


# MongoDB Configuration
MONGODB_CONNECTION_STRING=mongodb+srv://username:password@cluster0.mongodb.net
MONGODB_DATABASE=my_database


#GCP
GCP_TYPE=service_account
GCP_PROJECT_ID=my-sample-project
GCP_PRIVATE_KEY_ID=1234567890abcdef1234567890abcdef
GCP_PRIVATE_KEY="-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkq...\n-----END PRIVATE KEY-----\n"
GCP_CLIENT_EMAIL=my-service-account@my-sample-project.iam.gserviceaccount.com
GCP_CLIENT_ID=123456789012345678901
GCP_AUTH_URI=https://accounts.google.com/o/oauth2/auth
GCP_TOKEN_URI=https://oauth2.googleapis.com/token
GCP_AUTH_PROVIDER_CERT_URL=https://www.googleapis.com/oauth2/v1/certs
GCP_CLIENT_CERT_URL=https://www.googleapis.com/robot/v1/metadata/x509/my-service-account%40my-sample-project.iam.gserviceaccount.com
GCP_UNIVERSE_DOMAIN=googleapis.com
GCP_BUCKET_NAME=my-sample-bucket
GCP_CONNECTION_TYPE=gcp_storage
GCP_HOST_CDN_URL=https://storage.googleapis.com/my-sample-bucket/

# WEBP Credentials
GCP_WEBP_PROJECT_ID=my-webp-project
GCP_WEBP_CLIENT_EMAIL=my-webp-service-account@my-webp-project.iam.gserviceaccount.com
GCP_WEBP_CLIENT_ID=987654321098765432109
GCP_WEBP_PRIVATE_KEY_ID=abcdef1234567890abcdef1234567890
GCP_WEBP_PRIVATE_KEY="-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkq...\n-----END PRIVATE KEY-----\n"
GCP_WEBP_BUCKET_NAME=my-webp-bucket
GCP_WEBP_CONNECTION_TYPE=gcp_storage
GCP_WEBP_HOST_CDN_URL=https://storage.googleapis.com/my-webp-bucket/

Step 2. Create config.py file

In your Config.py file, use the .env package to load the .env file and assign the configuration values.

Example config file

import os
import base64
from dotenv import load_dotenv

# Load environment variables from the .env file
load_dotenv()

class Config:
    MYSQL_CONFIG = {
        "host": os.getenv("MYSQL_HOST_PROD"),
        "port": int(os.getenv("MYSQL_PORT_PROD")),
        "username": os.getenv("MYSQL_USER_PROD"),
        "password": os.getenv("MYSQL_PASSWORD_PROD"),
        "database": os.getenv("MYSQL_DB_PROD"),
    }

    # Elasticsearch connection
    api_key = os.getenv("ES_API_KEY")
    encoded_api_key = base64.b64encode(api_key.encode("utf-8")).decode("utf-8")

    ES_CONFIG = {
        "host": os.getenv("ES_HOST"),
        "port": int(os.getenv("ES_PORT")),
        "headers": {
            "Authorization": f"ApiKey {encoded_api_key}"
        }
    }

    # Redis Configuration
    REDIS_CONFIG = {
        "host": os.getenv("REDIS_HOST"),  # 127.0.0.1
        "port": int(os.getenv("REDIS_PORT")),  # 6379
    }

   S3_CONFIG = {
    'aws_s3_bucket': os.getenv('AWS_S3_BUCKET'),
    'aws_s3_folder': os.getenv('AWS_S3_FOLDER'),
    'aws_region_name': os.getenv('AWS_REGION_NAME'),
    'aws_access_key_id': os.getenv('AWS_ACCESS_KEY_ID'),
    'aws_secret_access_key': os.getenv('AWS_SECRET_ACCESS_KEY')
     }

    MONGODB_CONFIG = {
        'connection_string': os.getenv('MONGODB_CONNECTION_STRING'),
        'data_base': os.getenv('MONGODB_DATABASE')
    }

    credentials_json = {
    "type": os.getenv("GCP_TYPE"),
    "project_id": os.getenv("GCP_PROJECT_ID"),
    "private_key_id": os.getenv("GCP_PRIVATE_KEY_ID"),
    "private_key": os.getenv("GCP_PRIVATE_KEY").replace('\\n', '\n'),
    "client_email": os.getenv("GCP_CLIENT_EMAIL"),
    "client_id": os.getenv("GCP_CLIENT_ID"),
    "auth_uri": os.getenv("GCP_AUTH_URI"),
    "token_uri": os.getenv("GCP_TOKEN_URI"),
    "auth_provider_x509_cert_url": os.getenv("GCP_AUTH_PROVIDER_CERT_URL"),
    "client_x509_cert_url": os.getenv("GCP_CLIENT_CERT_URL"),
    "universe_domain": os.getenv("GCP_UNIVERSE_DOMAIN")
    }

    GCP_CONFIG = {
        'gcp_bucket_name': os.getenv("GCP_BUCKET_NAME"),
        'gcp_credentials_json': credentials_json,
        'connection_type': os.getenv("GCP_CONNECTION_TYPE"),
        "host_cdn_url": os.getenv("GCP_HOST_CDN_URL")
    }

    # Credentials for PLOTCH_WEBP_GCP_CONFIG
    credentials_webp_json = {
        "type": os.getenv("GCP_TYPE"),
        "project_id": os.getenv("GCP_WEBP_PROJECT_ID"),
        "private_key_id": os.getenv("GCP_WEBP_PRIVATE_KEY_ID"),
        "private_key": os.getenv("GCP_WEBP_PRIVATE_KEY").replace('\\n', '\n'),
        "client_email": os.getenv("GCP_WEBP_CLIENT_EMAIL"),
        "client_id": os.getenv("GCP_WEBP_CLIENT_ID"),
        "auth_uri": os.getenv("GCP_AUTH_URI"),
        "token_uri": os.getenv("GCP_TOKEN_URI"),
        "auth_provider_x509_cert_url": os.getenv("GCP_AUTH_PROVIDER_CERT_URL"),
        "client_x509_cert_url": os.getenv("GCP_CLIENT_CERT_URL"),
        "universe_domain": os.getenv("GCP_UNIVERSE_DOMAIN")
    }

    WEBP_GCP_CONFIG = {
        'gcp_bucket_name': os.getenv("GCP_WEBP_BUCKET_NAME"),
        'gcp_credentials_json': credentials_webp_json,
        'connection_type': os.getenv("GCP_WEBP_CONNECTION_TYPE"),
        "host_cdn_url": os.getenv("GCP_WEBP_HOST_CDN_URL")
    }

Step 3. Import Required Modules

The Datalake class from groclake.datalake provides the basic functionalities for managing data pipelines. The Config module contains the configuration details for MySQL, Elasticsearch (ES), and Redis.

from groclake.datalake import Datalake
from config import Config

Step 4. Define DatalakeConnection Class

The DatalakeConnection class extends Datalake and adds specific data connections to a pipeline.

class DatalakeConnection(Datalake):
    def __init__(self):
        super().__init__()  # Inherit from Datalake class

        # Define the configuration for each connection
        MYSQL_CONFIG = Config.MYSQL_CONFIG
        MYSQL_CONFIG['connection_type'] = 'sql'

        ES_CONFIG = Config.ES_CONFIG
        ES_CONFIG['connection_type'] = 'es'

        REDIS_CONFIG = Config.REDIS_CONFIG
        REDIS_CONFIG['connection_type'] = 'redis'

        S3_CONFIG = Config.S3_CONFIG
        S3_CONFIG['connection_type'] = 's3'

        MONGODB_CONFIG = Config.MONGODB_CONFIG
        MONGODB_CONFIG['connection_type'] = 'mongo'

        # Create and add connections to the pipeline
        self.test_pipeline = self.create_pipeline(name="test_pipeline")
        self.test_pipeline.add_connection(name="sql_connection", config=MYSQL_CONFIG)
        self.test_pipeline.add_connection(name="es_connection", config=ES_CONFIG)
        self.test_pipeline.add_connection(name="redis_connection", config=REDIS_CONFIG)
        self.test_pipeline.add_connection(name="s3_connection", config=S3_CONFIG)
        self.test_pipeline.add_connection(name="mongdb_connection",config=MONGODB_CONFIG)
        self.test_pipeline.add_connection(name="gcp_connection", config=GCP_CONFIG)
        self.test_pipeline.add_connection(name="webp_gcp_connection", config=WEBP_GCP_CONFIG)

        # Execute all connections at once
        self.execute_all()

        # Initialize connections
        self.connections = {
            "sql_connection": self.get_connection("sql_connection"),
            "es_connection": self.get_connection("es_connection"),
            "redis_connection": self.get_connection("redis_connection"),
            "s3_connection" = self.get_connection("s3_connection"),
            "mongodb_connection"=self.get_connection("mongdb_connection"),
            "plotch_gcp_connection" = self.get_connection("gcp_connection"),
            "webp_gcp_connection" = self.get_connection("webp_gcp_connection")
        }

    def get_connection(self, connection_name):
        """
        Returns a connection by name from the pipeline.
        """
        return self.test_pipeline.get_connection_by_name(connection_name)

Step 5. Intialize DatalakeConnection class

os.environ['GROCLAKE_API_KEY'] = userdata.get('groclake_api_key')
os.environ['GROCLAKE_ACCOUNT_ID'] = userdata.get('groclake_account_id')
datalake_connection = DatalakeConnection()

Here we create an instance of the DatalakeConnection class. When the class is instantiated, it automatically creates the pipeline, adds the connections (MySQL, Elasticsearch, Redis), and executes them concurrently.

Step 6. Accessing a Specific Connection

# Accessing a specific connection (MySQL connection in this case)
mysql_connection = datalake_connection.connections["sql_connection"]
print("MySQL Connection:", mysql_connection)

es_connection = datalake_connection.connections["es_connection"]
print("Elasticsearch Connection:", es_connection)

redis_connection = datalake_connection.connections["redis_connection"]
print("Redis Connection:", redis_connection)

s3_connection = datalake_connection.connections["s3_connection"]
print("S3 Connection:", s3_connection)

mongodb_connection = datalake_connection.connections["mongodb_connection"]
print("Mongodb Connection:", mongodb_connection)

gcp_connection = datalake_connection.connections["gcp_connection"]
print("GCP Connection:", gcp_connection)

webp_gcp_connection = datalake_connection.connections["webp_gcp_connection"]
print("Mongodb Connection:", webp_gcp_connection)

Example Response

MySQL Connection: <groclake.datalake.connection.SQLConnection object at 0x7b166dc5b410>
Elasticsearch Connection: <groclake.datalake.connection.ESConnection object at 0x7b166dc5ae10>
Redis Connection: <groclake.datalake.connection.RedisConnection object at 0x7b166dc58910>

Example use of MySQL connection

# INSERT DATA
def insert_user(name, status):
    query = "INSERT INTO user_details (user_id, name, status) VALUES (%s, %s, %s)"
    params = ("1234", name, status)
    mysql_connection.write(query, params)
    return True


#FETCH DATA
def get_user_info(user_id):
    query = "SELECT * FROM user_details WHERE user_id = %s"
    params = (user_id,)
    response = mysql_connection.read(query, params)
    return response

Example use of Elasticsearch connection

# WRITE

# Define the write query
write_query = {
    "index": "users",  # Index name
    "body": {
        "user_id": "123",
        "name": "Alice",
        "role": "Engineer"
    }
}


write_response = es_connection.write(write_query)
print("Write Response:", write_response)


#READ
read_query = {
    "index": "users",
    "body": {
        "query": {
            "match_all": {}
        }
    }
}


document_count = es_connection.read(read_query)
print("Total Documents:", document_count)

Example use of Redis connection

# WRITE
key = "user:1000:name"
value = "John Doe"
cache_ttl = 3600  # TTL of 1 hour

redis_connection.set(key, value, cache_ttl)
print(f"Set value for {key}: {value}")

#READ
key = "user:1000:name"
value = redis_connection.get(key)
print(f"Got value for {key}: {value.decode('utf-8')}")

Example use of S3 Connection

params = {
    "folder_name": "d5a15027e8894185",
    "document_type": "url",
    "document_data": "https://pdfobject.com/pdf/sample.pdf"
}

result = plotch_s3_connection.upload(params)
print("THIS IS RESULT", result)

Example use of MongoDB Connection

# Insert data into MongoDB
data_to_insert = {
    "name": "Thor",
    "realm": "Asgard",
    "weapon": "Mjolnir"
}
inserted_id = mongodb_connection.insert(collection_name="heroes", data=data_to_insert)
print(f"Inserted document with ID: {inserted_id}")

# Read data from MongoDB
query = {"realm": "Asgard"}
result = mongo_connection.read(collection_name="heroes", query=query)
print("Query Result:", result)

Example use of GCP Connection

params = {
    "image_type": "url",
    'gcp_bucket_path': "/C/V/vVmJnIt1738044192_dbe7ad56b6837cfeb857fd40a06b0660f23de24b9aac5340d5c77fd12ee5abe2",
    "image_data": "https://pbs.twimg.com/profile_images/428316729220276224/EdBZ2Kgp.jpeg"
}

webp_params = {
    "image_type": "url",
    'gcp_bucket_path': "/C/V/vVmJnIt1738044192_dbe7ad56b6837cfeb857fd40a06b0660f23de24b9aac5340d5c77fd12ee5abe3",
    "image_data": "https://pbs.twimg.com/profile_images/428316729220276224/EdBZ2Kgp.jpeg",
    "image": "webp"
}


webp_result = webp_gcp_connection.upload(webp_params)
print("THIS IS WEBP RESULT", webp_result)
result = gcp_connection.upload(params)
print("THIS IS JPEG IMAGE", result)
PreviousUpload DocumentsNextSnowflake integration

Last updated 3 months ago

💾