Author: Thanujhaa Sriee (thanujhaa.sriee@gmail.com)
This project aims at exploring structured dataset using spark SQL,understanding Spark Internals to increase query performance by caching data, using Spark UI to identify bottlenecks in performance. SQL performance aand tuning Spark configurations[Executors,Core]. I have used Databricks community edition and a dataset from San Francisco Fire Department for analysis.
- Data Description
- Environment
- Importing Data Files
- Running Spark SQL Queries
- Spark Internals - Optimization
This is a dataset from SanFrancisco Fire department, Calls-For-Service includes all fire units responses to calls.Each record includes the call number, incident number, address, unit identifier, call type, and disposition. All relevant time intervals are included.There are multiple records for each call number.Addresses are associated with a block number, intersection or call box, not a specific address.
The source for this data resides in S3/data. You can access this AWS S3 buckets in Databricks Environment by mounting buckets using DBFS or directly using APIs.
Otherwise download a subset of the Data SF's Fire Department Calls for Service. This dataset is about 85 MB.
The entire dataset can be found on San Francisco Fire Department Calls for Service
Create an account and Login to Databricks Community Edition.
If you are trying to mount the data from the AWS S3 into data bricks edition, Please use the below Scala Code
val mountDir = "/mnt/davis"
val source = "davis-dsv1071/data"
if (!dbutils.fs.mounts().map(_.mountPoint).contains(mountDir)) {
println(s"Mounting $source\n to $mountDir")
val accessKey = "Enter your access Key Here"
val secretKey = "Enter your secret Key Here"
val sourceStr = s"s3a://$accessKey:$secretKey@$source"
dbutils.fs.mount(sourceStr, mountDir)
}
We use the built-in Databricks visualization to see which neighborhoods have the most fire calls.
Run SELECT count(*) FROM fireCalls
Command takes 3.20 seconds to run
Now Cache the data
If we check the Spark UI we will observe that our file in memory takes up ~59 MB, and on disk it takes up ~90 M,
Run this SELECT count(*) FROM fireCalls
again
After caching, Command takes just 0.68 seconds to run (Data is deserialized and available in memory in spark,rather than on -disk, this speeds the process)
Only a chunk of data is available in memory, example data from only 1 partition
Select * from firecalls limit 100
Narrow Transformations: The data required to compute the records in a single partition reside in at most one partition of the parent DataFrame.
Examples SELECT (columns), DROP (columns), WHERE
Wide Transformations: The data required to compute the records in a single partition may reside in many partitions of the parent DataFrame.
Examples include:DISTINCT, GROUP BY, ORDER BY
Shuffling results in lot of i/o overload, manually we will be tweaking the spark.sql.shuffle.partitions parameter to controls how many resulting partitions there are after a shuffle (wide transformation). By default, this value is 200 regardless of how large or small your dataset is, or your cluster configuration.Let's change this parameter to 8
Time taken when spark.sql.shuffle.partition =8 =>3.75s
Time taken when spark.sql.shuffle.partition =64 =>3.28s
Time taken when spark.sql.shuffle.partition =100 =>3.70s
Time taken when spark.sql.shuffle.partition =400 =>3.11s
When dealing with small amounts of data, we must reduce the number of shuffle partitions otherwise we will end up with many partitions with small numbers of entries in each partition, which results in underutilization of all executors and increases the time it takes for data to be transferred over the network from the executor to the executor.
On the other hand, when you have too much data and too few partitions, it causes fewer tasks to be processed in executors, but it increases the load on each individual executor and often leads to memory error