Skip to main content

Platforms and Technology

Spark SQL Properties

Sparkler 4629347 1920

The spark.sql.* properties are a set of configuration options specific to Spark SQL, a module within Apache Spark designed for processing structured data using SQL queries, DataFrame API, and Datasets. These properties allow users to customize various aspects of Spark SQL’s behavior, optimization strategies, and execution environment. Here’s a brief introduction to some common spark.sql.* properties:

spark.sql.shuffle.partitions

The spark.sql.shuffle.partitions property in Apache Spark determines the number of partitions to use when shuffling data during operations like joins or aggregations in Spark SQL. Shuffling involves redistributing and grouping data across partitions based on certain criteria, and the number of partitions directly affects the parallelism and resource utilization during these operations. The default behavior splits DataFrames into 200 unique partitions when shuffling data.

Syntax:

// Setting the number of shuffle partitions to 200
spark.conf.set("spark.sql.shuffle.partitions", "200")

spark.sql.autoBroadcastJoinThreshold

The spark.sql.autoBroadcastJoinThreshold property in Apache Spark SQL determines the threshold size beyond which Spark SQL automatically broadcasts smaller tables for join operations. Broadcasting involves replicating a smaller DataFrame or table to all executor nodes to avoid costly shuffling during join operations.

Syntax:

// Setting the autoBroadcastJoinThreshold to 10MB
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10485760")

spark.sql.execution.arrow.enabled

In Apache Spark SQL, the spark.sql.execution.arrow.enabled property determines whether Arrow-based columnar data transfers are enabled for DataFrame operations. Arrow is a columnar in-memory data format that can significantly improve the performance of data serialization and deserialization, leading to faster data processing.

Syntax:

// Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

spark.sql.sources.partitionOverwriteMode

The spark.sql.sources.partitionOverwriteMode property in Apache Spark SQL determines the mode for overwriting partitions when writing data into partitioned tables. This property is particularly relevant when updating existing data in partitioned tables, as it specifies how Spark should handle the overwriting of partition directories. By default,  partitionOverwriteMode will be Static.

Syntax:

// Setting the partition overwrite mode to "dynamic"
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

spark.sql.statistics.histogram.enabled

The spark.sql.statistics.histogram.enabled property in Apache Spark SQL determines whether Spark SQL collects histograms for data statistics computation. Histograms provide additional insights into the distribution of data in columns, which can aid the query optimizer in making better execution decisions. By default, the config is set to false.

Syntax:

// Enable collection of histograms for data statistics computation
spark.conf.set("spark.sql.statistics.histogram.enabled", "true")

spark.sql.streaming.schemaInference

The spark.sql.streaming.schemaInference property in Apache Spark SQL determines whether schema inference is enabled for streaming DataFrames. When enabled, Spark SQL automatically infers the schema of streaming data sources during runtime, simplifying the development process by eliminating the need to manually specify the schema.

Syntax:

// Enable schema inference for streaming DataFrames
spark.conf.set("spark.sql.streaming.schemaInference", "true")

spark.sql.adaptive.skewJoin.enabled

The spark.sql.adaptive.skewJoin.enabled property in Apache Spark SQL determines whether adaptive query execution is enabled for skew join optimization. When enabled, Spark SQL automatically detects and mitigates data skewness in join operations by dynamically adjusting the join strategy to handle skewed data distributions more efficiently. By Default skew join is True.

Syntax:

// Enable adaptive query execution for skew join optimization
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

spark.sql.inMemoryColumnarStorage.batchSize

The spark.sql.inMemoryColumnarStorage.batchSize property in Apache Spark SQL configures the batch size for columnar caching. This property defines the number of rows that are processed and stored together in memory during columnar caching operations. By Default, batchsize is 10000.

Syntax:

// Setting the batch size for columnar caching to 1000 rows
spark.conf.set("spark.sql.inMemoryColumnarStorage.batchSize", "1000")

spark.sql.adaptive.coalescePartitions.enabled

The spark.sql.adaptive.coalescePartitions.enabled property in Apache Spark SQL determines whether adaptive partition coalescing is enabled. When enabled, Spark SQL dynamically adjusts the number of partitions during query execution to optimize resource utilization and improve performance.

Syntax:

// Enable adaptive partition coalescing
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

Example

Here’s an example demonstrating the usage of all the mentioned Spark SQL properties along with a SQL query:

// Importing necessary Spark classes
import org.apache.spark.sql.{SparkSession, DataFrame}

// Setting Spark SQL properties
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10485760") // 10 MB
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
spark.conf.set("spark.sql.statistics.histogram.enabled", "true")
spark.conf.set("spark.sql.streaming.schemaInference", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.inMemoryColumnarStorage.batchSize", "1000")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

// Creating DataFrames for the tables
val employeesData = Seq((1, "Aarthii", 1000), (2, "Gowtham", 1500), (3, "Saranya", 1200))
val departmentsData = Seq((1000, "HR"), (1200, "Engineering"), (1500, "Finance"))
val employeesDF = spark.createDataFrame(employeesData).toDF("emp_id", "emp_name", "dept_id")
val departmentsDF = spark.createDataFrame(departmentsData).toDF("dept_id", "dept_name")

// Registering DataFrames as temporary views
employeesDF.createOrReplaceTempView("employees")
departmentsDF.createOrReplaceTempView("departments")

// Executing a SQL query using the configured properties
val result = spark.sql(
"SELECT emp_name, dept_name FROM employees e JOIN departments d ON e.dept_id = d.dept_id"
)

// Showing the result
result.show()

OUTPUT:

Spark Sql Properties

In this example:

  • We import the necessary Spark classes, including SparkSession and DataFrame.
  • We create a SparkSession object named spark.
  • We set various Spark SQL properties using the spark.conf.set() method.
  • We create DataFrames for two tables: “employees” and “departments”.
  • We register the DataFrames as temporary views using createOrReplaceTempView().
  • We execute a SQL join query between the “employees” and “departments” tables using spark.sql().
  • Finally, we display the result using show().

These properties provide fine-grained control over Spark SQL’s behavior and optimization techniques, enabling users to tailor the performance and functionality of Spark SQL applications to specific requirements and use cases.

Reference: https://spark.apache.org/docs/latest/configuration.html

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Aarthii Gurunathan

Aarthii is an Associate Technical Consultant at Perficient, currently specializes as a Databricks Spark Developer. She is proficient in technologies such as SQL, Databricks, Spark, Scala and Java. She enthusiastically explores new technologies, continually learning to maintain her productivity. She endeavours in contributing in various capacities to give back to the community.

More from this Author

Follow Us