Skip to main content

Data & Intelligence

Writing Testable Python Objects in Databricks

Stethoscope With Clipboard And Laptop On Desk Doctor Working In Hospital Writing A Prescription Healthcare And Medical Concept Test Results In Background Vintage Color Selective Focus.

I’ve been writing about Test-Driven Development in Databricks and some of the interesting issues that you can run into with Python objects. It’s always been my opinion that code that is not testable is detestable. Admittedly, its been very difficult getting to where I wanted to be with Databricks and TDD. Unfortunately, it’s hard to find strength in the courage of your convictions when you have neither. Let’s get into the weeds and discuss honestly and in detail how to build resilient and reliable Databricks systems. We’ll take a look at how to build object that are both usable and testable. Then we’ll look at the testing tools that are available to us. Finally, we’ll look at some practical guidelines for structuring your Databricks project code and deploying it into Production.

Testable Python Objects

We’ve looked under the hood to see how pickling problems can lead to Py4J issues. We’ve even seen how that can happen with trivial code, like this:

class APIError(Exception):
"""Exception raised for errors in the API call."""


class ValidationError(Exception):
"""Exception raised for validation errors."""


class DataError(Exception):
"""Exception raised for errors in data processing."""

Now we know that there is a specific set of type mappings that can be used between Python and Java. Logically, custom objects are never going to be in that set. One way of handling this is to just have any method that calls this class convert the value to a string. This is a definite code smell; DRY specifically. Its not too much to ask these anaemic classes to help centralize the responsibility of risking Py4JErrors. This is all we’re talking about doing.

class APIError(Exception):
    """Exception raised for errors in the API call."""
    def __str__(self):
        return f"API Error: {super().__str__()}"

class DataError(Exception):
    """Exception raised for errors in data processing."""
    def __str__(self):
        return f"Data Error: {super().__str__()}"

class ValidationError(Exception):
    """Exception raised for validation errors."""
    def __str__(self):
        return f"Validation Error: {super().__str__()}"

If you look at this change strictly from a naive implementation perspective, we’ve only gone from this …

exception_str = str(exception)
if isinstance(exception, APIError): 
    log_content = f"{APP_NAME}: API Error: {exception_str}" 
elif isinstance(exception, ValidationError): 
    log_content = f"{APP_NAME}: Validation Error: {exception_str}" 
elif isinstance(exception, DataError): 
     log_content = f"{APP_NAME}: Data Error: {exception_str}"

… to this:

log_content = str(exception)

Its always nice to reduce lines of code, but LoC is one of the least meaningful code metrics for a reason. More importantly, we have now removed a category of error from our code. Another method can’t cause a P4JavaError when using our class. The magic isn’t in removing elif; its in removing except Py4JJavaError as err:.  This is a foundational concept in building more testable code; removing the potential for exceptions.

Databricks Mocks

Data Intelligence - The Future of Big Data
The Future of Big Data

With some guidance, you can craft a data platform that is right for your organization’s needs and gets the most return from your data capital.

Get the Guide

Its hard to imagine good unit tests without mocks. Its also hard to imagine mocks that made my life more difficult than mocking Databricks. Going back to the logging case, I used to have a very simple method that took that log_content = str(exception) value and saved it to a Delta table with some other columns. This is very common. Its also a testing nightmare. Eventually, I ended up with this:

from unittest import mock

...
    def setUp(self):
        # Mock the DataFrame and its write chain
        self.mock_df = mock.Mock()
        self.mock_write = mock.Mock()
        self.mock_df.write.format.return_value.mode.return_value.saveAsTable = self.mock_write

        # Patch the get_spark_session function
        self.patcher = mock.patch('main.utils.custom_logging.get_spark_session')
        self.mock_spark_session = self.patcher.start()
        self.mock_spark_session.return_value.createDataFrame.return_value = self.mock_df

    def test_log_info_message(self):
        """Test basic info logging."""
        log_message(message=self.TEST_MESSAGE)

        # Assert DataFrame creation and write process were called
        self.mock_spark_session.return_value.createDataFrame.assert_called_once()
        self.mock_df.write.format.assert_called_with("delta")
        self.mock_df.write.format.return_value.mode.assert_called_with("append")
        self.mock_write.assert_called_with("default.logging")

    def tearDown(self):
        self.patcher.stop()

I also had to go back to the python file I was testing and take the Spark session out of that method. Because reasons, I guess.

def _get_spark_session():
    spark = SparkSession.builder.appName(APP_NAME).getOrCreate()

