Databricks system tables are currently in Public Preview, which means they are accessible but some detail may still change. This is how Databricks describes system tables:
System tables are a Databricks-hosted analytical store of your account’s operational data found in the
system
catalog. System tables can be used for historical observability across your account.
I’m going to describe just one instance of where system tables were able to provide me with a simple, out-of-the-box solution to replace a less simple, less out-of-the-box solution. The ability to replace custom code in this manner is important when you’re trying to build a stable, resilient, manageable system. This is just a small example, but imagine how much easier an organization’s codebase could be if you recreate this process a few dozen times.
The Challenge
There are a number of different system tables but I want to focus on just the lineage tables: table_lineage and column_lineage. Specifically, I want to talk about how I leveraged them to dramatically simplify a process I was using to monitor a large number of Delta Share tables. We had a client that was consuming around 9,000 shared tables from a provider. The provider would drop tables into the share location and then a view (with column names for performance) would be created in a catalog in a business workspace. The provider said they would provide a list of changes every month; new or removed tables and/or columns. However, the client said they had a single-day SLA for changes. The business wanted to make sure they knew in advance that a job or report against a view would fail because the underlying table was dropped or altered. Also, if they requested a new table, they didn’t want to wait a month to find out if it had been delivered.
The Solution (Without System Tables)
We have Unity Catalog enabled, since this is a prerequisite for Delta Share. I needed to be able to monitor changes for all of the securable objects within the provider’s collection in Delta Share, which are schemas, tables and columns. There were only four or five schemas, so that monitoring could almost be done manually. However, there were a lot of tables and these tables even had a lot of columns. I only had access to the INFORMATION_SCHEMA for each of the schemas, so I could query table and column names. The idea was to maintain a hash of the ordered table (or column) names for a quick comparison. (I’m only showing the code for tables; the column code is similar.)
def get_tables_hash_and_names(spark): tables_query = """ SELECT table_name FROM xyz.information_schema.tables WHERE data_source_format = 'DELTASHARING' ORDER BY table_name DESC """ tables_df = spark.sql(tables_query) table_names = sorted([row['table_name'] for row in tables_df.collect()]) tables_str = '|'.join(table_names) tables_hash = hashlib.md5(tables_str.encode('utf-8')).hexdigest() return tables_hash, table_names
The Future of Big Data
With some guidance, you can craft a data platform that is right for your organization’s needs and gets the most return from your data capital.
If there is discrepancy, then do a deeper dive to identify the changes. This means the state needs to be saved to compare the current day to the prior day,
def save_schema_hashes(spark, current_schema_hashes):
current_timestamp = datetime.now()
records = [(table_name, schema_hash, current_timestamp) for table_name, schema_hash in current_schema_hashes.items()]
schema_hashes_df = spark.createDataFrame(records, ["table_name", "schema_hash", "timestamp"])
schema_hashes_df.write.format("delta").mode("append").saveAsTable("delta_schema_hashes")
Finally, performing a quick comparison using the hash value stored in the keys identifies any tables or columns that may have changed, necessitating a new view creation.
def identify_schema_changes(current_schema_hashes, previous_schema_hashes): added_tables = set(current_schema_hashes.keys()) - set(previous_schema_hashes.keys()) deleted_tables = set(previous_schema_hashes.keys()) - set(current_schema_hashes.keys()) modified_tables = set() for table in (set(current_schema_hashes.keys()) & set(previous_schema_hashes.keys())): if current_schema_hashes[table “” not found /]
!= previous_schema_hashes[table “” not found /]
: modified_tables.add(table) return added_tables, deleted_tables, modified_tables
The Solution (With System Tables)
Basically, I needed to build out an analytical store of the account’s operational data found in the system
catalog. Sound familiar? Once Databricks provided lineage along with their system tables, none of this functionality was needed. Now, everything has changed from code to SQL, substantially reducing the complexity of the solution. Here is a query to see if a source exists in the target:
SELECT DISTINCT target_table_name FROM system.access.table_lineage tl_outer WHERE target_table_catalog = '{catalog_name}' AND target_table_schema = '{schema_name}' AND target_table_type = 'VIEW' AND NOT EXISTS (SELECT 1 FROM system.access.table_lineage as tl_inner WHERE tl_outer.target_table_catalog = tl_inner.source_table_catalog AND tl_outer.target_table_schema = tl_inner.source_table_schema AND tl_outer.target_table_name = tl_inner.source_table_name)
And here is how you check if a target table no longer exists in the source:
SELECT DISTINCT source_table_name FROM system.access.table_lineage WHERE source_table_catalog = '{catalog_name}' AND source_table_schema = '{schema_name}' AND (target_table_catalog IS NULL AND target_table_schema IS NULL AND target_table_name IS NULL)
With this information, views can be created or dropped automatically. The same is true for columns. If something isn’t there, it’s either removed or modified, and a new view is created automatically with the new columns. The client teams were more comfortable supporting SQL than Pyspark. Also, I wasn’t storing and maintaining lineage in a table that the client had to pay for. All in all, the system tables let me revisit and simplify custom code.
Get in touch with us if you want to know more about how Databricks system tables could help you manage lineage, improve governance, provide cost insights, or potentially address other use cases.