-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
140 lines (107 loc) · 4.54 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# Import SparkSession
from pyspark.sql import SparkSession
# Import RDD queries
from rdd import query1 as rdd_query1
from rdd import query2 as rdd_query2
from rdd import query3 as rdd_query3
from rdd import query4 as rdd_query4
from rdd import query5 as rdd_query5
# Import SQL-on-csv queries
from sql_csv import query1 as sql_csv_query1
from sql_csv import query2 as sql_csv_query2
from sql_csv import query3 as sql_csv_query3
from sql_csv import query4 as sql_csv_query4
from sql_csv import query5 as sql_csv_query5
# Import SQL-on-Parquet queries
from sql_parquet import query1 as sql_parquet_query1
from sql_parquet import query2 as sql_parquet_query2
from sql_parquet import query3 as sql_parquet_query3
from sql_parquet import query4 as sql_parquet_query4
from sql_parquet import query5 as sql_parquet_query5
# Import RDD joins
from joins import broadcast_join
from joins import repartition_join
# Import Optimiser script
from optimiser import use_optimiser
# Import csv-to-parquet converter
from csv_to_parquet import convert_csv_to_parquet
def part1():
###################### Task 1 ######################
# Convert CSVs to Parquet; run only once. Should you
# wish to repeat the process, comment it out.
convert_csv_to_parquet()
################## Tasks 2, 3 & 4 ##################
times = {}
# Calculate execution times for each query (Tasks 2, 3 & 4)
for i in range(1, 6):
spark_csv = SparkSession \
.builder \
.appName("All-use session") \
.getOrCreate()
spark_parquet = spark_csv
spark_rdd = spark_csv.sparkContext
rdd_query_name = 'rdd_query%s' % (i)
parquet_query_name = 'sql_parquet_query%s' % (i)
csv_query_name = 'sql_csv_query%s' % (i)
times[rdd_query_name] = globals()[rdd_query_name](spark_rdd)
times[parquet_query_name], _ = globals()[parquet_query_name](spark_parquet)
times[csv_query_name], query_output = globals()[csv_query_name](spark_csv)
# Save the query-output, in dataframe format, on a text file
with open('../output/df_results/Q%sDF.txt' % i, 'w') as f:
f.write(query_output)
# Consistency in execution times
spark_csv.stop()
spark_parquet.stop()
spark_rdd.stop()
print(times)
# Compute execution times and write to a text file
with open('../output/part_1_times.txt', 'w') as f:
for query, execution_time in times.items():
f.write('%s: %.2f seconds\n' % (query, execution_time))
def part2():
###################### Task 1 ######################
times = {}
spark = SparkSession \
.builder \
.appName("All-use session") \
.getOrCreate() \
.sparkContext
times['Broadcast Join'], broadcast_result = broadcast_join(spark)
times['Repartition Join'], _ = repartition_join(spark)
# Compute execution times and write to a text file
with open('../output/join_type_times.txt', 'w') as f:
for query, execution_time in times.items():
f.write('%s: %.2f seconds\n' % (query, execution_time))
# Save the result to text files
with open('../output/join_outputs.txt', 'w') as f:
for result in broadcast_result:
f.write(str(result) + '\n')
# Consistency in execution times
spark.stop()
###################### Task 2 ######################
times = {}
# Two instances are created since Spark tends to keep
# metadata from each run in order to optimise reading
# and calculating future queries.
spark = SparkSession \
.builder \
.appName('Using Catalyst') \
.getOrCreate()
sc = spark
times["Using Catalyst"], with_catalyst = use_optimiser(spark)
times["Without using Catalyst"], without_catalyst = use_optimiser(sc, disabled="Y")
spark.stop()
sc.stop()
# Compute execution times and write to a text file
with open('../output/catalyst_times.txt', 'w') as f:
for query, execution_time in times.items():
f.write('%s: %.2f seconds\n' % (query, execution_time[0]))
# Save the optimised query plan to text file
with open('../output/optimised_plan.txt', 'w') as f:
f.write(with_catalyst)
# Save the non-optimised query plan to text file
with open('../output/non_optimised_plan.txt', 'w') as f:
f.write(without_catalyst)
if __name__ == "__main__":
part1()
part2()