Skip to main content

Data + Intelligence

Base Is Loaded: Bridging OLTP and OLAP with Lakebase and PySpark

Database Cloning

For years, the Lakehouse paradigm has successfully collapsed the wall between Data Warehouses and Data Lakes. We have unified streaming and batch, structured and unstructured data, all under one roof. Yet we often find ourselves hitting a familiar, frustrating wall: the gap between the analytical plane (OLAP) and the transactional plane (OLTP). In my latest project, the client wanted to use Databricks to serve as both an analytic platform and power their front-end React web app. There is a sample Databricks App that uses NodeJS for a front end and FastAPI for a Python backend that connects to Lakebase. The sample ToDo app provides a sample front end that performs CRUD operations out of the box. I opened a new Databricks Query object, connected to the Lakebase compute, and verified the data. It’s hard to overstate how cool this seemed.

The next logical step was to build a declarative pipeline that would flow the data Lakebase received from the POST, PUT and GET requests through the Bronze layer, for data quality checks, into Silver for SCD2-style history and then into Gold where it would be available to end users through AI/BI Genie and PowerBI reports as well as being the source for a sync table back to Lakebase to serve GET statements. I created a new declarative pipeline in a source-controlled asset bundle and started building. Then I stopped building. That’s not supported. You actually need to communicate with Lakebase from a notebook using the SDK. A newer SDK than Serverless provides, no less.

A couple of caveats. At the time of this writing, I’m using Azure Databricks, so I only have access to Lakebase Provisioned and not Lakebase Autoscaling. And it’s still in Public Preview; maybe GA is different. Or, not. Regardless, I have to solve the problem on my desk today, and simply having the database isn’t enough. We need a robust way to interact with it programmatically from our notebooks and pipelines.

In this post, I want to walk through a Python architectural pattern I’ve developed—BaseIsLoaded. This includes pipeline configurations, usage patterns, and a PySpark class:LakebaseClient. This class serves two critical functions: it acts as a CRUD wrapper for notebook-based application logic, and, more importantly, it functions as a bridge to turn a standard Postgres table into a streaming source for declarative pipelines.

The Connectivity Challenge: Identity-Native Auth

The first hurdle in any database integration is authentication. In the enterprise, we are moving away from hardcoded credentials and .pgpass files. We want identity-native authentication. The LakebaseClient handles this by leveraging the databricks.sdk. Instead of managing static secrets, the class generates short-lived tokens on the fly.

Look at the _ensure_connection_info method in the provided code snippet:

