Time-series data has typically been fit imperfectly into whatever database we were using at the time for other tasks. There are time series databases (TSDB) coming to market. TSDBs are optimized to store and retrieve associated pairs of times and values. TSDB’s architecture focuses on time-stamp data storage and the compressions, summarization and life-cycle management are customized for this structure. Usually, we aren’t going to get to move to a shiny new TSDB; we’ll just be using Parquet files. Databricks has released an open source project called Tempo that simplifies time series manipulation in Spark on Parquet (specifically Delta) files. The Photon query engine available in Databricks Lakehouse provides a highly performant platform for historical analysis using Tempo .
What does Tempo do?
The entry point is a time series data frame (TSDF) object. Timestamp columns are mandatory. Partition and sequence columns are optional but are instrumental in a number of use cases including automatic featurization. With a TSDB, you have some native functions available to you.
- Asof joins – use windowing to select the latest record from a source table and merge onto the base Fact table
- Moving averages – include Approximate Exponential Moving Average and Simple Moving Average
- Resampling – upsample based on frequency and an aggregate function
Looking deeper into the code, I found additional functionality had been written that basically used the code foundation to address a particular use case.
How does Tempo work?
A look at tsdf.py shows you need to provide a dataframe and a time stamp column and optionally one or more partition columns and a sequence column. The timestamp column will be used for sorting. The optional partition columns and/or sequence columns can be used for featurization. The timestamp column must be a string while the partition columns must be either a string or a list of strings.
When you want to write the TSDF to a Delta table, you call the write function, passing the TSDF, spark context, Delta table name and optimization columns (if any). Under the covers, this will write to a delta table.
view_df.write.mode("overwrite").partitionBy("event_dt").format('delta').saveAsTable(tabName)
If you are using open source Delta tables without the Databricks runtime, you will get not be able to optimize performance using Z-ordering.
useDeltaOpt = (os.getenv('DATABRICKS_RUNTIME_VERSION') != None) if useDeltaOpt: try: spark.sql("optimize {} zorder by {}".format(tabName, "(" + ",".join(partitionCols + optimizationCols) + ")")) except: print("Delta optimizations attempted on a non-Databricks platform. Switch to use Databricks Runtime to get optimization advantages.")
(It’s not hard to see how you could just write to a standard parquet file by possibly modifying io.py or even adding another write def to tsdf.py.)
Asof
There is an asofJoin module in tsdf.py that performs an asof join between two TSDFs whose timestamp columns match. Pass the current dataframe as well as the right tsdf is mandatory, while you can specify prefixes for the left and right TSDFs (the right will default to to the prefix “right”) to avoid duplicated column names. You have the option of providing a value to break up partitions into time brackets, which can limit skew and avoid null values if there are any values outside of the maximum lookback. The overlap fraction defaults to 0.5. A TSDF is created using the dataframe created by the asof join (with overlapped data and extraneous columns removed), the common timestamp column and the combined partitioned columns and returned.
Moving Averages
The simple moving average is used to compute rolling statistics based on the timestamp column. Call withRangeStats to calculate mean/count/min/max/sum/std deviation/zscore on all numeric columns or a more specific set of columns if provided. The default range back window is 1000 seconds from the floor of the base event timestamp. The following assumptions are made:
1. The features are summarized over a rolling window that ranges back | |
2. The range back window can be specified by the user | |
3. Sequence numbers are not yet supported for the sort | |
4. There is a cast to long from timestamp so microseconds or more likely breaks down – this could be more easily handled with a string timestamp or sorting the timestamp itself. If using a ‘rows preceding’ window, this wouldn’t be a problem | |
The naming convention is not very consistent here, so you calculate an exponential moving average by calling EMA and passing the TSDF and the column name to calculate. The calculation will run up to the window (30 by default).
Resampling
Convenience method for frequency conversion and resampling of time series data. This provides similar functionality to pandas.DataFrame.Resample but it is not a drop in replacement (unlike the goal of Koalas). Provide a timestamp column for sorting and columns to use for partitioning into more granular time series for windowing and sorting (hr, min, sec) and pass a function for aggregation. The aggregation is done in resample.py.
Additional Functionality
If you look at the source of tsdf.py, you’ll find a function called withLookbackFeatures.
Creates a 2-D feature tensor suitable for training an ML model to predict current values from the history of some set of features. This function creates a new column containing, for each observation, a 2-D array of the values of some number of other columns over a trailing “lookback” window from the previous observation up to some maximum number of past observations.
This is a small, straightforward function that uses the private BaseWindow and RowsBetweenWindows functionality already in the codebase to create something new and useful. It’s a good example of what’s possible by working within the source code in addition to what you can do with the library itself.
Conclusion
Unlike Delta or Koalas that try to make fundamental changes for the better, this library from the Databricks Lab just make a particular, but common, use case easier to manage. All of the projects in the Databricks Lab are intended to be used on the Databricks Unified Analytics Platform, but the source code is helpful on its own and sometimes can help accelerate work even outside the DUAP.