-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathpythonScript.py
71 lines (56 loc) · 2.35 KB
/
pythonScript.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from google.cloud import bigquery
import names
import random
import findspark
findspark.init()
from pyspark.sql import SparkSession
import json
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import Row
from pyspark import SQLContext
generos = ['male','female']
def export_items_to_bigquery(rows_to_insert):
# Instanciar el cliente
bigquery_client = bigquery.Client()
# Preparar la referencia al dataset y a la tabla que tenemos en Big Query
dataset_ref = bigquery_client.dataset('Dataset_1')
table_ref = dataset_ref.table('Personas')
# LLamamos a la API para obtener la tabla que referenciamos
table = bigquery_client.get_table(table_ref)
# Procedemos a ingresar en la tabla nuestras filas que creamos
errors = bigquery_client.insert_rows(table, rows_to_insert) # API request
assert errors == []
def creaDatosAleatorios(cantidad,listado):
for i in range(0,cantidad):
generoAleatorio = random.choice(generos)
nombreAleatorio = names.get_full_name(gender=generoAleatorio)
edadAleatoria = random.randint(18,90)
pesoAleatorio = random.uniform(1.40, 2.10)
estaturaAleatoria = random.uniform(35, 90)
tupla = (nombreAleatorio,edadAleatoria,generoAleatorio,estaturaAleatoria,pesoAleatorio)
listado.append(tupla)
return listado
def import_items_from_bigquery():
# Iniciacion del cliente
bigquery_client = bigquery.Client()
# Query de BigQuery
QUERY = """ SELECT * FROM `x-pivot-241800.Dataset_1.Personas` LIMIT 100 """
# Arranca la Query y se obtienen los datos
query_job = bigquery_client.query(QUERY) # API REQUEST
return query_job.to_dataframe()
# _____ MAIN ______
spark = SparkSession.builder.appName('ANALYTICS').getOrCreate()
listadoTuplas = []
#rows = creaDatosAleatorios(100,listadoTuplas)
#export_items_to_bigquery(rows)
#df = spark.createDataFrame(df)
data = import_items_from_bigquery()
tuples = [tuple(x) for x in data.values]
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
sqlContext = SQLContext(sc)
rdd = sc.parallelize(tuples)
people = rdd.map(lambda x: Row(nombre=x[0], edad=int(x[1]),genero=x[2],estatura=float(x[3]),peso=float(x[4])))
schemaPeople = sqlContext.createDataFrame(people)
schemaPeople.select('nombre','edad','genero').show()
schemaPeople.describe('edad').show()