Data & Intelligence

DataStax Advanced Turbo Under the Covers – Part 2

I implement distributed persistence solutions for enterprises. For use cases involving highly-available, low latency processing, I typically use DataStax Enterprise. Apache Cassandra provides great persistence characteristics for a real-time, distributed transactional store. For real-time stream processing, I’m a Spark fan. Throw in Solr and graph, add in operational support, and I have all of my favorite open source projects bundled, hardened and optimized. DSE 6 was just released and claims to double their performance. I was suspicious, for good reason, because both Spark and Cassandra are sensibly architected for high performance. I wanted to see what, if any, fundamental changes had been made to justify this claim. There were two architectural changes that I thought were interesting: moving from SEDA to TPC and continuous paging. Part one dealt with migrating from SEDA to TPC while CP will be covered here.

I want to make a clarification about DataStax Enterprise versus the individual Apache projects themselves because I get questions all the time from the development teams at client sites, particularly if they have wrapped their own implementation around a large Apache project such as Hadoop. DataStax does not bundle the individual projects together; DataStax integrates and optimizes the individual projects all in a unified environment that includes search, graph and a unified security model. It’s more technically correct to say DSE Transactional, DSE Analytics and DSE Search rather than Cassandra, Spark and Solr because they are not equivalent for many enterprise use cases. I know that sounds very marketing-speak, but I’m going to show the difference in code. For example, when I talk about the integration with Solr in queries, this is not available in Cassandra. Just DSE. DataStax contributes a very substantial amount of code back to open source Cassandra, but there are huge performance opportunities in a large distributed system that can happen at the integration points. Those are the enhancements that are available in DSE but not in OSS. Also, DataStax code is available on Github and you can usually figure out what they are doing just by looking at JIRAs and Github.

As an intellectual exercise, I am going to try to walk backwards from a corporate statement to the source code. (Please keep in mind that I could be, and often am, <= 100% wrong, but it’ll be fun anyway.) This is the DataStax company line: I want to see if I would feel comfortable making it a Javadoc.

Continuous paging increases read speed by having the server continuously prepare new result pages in response to a query. This means there is no cycle of communication between the DSE Server and DSE Java Driver. The reading machinery inside of DSE can operate continuously without having to constantly be restarted by the client like in normal paging.

You need to explicitly set Continuous Paging equal to true in the Spark configuration object being passed to the DSECassandraConnectionFactory. You can do this in code, in a config file or on the command line like most configuration settings. The DSECassandraConnectionFactory overrides the default behavior of the open-source CassandraConnectionFactory, so this behavior is only available in DSE.

def getScanner(
 readConf: ReadConf,
 connConf: CassandraConnectorConf,
 columnNames: IndexedSeq[String]): Scanner =
 new DefaultScanner(readConf, connConf, columnNames)
 }
Data Intelligence - The Future of Big Data
The Future of Big Data

With some guidance, you can craft a data platform that is right for your organization’s needs and gets the most return from your data capital.

Get the Guide

We get our first hint of this new behavior when we look at the scan method of the DefaultScanner.

override def scan(statement: Statement): ScanResult = {
  val rs = session.execute(statement)
  val columnMetaData = CassandraRowMetadata.fromResultSet(columnNames, rs)
  val iterator = new PrefetchingResultSetIterator(rs, readConf.fetchSizeInRows)
  ScanResult(iterator, columnMetaData)
  }

We were told that CP improves performance by having the server continuously prepare new result pages in response to a query, and a PrefetchingResultSetIterator sounds like the sort of thing that does just that. The scaladoc states the class “efficiently iterate(s) over a large, paged ResultSet, asynchronously prefetching the next page”. Nice. Let’s look at the code:

class PrefetchingResultSetIterator(resultSet: ResultSet, 
                          prefetchWindowSize: Int,
                          timer: Option[Timer] = None) extends Iterator[Row] {

  private[this] val iterator = resultSet.iterator()

  override def hasNext = iterator.hasNext

  private[this] def maybePrefetch(): Unit = {
    if (!resultSet.isFullyFetched && 
             resultSet.getAvailableWithoutFetching < prefetchWindowSize) {
      val t0 = System.nanoTime()
      val future: ListenableFuture[ResultSet] = resultSet.fetchMoreResults()
      if (timer.isDefined)
        Futures.addCallback(future, new FutureCallback[ResultSet] {
          override def onSuccess(ignored: ResultSet): Unit = {
            timer.get.update(System.nanoTime() - t0, TimeUnit.NANOSECONDS)
          }

          override def onFailure(ignored: Throwable): Unit = { }
        })
     }  
  }

  override def next() = {
    maybePrefetch()
    iterator.next()
  }
}

The PrefetchingResultSetIterator overrides next() to use maybePrefetch(), which uses a ListenableFuture to asynchronously request new records based on a windowing size (this is what fetchMoreResults does). All reads from CassandraTableScanRDD will now be using the PrefetchingResultSetIterator for all reads within a SparkContext.

override def compute(split: Partition, 
                   context: TaskContext): Iterator[R] = {
 ...
 val scanner = connector.connectionFactory.getScanner(
                   readConf, connector.conf, columnNames)
 ...
}

From my reading of the code, this seems to enable precisely what DataStax says they provide. Does what they provide do what they claim?

Note the CP promises up to a 3x performance boost while TPC promises 2x performance boost. The metrics here are going to be a little fuzzier because it’s more difficult to estimate the performance of someone else’s analytic queries than it is to estimate the sum of savings of lower level operations. The improvement will depend on the data and the queries, but there are also some devops configurations to consider and, more abstractly, does this optimization preclude a better optimization.

Optimizing performance in DataStax is complex on purpose: there are a lot of knobs because there are a lot of things than can be tweaked for a particular use case. Before you embark on any kind of optimization, you need to take some careful measurements before and after and have criteria ahead of time to help you evaluate the effect of changes. First, identify queries that need to be optimized to begin with. It seems self-evident but I have seen a lot of frustration develop between the business and tech because a query no one cares about was painstakingly refactored to be blindingly fast. Next, out of those queries that are important, which ones are candidates for this particular optimization? For example, continuous paging is not particularly beneficial if we’re just talking a single page. Also, some queries can be optimized by other, better, means. Solr-optimized queries are fantastic (and I will discuss them later), but they are disabled at this time if you enable CP. Benchmark your queries and determine if the bottleneck can be found in multiple round trips to the java driver. We’ve seen this is literally the code change. Once you find queries that ideally have a lot of columns selected and return a large resultset and are not a good candidate for other query optimizations, you may want to tweak the number of executor JVM nodes down. They say it helps.

I listed out a lot of filtering considerations, but I think you’ll be surprised at how many analytic queries can be improved by continuous paging. When I think of all of the analytic queries that form the backbone of most of my clients use cases, I think that continuous paging could bring substantial gains. Now that we’ve gone through the code, I hope it’s a lot easier to reason about. Back and forth trips to the server are handled asynchronously in Spark. If you don’t have queries that can be optimized by this low-level change, you may want to revisit your query patterns.

 

About the Author

As a solutions architect with Perficient, I bring twenty years of development experience and I'm currently hands-on with Hadoop/Spark, blockchain and cloud, coding in Java, Scala and Go. I'm certified in and work extensively with Hadoop, Cassandra, Spark, AWS, MongoDB and Pentaho. Most recently, I've been bringing integrated blockchain (particularly Hyperledger and Ethereum) and big data solutions to the cloud with an emphasis on integrating Modern Data produces such as HBase, Cassandra and Neo4J as the off-blockchain repository.

More from this Author

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Subscribe to the Weekly Blog Digest:

Sign Up