This was a lot of work. That’s why I’m including the code. A lot of the work I did was just throwing stuff out there, though. So don’t use the code I included. There is a much better solution. Don’t write code that’s this hard to test.

Just test public methods. Keep your public method clean: accept, validate and process parameters and pay close attention to pre-conditions, post-conditions and invariants. Put the supporting logic and integration in private methods. Since this is python, you don’t really get private methods. I use a single underscore, which is a naming convention that other developers would understand. Don’t use the double-underscore because I honestly don’t know what the name mangling would do in Databricks. Once I moved the Databricks code to a private module, I didn’t need to use the _get_spark_session code. I kept the unit tests because I wrote them but I was going to find out all of that information in the unit tests anyway.

Project Structure

This was a tough one. Picture how your notebooks are laid out in Databricks. Does it easily lend itself to this structure (found in your .vscode/.settings file)?

"python.testing.unittestArgs": [
    "-v",
    "-s",
    "./my_dir/test",
    "-p",
    "*_test.py"
],

Does it fit into any directory structure that could conform to ./.venv/bin/python -m unittest discover -v -s ./my_dir/test -p '*_test.py'? Probably not. But that’s what you’re going to need. So far I’ve talked about TDD and a little be about repos. This is where a CI/CD pipeline comes in. For development, you are going to need main and test directories if you are going to run your tests in Test Explorer. Also, you are going to need to be explicit about your paths in your imports. This is going to be rough; hope your sed isn’t too rusty 🙂

At a high level, the pipeline is going to need to do the following:

  1. Trigger a workflow. A push to a specific branch is one well-known pattern here but there are others.
  2. Checkout the repo: You’ll need to workflow to access the repo. The actions/checkout step is a good start (for GitHub Actions).
  3. Reorganize files: Pick something you’re familiar with, like python or a shell script, to flatten the directory structure, rename the files an modify the affected import statements.
  4. Push to Databricks Repo: I recommend using a Databricks service account and use the REST API. The Databricks CLI is an option but its one more thing to install and maintain and in most corporate environments, someone somewhere is going to say no at some point.

I would actually prefer you use a shell script rather than python just because there won’t be any dependencies. I know a lot of people don’t use the command line much, and I feel bad about that sed comment, so here’s a sample bash script to flatten your directories and refactor the import files.

#!/bin/bash

# Rename directories
mv ./main/api_1 ./api_1
mv ./main/api_2 ./api_2

# Update import statements in Python files
find . -name '*.py' -exec sed -i 's/from main.api_1/from api_1/g' {} +
find . -name '*.py' -exec sed -i 's/from main.api_2/from api_2/g' {} +

Conclusion

These last posts were pretty rough. I mean, most people just fire up a Databricks notebooks and everything just works. However, nothing ‘just works’. Everyone thought their technical debt was a good idea at some point. You need to test your code to trust your code. People love pointing out that testing doesn’t guarantee success. No one I know who tests a lot ever made that claim. Unit testing just validates that your particular implementation of your interpretation of someone’s description of a business need worked at a certain point in time based on a limited number of well defined assumptions. If that’s all you get out of a hundred unit tests, imagine the shenanigans from from not testing at all.  I also threw in a CI/CD pipeline at the end there. Once again, if a controlled deployment can be difficult, imagine the mayhem of uncontrolled deployments.

In my first post in this series, I said “I don’t like testing Databricks notebooks and that’s a problem.”. More to the point is that I’ve been a developer for decades and I’ve seen what it takes to build resilient and reliable systems. Sticking Notebooks into Production with no guardrails is not a good foundation. Reread these blogs from the perspective of a developer building a system outside of Databricks. A lot of these recommendations are regular python best practices. The idea that you can relax basic best practices just because you’re running on a large distributed engine objectively makes no sense. Everything I recommended here is free of cost. It takes work, but that’s why they call it a job.

Good luck and have fun!

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

David Callaghan, Solutions Architect

As a solutions architect with Perficient, I bring twenty years of development experience and I'm currently hands-on with Hadoop/Spark, blockchain and cloud, coding in Java, Scala and Go. I'm certified in and work extensively with Hadoop, Cassandra, Spark, AWS, MongoDB and Pentaho. Most recently, I've been bringing integrated blockchain (particularly Hyperledger and Ethereum) and big data solutions to the cloud with an emphasis on integrating Modern Data produces such as HBase, Cassandra and Neo4J as the off-blockchain repository.

More from this Author

Follow Us