def _ensure_connection_info(self, spark: SparkSession, value: Any):
    # Populate ``self._conn_info`` with the Lakebase endpoint and temporary token
  if self._conn_info is None:
    w = WorkspaceClient()
    instance_name = "my_lakebase" # Example instance
    instance = w.database.get_database_instance(name=instance_name)
    cred = w.database.generate_database_credential(
    request_id=str(uuid.uuid4()), instance_names=[instance_name]
  )
  self._conn_info = {
    "host": instance.read_write_dns,
    "dbname": "databricks_postgres",
    "password": cred.token, # Ephemeral token
    # ...
  }
    """)

This encapsulates the complexity of finding the endpoint and authenticating and allows us to enforce a “zero-trust” model within our code. The notebook or job running this code inherits the permissions of the service principal or user executing it, requesting a token valid only for that session.

Operationalizing DDL: Notebooks as Migration Scripts

One of the strongest use cases for Lakebase is managing application state or configuration for data products. However, managing the schema of a Postgres database usually requires an external migration tool (like Flyway or Alembic).

To keep the development lifecycle contained within Databricks, I extended the class to handle safe DDL execution. The class includes methods like create_table, alter_table_add_column, and create_index.

These methods use psycopg2.sql to handle identifier quoting safely. In a multi-tenant environment where table names might be dynamically generated based on business units or environments, either by human or agentic developers, SQL injection via table names is a real risk.

def create_table(self, schema: str, table: str, columns: List[str]):
    ddl = psql.SQL("CREATE TABLE IF NOT EXISTS {}.{} ( {} )").format(
        psql.Identifier(schema),
        psql.Identifier(table),
        psql.SQL(", ").join(psql.SQL(col) for col in columns)
    )
    self.execute_ddl(ddl.as_string(self._get_connection()))

This allows a Databricks Notebook to serve as an idempotent deployment script. You can define your schema in code and execute it as part of a “Setup” task in a Databricks Workflow, ensuring the OLTP layer exists before the ETL pipeline attempts to read from or write to it.

The Core Innovation: Turning Postgres into a Micro-Batch Stream

The most significant value of this architecture is the load_new_data method.

Standard JDBC connections in Spark are designed for throughput, not politeness. They default to reading the entire table or, if you attempt to parallelize reads via partitioning, they spawn multiple executors that can quickly exhaust the connection limit of Lakebase. By contrast, LakebaseClient runs intentionally on the driver using a single connection.

This solves a common dilemma we run into with our enterprise clients: if you have a transactional table (e.g., an orders table or a pipeline_audit log) in Lakebase and want to ingest it into Delta Lake incrementally, you usually have to introduce Kafka, Debezium, or complex CDC tools. If you have worked for a large, regulated company, you can appreciate the value of not asking for things.

Instead, LakebaseClient implements a lightweight “Client-Side CDC” pattern. It relies on a monotonic column (a checkpoint_column, such as an auto-incrementing ID or a modification_timestamp) to fetch only what has changed since the last run.

1. State Management with Delta

The challenge with custom polling logic is: where do you store the offset? If the cluster restarts, how does the reader know where it left off?

I solved this by using Delta Lake itself as the state store for the Postgres reader. The _persist_checkpoint and _load_persisted_checkpoint methods use a small Delta table to track the last_checkpoint for every source.

def _persist_checkpoint(self, spark: SparkSession, value: Any):
    # ... logic to create table if not exists ...
    # Upsert (merge) last checkpoint into a Delta table
    spark.sql(f"""
        MERGE INTO {self.checkpoint_store} t
        USING _cp_upsert_ s
        ON t.source_id = s.source_id
        WHEN MATCHED THEN UPDATE SET t.last_checkpoint = s.last_checkpoint
        WHEN NOT MATCHED THEN INSERT ...
    """)

This creates a robust cycle: The pipeline reads from Lakebase, processes the data, and commits the offset to Delta. This ensures exactly-once processing semantics (conceptually) for your custom ingestion logic.

2. The Micro-Batch Logic

The load_new_data method brings it all together. It creates a psycopg2 cursor, queries only the rows where checkpoint_col > last_checkpoint, limits the fetch size (to prevent OOM errors on the driver), and converts the result into a Spark DataFrame.

    if self.last_checkpoint is not None:
        query = psql.SQL(
            "SELECT * FROM {} WHERE {} > %s ORDER BY {} ASC{}"
        ).format(...)
        params = (self.last_checkpoint,)

By enforcing an ORDER BY on the monotonic column, we ensure that if we crash mid-batch, we simply resume from the last successfully processed ID.

Integration with Declarative Pipelines

So, how do we use this in a real-world enterprise scenario?

Imagine you have a “Control Plane” app running on a low-cost cluster that allows business users to update “Sales Targets” via a Streamlit app (backed by Lakebase). You want these targets to immediately impact your “Sales Reporting” Delta Live Table (DLT) pipeline.

Instead of a full refresh of the sales_targets table every hour, you can run a continuous or scheduled job using LakebaseClient.

The Workflow:

  1. Instantiation:
    lb_source = LakebaseClient(
        table_name="public.sales_targets",
        checkpoint_column="updated_at",
        checkpoint_store="system.control_plane.ingestion_offsets"
    )
    
  2. Ingestion Loop: You can wrap load_new_data in a simple loop or a scheduled task.
    # Fetch micro-batch
    df_new_targets = lb_source.load_new_data()
    
    if not df_new_targets.isEmpty():
        # Append to Bronze Delta Table
        df_new_targets.write.format("delta").mode("append").saveAsTable("bronze.sales_targets")
    
  3. Downstream DLT: Your main ETL pipeline simply reads from bronze.sales_targets as a standard streaming source. The LakebaseClient acts as the connector, effectively “streaming” changes from the OLTP layer into the Bronze layer.

Architectural Considerations and Limitations

While this class provides a powerful bridge, as architects, we must recognize the boundaries.

  1. It is not a Debezium Replacement: This approach relies on “Query-based CDC.” It cannot capture hard deletes (unless you use soft-delete flags), and it relies on the checkpoint_column being strictly monotonic. If your application inserts data with past timestamps, this reader will miss them. My first use case was pretty simple; just a single API client performing CRUD operations. For true transaction log mining, you still need logical replication slots (which Lakebase supports, but requires a more complex setup).
  2. Schema Inference: The _postgres_type_to_spark method in the code provides a conservative mapping. Postgres has rich types (like JSONBHSTORE, custom enums). This class defaults unknown types to StringType. This is intentional design—it shifts the schema validation burden to the Bronze-to-Silver transformation in Delta, preventing the ingestion job from failing due to exotic Postgres types. I can see adding support for JSONB before this project is over, though.
  3. Throughput: This runs on the driver or a single executor node (depending on how you parallelize calls). It is designed for “Control Plane” data—thousands of rows per minute, not millions of rows per second. Do not use this to replicate a high-volume trading ledger; use standard ingestion tools for that.

Conclusion

Lakebase fills the critical OLTP void in the Databricks ecosystem. However, a database is isolated until it is integrated. The BaseIsLoaded pattern demonstrated here offers a lightweight, Pythonic way to knit this transactional layer into your analytical backbone.

By abstracting authentication, safely handling DDL, and implementing stateful micro-batching via Delta-backed checkpoints, we can build data applications that are robust, secure, and entirely contained within the Databricks control plane. It allows us to stop treating application state as an “external problem” and start treating it as a native part of the Lakehouse architecture. Because, at the end of the day, adding Apps plus Lakebase to your toolbelt is too much fun to let a little glue code stand in your way.

Perficient is a Databricks Elite PartnerContact us to learn more about how to empower your teams with the right tools, processes, and training to unlock your data’s full potential across your enterprise.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

David Callaghan, Senior Solutions Architect

Databricks Champion | Center of Excellence Lead | Data Privacy & Governance Expert | Speaker & Trainer | 30+ Yrs in Enterprise Data Architecture

More from this Author

Follow Us