Skip to content

Latest commit

 

History

History
144 lines (83 loc) · 6.13 KB

File metadata and controls

144 lines (83 loc) · 6.13 KB

Distributed-Computing-with-Spark-SQL

Author: Thanujhaa Sriee (thanujhaa.sriee@gmail.com)


Project Goal:

This project aims at exploring structured dataset using spark SQL,understanding Spark Internals to increase query performance by caching data, using Spark UI to identify bottlenecks in performance. SQL performance aand tuning Spark configurations[Executors,Core]. I have used Databricks community edition and a dataset from San Francisco Fire Department for analysis.


Table of Contents

  • Data Description
  • Environment
  • Importing Data Files
  • Running Spark SQL Queries
  • Spark Internals - Optimization

Data Description:

This is a dataset from SanFrancisco Fire department, Calls-For-Service includes all fire units responses to calls.Each record includes the call number, incident number, address, unit identifier, call type, and disposition. All relevant time intervals are included.There are multiple records for each call number.Addresses are associated with a block number, intersection or call box, not a specific address.

The source for this data resides in S3/data. You can access this AWS S3 buckets in Databricks Environment by mounting buckets using DBFS or directly using APIs.

Otherwise download a subset of the Data SF's Fire Department Calls for Service. This dataset is about 85 MB. The entire dataset can be found on San Francisco Fire Department Calls for Service


Environment:

Create an account and Login to Databricks Community Edition.

image

image

image

image


Importing Data Files:

If you are trying to mount the data from the AWS S3 into data bricks edition, Please use the below Scala Code

val mountDir = "/mnt/davis"
val source = "davis-dsv1071/data"

if (!dbutils.fs.mounts().map(_.mountPoint).contains(mountDir)) {
println(s"Mounting $source\n to $mountDir")
val accessKey = "Enter your access Key Here"
val secretKey = "Enter your secret Key Here"
val sourceStr = s"s3a://$accessKey:$secretKey@$source"

dbutils.fs.mount(sourceStr, mountDir)
}

image


Running Spark SQL Queries

image

Now look at calls by neighborhood.

image

Which neighborhoods have the most fire calls?

image

Visualizing Data

We use the built-in Databricks visualization to see which neighborhoods have the most fire calls.

image


Spark Internals - Optimization

1. CACHING DATA

Run SELECT count(*) FROM fireCalls Command takes 3.20 seconds to run Now Cache the data image

If we check the Spark UI we will observe that our file in memory takes up ~59 MB, and on disk it takes up ~90 M,

image

Run this SELECT count(*) FROM fireCalls again

Conclusion:

After caching, Command takes just 0.68 seconds to run (Data is deserialized and available in memory in spark,rather than on -disk, this speeds the process)


2. LAZY CACHING

Only a chunk of data is available in memory, example data from only 1 partition

Select * from firecalls limit 100


3.SHUFFLING PARTITIONS

Narrow Transformations: The data required to compute the records in a single partition reside in at most one partition of the parent DataFrame. Examples SELECT (columns), DROP (columns), WHERE

Wide Transformations: The data required to compute the records in a single partition may reside in many partitions of the parent DataFrame. Examples include:DISTINCT, GROUP BY, ORDER BY

Shuffling results in lot of i/o overload, manually we will be tweaking the spark.sql.shuffle.partitions parameter to controls how many resulting partitions there are after a shuffle (wide transformation). By default, this value is 200 regardless of how large or small your dataset is, or your cluster configuration.Let's change this parameter to 8

image

Run below query: image

Time taken when spark.sql.shuffle.partition =8 =>3.75s
Time taken when spark.sql.shuffle.partition =64 =>3.28s
Time taken when spark.sql.shuffle.partition =100 =>3.70s
Time taken when spark.sql.shuffle.partition =400 =>3.11s

Conclusion:

When dealing with small amounts of data, we must reduce the number of shuffle partitions otherwise we will end up with many partitions with small numbers of entries in each partition, which results in underutilization of all executors and increases the time it takes for data to be transferred over the network from the executor to the executor.
On the other hand, when you have too much data and too few partitions, it causes fewer tasks to be processed in executors, but it increases the load on each individual executor and often leads to memory error