diff --git a/python/housinginsights/ingestion/CSVWriter.py b/python/housinginsights/ingestion/CSVWriter.py index f22594d6..3af4e001 100644 --- a/python/housinginsights/ingestion/CSVWriter.py +++ b/python/housinginsights/ingestion/CSVWriter.py @@ -10,6 +10,13 @@ logging_path = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, "logs")) +# relative package import for when running as a script +PYTHON_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, os.pardir)) +# sys.path.append(PYTHON_PATH) +CLEAN_PSV_PATH = os.path.abspath(os.path.join(PYTHON_PATH, os.pardir, + 'data', 'processed', + '_clean_psv')) class CSVWriter(object): @@ -52,7 +59,10 @@ def __init__(self, meta, manifest_row, filename=None): # By default, creates a temp csv file wherever the calling module was # located - self.filename = 'temp_{}.psv'.format(self.unique_data_id) if filename == None else filename + if filename is None: + self.filename = 'temp_{}.psv'.format(self.unique_data_id) + else: + self.filename = filename # remove any existing copy of the file so we are starting clean self.remove_file() @@ -65,7 +75,8 @@ def __init__(self, meta, manifest_row, filename=None): #print("header written") self.file = open(self.filename, 'a', newline='', encoding='utf-8') - self.writer = DictWriter(self.file, fieldnames=self.dictwriter_fields, delimiter="|") + self.writer = DictWriter(self.file, fieldnames=self.dictwriter_fields, + delimiter="|") def write(self, row): """ diff --git a/python/housinginsights/ingestion/Cleaners.py b/python/housinginsights/ingestion/Cleaners.py index f1f7c59a..5b7ee2f0 100644 --- a/python/housinginsights/ingestion/Cleaners.py +++ b/python/housinginsights/ingestion/Cleaners.py @@ -15,6 +15,7 @@ from housinginsights.sources.models.pres_cat import CLUSTER_DESC_MAP from housinginsights.sources.google_maps import GoogleMapsApiConn from housinginsights.sources.models.mar import MAR_TO_TABLE_FIELDS +from housinginsights.tools.base_colleague import Colleague PYTHON_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, @@ -30,16 +31,18 @@ """ -class CleanerBase(object, metaclass=ABCMeta): +class CleanerBase(Colleague, metaclass=ABCMeta): def __init__(self, meta, manifest_row, cleaned_csv='', removed_csv='', engine=None): + super().__init__() + self.cleaned_csv = cleaned_csv self.removed_csv = removed_csv self.engine = engine self.manifest_row = manifest_row self.tablename = manifest_row['destination_table'] - self.meta = meta + self.meta = meta # TODO - remove: not used in this class self.fields = meta[self.tablename]['fields'] #a list of dicts self.null_value = 'Null' #what the SQLwriter expects in the temp csv @@ -57,13 +60,11 @@ def __init__(self, meta, manifest_row, cleaned_csv='', removed_csv='', if field['type'] == 'date': self.date_fields.append(field['source_name']) - @abstractmethod def clean(self, row, row_num): # TODO: add replace_null method as required for an implementation (#176) pass - def add_proj_addre_lookup_from_mar(self): """ Adds an in-memory lookup table of the contents of the current @@ -85,7 +86,6 @@ def add_ssl_nlihc_lookup(self): result = proxy.fetchall() self.ssl_nlihc_lookup = {d[0]:d[1] for d in result} - def get_nlihc_id_if_exists(self, mar_ids_string, ssl=None): "Checks for record in project table with matching MAR id." @@ -110,7 +110,6 @@ def get_nlihc_id_if_exists(self, mar_ids_string, ssl=None): #If we don't find a match return self.null_value - # TODO: figure out what is the point of this method...it looks incomplete def field_meta(self, field): for field_meta in self.fields: @@ -546,7 +545,6 @@ def add_mar_tract_lookup(self): result = proxy.fetchall() self.mar_tract_lookup = {d[0]:d[1] for d in result} - def add_census_tract_from_mar(self, row, column_name='mar_id', lat_lon_col_names=('LATITUDE', 'LONGITUDE'), x_y_coords_col_names=('X', 'Y'), @@ -667,6 +665,7 @@ def clean(self, row, row_num=None): row = self.replace_nulls(row, null_values=['N', 'NA', '', None]) return row + class ProjectCleaner(CleanerBase): def clean(self, row, row_num=None): row = self.replace_nulls(row, null_values=['N', '', None]) @@ -854,11 +853,13 @@ def clean(self,row,row_num=None): return row + class ProjectAddressCleaner(CleanerBase): def clean(self,row,row_num=None): row = self.replace_nulls(row) return row - + + class ZillowCleaner(CleanerBase): """ Incomplete Cleaner - adding data to the code so we have it when needed (was doing analysis on this) diff --git a/python/housinginsights/ingestion/DataReader.py b/python/housinginsights/ingestion/DataReader.py index 0856be4c..b65cdfd3 100644 --- a/python/housinginsights/ingestion/DataReader.py +++ b/python/housinginsights/ingestion/DataReader.py @@ -20,7 +20,7 @@ from datetime import datetime import dateutil.parser as dateparser - +from housinginsights.tools.base_colleague import Colleague from housinginsights.tools.logger import HILogger logger = HILogger(name=__file__, logfile="ingestion.log") @@ -30,7 +30,7 @@ # TODO: convert relative path to full path when passed as argument -class HIReader(object): +class HIReader(Colleague): """ Container object that reads in CSVs and provides them row-by-row through the __iter__ method. Each object is associated with one specific file @@ -41,6 +41,7 @@ class HIReader(object): and lower bandwidth usage. """ def __init__(self, path, path_type="file", encoding="latin-1", keys=None): + super().__init__() self.path = path self._length = None self._keys = keys @@ -127,6 +128,7 @@ def get_row_by_column_name(self, col_header_name, look_up_value): return None +# TODO - refactor: do we really need meta and manifest_row passed class DataReader(HIReader): """ Reads a specific data file. This file must be associated with a specific @@ -169,14 +171,14 @@ def __init__(self, meta, manifest_row, load_from="local"): # Use default encoding if none found if 'encoding' not in manifest_row: - logger.warning(" Warning: encoding not found in manifest. " \ - "Falling back to latin-1.") + logger.warning(" Warning: encoding not found in manifest. " + "Falling back to latin-1.") self.encoding = manifest_row.get('encoding', 'latin-1') self.load_from = load_from self.s3_path = os.path.join(manifest_row['s3_folder'], manifest_row['filepath'].strip("\/") - ).replace("\\","/") + ).replace("\\", "/") self._error_reporting_overhead = {} # # Test connection to s3 URL # if self.manifest_row['include_flag'] == 'use': @@ -253,7 +255,6 @@ def __iter__(self): def keys(self): return self._keys - def _download_data_file(self): """ Internal function that tries to load the data file from the local file @@ -354,7 +355,7 @@ def _set_keys(self): "encoding error encountered.") return _keys - def should_file_be_loaded(self, sql_manifest_row): + def should_file_be_loaded(self): """ Runs all the checks that the file is OK to use. @@ -362,22 +363,23 @@ def should_file_be_loaded(self, sql_manifest_row): :return: True if passes validation; False otherwise. """ - if self._do_fields_match() and self._check_include_flag(sql_manifest_row): + if self._do_fields_match() and self._check_include_flag(): return True else: return False - def _check_include_flag(self, sql_manifest_row): + def _check_include_flag(self): """ - Checks to make sure the include_flag matches requirements for loading the data + Checks to make sure the include_flag matches requirements for loading + the data - Previously this compared the manifest_row to the sql_manifest_row; however, - since the unique_data_id now stays constant across time this check is - not needed. + Previously this compared the manifest_row to the sql_manifest_row; + however, since the unique_data_id now stays constant across time this + check is not needed. """ if self.manifest_row['include_flag'] == 'use': - return True + return True else: logger.warning("Skipping data source. {} include_flag is {}".format( diff --git a/python/housinginsights/ingestion/GetApiData.py b/python/housinginsights/ingestion/GetApiData.py new file mode 100644 index 00000000..478280cf --- /dev/null +++ b/python/housinginsights/ingestion/GetApiData.py @@ -0,0 +1,138 @@ +""" +get_api_data.py provides command line convenience access to the modules in the housinginsights.sources +directory. + +Every API class should implement a few key features +""" + +# built-in imports +import os +import importlib + +# app imports +from housinginsights.tools.base_colleague import Colleague + +# Configure logging +from housinginsights.tools.logger import HILogger +logger = HILogger(name=__file__, logfile="sources.log") + +PYTHON_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir)) + + +class GetApiData(Colleague): + def __init__(self): + super().__init__() + self._API_FOLDER = 'housinginsights.sources' + self._MODULES = { + 'opendata': 'OpenDataApiConn', + 'DCHousing': 'DCHousingApiConn', + 'dhcd': 'DhcdApiConn', + 'census': 'CensusApiConn', + 'wmata_distcalc': 'WmataApiConn', + 'prescat': 'PrescatApiConn' + } + self._IDS_TO_MODULES = { + 'tax': 'opendata', + 'building_permits_2013': 'opendata', + 'building_permits_2014': 'opendata', + 'building_permits_2015': 'opendata', + 'building_permits_2016': 'opendata', + 'building_permits_2017': 'opendata', + 'crime_2013': 'opendata', + 'crime_2014': 'opendata', + 'crime_2015': 'opendata', + 'crime_2016': 'opendata', + 'crime_2017': 'opendata', + 'mar': 'opendata', + 'dchousing': 'DCHousing', + 'dhcd_dfd_projects': 'dhcd', + 'dhcd_dfd_properties': 'dhcd', + 'acs5_2009': 'census', + 'acs5_2010': 'census', + 'acs5_2011': 'census', + 'acs5_2012': 'census', + 'acs5_2013': 'census', + 'acs5_2014': 'census', + 'acs5_2015': 'census', + 'acs5_2009_moe': 'census', + 'acs5_2010_moe': 'census', + 'acs5_2011_moe': 'census', + 'acs5_2012_moe': 'census', + 'acs5_2013_moe': 'census', + 'acs5_2014_moe': 'census', + 'acs5_2015_moe': 'census', + 'wmata_stops': 'wmata_distcalc', + 'wmata_dist': 'wmata_distcalc' + } + + def get_files_by_data_ids(self, unique_data_ids_list): + processed_ids = list() + for data_id in unique_data_ids_list: + try: + mod = self._IDS_TO_MODULES[data_id] + except KeyError: + logger.error('%s is not a valid data id! Skipping...' % data_id) + continue + + result = self.get_files_by_modules([mod], unique_data_id=[data_id]) + if len(result) == 1: + processed_ids.append(data_id) + return processed_ids + + def get_files_by_modules(self, modules_list, unique_data_id=None): + processed = list() + for m in modules_list: + try: + class_name = self._MODULES[m] + except KeyError: + logger.error('Module %s is not valid! Skipping...' % m) + continue + + try: + if unique_data_id is None: + logger.info("Processing %s module with class %s", m, + class_name) + else: + logger.info("Processing %s with class %s", unique_data_id, + class_name) + module_name = self._API_FOLDER + '.' + m + api_method = self._get_api_method(class_name, module_name) + + # Get the data + # TODO refactor all the methods that need db to instead use + # TODO self.engine() created in __init__(see base_project for + # TODO example) + api_method(unique_data_id, False, 'csv', + db=self._database_choice) + processed.append(m) + + # log outcome + if unique_data_id is None: + logger.info("Completed processing %s module with class " + "%s", m, class_name) + else: + logger.info("Completed processing %s with class " + "%s", unique_data_id, class_name) + except Exception as e: + logger.error("The request for '%s' failed with error: %s", m, e) + + if self._debug: + raise e + + self._ingestion_mediator.update_manifest_with_new_path() + return processed + + def get_all_files(self): + modules_list = self._MODULES.keys() + return self.get_files_by_modules(modules_list) + + def _get_api_method(self, class_name, module_name): + mod = importlib.import_module(module_name) + + api_class = getattr(mod, class_name) + + api_instance = api_class(database_choice=self._database_choice, + debug=self._debug) + # IMPORTANT: Every class should have a get_data method! + return getattr(api_instance, 'get_data') diff --git a/python/housinginsights/ingestion/LoadData.py b/python/housinginsights/ingestion/LoadData.py index e3f41fca..d5e5e64e 100644 --- a/python/housinginsights/ingestion/LoadData.py +++ b/python/housinginsights/ingestion/LoadData.py @@ -20,17 +20,11 @@ import sys import os import argparse -import json -import concurrent.futures -from time import time, sleep -from requests.packages.urllib3.exceptions import NewConnectionError -from requests.packages.urllib3.exceptions import MaxRetryError -from sqlalchemy import Table, Column, Integer, String, MetaData, Numeric +from sqlalchemy import MetaData from datetime import datetime from uuid import uuid4 import dateutil.parser as dateparser -from sqlalchemy.exc import ProgrammingError # Needed to make relative package imports when running this file as a script # (i.e. for testing purposes). @@ -40,627 +34,212 @@ PYTHON_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)) - -logging_path = os.path.abspath(os.path.join(PYTHON_PATH, "logs")) - #append to path if running this file directly, otherwise assume it's already been appended. if __name__ == "__main__": sys.path.append(PYTHON_PATH) -from housinginsights.tools import dbtools - - -from housinginsights.ingestion import CSVWriter, DataReader -from housinginsights.ingestion import HISql, TableWritingError -from housinginsights.ingestion import functions as ingestionfunctions -from housinginsights.ingestion.Manifest import Manifest +from housinginsights.tools.base_colleague import Colleague from housinginsights.tools.logger import HILogger logger = HILogger(name=__file__, logfile="ingestion.log") -class LoadData(object): - - def __init__(self, database_choice=None, meta_path=None, - manifest_path=None, keep_temp_files=True, - drop_tables=False,debug=False): - """ - Initializes the class with optional arguments. The default behaviour - is to load the local database with data tracked from meta.json - and manifest.csv within the 'python/scripts' folder. - - :param database_choice: choice of 'local_database', - 'docker_database', and 'remote_database' - :param meta_path: the path of the meta.json to be used - :param manifest_path: the path of the manifest_path.csv to be used - :param keep_temp_files: if True, temp clean pipe-delimited files will be - archived in the 'python/logs' folder - """ - - # load defaults if no arguments passed - _scripts_path = os.path.abspath(os.path.join(PYTHON_PATH, 'scripts')) - if database_choice is None: - self.database_choice = 'docker_database' - else: - self.database_choice = database_choice - if meta_path is None: - meta_path = os.path.abspath(os.path.join(_scripts_path, - 'meta.json')) - if manifest_path is None: - manifest_path = os.path.abspath(os.path.join(_scripts_path, - 'manifest.csv')) - self._keep_temp_files = keep_temp_files - - - # load given meta.json and manifest.csv files into memory - self.meta = ingestionfunctions.load_meta_data(meta_path) - self.manifest = Manifest(manifest_path) +class LoadData(Colleague): + def __init__(self): + super().__init__() - - # setup engine for database_choice - self.engine = dbtools.get_database_engine(self.database_choice) - - # write the meta.json to the database - self._meta_json_to_database() - - self._failed_table_count = 0 - - self.drop_tables = drop_tables - self.debug=debug - - - def _drop_tables(self): + def load_raw_data(self, unique_data_id_list, download_api_data=False, + load_dependents=False): """ - Returns the outcome of dropping all the tables from the - database_choice and then rebuilding. + Attempts to process and clean the raw data for the given + unique_data_ids in unique_data_id_list and then load it into the + database. + + Returns list of successfully processed, cleaned, and loaded + unique_data_ids from the passed unique_data_id_list. + + Note - if debug = True, errors are raised instead of skipped + + :param unique_data_id_list: the list of unique data ids that should + be loaded into database from raw data file + :param download_api_data: if True, get_api_data will be called on + unique_data_ids that have update_method = api + :param load_dependents: if True, any data ids dependent on the + existence id db of the current data id will be processed next + :return: list of unique data ids that were successfully loaded into + the db """ - logger.info("Dropping all tables from the database!") - with self.engine.connect() as db_conn: - query_result = list() - query_result.append(db_conn.execute( - "DROP SCHEMA public CASCADE;CREATE SCHEMA public;")) - - if self.database_choice == 'remote_database' or self.database_choice \ - == 'remote_database_master': - query_result.append(db_conn.execute(''' - GRANT ALL PRIVILEGES ON SCHEMA public TO housingcrud; - GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO housingcrud; - GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO housingcrud; - GRANT ALL ON SCHEMA public TO public; - ''')) - return query_result - - def remove_tables(self, tables): - ''' - Used when you want to update only part of the database. Drops the table - and deletes all associated rows in the sql manifest. Run this before - updating data in tables that have added or removed columns. - - tables: a list of table names to be deleted - ''' - if 'all' in tables: - self._drop_tables() - logger.info('Dropped all tables from database') + processed_ids = list() # track what is processed - else: - for table in tables: - try: - logger.info("Dropping the {} table and all associated manifest rows".format(table)) - #Delete all the relevant rows of the sql manifest table - q = "SELECT DISTINCT unique_data_id FROM {}".format(table) - conn = self.engine.connect() - proxy = conn.execute(q) - results = proxy.fetchall() - for row in results: - q = "DELETE FROM manifest WHERE unique_data_id = '{}'".format( - row[0]) - conn.execute(q) - - #And drop the table itself - q = "DROP TABLE {}".format(table) - conn.execute(q) - - logger.info("Dropping table {} was successful".format(table)) - - except ProgrammingError as e: - logger.error("Couldn't remove table {}".format(table)) - if self.debug == True: - raise e - - - def _meta_json_to_database(self): - """ - Makes sure we have a meta table in the database. - If not, it creates it with appropriate fields. - """ + # reset tracking of used get api data ids + self._ingestion_mediator.reset_id_map_set_use() - sqlalchemy_metadata = MetaData() # this is unrelated to our meta.json - meta_table = Table('meta', sqlalchemy_metadata, - Column('meta', String)) + for unique_data_id in unique_data_id_list: + # skip those that have been processed via prior dependencies + if unique_data_id in processed_ids: + continue - sqlalchemy_metadata.create_all(self.engine) - json_string = json.dumps(self.meta) - ins = meta_table.insert().values(meta=json_string) - with self.engine.connect() as conn: - conn.execute("DELETE FROM meta;") - conn.execute(ins) + success = self._ingestion_mediator.load_unique_data_id( + unique_data_id, download_api_data) - def _remove_existing_data(self, manifest_row): - """ - Removes all rows in the respective table for the given unique_data_id - then sets status = deleted for the unique_data_id in the - database manifest. - - :param uid: unique_data_id for the data to be updated - :param manifest_row: the row for uid in the manifest - :return: result from executing delete query as - sqlalchemy result object if row exists in sql manifest - else - returns None - """ - uid = manifest_row['unique_data_id'] - - temp_filepath = self._get_temp_filepath( - manifest_row=manifest_row) - - # get objects for interfacing with the database - sql_interface = self._configure_db_interface( - manifest_row=manifest_row, temp_filepath=temp_filepath) - sql_manifest_row = sql_interface.get_sql_manifest_row() - - - try: - # delete only rows with data_id in respective table - table_name = sql_manifest_row['destination_table'] - except TypeError: #will occur if sql_manifest_row == None - logger.info(" No sql_manifest exists! Proceed with adding" - " new data to the database!") - return None - - try: - query = "DELETE FROM {} WHERE unique_data_id =" \ - " '{}'".format(table_name, uid) - logger.info(" Deleting {} data from {}!".format( - uid, table_name)) - result = self.engine.execute(query) - # change status = deleted in sql_manifest - logger.info(" Resetting status in sql manifest row!") - sql_interface.update_manifest_row(conn=self.engine, - status='deleted') - except ProgrammingError: - logger.warning("Problem executing DELETE query for table {} and uid {}. Manifest row exists" - " but table does not. You should check validity of" - " table and data".format(table_name,uid)) - return None - - return result - - def _remove_table_if_empty(self, manifest_row): - """ - If a table is empty (all data has been removed, e.g. using _remove_existing_data), - delete the table itself. This allows the table to be rebuilt from scratch using - meta.json when new data is loaded - for example, if additional columns have been added. - - :param manifest_row: the row for the uid in the manifest as a dictionary, which - supplies the name of the table to be checked/dropped - - returns: result from executing delete qurey - """ - table_name = manifest_row['destination_table'] - conn = self.engine.connect() - - - try: - proxy = conn.execute("SELECT COUNT(*) FROM {}".format(table_name)) - result = proxy.fetchall() - except ProgrammingError: - logger.info(" Couldn't find table {} in the database".format(table_name)) - conn.close() - return None - - #If the table is empty - if result[0][0] == 0: #result format is [(count,)] i.e. list of tuples with each tuple = one row of result - try: - logger.info(" Dropping table from database: {}".format(table_name)) - proxy = conn.execute("DROP TABLE {}".format(table_name)) - conn.close() - return proxy - except ProgrammingError: - logger.error("Could not drop table {} from the database".format(table_name)) - if self.debug == True: - raise e - - def _get_most_recent_timestamp_subfolder(self, root_folder_path): - """ - Returns the most recent timestamp subfolder in a given folder path. + if success: + processed_ids.append(unique_data_id) - :param root_folder_path: the path for the directory containing - timestamp subfolders - :type root_folder_path: str + # if requested process dependents next + if load_dependents: + result = self._ingestion_mediator.load_dependents_workflow( + unique_data_id) + processed_ids.extend(result) # add to processed_ids tracker - :return: most recent timestamp subfolder - :type: str - """ - walk_gen = os.walk(root_folder_path) - root, dirs, files = walk_gen.__next__() - dirs.sort(reverse=True) - return dirs[0] + return processed_ids - def make_manifest(self, all_folders_path): + def load_cleaned_data(self, unique_data_id_list, + use_raw_if_missing=True): """ - Creates a new manifest.csv with updated data date and filepath for - the raw data files within the most recent timestamp subfolder of the - given folder path. - - A new instance of manifest object is created from the new - manifest.csv file. - - :param all_folders_path: the folder that contains the timestamped - subfolders representing updated raw data files that should be loaded - into the database - :type all_folders_path: str - - :param overwrite: should the current manifest.csv be overwritten? - :type overwrite: bool - - :return: the path of the manifest + Attempts to load clean psv file for the given + unique_data_ids in unique_data_id_list into the + database. If clean psv file doesn't already exist for a unique data + id and use_raw_if_missing flag is True, the raw data will be + processed, cleaned, and then loaded into the database. + + Returns list of successfully loaded unique_data_ids from + the passed unique_data_id_list. + + Note - if debug = True, errors are raised instead of skipped + + :param unique_data_id_list: the list of unique data ids that should + be loaded into database from raw data file + :param use_raw_if_missing: flag determining whether to attempt to + load with raw data if clean psv file doesn't exist + :return: list of unique data ids that were successfully loaded into + the db """ - # get most recent subfolder, gather info for updating manifest - timestamp = self._get_most_recent_timestamp_subfolder( - all_folders_path) - most_recent_subfolder_path = os.path.join(all_folders_path, - timestamp) - - return self.manifest.update_manifest(most_recent_subfolder_path) - - def update_database(self, unique_data_id_list): + processed_ids = list() + + # iterate through unique_data_id_list + for unique_data_id in unique_data_id_list: + # get clean psv file for given unique data id + clean_data_path = self._ingestion_mediator.get_clean_psv_path( + unique_data_id) + + # if missing - determine action based on usw_raw_if_missing flag + if clean_data_path is None: + if use_raw_if_missing: + result = self.load_raw_data([unique_data_id]) + if result: + processed_ids.extend(result) + else: + logger.info( + "Couldn't find clean psv file for {}".format( + unique_data_id)) + if self._debug: + raise FileNotFoundError( + "Couldn't find clean psv file for {} - " + "'use_raw_if_missing' flag = False".format( + unique_data_id)) + else: # attempt to load clean psv into db + success = self._ingestion_mediator.write_file_to_db( + unique_data_id, clean_data_path) + + if success: + processed_ids.append(unique_data_id) + + return processed_ids + + def reload_all_from_manifest(self, use_clean=True, + use_raw_if_missing=True, + drop_tables=False, download_api_data=False, + load_dependents=False): """ - Reloads only the flat file associated to the unique_data_id in - unique_data_id_list. - - Returns a list of unique_data_ids that were successfully updated. + Reload all 'use' flagged unique data ids from manifest.csv. + + :param use_clean: if True, use clean psv files for data loading + :param use_raw_if_missing: if True, use raw data file if clean psv + file is missing + :param drop_tables: if True, drop table first before loading data + :param download_api_data: if True, get_api_data will be called on + unique_data_ids that have update_method = api + :param load_dependents: if True, any data ids dependent on the + existence id db of the current data id will be processed next + :return: list of unique data ids that were successfully loaded into + the db """ - logger.info("update_only(): attempting to update {} data".format( - unique_data_id_list)) - processed_data_ids = [] + data_ids = self._ingestion_mediator.get_list_use_unique_data_ids() - for uid in unique_data_id_list: - manifest_row = self.manifest.get_manifest_row(uid) + if drop_tables: + self._ingestion_mediator.remove_tables(['all']) - # process manifest row for requested data_id if flagged for use - if manifest_row is None: - logger.info(" Skipping: {} not found in manifest!".format( - uid)) - else: - - # follow normal workflow and load data_id - logger.info( - " Loading {} data!".format(uid)) - self._process_data_file(manifest_row=manifest_row) - processed_data_ids.append(uid) - - return processed_data_ids + if use_clean: + processed_ids = self.load_cleaned_data(unique_data_id_list=data_ids, + use_raw_if_missing=use_raw_if_missing) + else: + processed_ids = self.load_raw_data( + unique_data_id_list=data_ids, + download_api_data=download_api_data, + load_dependents=load_dependents) - def _get_temp_filepath(self, manifest_row): - """ - Returns a file path where intermediary clean psv file will be saved. - """ - return os.path.abspath( - os.path.join(logging_path, 'temp_{}.psv'.format( - manifest_row['unique_data_id']))) + return processed_ids - def rebuild(self): + ############################### + # methods for calculating the data for zone_facts table + ############################### + def recalculate_database(self): """ - Using manifest.csv, meta.json, and respective cleaners, validate and - process the data and then load all usable data into the database - after dropping all tables. + An alternative to 'rebuild' and 'update_database' methods - if no new + data has been added but changes to the calculations are made, re-run + the calculation routines. """ - if self.drop_tables: - self._drop_tables() - - # reload meta.json into db - self._meta_json_to_database() - - processed_data_ids = [] - - # Iterate through each row in the manifest then clean and validate - for manifest_row in self.manifest: - # Note: Incompletely filled out rows in the manifest can break the - # other code - # TODO: figure out a way to flag this issue early in loading - # TODO: of manifest - - # only clean and validate data files flagged for use in database - try: - if manifest_row['include_flag'] == 'use': - logger.info("{}: preparing to load row {} from the manifest". - format(manifest_row['unique_data_id'], - len(self.manifest))) - self._process_data_file(manifest_row=manifest_row) - processed_data_ids.append(manifest_row['unique_data_id']) - except: - logger.exception("Unable to process {}".format(manifest_row['unique_data_id'])) - - return processed_data_ids - - def recalculate_database(self): - ''' - An alternative to 'rebuild' and 'update_database' methods - if no new data has been added but - changes to the calculations are made, re-run the calculation routines. - ''' try: - #self._automap() #not yet used - created for potential use by calculated fields - self._create_zone_facts_table() + # self._automap() #not yet used - created for potential use by calculated fields + self._ingestion_mediator.create_zone_facts_table() + + # populate table with calculated fields values + res_units_by_zone_type = self._get_residential_units() + self._populate_zone_facts_table(res_units_by_zone_type) self._populate_calculated_project_fields() except Exception as e: logger.error("Failed to recalculate database due to {}".format(e)) - if self.debug: + if self._debug: raise e + else: + return False - return None - - def _automap(self): - ''' - Adding this in case it is useful for the update scripts for _populate_calculated_project_fields - Did not end up using it for REAC score, but leaving it in case it is useful for future. - Mimics automap method used in api - ''' - from sqlalchemy.ext.automap import automap_base - - Base = automap_base() - Base.prepare(self.engine, reflect=True) - - #self._BuildingPermits = Base.classes.building_permits - #self._Census = Base.classes.census - #self._CensusMarginOfError = Base.classes.census_margin_of_error - #self._Crime = Base.classes.crime - #self._DcTax = Base.classes.dc_tax - self._Project = Base.classes.project - self._ReacScore = Base.classes.reac_score - #self._RealProperty = Base.classes.real_property - #self._Subsidy = Base.classes.subsidy - #self._Topa = Base.classes.topa - #self._WmataDist = Base.classes.wmata_dist - #self._WmataInfo = Base.classes.wmata_info - - def _populate_calculated_project_fields(self): - ''' - Adds values for calculated fields to the project table - Assumes the columns have already been created due to meta.json - ''' - conn = self.engine.connect() - - - ######################### - # Most recent REAC score - ######################### - logger.info(" Calculating REAC statistics") - #HT on sql syntax for a bulk insert, for future reference. Also notes how to only replace if exists: https://stackoverflow.com/a/7918818 - #This statement finds the most recent reac score in the reac_score table for each nlihc_id, and writes that into the project table - stmt = ''' - update project - set (most_recent_reac_score_num, most_recent_reac_score_date, most_recent_reac_score_id) = - (select reac_score_num, last_score_date, reac_score_id from( - ---This creates a table of most recent scores - SELECT - reac_score.id as reac_score_id - , reac_score.nlihc_id as nlihc_id - , reac_score.reac_score_num as reac_score_num - , dates.last_score_date as last_score_date - FROM reac_score - INNER JOIN - ( SELECT - nlihc_id - , MAX(reac_date) AS last_score_date - FROM reac_score - GROUP BY nlihc_id - ) AS dates - ON reac_score.nlihc_id=dates.nlihc_id - AND reac_score.reac_date=dates.last_score_date - ) as most_recent_scores - - where project.nlihc_id = most_recent_scores.nlihc_id) - ''' - conn.execute(stmt) - - - ######################## - # Sum of tax assessment - ######################## - - logger.info(" Calculating tax assessment statistics") - - from sqlalchemy import select, update, and_, bindparam - - meta = MetaData() - meta.reflect(bind=self.engine) - p,t = meta.tables['project'],meta.tables['dc_tax'] - - q = select([p.c.nlihc_id - , t.c.appraised_value_current_total - , t.c.appraised_value_current_land - , t.c.appraised_value_current_impr] - )\ - .where( - and_(p.c.nlihc_id == t.c.nlihc_id - , t.c.nlihc_id != None - ) - ) - rproxy = conn.execute(q) - rrows = rproxy.fetchall() - rproxy.close() - - summed_appraisals = [] - proj_ids = set([row.nlihc_id for row in rrows]) - for proj in proj_ids: - summed_total, summed_land, summed_impr = \ - sum([row.appraised_value_current_total - for row in rrows if row.nlihc_id == proj])\ - ,sum([row.appraised_value_current_land - for row in rrows if row.nlihc_id == proj])\ - ,sum([row.appraised_value_current_impr - for row in rrows if row.nlihc_id == proj]) - summed_appraisals.append({ - 'proj':proj, - 'summed_total':summed_total - , 'summed_land':summed_land - , 'summed_impr':summed_impr - }) - - upd = p.update()\ - .where(p.c.nlihc_id == bindparam('proj'))\ - .values(sum_appraised_value_current_total = bindparam('summed_total') - , sum_appraised_value_current_land = bindparam('summed_land') - , sum_appraised_value_current_impr = bindparam('summed_impr') - ) - - conn.execute(upd,summed_appraisals) - - # here should calculate the tax assessment per unit in the project; but, be sure to use the - # proj_unit_tot_mar instead of normal _tot so that if we don't have all the records from the mar - # (due to missing addresses in the proj_addre table) we'll be missing the same ones from numerator - # and denominator so will get realistic average. - logger.info(" Calculating TOPA statistics") - - stmt = """ - update project - set (topa_count - ,most_recent_topa_date) - = - (select topa_count - , most_recent_topa_date - from( - select nlihc_id - ,count(distinct nidc_rcasd_id) as topa_count - ,count(nidc_rcasd_id) as topa_record_count - ,max(notice_date) as most_recent_topa_date - from( - select project.nlihc_id as nlihc_id - , nidc_rcasd_id - , address - , notice_date - from project - left join topa - on project.nlihc_id = topa.nlihc_id - --for debugging: - --where project.nlihc_id in ('NL000365','NL000046','NL000229') - --order by project.nlihc_id desc, notice_date desc - - ) - AS joined_appraisals - group by nlihc_id - --order by most_recent_notice_date desc - - ) as summed_appraisals - where summed_appraisals.nlihc_id = project.nlihc_id) - """ - conn.execute(stmt) - - - - ######################### - # Other calculated fields - ######################### - - # Miles (or Portion of) to Nearest Metro Station - - stmt = """ - UPDATE project - SET nearest_metro_station = - ( - SELECT nearest_metro - FROM - ( - SELECT wmata_dist.nlihc_id AS nlihc_id - , MIN(dist_in_miles) AS nearest_metro - FROM wmata_dist - WHERE type = 'rail' - GROUP BY nlihc_id - ) AS nstation - WHERE nstation.nlihc_id = project.nlihc_id - ) - """ - conn.execute(stmt) + return True - # Bus Routes within Half a Mile of Each Project # - # + def _get_residential_units(self): """ - The code below tries to reduce the time individual database queries - otherwise would take for record-by-record updates. It gathers - the data needed in one initial composite query, uses list comprehension - to shorten processing, and executes the final update within SQLAlchemy's - "executemany" context. + Returns the number of residential units in the standard 'items' format """ - from sqlalchemy import select, update, and_, bindparam - - meta = MetaData() - meta.reflect(bind=self.engine) - p = meta.tables['project'] - d = meta.tables['wmata_dist'] - i = meta.tables['wmata_info'] - - q = select([d.c.nlihc_id - , d.c.stop_id_or_station_code - , i.c.lines])\ - .where( - and_(d.c.type == 'bus' - , d.c.stop_id_or_station_code - == i.c.stop_id_or_station_code - ) - ) - rproxy = conn.execute(q) - rrows = rproxy.fetchall() - rproxy.close() - - bus_routes_nearby = [] - proj_ids = set([row.nlihc_id for row in rrows]) - for proj in proj_ids: - clusters = [row.lines for row in rrows if row.nlihc_id == proj] - count_unique = len(set(":".join(clusters).split(":"))) - bus_routes_nearby.append({'proj':proj,'routes':count_unique}) - - upd = p.update()\ - .where(p.c.nlihc_id == bindparam('proj'))\ - .values(bus_routes_nearby = bindparam('routes')) - - conn.execute(upd,bus_routes_nearby) - - ######################### - # Teardown - ######################### - conn.close() - - def _create_zone_facts_table(self): - """ - Creates the zone_facts table which is used as a master table for API - endpoints. The purpose is to avoid recalculating fields adhoc and - performing client-side reconfiguration of data into display fields. - This is all now done in the backend whenever new data is loaded. - """ - try: - # drop zone_facts table if already in db - if 'zone_facts' in self.engine.table_names(): - with self.engine.connect() as conn: - conn.execute('DROP TABLE zone_facts;') + with self._engine.connect() as conn: - # create empty zone_facts table - sql_interface = HISql(meta=self.meta, manifest_row=None, - engine=self.engine) - with self.engine.connect() as conn: - sql_interface.create_table(db_conn=conn, table='zone_facts') + res_units_by_zone_type = {} + zone_types = ['ward', 'neighborhood_cluster', 'census_tract'] + for zone_type in zone_types: + q = """ + SELECT {zone_type} + , sum(active_res_occupancy_count) + FROM mar + GROUP BY {zone_type} + """.format(zone_type=zone_type) + rproxy = conn.execute(q) + rrows = rproxy.fetchall() + res_units_by_zone_type[zone_type] = { + 'items': { + row[0]: row[1] for row in rrows + if row[0] is not None and row[0] != '' + } + } + return res_units_by_zone_type - # populate table with calculated fields values - res_units_by_zone_type = self._get_residential_units() - self._populate_zone_facts_table(res_units_by_zone_type) - + # TODO do better error handling - for interim development purposes only except Exception as e: - logger.error("Failed to create zone_facts table") - if self.debug: - raise e - + return {'items': None, + 'notes': "_get_residential_units failed: {}".format(e), + 'grouping': "grouping", 'data_id': "res_units_by_zone"} - def _populate_zone_facts_table(self,res_units_by_zone_type): + def _populate_zone_facts_table(self, res_units_by_zone_type): """ Populates the zone_facts table with the calculated fields data and acs rent data fields from census. @@ -690,7 +269,7 @@ def _populate_zone_facts_table(self,res_units_by_zone_type): 'construction_permits': ['count', 'building_permits', 'construction'], 'construction_permits_rate': ['rate', 'building_permits', - 'construction'] + 'construction'] } zone_types = ['census_tract', 'ward', 'neighborhood_cluster'] @@ -708,36 +287,38 @@ def _populate_zone_facts_table(self,res_units_by_zone_type): grouping=zone_type) field_values[field] = result['items'] except Exception as e: - logger.error("Couldn't get census data for {}".format(field)) + logger.error( + "Couldn't get census data for {}".format(field)) logger.error(e) # get field value from building permits and crime table - for field in sorted(summarize_obs_field_args,reverse=True): + for field in sorted(summarize_obs_field_args, reverse=True): # Sorted to calculate res_units_by_zone_type before permit rates. try: method, table_name, filter_name = summarize_obs_field_args[ field] - if field == 'residential_units': - field_values[field] = res_units_by_zone_type[zone_type]['items'] + if field == 'residential_units': + field_values[field] = res_units_by_zone_type[zone_type][ + 'items'] continue result = self._summarize_observations( - method, + method, table_name, filter_name, months=12, grouping=zone_type, res_units_by_zone_type=res_units_by_zone_type - ) + ) field_values[field] = result['items'] except Exception as e: logger.error("Couldn't summarize data for {}".format(field)) logger.error(e) - try: - zone_specifics = self._get_zone_specifics_for_zone_type(zone_type) + zone_specifics = self._get_zone_specifics_for_zone_type( + zone_type) - for zone in zone_specifics: #list of zones in that zone type, i.e. ['Ward 1', 'Ward 2' etc.] + for zone in zone_specifics: # list of zones in that zone type, i.e. ['Ward 1', 'Ward 2' etc.] # skip 'Non-cluster area' if zone == 'Non-cluster area': continue @@ -759,79 +340,24 @@ def _populate_zone_facts_table(self,res_units_by_zone_type): # derive column and values strings needed for sql query columns = ', '.join(columns) - columns = 'id, zone_type, zone, ' + columns - - values = ', '.join(values) - values = "'" + str(uuid4()) + "', '" + zone_type + "', '" + \ - zone + "', " "" + values - - q = "INSERT INTO zone_facts ({cols}) VALUES ({vals})".format( - cols=columns, vals=values) - - with self.engine.connect() as conn: - result = conn.execute(q) - query_results.append(result) - except Exception as e: - logger.error("Couldn't load data for {} into zone_facts".format(zone_type)) - - - return query_results - - def _get_zone_specifics_for_zone_type(self, zone_type): - """ - Returns a list of zone_specific values for a given zone_type. - """ - with self.engine.connect() as conn: - - if zone_type == 'ward': - table = 'census_tract_to_ward' - elif zone_type == 'neighborhood_cluster': - table = 'census_tract_to_neighborhood_cluster' - else: - table = 'census' - - query_result = conn.execute( - 'select distinct {zone} from {table};'.format(zone=zone_type, - table=table)) - zone_specifics = [row[0] for row in query_result.fetchall()] - # zones.append(zone) - - return zone_specifics - - def _items_divide(self, numerator_data, denominator_data): - """ - Divides items in the numerator by items in the denominator by matching - the appropriate groupings. - - Takes data that is formatted for output the API, i.e. a dictionary - with key "items", which contains a list of dictionaries each with 'grouping' - and 'count'. + columns = 'id, zone_type, zone, ' + columns - Returns items as dictionary with group and division result as - key/value pairs instead of list of dictionaries. - """ - items = dict() - if numerator_data['items'] is None: - items = None - else: - for n in numerator_data['items']: - # TODO what should we do when a matching item isn't found? + values = ', '.join(values) + values = "'" + str(uuid4()) + "', '" + zone_type + "', '" + \ + zone + "', " "" + values - if n not in denominator_data['items'] or \ - numerator_data['items'][n] is None \ - or denominator_data['items'][n] is None \ - or denominator_data['items'][n] == 0: - divided = None - else: - divided = numerator_data['items'][n] / denominator_data[ - 'items'][n] + q = "INSERT INTO zone_facts ({cols}) VALUES ({vals})".format( + cols=columns, vals=values) - # item = dict({'group': n['group'], - # 'value': divided}) - items[n] = divided + with self._engine.connect() as conn: + # TODO - consider trans/rollback logic for this + result = conn.execute(q) + query_results.append(result) + except Exception as e: + logger.error("Couldn't load data for {} into zone_facts".format( + zone_type)) - return {'items': items, 'grouping': numerator_data['grouping'], - 'data_id': numerator_data['data_id']} + return query_results def _census_with_weighting(self, data_id, grouping): """ @@ -895,7 +421,7 @@ def _get_weighted_census_results(self, grouping, field, pop_wt_prop=False): if pop_wt_prop: pop_wt = 'population_weight_proportions' - with self.engine.connect() as conn: + with self._engine.connect() as conn: # get the field population value for each census_tract q = "SELECT census_tract, {field} FROM census".format( field=field) # TODO need to add 'year' column for multiple census years when this is added to the data @@ -954,7 +480,7 @@ def _get_weighted_census_results(self, grouping, field, pop_wt_prop=False): return {'items': items, 'grouping': grouping, 'data_id': field} def _summarize_observations(self, method, table_name, filter_name, months, - grouping,res_units_by_zone_type): + grouping, res_units_by_zone_type): """ This endpoint takes a table that has each record as list of observations (like our crime and building_permits tables) and returns summary @@ -993,11 +519,10 @@ def _summarize_observations(self, method, table_name, filter_name, months, :return: """ - ########################### - #Handle filters + # Handle filters ########################### - #Be sure concatenated 'AND' statements have a space in front of them + # Be sure concatenated 'AND' statements have a space in front of them additional_wheres = '' if filter_name == 'all': additional_wheres += " " @@ -1018,15 +543,15 @@ def _summarize_observations(self, method, table_name, filter_name, months, additional_wheres += " Incorrect filter name - this inserted SQL will cause query to fail" ########################## - #Handle date range + # Handle date range ########################## date_fields = {'building_permits': 'issue_date', 'crime': 'report_date'} date_field = date_fields[table_name] - #method currently not implemented. 'count' or 'rate' + # method currently not implemented. 'count' or 'rate' - start_date = None #request.args.get('start') + start_date = None # request.args.get('start') if start_date is None: start_date = "now()" else: @@ -1038,7 +563,7 @@ def _summarize_observations(self, method, table_name, filter_name, months, date_range_sql = ( "({start_date}::TIMESTAMP - INTERVAL '{months} months')" " AND {start_date}::TIMESTAMP" - ).format(start_date=start_date, months=months) + ).format(start_date=start_date, months=months) ######################### # Optional - validate other inputs @@ -1067,7 +592,7 @@ def _summarize_observations(self, method, table_name, filter_name, months, # residential units if table_name in ['crime']: denominator = self._get_weighted_census_results(grouping, - 'population') + 'population') api_results = self._items_divide(api_results, denominator) api_results = self._scale(api_results, 100000) # crime incidents per 100,000 people @@ -1080,7 +605,7 @@ def _count_observations(self, table_name, grouping, date_field, fallback = "'Unknown'" try: - with self.engine.connect() as conn: + with self._engine.connect() as conn: q = """ SELECT COALESCE({grouping},{fallback}) --'Unknown' @@ -1098,15 +623,15 @@ def _count_observations(self, table_name, grouping, date_field, proxy = conn.execute(q) results = proxy.fetchall() - #transform the results. - #TODO should come up with a better generic way to do this using column - #names for any arbitrary sql table results. + # transform the results. + # TODO should come up with a better generic way to do this using column + # names for any arbitrary sql table results. formatted = {row[0]: row[1] for row in results} return {'items': formatted, 'grouping': grouping, 'data_id': table_name} - #TODO do better error handling - for interim development purposes only + # TODO do better error handling - for interim development purposes only except Exception as e: return {'items': None, 'notes': "Query failed: {}".format(e), 'grouping': grouping, 'data_id': table_name} @@ -1124,240 +649,295 @@ def _scale(self, data, factor): return data - def _get_residential_units(self): + def _items_divide(self, numerator_data, denominator_data): """ - Returns the number of residential units in the standard 'items' format + Divides items in the numerator by items in the denominator by matching + the appropriate groupings. + + Takes data that is formatted for output the API, i.e. a dictionary + with key "items", which contains a list of dictionaries each with 'grouping' + and 'count'. + + Returns items as dictionary with group and division result as + key/value pairs instead of list of dictionaries. """ + items = dict() + if numerator_data['items'] is None: + items = None + else: + for n in numerator_data['items']: + # TODO what should we do when a matching item isn't found? - try: - with self.engine.connect() as conn: + if n not in denominator_data['items'] or \ + numerator_data['items'][n] is None \ + or denominator_data['items'][n] is None \ + or denominator_data['items'][n] == 0: + divided = None + else: + divided = numerator_data['items'][n] / denominator_data[ + 'items'][n] - res_units_by_zone_type = {} - zone_types = ['ward','neighborhood_cluster','census_tract'] - for zone_type in zone_types: - q = """ - SELECT {zone_type} - , sum(active_res_occupancy_count) - FROM mar - GROUP BY {zone_type} - """.format(zone_type=zone_type) - rproxy = conn.execute(q) - rrows = rproxy.fetchall() - res_units_by_zone_type[zone_type] = {'items' : - {row[0]:row[1] for row in rrows - if row[0] != None and row[0] != ''} - } - return res_units_by_zone_type + # item = dict({'group': n['group'], + # 'value': divided}) + items[n] = divided - #TODO do better error handling - for interim development purposes only - except Exception as e: - return {'items': None, 'notes': "_get_residential_units failed: {}".format(e), - 'grouping': grouping, 'data_id': "res_units_by_zone"} + return {'items': items, 'grouping': numerator_data['grouping'], + 'data_id': numerator_data['data_id']} - def _process_data_file(self, manifest_row): + def _get_zone_specifics_for_zone_type(self, zone_type): """ - Processes the data file for the given manifest row. + Returns a list of zone_specific values for a given zone_type. """ - # get the file object for the data - csv_reader = DataReader(meta=self.meta, - manifest_row=manifest_row, - load_from="file") + with self._engine.connect() as conn: - # get file path for storing clean PSV files - temp_filepath = self._get_temp_filepath(manifest_row=manifest_row) + if zone_type == 'ward': + table = 'census_tract_to_ward' + elif zone_type == 'neighborhood_cluster': + table = 'census_tract_to_neighborhood_cluster' + else: + table = 'census' - # validate and clean - self._load_single_file(table_name=manifest_row['destination_table'], - manifest_row=manifest_row, - csv_reader=csv_reader, - temp_filepath=temp_filepath) + query_result = conn.execute( + 'select distinct {zone} from {table};'.format(zone=zone_type, + table=table)) + zone_specifics = [row[0] for row in query_result.fetchall()] - def _get_cleaner(self, table_name, manifest_row): - """ - Returns the custom cleaner class that is to be used to clean the - specific data for use in database. + return zone_specifics - :param table_name: the table name for that data being processed - :param manifest_row: the row representing the data being loaded - :return: instance of custom cleaner class + def _populate_calculated_project_fields(self): """ - cleaner_class_name = self.meta[table_name]['cleaner'] - return ingestionfunctions.get_cleaner_from_name( - meta=self.meta, - manifest_row=manifest_row, - name=cleaner_class_name, - engine=self.engine) - - def _get_meta_only_fields(self, table_name, data_fields): + Adds values for calculated fields to the project table + Assumes the columns have already been created due to meta.json """ - Returns fields that exist in meta.json but not CSV so we can add - them to the row as it is cleaned and written to PSV file. + conn = self._engine.connect() - :param table_name: the table name for the data being processed - :param data_fields: the fields for the data being processed - :return: additional fields as dict - """ - meta_only_fields = {} - for field in self.meta[table_name]['fields']: - if field['source_name'] not in data_fields: - # adds 'sql_name',None as key,value pairs in dict - meta_only_fields[field['sql_name']] = None - return meta_only_fields - - def _configure_db_interface(self, manifest_row, temp_filepath): - """ - Returns an interface object for the sql database + ######################### + # Most recent REAC score + ######################### + logger.info(" Calculating REAC statistics") + # HT on sql syntax for a bulk insert, for future reference. Also notes how to only replace if exists: https://stackoverflow.com/a/7918818 + # This statement finds the most recent reac score in the reac_score table for each nlihc_id, and writes that into the project table + stmt = ''' + update project + set (most_recent_reac_score_num, most_recent_reac_score_date, most_recent_reac_score_id) = + (select reac_score_num, last_score_date, reac_score_id from( + ---This creates a table of most recent scores + SELECT + reac_score.id as reac_score_id + , reac_score.nlihc_id as nlihc_id + , reac_score.reac_score_num as reac_score_num + , dates.last_score_date as last_score_date + FROM reac_score + INNER JOIN + ( SELECT + nlihc_id + , MAX(reac_date) AS last_score_date + FROM reac_score + GROUP BY nlihc_id + ) AS dates + ON reac_score.nlihc_id=dates.nlihc_id + AND reac_score.reac_date=dates.last_score_date + ) as most_recent_scores - :param manifest_row: a given row in the manifest - :param temp_filepath: the file path where PSV will be saved - """ - # check for database manifest - create it if it doesn't exist - sql_manifest_exists = \ - ingestionfunctions.check_or_create_sql_manifest(engine=self.engine) - logger.info(" sql_manifest_exists: {}".format(sql_manifest_exists)) - - # configure database interface object and get matching manifest row - interface = HISql(meta=self.meta, manifest_row=manifest_row, - engine=self.engine, filename=temp_filepath) - return interface - - def _load_single_file(self, table_name, manifest_row, csv_reader, - temp_filepath): + where project.nlihc_id = most_recent_scores.nlihc_id) + ''' + conn.execute(stmt) + + ######################## + # Sum of tax assessment + ######################## + + logger.info(" Calculating tax assessment statistics") + + from sqlalchemy import select, update, and_, bindparam + + meta = MetaData() + meta.reflect(bind=self._engine) + p, t = meta.tables['project'], meta.tables['dc_tax'] + + q = select([p.c.nlihc_id + , t.c.appraised_value_current_total + , t.c.appraised_value_current_land + , t.c.appraised_value_current_impr] + ) \ + .where( + and_(p.c.nlihc_id == t.c.nlihc_id + , t.c.nlihc_id != None + ) + ) + rproxy = conn.execute(q) + rrows = rproxy.fetchall() + rproxy.close() + + summed_appraisals = [] + proj_ids = set([row.nlihc_id for row in rrows]) + for proj in proj_ids: + summed_total, summed_land, summed_impr = \ + sum([row.appraised_value_current_total + for row in rrows if row.nlihc_id == proj]) \ + , sum([row.appraised_value_current_land + for row in rrows if row.nlihc_id == proj]) \ + , sum([row.appraised_value_current_impr + for row in rrows if row.nlihc_id == proj]) + summed_appraisals.append({ + 'proj': proj, + 'summed_total': summed_total + , 'summed_land': summed_land + , 'summed_impr': summed_impr + }) + + upd = p.update() \ + .where(p.c.nlihc_id == bindparam('proj')) \ + .values(sum_appraised_value_current_total=bindparam('summed_total') + , sum_appraised_value_current_land=bindparam('summed_land') + , sum_appraised_value_current_impr=bindparam('summed_impr') + ) + + conn.execute(upd, summed_appraisals) + + # here should calculate the tax assessment per unit in the project; but, be sure to use the + # proj_unit_tot_mar instead of normal _tot so that if we don't have all the records from the mar + # (due to missing addresses in the proj_addre table) we'll be missing the same ones from numerator + # and denominator so will get realistic average. + logger.info(" Calculating TOPA statistics") + + stmt = """ + update project + set (topa_count + ,most_recent_topa_date) + = + (select topa_count + , most_recent_topa_date + from( + select nlihc_id + ,count(distinct nidc_rcasd_id) as topa_count + ,count(nidc_rcasd_id) as topa_record_count + ,max(notice_date) as most_recent_topa_date + from( + select project.nlihc_id as nlihc_id + , nidc_rcasd_id + , address + , notice_date + from project + left join topa + on project.nlihc_id = topa.nlihc_id + --for debugging: + --where project.nlihc_id in ('NL000365','NL000046','NL000229') + --order by project.nlihc_id desc, notice_date desc + + ) + AS joined_appraisals + group by nlihc_id + --order by most_recent_notice_date desc + + ) as summed_appraisals + where summed_appraisals.nlihc_id = project.nlihc_id) + """ + conn.execute(stmt) + + ######################### + # Other calculated fields + ######################### + + # Miles (or Portion of) to Nearest Metro Station + + stmt = """ + UPDATE project + SET nearest_metro_station = + ( + SELECT nearest_metro + FROM + ( + SELECT wmata_dist.nlihc_id AS nlihc_id + , MIN(dist_in_miles) AS nearest_metro + FROM wmata_dist + WHERE type = 'rail' + GROUP BY nlihc_id + ) AS nstation + WHERE nstation.nlihc_id = project.nlihc_id + ) + """ + conn.execute(stmt) + + # Bus Routes within Half a Mile of Each Project # + # """ - Cleans the data for the table name in the given manifest row, writes - the clean data to PSV file, and then passes on that information so - the database can be updated accordingly. + The code below tries to reduce the time individual database queries + otherwise would take for record-by-record updates. It gathers + the data needed in one initial composite query, uses list comprehension + to shorten processing, and executes the final update within SQLAlchemy's + "executemany" context. """ - # get database interface and it's equivalent manifest row - sql_interface = self._configure_db_interface( - manifest_row=manifest_row, temp_filepath=temp_filepath) - - sql_manifest_row = sql_interface.get_sql_manifest_row() - - cleaner = self._get_cleaner(table_name=table_name, - manifest_row=manifest_row) - csv_writer = CSVWriter(meta=self.meta, - manifest_row=manifest_row, - filename=temp_filepath) - - # clean the file and save the output to a local pipe-delimited file - if csv_reader.should_file_be_loaded(sql_manifest_row=sql_manifest_row): - print(" Cleaning...") - start_time = time() - meta_only_fields = self._get_meta_only_fields( - table_name=table_name, data_fields=csv_reader.keys) - # TODO - concurrency can be handled here: row at a time - # TODO - while cleaning one row, start cleaning the next - # TODO - once cleaning is done, write row to psv file - # TODO - consider using queue: once empty update db with psv data - total_rows = len(csv_reader) - for idx, data_row in enumerate(csv_reader): - if idx % 100 == 0: - print(" on row ~{} of {}".format(idx,total_rows), end='\r', flush=True) - try: - data_row.update(meta_only_fields) # insert other field dict - clean_data_row = cleaner.clean(data_row, idx) - if clean_data_row is not None: - csv_writer.write(clean_data_row) - except Exception as e: - logger.error("Error when trying to clean row index {} from the manifest_row {}".format(idx,manifest_row)) - if self.debug == True: - raise e - - - # with concurrent.futures.ThreadPoolExecutor( - # max_workers=100) as executor: - # future_data = {executor.submit( - # self._clean_data, idx, data_row, cleaner, table_name, - # csv_reader.keys): ( - # idx, data_row) for idx, data_row in enumerate(csv_reader)} - # for future in concurrent.futures.as_completed(future_data): - # clean_data_row = future.result() - # if clean_data_row is not None: - # csv_writer.write(clean_data_row) - # - # csv_writer.close() - # - # # write the data to the database - # self._update_database(sql_interface=sql_interface) - # - # if not self._keep_temp_files: - # csv_writer.remove_file() - # end_time = time() - # print("\nRun time= %s" % (end_time - start_time)) - - csv_writer.close() - - # write the data to the database - self._update_database(sql_interface=sql_interface, manifest_row = manifest_row) - - if not self._keep_temp_files: - csv_writer.remove_file() - end_time = time() - print("\nRun time= %s" % (end_time - start_time)) - - def _clean_data(self, idx, data_row, cleaner, table_name, data_fields): - """ - Only used by threading - currently not used + from sqlalchemy import select, update, and_, bindparam + meta = MetaData() + meta.reflect(bind=self._engine) + p = meta.tables['project'] + d = meta.tables['wmata_dist'] + i = meta.tables['wmata_info'] - Helper function that actually does the cleaning processing of each - row in the raw data file. To improve performance, each row is - processess currently by n number of threads determined in - _load_single_file method. + q = select([d.c.nlihc_id + , d.c.stop_id_or_station_code + , i.c.lines]) \ + .where( + and_(d.c.type == 'bus' + , d.c.stop_id_or_station_code + == i.c.stop_id_or_station_code + ) + ) + rproxy = conn.execute(q) + rrows = rproxy.fetchall() + rproxy.close() - :param idx: the index of the data row - :param data_row: the data row to be processed - :param cleaner: an object representing the cleaner class to be used - :param table_name: the name of the table the data should go in - :param data_fields: the column headers required in the database - :return: the processed clean data row - """ - if data_row is None: # skip empty rows - return None + bus_routes_nearby = [] + proj_ids = set([row.nlihc_id for row in rrows]) + for proj in proj_ids: + clusters = [row.lines for row in rrows if row.nlihc_id == proj] + count_unique = len(set(":".join(clusters).split(":"))) + bus_routes_nearby.append({'proj': proj, 'routes': count_unique}) - meta_only_fields = self._get_meta_only_fields( - table_name=table_name, data_fields=data_fields) - data_row.update(meta_only_fields) # insert other field dict keys + upd = p.update() \ + .where(p.c.nlihc_id == bindparam('proj')) \ + .values(bus_routes_nearby=bindparam('routes')) - # handle connection timeouts by pausing then trying again - while True: - try: - clean_data_row = cleaner.clean(data_row, idx) - break - except Exception: - logger.warning("_clean_data(): %s" % data_row) - sleep(1) + conn.execute(upd, bus_routes_nearby) - return clean_data_row + ######################### + # Teardown + ######################### + conn.close() - def _update_database(self, sql_interface, manifest_row): + def _automap(self): """ - Load the clean PSV file into the database + Adding this in case it is useful for the update scripts for _populate_calculated_project_fields + Did not end up using it for REAC score, but leaving it in case it is useful for future. + Mimics automap method used in api """ - print(" Loading...") - - # TODO Need to figure out how to revert the removal if loading doesn't work?? - self._remove_existing_data(manifest_row=manifest_row) - self._remove_table_if_empty(manifest_row = manifest_row) - - # create table if it doesn't exist - sql_interface.create_table_if_necessary() - try: - sql_interface.write_file_to_sql() - except TableWritingError: - # TODO: tell user total count of errors. - # currently write_file_to_sql() just writes in log that file failed - self._failed_table_count += 1 - pass + from sqlalchemy.ext.automap import automap_base + + Base = automap_base() + Base.prepare(self._engine, reflect=True) + + # self._BuildingPermits = Base.classes.building_permits + # self._Census = Base.classes.census + # self._CensusMarginOfError = Base.classes.census_margin_of_error + # self._Crime = Base.classes.crime + # self._DcTax = Base.classes.dc_tax + self._Project = Base.classes.project + self._ReacScore = Base.classes.reac_score + # self._RealProperty = Base.classes.real_property + # self._Subsidy = Base.classes.subsidy + # self._Topa = Base.classes.topa + # self._WmataDist = Base.classes.wmata_dist + # self._WmataInfo = Base.classes.wmata_info def main(passed_arguments): """ Initializes load procedure based on passed command line arguments and options. - """ + """ # use real data as default scripts_path = os.path.abspath(os.path.join(PYTHON_PATH, 'scripts')) @@ -1393,7 +973,7 @@ def main(passed_arguments): # universal defaults keep_temp_files = True - + # Instantiate and run the loader loader = LoadData(database_choice=database_choice, meta_path=meta_path, @@ -1421,7 +1001,8 @@ def main(passed_arguments): #TODO add in failures report here e.g. _failed_table_count -#Add command line utility + +# Add command line utility description = ('Loads our flat file data into the database of choice. You ' 'can load sample or real data and/or rebuild or update only ' 'specific flat files based on unique_data_id values.') diff --git a/python/housinginsights/ingestion/Manifest.py b/python/housinginsights/ingestion/Manifest.py index 4b2b71ea..e0c09e75 100644 --- a/python/housinginsights/ingestion/Manifest.py +++ b/python/housinginsights/ingestion/Manifest.py @@ -1,15 +1,20 @@ """ Manifest module """ + +# built-in imports import os from collections import Counter from csv import DictWriter, DictReader - -from housinginsights.ingestion.DataReader import HIReader - +from sqlalchemy.exc import ProgrammingError from datetime import datetime import dateutil.parser as dateparser +# app imports +from housinginsights.ingestion.DataReader import HIReader +from housinginsights.tools.logger import HILogger +logger = HILogger(name=__file__, logfile="ingestion.log") + class Manifest(HIReader): """ @@ -28,6 +33,23 @@ def __init__(self, path='manifest.csv'): if not self.has_unique_ids(): raise ValueError('Manifest has duplicate unique_data_id!') + # metadata fields for manifest.csv + self._fields = [ + ("status", "text"), + ("load_date", "timestamp"), + ("include_flag", "text"), + ("destination_table", "text"), + ("unique_data_id", "text"), + ("update_method", "text"), + ("data_date", "date"), + ("encoding", "text"), + ("local_folder", "text"), + ("s3_folder", "text"), + ("filepath", "text"), + ("dependency", "text"), + ("notes", "text") + ] + def __iter__(self): self._length = 0 self._counter = Counter() @@ -37,14 +59,15 @@ def __iter__(self): for row in reader: self._length += 1 - #parse the date into proper format for sql + # parse the date into proper format for sql try: - _date = dateparser.parse(row['data_date'],dayfirst=False,yearfirst=False) + _date = dateparser.parse(row['data_date'], dayfirst=False, + yearfirst=False) row['data_date'] = datetime.strftime(_date, '%Y-%m-%d') except ValueError: row['data_date'] = 'Null' - #return the row + # return the row yield row def has_unique_ids(self): @@ -67,43 +90,37 @@ def has_unique_ids(self): self.unique_ids[row['unique_data_id']] = 'found' return True + def get_use_unique_data_ids(self): + """ + Returns list of all unique data ids that have 'include_flag' = 'use'. + """ + use_ids = list() + for row in self: + if row['include_flag'] in Manifest._include_flags_positive: + use_ids.append(row['unique_data_id']) + return use_ids + def get_manifest_row(self, unique_data_id): """ Returns the row for the given unique_data_id else 'None' if not in manifest. """ for row in self: - use = row['include_flag'] == 'use' + use = row['include_flag'] in Manifest._include_flags_positive if row['unique_data_id'] == unique_data_id and use: return row return None - def create_list(self, folder_path): - """ - Returns a list of potential unique_data_ids based on csv files in the - given folder path. The assumption is that all raw data a saved as csv - with filename that matches unique_data_id used in manifest. + def update_manifest(self, date_stamped_folder): """ - unique_data_ids = list() - files_in_folder = os.listdir(folder_path) - for file in files_in_folder: - file_path = os.path.join(folder_path, file) - uid, file_ext = file.split(sep='.') - if os.path.isfile(file_path) and file_ext == 'csv': - unique_data_ids.append(uid) + Used for automatically swapping out old files for new ones in our + manifest.csv whenever we gather new data. - return unique_data_ids - - def update_manifest(self, date_stamped_folder): - ''' - Used for automatically swapping out old files for new ones in our manifest.csv - whenever we gather new data. - - Using the folder passed (e.g. /data/raw/apis), find the most recent subfolder - (by sorting alphabetically). Make a list of .csv files in that folder - and update the manifest.csv for every unique_data_id that corresponds - to one of the .csv files. - ''' + Using the folder passed (e.g. /data/raw/apis), find the most recent + subfolder (by sorting alphabetically). Make a list of .csv files in + that folder and update the manifest.csv for every unique_data_id that + corresponds to one of the .csv files. + """ timestamp = os.path.basename(date_stamped_folder) data_date = datetime.strptime(timestamp, '%Y%m%d').strftime('%Y-%m-%d') file_path_base = date_stamped_folder[date_stamped_folder.find('raw'):] @@ -111,7 +128,7 @@ def update_manifest(self, date_stamped_folder): field_names = self.keys # get unique_data_ids for data files in recent subfolder - uid_list = self.create_list(date_stamped_folder) + uid_list = self._create_list(date_stamped_folder) data = list() # update the manifest.csv in place @@ -130,3 +147,70 @@ def update_manifest(self, date_stamped_folder): writer.writerows(data) return self.path + + def _create_list(self, folder_path): + """ + Returns a list of potential unique_data_ids based on csv files in the + given folder path. The assumption is that all raw data a saved as csv + with filename that matches unique_data_id used in manifest. + """ + unique_data_ids = list() + files_in_folder = os.listdir(folder_path) + for file in files_in_folder: + file_path = os.path.join(folder_path, file) + uid, file_ext = file.split(sep='.') + if os.path.isfile(file_path) and file_ext == 'csv': + unique_data_ids.append(uid) + + return unique_data_ids + + def check_or_create_sql_manifest(self, engine): + """ + Makes sure we have a manifest table in the database. + If not, it creates it with appropriate fields. + + This corresponds to the manifest.csv file, which contains a log + of all the individual data files we have used as well as which + table they each go into. + + The csv version of the manifest includes all files we have ever + used, including ones not in the database. + + The SQL version of the manifest only tracks those that have been + written to the database, and whether they are still there or + have been deleted. + """ + try: + with engine.connect() as db_conn: + sql_query = "SELECT * FROM manifest" + query_result = db_conn.execute(sql_query) + _ = [dict(row.items()) for row in query_result] + return True + except ProgrammingError as _: + try: + logger.info('SQL Manifest missing...attempting to add...') + # Create the query with appropriate fields and datatypes + db_conn = engine.connect() + field_statements = [] + for tup in self._fields: + field_statements.append(tup[0] + " " + tup[1]) + field_command = ",".join(field_statements) + create_command = "CREATE TABLE manifest({});".format( + field_command) + db_conn.execute(create_command) + db_conn.close() + logger.info("Manifest table created in the SQL database") + return True + + except Exception as e: + raise e + + def get_dependent_data_ids(self, unique_data_id): + """ + Returns list of dependent data ids for the passed unique_data_id + """ + dependents = list() + for row in self: + if row['dependency'] == unique_data_id: + dependents.append(row['unique_data_id']) + return dependents diff --git a/python/housinginsights/ingestion/Meta.py b/python/housinginsights/ingestion/Meta.py new file mode 100644 index 00000000..318f3d2f --- /dev/null +++ b/python/housinginsights/ingestion/Meta.py @@ -0,0 +1,139 @@ +""" +Meta.py is an object representation of the meta.json file used to document +all metadata fields for our raw data mapping them to their respective fields +used in the the database. +""" +# built-in imports +import os +import json + +# app imports +from housinginsights.tools.base_colleague import Colleague +from housinginsights.tools.logger import HILogger +from housinginsights.ingestion import Cleaners + +# useful globals +PYTHON_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, os.pardir)) +SCRIPTS_PATH = os.path.abspath(os.path.join(PYTHON_PATH, 'scripts')) + +logger = HILogger(name=__file__, logfile="ingestion.log") + + +class Meta(Colleague): + + def __init__(self, meta_path=None): + super().__init__() + if meta_path is None: + self._meta_path = os.path.abspath(os.path.join(SCRIPTS_PATH, + 'meta.json')) + else: + self._meta_path = meta_path + + self._meta = self._validate_and_load_into_memory() + + def _validate_and_load_into_memory(self): + """ + Helper function validates the format of the JSON data in meta.json and + returns the validated JSON data of meta.json file + """ + """ + Expected meta data format: + { tablename: {fields:[ + { "display_name": "Preservation Catalog ID", + "display_text": "description of what this field is", + "source_name": "Nlihc_id", + "sql_name": "nlihc_id", + "type": "object" + } + ]} + } + """ + with open(self._meta_path) as fh: + meta = json.load(fh) + + # validate that meta.json meets the expected data format + json_is_valid = True + try: + for table in meta: + for field in meta[table]['fields']: + for key in field: + if key not in ( + 'display_name', 'display_text', 'source_name', + 'sql_name', 'type', 'required_in_source', '_comment'): + json_is_valid = False + first_json_error = "Location: " \ + "table: {}, section: {}, " \ + "attribute: {}".format(table, + field, + key) + raise ValueError("Error found in JSON, " + "check expected format. " + "{}".format(first_json_error)) + except Exception: + raise ValueError("Error found in JSON, check expected format.") + + logger.info( + "{} imported. JSON format is valid: {}".format(self._meta_path, + json_is_valid)) + return meta + + def get_meta(self): + return self._meta + + def get_cleaner_from_name(self, manifest_row, engine=None): + """ + Returns the instance of the cleaner class matching the given cleaner class + name. + + :param manifest_row: the given row in manifest.csv + :param engine: the engine object used for interacting with db + :return: a class object of the given cleaner class + """ + + # Import + # module = import_module("module.submodule") + table_name = manifest_row['destination_table'] + cleaner_class_name = self._meta[table_name]['cleaner'] + Class_ = getattr(Cleaners, cleaner_class_name) + instance = Class_(self._meta, manifest_row, engine=engine) + return instance + + def get_sql_fields_and_type_from_meta(self, table_name=None): + """ + Get list of 'sql_name' and 'type' from fields for database updating + + :param table_name: the name of the table to be referenced in meta + :return: a tuple - 'sql_name, sql_name_type' + """ + + if table_name is None: + raise ValueError( + 'Invalid table_name: {} was passed'.format(table_name)) + + meta_fields = self._meta[table_name]['fields'] + + sql_fields = list() + sql_field_types = list() + + for field in meta_fields: + sql_fields.append(field['sql_name']) + sql_field_types.append(field['type']) + + return sql_fields, sql_field_types + + def get_meta_only_fields(self, table_name, data_fields): + """ + Returns fields that exist in meta.json but not CSV so we can add + them to the row as it is cleaned and written to PSV file. + + :param table_name: the table name for the data being processed + :param data_fields: the fields for the data being processed + :return: additional fields as dict + """ + meta_only_fields = {} + for field in self._meta[table_name]['fields']: + if field['source_name'] not in data_fields: + # adds 'sql_name',None as key,value pairs in dict + meta_only_fields[field['sql_name']] = None + return meta_only_fields diff --git a/python/housinginsights/ingestion/SQLWriter.py b/python/housinginsights/ingestion/SQLWriter.py index 2b000bff..1ed8302b 100644 --- a/python/housinginsights/ingestion/SQLWriter.py +++ b/python/housinginsights/ingestion/SQLWriter.py @@ -51,117 +51,113 @@ something for DataReader class? JSON file? """ -import os -import sys -sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), - os.pardir, os.pardir))) -# TODO: clean up unused imports -from housinginsights.tools import dbtools -from sqlalchemy.orm import sessionmaker from sqlalchemy.exc import ProgrammingError from psycopg2 import DataError import copy import datetime +# app imports from housinginsights.tools.logger import HILogger +from housinginsights.tools.base_colleague import Colleague logger = HILogger(name=__file__, logfile="ingestion.log") + # TODO: is this incomplete - do we want to define a specific error output class TableWritingError(Exception): pass -class HISql(object): - def __init__(self, meta, manifest_row, engine, filename=None): +# TODO - refactor: decouple meta and manifest_row - use IngestionMediator +class HISql(Colleague): + + def __init__(self): """ Initialize object that will send the data stored in the newly-created clean.csv file to the database. - :param meta: meta data as json data - :param manifest_row: a given row in manifest.csv file - :param engine: the database the database that will be updated - :param filename: the clean data file that will be loaded into database + :param debug: determines whether errors certain errors should be + raised or skipped """ + super().__init__() - self.meta = meta - self.manifest_row = manifest_row - self.engine = engine - - # extract some values for convenience - try: - self.unique_data_id = self.manifest_row["unique_data_id"] - self.tablename = self.manifest_row["destination_table"] - - # assign defaults - self.filename = 'temp_{}.psv'.format(self.unique_data_id) \ - if filename is None else filename - except TypeError: - # assume creating table directly from meta.json - self.unique_data_id = None - self.tablename = None - self.filename = 'temp_{}.psv'.format('default') \ - if filename is None else filename - - def write_file_to_sql(self): - #TODO let this use existing session/connection/engine instead? - #engine = dbtools.get_database_engine("local_database") - - conn = self.engine.connect() - trans = conn.begin() - - try: - - print(" opening {}".format(self.filename)) - with open(self.filename, 'r', encoding='utf-8') as f: - #copy_from is only available on the psycopg2 object, we need to dig in to get it - dbapi_conn = conn.connection - dbapi_conn.set_client_encoding("UTF8") - dbapi_cur = dbapi_conn.cursor() - - dbapi_cur.copy_from(f, self.tablename, sep='|', null='Null', columns=None) - - self.update_manifest_row(conn=conn, status="loaded") - - #used for debugging, keep commented in real usage - #raise ProgrammingError(statement="test", params="test", orig="test") - - dbapi_conn.commit() - trans.commit() - logger.info(" data file loaded into database") - - #TODO need to find out what types of expected errors might actually occur here - #For now, assume that SQLAlchemy will raise a programmingerror - except (ProgrammingError, DataError, TypeError) as e: - trans.rollback() + def write_file_to_sql(self, table_name, clean_psv_file, engine): + """ + Attempts to write a clean psv file for the raw data of a given unique + data id into the specified table. + + :param table_name: the table where the data should be added to + :param clean_psv_file: the path for the clean psv file + :param engine: the sql engine to be used for connecting to the db + :return: True if table was updated accordingly with the data from the + clean_psv_file - False otherwise + """ + with engine.connect() as conn: + trans = conn.begin() + + try: + + logger.info(" opening {}".format(clean_psv_file)) + with open(clean_psv_file, 'r', encoding='utf-8') as f: + # copy_from is only available on the psycopg2 object, we + # need to dig in to get it + dbapi_conn = conn.connection + dbapi_conn.set_client_encoding("UTF8") + dbapi_cur = dbapi_conn.cursor() + + dbapi_cur.copy_from(f, table_name, sep='|', + null='Null', columns=None) + + self.update_sql_manifest_row( + self._ingestion_mediator.get_current_manifest_row(), + conn=conn, status="loaded") + + # used for debugging, keep commented in real usage + # raise ProgrammingError(statement="test", params="test", orig="test") + + dbapi_conn.commit() + trans.commit() + logger.info(" data file loaded into database") + + # TODO need to find out what types of expected errors might actually occur here + # For now, assume that SQLAlchemy will raise a programmingerror + except (ProgrammingError, DataError, TypeError) as e: + trans.rollback() + + logger.warning( + " FAIL: something went wrong loading {}".format( + clean_psv_file)) + logger.warning(" exception: {}".format(e)) + # raise TableWritingError + if self._debug: + raise e + else: + return False - logger.warning(" FAIL: something went wrong loading {}".format(self.unique_data_id)) - logger.warning(" exception: {}".format(e)) - raise TableWritingError + return True - conn.close() - - def update_manifest_row(self, conn, status="unknown"): + def update_sql_manifest_row(self, manifest_row, conn, status="unknown"): """ Adds self.manifest_row associated with this table to the SQL manifest conn = the connection to use (won't be closed so calling function can rollback) status = the value to put in the "status" field """ # Add the status - manifest_row = copy.copy(self.manifest_row) + manifest_row = copy.copy(manifest_row) manifest_row['status'] = status manifest_row['load_date'] = datetime.datetime.now().isoformat() # Remove the row if it exists # TODO make sure data is synced or appended properly - sql_manifest_row = self.get_sql_manifest_row(db_conn=conn, - close_conn=False) + sql_manifest_row = self.get_sql_manifest_row(manifest_row[ + 'unique_data_id'], + conn) if sql_manifest_row is not None: logger.info(" deleting existing manifest row for {}".format( - self.unique_data_id)) + manifest_row['unique_data_id'])) delete_command = \ "DELETE FROM manifest WHERE unique_data_id = '{}'".format( - self.unique_data_id) + manifest_row['unique_data_id']) conn.execute(delete_command) columns = [] @@ -175,126 +171,170 @@ def update_manifest_row(self, conn, status="unknown"): insert_command = "INSERT INTO manifest {} VALUES {};".format( columns_string, values_string) - + conn.execute(insert_command) - def create_table_if_necessary(self, table=None): - """ - Creates the table associated with this data file if it doesn't already - exist table = string representing the tablename - """ - #TODO - a better long-term solution to this might be SQLAlchemy metadata: http://www.mapfish.org/doc/tutorials/sqlalchemy.html - if table is None: - table = self.tablename - db_conn = self.engine.connect() - if self.does_table_exist(db_conn, table): - logger.info(" Did not create table because it already exists") - else: - self.create_table(db_conn, table) - db_conn.close() - - def does_table_exist(self, db_conn, table): + def does_table_exist(self, table_name, engine): try: - db_conn.execute("SELECT * FROM {}".format(table)) + with engine.connect() as db_conn: + db_conn.execute("SELECT * FROM {}".format(table_name)) return True except ProgrammingError: return False - def create_table(self, db_conn, table): - - sql_fields, sql_field_types = self.get_sql_fields_and_type_from_meta( - table_name=table) - - field_statements = [] - for idx, field in enumerate(sql_fields): - field_statements.append(field + " " + sql_field_types[idx]) - field_command = ",".join(field_statements) - create_command = "CREATE TABLE {}({});".format(table, field_command) - db_conn.execute(create_command) - logger.info(" Table created: {}".format(table)) - - # Create an id column and make it a primary key - create_id = "ALTER TABLE {} ADD COLUMN {} text;".format(table, 'id') - db_conn.execute(create_id) - set_primary_key = "ALTER TABLE {} ADD PRIMARY KEY ({});".format(table, - 'id') - db_conn.execute(set_primary_key) - - def create_primary_key_table(self, db_conn, table): + def create_table(self, table_name, sql_fields, sql_field_types, + engine): + + with engine.connect() as db_conn: + field_statements = [] + for idx, field in enumerate(sql_fields): + field_statements.append(field + " " + sql_field_types[idx]) + field_command = ",".join(field_statements) + create_command = "CREATE TABLE {}({});".format(table_name, + field_command) + db_conn.execute(create_command) + logger.info(" Table created: {}".format(table_name)) + + # Create an id column and make it a primary key + create_id = "ALTER TABLE {} ADD COLUMN {} text;".format(table_name, + 'id') + db_conn.execute(create_id) + set_primary_key = "ALTER TABLE {} " \ + "ADD PRIMARY KEY ({});".format(table_name, + 'id') + db_conn.execute(set_primary_key) + + def create_primary_key_table(self, table_name, engine): pass - def drop_table(self, table=None): - - db_conn = self.engine.connect() - table = self.tablename if table is None else table - - #TODO also need to delete manifest row(s) - #TODO need to use a transaction to ensure both operations sync - try: - db_conn.execute("DROP TABLE {}".format(table)) - except ProgrammingError: - logger.warning(" {} table can't be dropped because it doesn't exist".format(self.tablename)) - db_conn.close() - - def get_sql_manifest_row(self, db_conn=None, close_conn=True): + def drop_table(self, table_name, engine): + with engine.connect() as db_conn: + # TODO also need to delete manifest row(s) + # TODO need to use a transaction to ensure both operations sync + try: + db_conn.execute("DROP TABLE {}".format(table_name)) + except ProgrammingError: + logger.warning(" {} table can't be dropped because it " + "doesn't exist".format(table_name)) + + def get_sql_manifest_row(self, unique_data_id, conn): """ Connect to the database, perform sql query for the given - 'unique_data_id' for the given manifest row, and then return the - equivalent sql manifest row. + 'unique_data_id' and then return the equivalent sql manifest row. - :param db_conn: the connection object for the database - :param close_conn: boolean flag dictating whether to close connection - after work is complete + :param unique_data_id: the unique data id of which we're interested in :return: the resulting sql manifest row as a dict object """ - sql_query = "SELECT * FROM manifest WHERE unique_data_id = '{}'".format( - self.unique_data_id) + sql_query = "SELECT * FROM manifest " \ + "WHERE unique_data_id = '{}'".format(unique_data_id) - if db_conn is None: - db_conn = self.engine.connect() - - query_result = db_conn.execute(sql_query) + query_result = conn.execute(sql_query) # convert the sqlAlchemy ResultProxy object into a list of dictionaries results = [dict(row.items()) for row in query_result] - if close_conn: - db_conn.close() - # We expect there to be exactly one row matching the query if # the csv_row is already in the database - # TODO: change this to if, elif, else statement is mutually exclusive if len(results) > 1: - raise ValueError('Found multiple rows in database for data' - ' id {}'.format(self.unique_data_id)) - - # Return just the dictionary of results, not the list of dictionaries - if len(results) == 1: - return results[0] - - if len(results) == 0: - logger.info(" Couldn't find sql_manifest_row for {}".format(self.unique_data_id)) + raise ValueError(' Found multiple rows in database for data' + ' id {}'.format(unique_data_id)) + elif len(results) == 0: + logger.info(" Couldn't find sql_manifest_row for {}".format( + unique_data_id)) return None + else: # Return just the dictionary of results, not list of dictionaries + return results[0] - def get_sql_fields_and_type_from_meta(self, table_name=None): + def remove_existing_data(self, manifest_row, engine): """ - Get list of 'sql_name' and 'type' from fields for database updating - - :param table_name: the name of the table to be referenced in meta - :return: a tuple - 'sql_name, sql_name_type' + Removes all rows in the respective table for the given unique_data_id + then sets status = deleted for the unique_data_id in the + database manifest. + + :param manifest_row: the row for uid in the manifest + :param engine: object for connecting to the database + :return: result from executing delete query as + sqlalchemy result object if row exists in sql manifest - else + returns None """ + uid = manifest_row['unique_data_id'] + + with engine.connect() as conn: + trans = conn.begin() + # get objects for interfacing with the database + sql_manifest_row = self.get_sql_manifest_row(uid, conn) + + try: + # delete only rows with data_id in respective table + table_name = sql_manifest_row['destination_table'] + except TypeError: # will occur if sql_manifest_row == None + logger.info(" No sql_manifest exists! Proceed with adding" + " new data to the database!") + return None + + try: + query = "DELETE FROM {} WHERE unique_data_id =" \ + " '{}'".format(table_name, uid) + logger.info(" Deleting {} data from {}!".format( + uid, table_name)) + result = conn.execute(query) + # change status = deleted in sql_manifest + logger.info(" Resetting status in sql manifest row!") + self.update_sql_manifest_row(manifest_row, conn=conn, + status='deleted') + trans.commit() + except ProgrammingError: + trans.rollback() + logger.warning( + "Problem executing DELETE query for table {} and uid {}. " + "Manifest row exists but table does not. You should check " + "validity of table and data".format(table_name, uid)) + return None + + return result + + def remove_table_if_empty(self, manifest_row, engine): + """ + If a table is empty (all data has been removed, e.g. using _remove_existing_data), + delete the table itself. This allows the table to be rebuilt from scratch using + meta.json when new data is loaded - for example, if additional columns have been added. - if table_name is None: - table_name = self.tablename - - meta_fields = self.meta[table_name]['fields'] - - sql_fields = list() - sql_field_types = list() + :param manifest_row: the row for the uid in the manifest as a dictionary, which + supplies the name of the table to be checked/dropped - for field in meta_fields: - sql_fields.append(field['sql_name']) - sql_field_types.append(field['type']) + returns: result from executing delete qurey + """ + table_name = manifest_row['destination_table'] + with engine.connect() as conn: + + try: + proxy = conn.execute("SELECT COUNT(*) FROM {}".format( + table_name)) + result = proxy.fetchall() + except ProgrammingError: + logger.info(" Couldn't find table {} in the database".format( + table_name)) + return None + + # If the table is empty + # result format is [(count,)] i.e. list of tuples with each + # tuple = one row of result + if result[0][0] == 0: + trans = conn.begin() + try: + logger.info(" Dropping table from database: {}".format( + table_name)) + proxy = conn.execute("DROP TABLE {}".format(table_name)) + trans.commit() + return proxy + except ProgrammingError as e: + trans.rollback() + logger.error( + "Could not drop table {} from the database".format( + table_name)) + if self._debug: + raise e + else: + return None - return sql_fields, sql_field_types diff --git a/python/housinginsights/ingestion/functions.py b/python/housinginsights/ingestion/functions.py index 6c36c8f5..a47ac3fd 100644 --- a/python/housinginsights/ingestion/functions.py +++ b/python/housinginsights/ingestion/functions.py @@ -17,6 +17,7 @@ # Completed, tests not written. +# TODO - remove: moved into Meta.py def load_meta_data(filename='meta.json'): """ Helper function validates the format of the JSON data in meta.json. @@ -57,6 +58,7 @@ def load_meta_data(filename='meta.json'): return meta +# TODO - remove: moved into Manifest.py def check_or_create_sql_manifest(engine, rebuild=False): ''' Makes sure we have a manifest table in the database. @@ -88,18 +90,19 @@ def check_or_create_sql_manifest(engine, rebuild=False): #Create the query with appropriate fields and datatypes db_conn = engine.connect() fields = [ - ("status","text"), + ("status", "text"), ("load_date", "timestamp"), - ("include_flag","text"), - ("destination_table","text"), - ("unique_data_id","text"), + ("include_flag", "text"), + ("destination_table", "text"), + ("unique_data_id", "text"), ("update_method", "text"), - ("data_date","date"), + ("data_date", "date"), ("encoding", "text"), - ("local_folder","text"), - ("s3_folder","text"), - ("filepath","text"), - ("notes","text") + ("local_folder", "text"), + ("s3_folder", "text"), + ("filepath", "text"), + ("dependency", "text"), + ("notes", "text") ] field_statements = [] for tup in fields: @@ -115,6 +118,7 @@ def check_or_create_sql_manifest(engine, rebuild=False): raise e +# TODO - remove: moved into Meta.py def get_cleaner_from_name(meta, manifest_row, name="GenericCleaner", engine=None): """ Returns the instance of the cleaner class matching the given cleaner class diff --git a/python/housinginsights/tools/base_colleague.py b/python/housinginsights/tools/base_colleague.py new file mode 100644 index 00000000..a23ed9d4 --- /dev/null +++ b/python/housinginsights/tools/base_colleague.py @@ -0,0 +1,21 @@ +""" +This modules defines the base colleague class for use by any object that will +interact with the ingestion mediator module. +""" + + +class Colleague(object): + def __init__(self): + self._ingestion_mediator = None + self._debug = None + self._engine = None + self._database_choice = None + + def set_ingestion_mediator(self, ingestion_mediator): + self._ingestion_mediator = ingestion_mediator + self._debug = self._ingestion_mediator.get_debug() + self._engine = self._ingestion_mediator.get_engine() + self._database_choice = self._ingestion_mediator.get_database_choice() + + def _get_manifest_row(self): + return self._ingestion_mediator.get_current_manifest_row() \ No newline at end of file diff --git a/python/housinginsights/tools/ingestion_mediator.py b/python/housinginsights/tools/ingestion_mediator.py new file mode 100644 index 00000000..2a83f04b --- /dev/null +++ b/python/housinginsights/tools/ingestion_mediator.py @@ -0,0 +1,446 @@ +""" +ingestion_mediator.py is an implementation of a mediator object. It interacts +with all objects and scripts involved in our ingestion and processing +workflow, coordinating their activities. +""" + +# built-in import +import os +from time import time +from sqlalchemy.exc import ProgrammingError +from datetime import datetime + +# app imports +from housinginsights.ingestion.DataReader import DataReader +from housinginsights.tools import dbtools +from housinginsights.tools.logger import HILogger +from housinginsights.ingestion.CSVWriter import CSVWriter + +# useful paths as globals +PYTHON_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, os.pardir)) +SCRIPTS_PATH = os.path.abspath(os.path.join(PYTHON_PATH, 'scripts')) +CLEAN_PSV_PATH = os.path.abspath(os.path.join(PYTHON_PATH, os.pardir, + 'data', 'processed', + '_clean_psv')) + +logger = HILogger(name=__file__, logfile="ingestion.log") + + +class IngestionMediator(object): + def __init__(self, database_choice=None, debug=False): + """ + Initialize mediator with private instance variables for colleague + objects and other instance variables needed for ingestion workflow + """ + # load defaults if no arguments passed + if database_choice is None: + self._database_choice = 'docker_database' + else: + self._database_choice = database_choice + + # initialize instance variables related from above passed args + self._debug = debug + self._engine = dbtools.get_database_engine(self._database_choice) + self._manifest_row = None # typically working on one at time - track? + self._table_name = None + self._all_use_unique_data_ids = None + self._sql_manifest_in_db = False + self._id_map_used_set = set() + + # initialize instance variables for colleague objects + self._load_data = None + self._manifest = None + self._meta = None + self._hi_sql = None + self._get_api_data = None + + ############################### + # setter methods for linking this instance with respective colleague + # instances + ############################### + def set_load_data(self, load_data_instance): + self._load_data = load_data_instance + + def set_manifest(self, manifest_instance): + self._manifest = manifest_instance + self._all_use_unique_data_ids = self._manifest.get_use_unique_data_ids() + + def set_meta(self, meta_instance): + self._meta = meta_instance + + def set_hi_sql(self, hi_sql_instance): + self._hi_sql = hi_sql_instance + + def set_get_api_data(self, get_api_data_instance): + self._get_api_data = get_api_data_instance + + ############################### + # setter and getter methods for accessing private instance variables + ############################### + def set_debug(self, debug): + self._debug = debug + + def get_debug(self): + return self._debug + + def get_engine(self): + """ + Return the database engine associated with the instance of ingestion + mediator. + """ + return self._engine + + def set_database_choice(self, database_choice): + self._database_choice = database_choice + + def get_database_choice(self): + return self._database_choice + + def get_current_manifest_row(self): + return self._manifest_row + + def get_current_table_name(self): + return self._table_name + + def get_list_use_unique_data_ids(self): + return self._all_use_unique_data_ids + + def get_clean_psv_path(self, unique_data_id): + self._set_manifest_row_and_table_name(unique_data_id) + clean_data_path = os.path.abspath( + os.path.join(CLEAN_PSV_PATH, + 'clean_{}.psv'.format(unique_data_id))) + + # check that it is valid file path and return accordingly + if os.path.isfile(clean_data_path): + return clean_data_path + else: + return None + + def _set_manifest_row_and_table_name(self, unique_data_id): + """ + Ensures that whenever the mediator is coordinating activities across + colleagues it is updated with the correct manifest row and table name + for the current state. This is primarily to avoid passing the current + manifest row across objects and communicate amongst colleagues + primarily with respect to unique_data_id of interest. + + The aim here is clarity from a user's perspective and to ease + debugging. The manifest object is not intuitive to human reader but + the string representation of the unique_data_id is clear to a + person. Since there a single row for each unique data id, + this seems like the better data type to use for communicating + across colleague objects. + + :param unique_data_id: the unique data id that is currently of focus + """ + self._manifest_row = self._manifest.get_manifest_row(unique_data_id) + if self._manifest_row is not None: + self._table_name = self._manifest_row['destination_table'] + else: + self._table_name = None + + def reset_id_map_set_use(self): + self._id_map_used_set = set() + + ############################### + # instance methods for coordinating tasks across other objects + ############################### + # download new raw data + def download_api_raw_data(self, unique_data_id): + # remap if dchousing or dhcd + id_map = { + 'dchousing_project': 'dchousing', + 'dchousing_subsidy': 'dchousing', + 'dchousing_addre': 'dchousing', + 'dhcd_dfd_properties_project': 'dhcd_dfd_properties', + 'dhcd_dfd_properties_subsidy': 'dhcd_dfd_properties', + 'dhcd_dfd_properties_addre': 'dhcd_dfd_properties' + } + + data_id = id_map.get(unique_data_id, unique_data_id) + + # check whether api call has been made within same iteration of + # load_raw_data call, if so, don't download again + if data_id in self._id_map_used_set: + return True + + # get download new raw data from api call + processed = self._get_api_data.get_files_by_data_ids([data_id]) + + # return outcome of attempt to download from api + if data_id in processed: + self._id_map_used_set.add(data_id) + return True + return False + + # process all dependents recursively + def load_dependents_workflow(self, unique_data_id): + processed = list() + dependents_queue = list() + dependents_queue.extend( + self._manifest.get_dependent_data_ids(unique_data_id)) + + # iterate until no more dependent data ids + while dependents_queue: + data_id = dependents_queue.pop(0) + success = self.load_unique_data_id(data_id, download_api_data=True) + + if success: + processed.append(data_id) + + # add new dependents - assuming manifest doesn't have + # circular dependencies + dependents = self._manifest.get_dependent_data_ids(data_id) + for dependent in dependents: + if dependent not in dependents_queue: + dependents_queue.append(dependent) + + return processed + + # load to database + def load_unique_data_id(self, unique_data_id, download_api_data=False): + # don't proceed any further if invalid unique_data_id + if not self._check_unique_data_id(unique_data_id): + return False + + updated_from_api = self._manifest_row['update_method'] == 'api' + + # TODO - treat prescat as api once S3 bucket available? + # if requested get new raw data only if update_mode is api + if download_api_data and updated_from_api: + success = self.download_api_raw_data(unique_data_id) + + # log failure and proceed accordingly + if not success: + logger.error('Download api request failed for %s data! ' + 'Using existing raw data file!' + % unique_data_id) + if self._debug: + raise FileNotFoundError('Download api request failed ' + 'for %s data!' % unique_data_id) + + clean_data_path = self.process_and_clean_raw_data(unique_data_id) + + if clean_data_path is None: # failed so do non't try to write to db + return False + + # return result of attempting to write clean psv to db + return self.write_file_to_db(unique_data_id, clean_data_path) + + # update manifest with new paths + def update_manifest_with_new_path(self): + time_stamp = datetime.now().strftime('%Y%m%d') + + # use correct root folder for raw folder path + if self._database_choice == 'remote_database': + folder = 'https://s3.amazonaws.com/housinginsights' + else: + folder = os.path.join(PYTHON_PATH, os.pardir, 'data') + date_stamped_folder = os.path.join(folder, 'raw', '_downloads', + time_stamp) + try: + self._manifest.update_manifest( + date_stamped_folder=date_stamped_folder) + logger.info("Manifest updated at %s", date_stamped_folder) + except Exception as e: + logger.error("Failed to update manifest with error %s", e) + if self._debug: + raise e + + # process and clean raw data + def process_and_clean_raw_data(self, unique_data_id): + """ + Processes and cleans the raw data file for the passed unique_data_id. + + Once processed and clean, the resulting clean_unique_dat_id.psv file + path is returned as string. + + :param unique_data_id: the unique data id for the raw data file to be + processed and cleaned + :return: string representation of the path for the + clean_unique_data_id.psv + """ + + # validate unique_data_id before processing + if not self._check_unique_data_id(unique_data_id): + return None + + # initialize objects needed for the cleaning process + clean_data_path = os.path.abspath( + os.path.join(CLEAN_PSV_PATH, + 'clean_{}.psv'.format(unique_data_id))) + raw_data_reader = DataReader(meta=self._meta.get_meta(), + manifest_row=self._manifest_row) + clean_data_writer = CSVWriter(meta=self._meta.get_meta(), + manifest_row=self._manifest_row, + filename=clean_data_path) + cleaner = self._meta.get_cleaner_from_name( + manifest_row=self._manifest_row, engine=self._engine) + + # clean the file and save the output to a local pipe-delimited file + if raw_data_reader.should_file_be_loaded(): + logger.info(" Cleaning %s..." % unique_data_id) + start_time = time() + meta_only_fields = self._meta.get_meta_only_fields( + table_name=self._table_name, data_fields=raw_data_reader.keys) + total_rows = len(raw_data_reader) + for idx, data_row in enumerate(raw_data_reader): + if idx % 100 == 0: + print(" on row ~{} of {}".format(idx, total_rows), + end='\r', flush=True) + + try: + data_row.update(meta_only_fields) # insert other field dict + clean_data_row = cleaner.clean(data_row, idx) + if clean_data_row is not None: + clean_data_writer.write(clean_data_row) + except Exception as e: + logger.error("Error when trying to clean row index {} from" + " the manifest_row {}".format( + idx, self._manifest_row)) + if self._debug is True: + raise e + + clean_data_writer.close() + end_time = time() + print("\nRun time= %s" % (end_time - start_time)) + + return clean_data_path + + # write file to db + def write_file_to_db(self, unique_data_id, clean_data_path): + # update instance with correct self._manifest_row and self._table_name + self._set_manifest_row_and_table_name(unique_data_id) + + # check and ensure sql_manifest exists before attempting to write to db + if not self._sql_manifest_in_db: + self._sql_manifest_in_db = \ + self._manifest.check_or_create_sql_manifest(self._engine) + + # remove existing data + result = self._hi_sql.remove_existing_data(self._manifest_row, + self._engine) + + # remove table if necessary + if result is not None: + _ = self._hi_sql.remove_table_if_empty(self._manifest_row, + self._engine) + + # TODO - a better long-term solution to this might be SQLAlchemy metadata: http://www.mapfish.org/doc/tutorials/sqlalchemy.html + # create table if it doesn't exist + if self._hi_sql.does_table_exist(self._table_name, self._engine): + logger.info("Did not create table because it already exists") + else: + sql_fields, sql_field_types = \ + self._meta.get_sql_fields_and_type_from_meta(self._table_name) + self._hi_sql.create_table(self._table_name, sql_fields, + sql_field_types, self._engine) + + # load clean file to database + return self._hi_sql.write_file_to_sql(self._table_name, + clean_data_path, self._engine) + + # create zone_facts table + def create_zone_facts_table(self): + """ + Creates the zone_facts table which is used as a master table for API + endpoints. The purpose is to avoid recalculating fields adhoc and + performing client-side reconfiguration of data into display fields. + This is all now done in the backend whenever new data is loaded. + """ + + try: + # drop zone_facts table if already in db + if 'zone_facts' in self._engine.table_names(): + with self._engine.connect() as conn: + conn.execute('DROP TABLE zone_facts;') + + # create empty zone_facts table + self._table_name = 'zone_facts' + self._manifest_row = None + sql_fields, sql_field_types = \ + self._meta.get_sql_fields_and_type_from_meta(self._table_name) + self._hi_sql.create_table(self._table_name, sql_fields, + sql_field_types, self._engine) + + except Exception as e: + logger.error("Failed to create zone_facts table") + if self._debug: + raise e + + # remove tables for db + def remove_tables(self, tables_list): + """ + Used when you want to update only part of the database. Drops the table + and deletes all associated rows in the sql manifest. Run this before + updating data in tables that have added or removed columns. + + tables: a list of table names to be deleted + """ + if 'all' in tables_list: + self._drop_tables() + logger.info('Dropped all tables from database') + self._sql_manifest_in_db = False + else: + for table in tables_list: + try: + logger.info("Dropping the {} table and all associated " + "manifest rows".format(table)) + # Delete all the relevant rows of the sql manifest table + q = "SELECT DISTINCT unique_data_id FROM {}".format(table) + with self._engine.connect() as conn: + proxy = conn.execute(q) + results = proxy.fetchall() + for row in results: + q = "DELETE FROM manifest " \ + "WHERE unique_data_id = '{}'".format(row[0]) + conn.execute(q) + + # And drop the table itself + q = "DROP TABLE {}".format(table) + conn.execute(q) + + logger.info("Dropping table {} was successful".format(table)) + + except ProgrammingError as e: + logger.error("Couldn't remove table {}".format(table)) + if self._debug: + raise e + + def _drop_tables(self): + """ + Returns the outcome of dropping all the tables from the + database_choice and then rebuilding. + """ + logger.info("Dropping all tables from the database!") + with self._engine.connect() as conn: + query_result = list() + query_result.append(conn.execute( + "DROP SCHEMA public CASCADE;CREATE SCHEMA public;")) + + if self._database_choice == 'remote_database' or \ + self._database_choice == 'remote_database_master': + query_result.append(conn.execute(''' + GRANT ALL PRIVILEGES ON SCHEMA public TO housingcrud; + GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO housingcrud; + GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO housingcrud; + GRANT ALL ON SCHEMA public TO public; + ''')) + return query_result + + def _check_unique_data_id(self, unique_data_id): + # set self._manifest_row and self._table_name instance variables + self._set_manifest_row_and_table_name(unique_data_id) + + # validate resulting manifest_row + if self._manifest_row is None: + logger.error('"{}" is not in manifest.csv!'.format(unique_data_id)) + if self._debug: + raise ValueError('"{}" is not a valid unique_data_id!'.format( + unique_data_id)) + else: + return False + else: + return True diff --git a/python/scripts/get_api_data.py b/python/scripts/get_api_data.py deleted file mode 100644 index b1d71ba5..00000000 --- a/python/scripts/get_api_data.py +++ /dev/null @@ -1,169 +0,0 @@ -""" -get_api_data.py provides command line convenience access to the modules in the housinginsights.sources -directory. - -Every API class should implement a few key features -""" - - -import sys -import os -import importlib -from datetime import datetime -import argparse - - -python_filepath = os.path.abspath(os.path.join(os.path.dirname(__file__), - os.pardir)) -sys.path.append(python_filepath) - -# Configure logging -import logging -from housinginsights.tools.logger import HILogger -logger = HILogger(name=__file__, logfile="sources.log", level=logging.INFO) - -#TODO is this import necessary? -from housinginsights.config.base import HousingInsightsConfig -from housinginsights.ingestion.Manifest import Manifest - - -def get_multiple_api_sources(a): - ''' - This method calls the 'get_data' method on each ApiConn class in the /sources folder - a = an arguments object from argparse - - a.ids: list of unique data ids. Passing 'None' to unique_data_ids will run all get_data methods. - a.sample: when possible, download just a few lines (for faster testing) - a.database: the database choice, such as 'docker_database', as identified in the secrets.json. - a.debug: if True exceptions will be raised. if False, they will be printed but processing will continue. - a.modules: in addition to the unique_data_ids (which are passed directly to the ApiConn.get_data() method), you can choose to - only run listed modules. - - ''' - #Turn the arguments into individual items - - #TODO should update the secrets.json keys to make them simpler so that this mapping is irrelevant - database_map = {'docker':'docker_database', - 'docker_local':'docker_with_local_python', - 'codefordc':'codefordc_remote_admin', - 'local':'local_database' - } - - db = database_map[a.database] - unique_data_ids=a.ids - sample = a.sample - output_type = 'csv' #deprecated, should get rid of stdout option w/in the modules and always use csv - debug = a.debug - module_list = a.modules - - - - logger.info("Starting get_multiple_api_sources.") - API_FOLDER = 'housinginsights.sources' - - # All possible source modules and classes as key:value of module:classname - modules = { - 'opendata': 'OpenDataApiConn', - 'DCHousing': 'DCHousingApiConn', - 'dhcd': 'DhcdApiConn', - 'census': 'CensusApiConn', - 'wmata_distcalc': 'WmataApiConn', - 'prescat': 'PrescatApiConn' - } - - # If no module list is provided, use them all - if module_list is None: - module_list = list(modules.keys()) - - for m in module_list: - try: - logger.info("Processing %s module with class %s", m, modules[m]) - module_name = API_FOLDER + '.' + m - module = importlib.import_module(module_name) - - class_name = modules[m] - api_class = getattr(module, class_name) - - api_instance = api_class(database_choice=db, debug=debug) - api_method = getattr(api_instance, 'get_data') # Every class should have a get_data method! - - # Get the data - api_method(unique_data_ids, sample, output_type, db=db) #TODO refactor all the methods that need db to instead use self.engine() created in __init__(see base_project for example) - - except Exception as e: - logger.error("The request for '%s' failed with error: %s", m, e) - - if debug: - raise e - - # update the manifest - manifest = Manifest(os.path.abspath(os.path.join( - python_filepath, 'scripts', 'manifest.csv'))) - d = datetime.now().strftime('%Y%m%d') - - # use correct root folder for raw folder path - if db == 'remote_database': - folder = 'https://s3.amazonaws.com/housinginsights' - else: - folder = os.path.join(python_filepath, os.pardir, 'data') - date_stamped_folder = os.path.join(folder, 'raw', '_downloads', d) - try: - manifest.update_manifest(date_stamped_folder=date_stamped_folder) - logger.info("Manifest updated at %s", date_stamped_folder) - except Exception as e: - logger.error("Failed to update manifest with error %s", e) - logger.info("Completed get_multiple_api_sources.") - - - -#Add a command line argument parser -description = ("""Downloads data from the various sources that are used in Housing Insights. - """) -parser = argparse.ArgumentParser(description=description) -parser.add_argument("database", help="""which database we should connect to - when using existing data as part of the download process""", - choices=['docker', 'docker_local', 'local', 'codefordc']) - -parser.add_argument('-s', '--sample', help="""Only download a sample of data. Used - for testing, but doesn't work for most sources""", - action='store_true') - -parser.add_argument('--ids', nargs='+', - help='Only download these unique data_ids', - choices = ['tax', - 'building_permits_2013','building_permits_2014', - 'building_permits_2015','building_permits_2016', - 'building_permits_2017', - 'crime_2013','crime_2014','crime_2015', - 'crime_2016','crime_2017', - 'mar', - 'dchousing', - 'dhcd_dfd_projects', 'dhcd_dfd_properties', - 'acs5_2009','acs5_2010','acs5_2011','acs5_2012', - 'acs5_2013','acs5_2014','acs5_2015', - 'acs5_2009_moe','acs5_2010_moe','acs5_2011_moe','acs5_2012_moe', - 'acs5_2013_moe','acs5_2014_moe','acs5_2015_moe', - 'wmata_stops','wmata_dist' - ]) - -parser.add_argument('--modules', nargs='+', - help='Only download from these modules', - #TODO make choices list pull from the keys of 'modules' var above - choices = ['opendata', - 'DCHousing', - 'dhcd', - 'census', - 'wmata_distcalc', - 'prescat' - ]) - -parser.add_argument ('--debug',action='store_true', - help="Pass this flag to use debug mode, where errors are raised as they occur") - - - -if __name__ == '__main__': - logger.info("running get api data") - - a = parser.parse_args() - get_multiple_api_sources(a) diff --git a/python/scripts/manifest.csv b/python/scripts/manifest.csv index 1d7a78e1..c086089b 100644 --- a/python/scripts/manifest.csv +++ b/python/scripts/manifest.csv @@ -1,32 +1,32 @@ -include_flag,destination_table,unique_data_id,update_method,data_date,encoding,local_folder,s3_folder,filepath,notes -use,mar,mar,api,2017-09-20,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20170920/mar.csv,BUILD FIRST - other tables depend on it -use,project,prescat_project,manual,2017-07-24,latin-1,../../../data,https://s3.amazonaws.com/housing-insights/,raw/preservation_catalog/20170830/Project.csv, -use,subsidy,prescat_subsidy,manual,2017-07-24,latin-1,../../../data,https://s3.amazonaws.com/housing-insights/,raw/preservation_catalog/20170830/Subsidy.csv, -use,proj_addre,prescat_addre,api,2017-08-27,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/preservation_catalog/20170830/Building_geocode.csv, -use,reac_score,prescat_reac,manual,2017-07-24,latin-1,../../../data,https://s3.amazonaws.com/housing-insights/,raw/preservation_catalog/20170830/Reac_score.csv, -use,real_property,prescat_real_property,manual,2017-07-24,latin-1,../../../data,https://s3.amazonaws.com/housing-insights/,raw/preservation_catalog/20170830/Real_property.csv, -use,parcel,prescat_parcel,manual,2017-03-15,latin-1,../../../data,https://s3.amazonaws.com/housing-insights/,raw/preservation_catalog/20170830/Parcel.csv, -use,project,dchousing_project,api,2017-09-20,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20170920/dchousing_project.csv, -use,subsidy,dchousing_subsidy,api,2017-09-20,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20170920/dchousing_subsidy.csv, -use,proj_addre,dchousing_addre,api,2017-09-20,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20170920/dchousing_addre.csv, -use,project,dhcd_dfd_properties_project,api,2017-09-20,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20170920/dhcd_dfd_properties_project.csv, -use,subsidy,dhcd_dfd_properties_subsidy,api,2017-09-20,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20170920/dhcd_dfd_properties_subsidy.csv, -use,proj_addre,dhcd_dfd_properties_addre,api,2017-09-20,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20170920/dhcd_dfd_properties_addre.csv, -skip,building_permits,building_permits_2013,api,2017-09-20,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20170920/building_permits_2013.csv, -skip,building_permits,building_permits_2014,api,2017-09-20,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20170920/building_permits_2014.csv, -skip,building_permits,building_permits_2015,api,2017-09-20,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20170920/building_permits_2015.csv, -use,building_permits,building_permits_2016,api,2017-09-20,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20170920/building_permits_2016.csv, -use,building_permits,building_permits_2017,api,2017-09-20,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20170920/building_permits_2017.csv, -use,dc_tax,tax,api,2017-09-20,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20170920/tax.csv, -skip,hmda,hmda_all_dc,manual,2017-04-06,latin-1,../../../data,https://s3.amazonaws.com/housing-insights/,raw/hmda/DC_2007_to_2015/hmda_lar.csv,"Downloaded from www.consumerfinance.org, reflects all DC HMDA records from 2007-2015" -use,topa,topa_rcasd_2017,manual,2017-05-20,latin-1,../../../data,https://s3.amazonaws.com/housing-insights/,raw/TOPA_notices/20170725/Rcasd_2017.csv,Parsed from DCHD data by Urban Institute staff. -use,topa,topa_rcasd_2016,manual,2017-05-20,latin-1,../../../data,https://s3.amazonaws.com/housing-insights/,raw/TOPA_notices/20170725/Rcasd_2016.csv,Parsed from DCHD data by Urban Institute staff. -use,topa,topa_rcasd_2015,manual,2017-05-20,latin-1,../../../data,https://s3.amazonaws.com/housing-insights/,raw/TOPA_notices/20170725/Rcasd_2015.csv,Parsed from DCHD data by Urban Institute staff. -use,census,acs5_2015,api,2017-09-20,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20170920/acs5_2015.csv, -use,census_tract_to_neighborhood_cluster,tract2010_cluster2000,manual,2013-07-01,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/geographic_data/Tract_weights/Wt_tr10_cltr00.csv, -use,census_tract_to_ward,tract2010_ward2012,manual,2013-07-01,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/geographic_data/Tract_weights/Wt_tr10_ward12.csv, -skip,crime,crime_2015,api,2017-09-20,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20170920/crime_2015.csv, -use,crime,crime_2016,api,2017-09-20,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20170920/crime_2016.csv, -use,crime,crime_2017,api,2017-09-20,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20170920/crime_2017.csv, -use,wmata_dist,wmata_dist,api,2017-05-24,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/wmata/20170215/dist_from_sql_with_lat.csv, -use,wmata_info,wmata_stops,api,2017-05-24,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/apis/20170524/wmata_stops.csv, +include_flag,destination_table,unique_data_id,update_method,data_date,encoding,local_folder,s3_folder,filepath,dependency,notes +use,mar,mar,api,2017-10-24,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20171024/mar.csv,,BUILD FIRST - other tables depend on it +use,project,prescat_project,manual,2017-07-24,latin-1,../../../data,https://s3.amazonaws.com/housing-insights/,raw/preservation_catalog/20170830/Project.csv,mar, +use,subsidy,prescat_subsidy,manual,2017-07-24,latin-1,../../../data,https://s3.amazonaws.com/housing-insights/,raw/preservation_catalog/20170830/Subsidy.csv,mar, +use,proj_addre,prescat_addre,manual,2017-08-27,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/preservation_catalog/20170830/Building_geocode.csv,mar, +use,reac_score,prescat_reac,manual,2017-07-24,latin-1,../../../data,https://s3.amazonaws.com/housing-insights/,raw/preservation_catalog/20170830/Reac_score.csv,, +use,real_property,prescat_real_property,manual,2017-07-24,latin-1,../../../data,https://s3.amazonaws.com/housing-insights/,raw/preservation_catalog/20170830/Real_property.csv,, +use,parcel,prescat_parcel,manual,2017-03-15,latin-1,../../../data,https://s3.amazonaws.com/housing-insights/,raw/preservation_catalog/20170830/Parcel.csv,prescat_addre, +use,project,dchousing_project,api,2017-10-24,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20171024/dchousing_project.csv,prescat_addre, +use,subsidy,dchousing_subsidy,api,2017-10-24,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20171024/dchousing_subsidy.csv,prescat_addre, +use,proj_addre,dchousing_addre,api,2017-10-24,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20171024/dchousing_addre.csv,prescat_addre, +use,project,dhcd_dfd_properties_project,api,2017-10-24,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20171024/dhcd_dfd_properties_project.csv,dchousing_addre, +use,subsidy,dhcd_dfd_properties_subsidy,api,2017-10-24,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20171024/dhcd_dfd_properties_subsidy.csv,dchousing_addre, +use,proj_addre,dhcd_dfd_properties_addre,api,2017-10-24,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20171024/dhcd_dfd_properties_addre.csv,dchousing_addre, +skip,building_permits,building_permits_2013,api,2017-10-24,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20171024/building_permits_2013.csv,, +skip,building_permits,building_permits_2014,api,2017-10-24,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20171024/building_permits_2014.csv,, +skip,building_permits,building_permits_2015,api,2017-10-24,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20171024/building_permits_2015.csv,, +use,building_permits,building_permits_2016,api,2017-10-24,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20171024/building_permits_2016.csv,, +use,building_permits,building_permits_2017,api,2017-10-24,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20171024/building_permits_2017.csv,, +use,dc_tax,tax,api,2017-10-24,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20171024/tax.csv,prescat_addre, +skip,hmda,hmda_all_dc,manual,2017-04-06,latin-1,../../../data,https://s3.amazonaws.com/housing-insights/,raw/hmda/DC_2007_to_2015/hmda_lar.csv,"Downloaded from www.consumerfinance.org,, reflects all DC HMDA records from 2007-2015", +use,topa,topa_rcasd_2017,manual,2017-05-20,latin-1,../../../data,https://s3.amazonaws.com/housing-insights/,raw/TOPA_notices/20170725/Rcasd_2017.csv,,Parsed from DCHD data by Urban Institute staff. +use,topa,topa_rcasd_2016,manual,2017-05-20,latin-1,../../../data,https://s3.amazonaws.com/housing-insights/,raw/TOPA_notices/20170725/Rcasd_2016.csv,,Parsed from DCHD data by Urban Institute staff. +use,topa,topa_rcasd_2015,manual,2017-05-20,latin-1,../../../data,https://s3.amazonaws.com/housing-insights/,raw/TOPA_notices/20170725/Rcasd_2015.csv,,Parsed from DCHD data by Urban Institute staff. +use,census,acs5_2015,api,2017-10-08,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20171008/acs5_2015.csv,, +use,census_tract_to_neighborhood_cluster,tract2010_cluster2000,manual,2013-07-01,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/geographic_data/Tract_weights/Wt_tr10_cltr00.csv,, +use,census_tract_to_ward,tract2010_ward2012,manual,2013-07-01,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/geographic_data/Tract_weights/Wt_tr10_ward12.csv,, +skip,crime,crime_2015,api,2017-10-24,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20171024/crime_2015.csv,, +use,crime,crime_2016,api,2017-10-24,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20171024/crime_2016.csv,, +use,crime,crime_2017,api,2017-10-24,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/_downloads/20171024/crime_2017.csv,, +use,wmata_dist,wmata_dist,api,2017-05-24,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/wmata/20170215/dist_from_sql_with_lat.csv,, +use,wmata_info,wmata_stops,api,2017-05-24,utf-8,../../../data,https://s3.amazonaws.com/housing-insights/,raw/apis/20170524/wmata_stops.csv,, diff --git a/python/scripts/services.py b/python/scripts/services.py index 5d6d55d4..836954ad 100644 --- a/python/scripts/services.py +++ b/python/scripts/services.py @@ -1,57 +1,140 @@ -import os, sys -python_filepath = os.path.abspath(os.path.join(os.path.dirname(__file__), - os.pardir)) -sys.path.append(python_filepath) +""" +services.py module is a scripting module used for user and server to +interface with ingestion_mediator and respective colleague modules used for +our ingestion workflow. +""" -import argparse +import os +import sys +import argparse + +PYTHON_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir)) +SCRIPTS_PATH = os.path.abspath(os.path.join(PYTHON_PATH, 'scripts')) +sys.path.append(PYTHON_PATH) from housinginsights.tools.logger import HILogger -import get_api_data -import load_data from housinginsights.tools.mailer import HIMailer - -loggers = [ HILogger(name=__file__, logfile="services.log", level=10), - HILogger(name=__file__, logfile="sources.log", level=10), - HILogger(name=__file__, logfile="ingestion.log", level=10) - ] +# TODO: move logger configuration and management into IngestionMediator +loggers = [HILogger(name=__file__, logfile="services.log", level=10), + HILogger(name=__file__, logfile="sources.log", level=10), + HILogger(name=__file__, logfile="ingestion.log", level=10) + ] logger = loggers[0] -def run_get_api_data(debug=False): - # TODO Figure out which parameters should be passed for this to run as a service. - try: - get_api_data.get_multiple_api_sources(db='docker_database') - except Exception as e: - logger.error("get_api_data failed with error: %s", e) - if debug: - raise e - finally: - get_api_data.send_log_file_to_admin(debug=debug) + +from housinginsights.tools.ingestion_mediator import IngestionMediator +from housinginsights.ingestion.LoadData import LoadData +from housinginsights.ingestion.Meta import Meta +from housinginsights.ingestion.Manifest import Manifest +from housinginsights.ingestion.SQLWriter import HISql +from housinginsights.ingestion.GetApiData import GetApiData + +############################### +# Instantiate global colleague objects needed for ingestion scripting +############################### +_load_data = LoadData() +# currently safe to assume path for manifest will be unchanged +_manifest = Manifest(os.path.abspath(os.path.join(SCRIPTS_PATH, + 'manifest.csv'))) +_meta = Meta() +_hisql = HISql() +_get_api_api = GetApiData() +_mediator = None + + +def configure_ingestion_mediator(database_choice=None, + debug=False): + # initialize instance of ingestion mediator + _mediator = IngestionMediator(database_choice=database_choice, + debug=debug) + + # connect ingestion mediator instance to its colleague instances + _mediator.set_load_data(_load_data) + _mediator.set_manifest(_manifest) + _mediator.set_meta(_meta) + _mediator.set_hi_sql(_hisql) + _mediator.set_get_api_data(_get_api_api) + + # connect colleague instances to this ingestion mediator instance + _load_data.set_ingestion_mediator(_mediator) + _manifest.set_ingestion_mediator(_mediator) + _meta.set_ingestion_mediator(_mediator) + _hisql.set_ingestion_mediator(_mediator) + _get_api_api.set_ingestion_mediator(_mediator) + + +def weekly_update(database_choice=None, drop_tables_first=False): + if _mediator is None: + configure_ingestion_mediator(database_choice) + + return _load_data.reload_all_from_manifest(use_clean=False, + drop_tables=drop_tables_first, + load_dependents=True) + + +def load_db_with_raw_data(unique_data_id_list, download_api_data=False, + load_dependents=False, database_choice=None): + if _mediator is None: + configure_ingestion_mediator(database_choice) + + return _load_data.load_raw_data(unique_data_id_list=unique_data_id_list, + download_api_data=download_api_data, + load_dependents=load_dependents) + + +def load_db_with_cleaned_data(unique_data_id_list, use_raw_if_missing=True, + database_choice=None): + if _mediator is None: + configure_ingestion_mediator(database_choice) + + return _load_data.load_cleaned_data(unique_data_id_list=unique_data_id_list, + use_raw_if_missing=use_raw_if_missing) + + +def load_db_from_manifest(use_clean=True, use_raw_if_missing=True, + drop_tables=False, download_api_data=False, + load_dependents=False, database_choice=None): + if _mediator is None: + configure_ingestion_mediator(database_choice) + + return _load_data.reload_all_from_manifest( + use_clean=use_clean, use_raw_if_missing=use_raw_if_missing, + drop_tables=drop_tables, download_api_data=download_api_data, + load_dependents=load_dependents) + + +def recalculate(database_choice=None): + if _mediator is None: + configure_ingestion_mediator(database_choice) + + _load_data.recalculate_database() + def send_log_file_to_admin(debug=True): """ - At conclusion of process, send log file by email to admin and delete or archive from server. - + At conclusion of process, send log file by email to admin and delete or + archive from server. Currently very much a WIP demo, with some things partially implemented - """ #TODO!!! make this count all 3 files - level_counts = get_log_level_counts(loggers[2].logfile) + level_counts = _get_log_level_counts(loggers[2].logfile) error_count = level_counts.get('ERROR', 0) email = HIMailer(debug_mode=debug) # email.recipients.append('neal@nhumphrey.com') email.subject = "get_api_data logs, completed with {} errors".format(error_count) email.message = "See attached for the logs from the weekly_update(). Log counts by level are as follows: {}".format(level_counts) - + #add attachments - TODO not working! attachments = [] for l in loggers: attachments.append(l.logfile) - #TODO the HIMailer can properly attach one file but if attaching more than one they come through corrupted. + #TODO the HIMailer can properly attach one file but if attaching more than one they come through corrupted. email.attachments = [loggers[2].logfile] email.send_email() @@ -60,7 +143,7 @@ def send_log_file_to_admin(debug=True): # os.unlink(logger.logfile) -def get_log_level_counts(logfile): +def _get_log_level_counts(logfile): with open(logfile) as log: logdata = log.readlines() level_counts = {} @@ -73,115 +156,107 @@ def get_log_level_counts(logfile): return level_counts - -def weekly_update(db_choice, drop_tables_first = False): - #TODO should update the secrets.json keys to make them simpler so that this mapping is irrelevant - - send_log = True - debug = True - - - #Run the jobs - try: - if drop_tables_first: - remove_table_flag = '--remove-tables' - tables_to_remove = 'all' - else: - remove_table_flag = '' - tables_to_remove = '' - - #Get and load data in order so that we appropriately deal with duplicate records - - #Start with MAR so that we can geocode things - arguments = get_api_data.parser.parse_args([db_choice,'--modules','opendata','--ids','mar']) - get_api_data.get_multiple_api_sources(arguments) - arguments = load_data.parser.parse_args([db_choice,'--update-only','mar', '--skip-calculations' , remove_table_flag, tables_to_remove]) - load_data.main(arguments) - - - #prescat - arguments = get_api_data.parser.parse_args([db_choice,'--modules','prescat']) - get_api_data.get_multiple_api_sources(arguments) - arguments = load_data.parser.parse_args([db_choice,'--update-only','prescat_project', - 'prescat_subsidy', - 'prescat_addre', - 'prescat_reac', - 'prescat_real_property', - 'prescat_parcel', - - '--skip-calculations' ]) - load_data.main(arguments) - - - #then DHCD since it has better data when duplicate entries appear in DCHousing - arguments = get_api_data.parser.parse_args([db_choice,'--modules','dhcd']) - get_api_data.get_multiple_api_sources(arguments) - arguments = load_data.parser.parse_args([db_choice,'--update-only','dhcd_dfd_properties_project', - 'dhcd_dfd_properties_subsidy', - 'dhcd_dfd_properties_addre', - '--skip-calculations' ]) - load_data.main(arguments) - - - #Then DCHousing - arguments = get_api_data.parser.parse_args([db_choice,'--modules','DCHousing']) - get_api_data.get_multiple_api_sources(arguments) - arguments = load_data.parser.parse_args([db_choice,'--update-only','dchousing_project', - 'dchousing_subsidy', - 'dchousing_addre', - '--skip-calculations']) - load_data.main(arguments) - - - #Then everything else - #TODO it's a little bit clunky to do it this way but to do "everything else" we'd need to modify load_data to accept a negative list - arguments = get_api_data.parser.parse_args([db_choice,'--modules', - 'opendata', - #TODO temporarily skipped because it's slow: 'wmata_distcalc', - 'census']) - - get_api_data.get_multiple_api_sources(arguments) - arguments = load_data.parser.parse_args([db_choice,'--update-only', - 'tract2010_ward2012', - 'tract2010_cluster2000', - 'tax', - #'hmda_all_dc', - 'topa_rcasd_2017', - 'topa_rcasd_2016', - 'topa_rcasd_2015', - 'building_permits_2016', - 'building_permits_2017', - 'crime_2016','crime_2017', - 'acs5_2015', - 'wmata_stops', - 'wmata_dist' - ]) - load_data.main(arguments) - - - - except Exception as e: - logger.error("Weekly update failed with error: %s", e) - if debug: - raise e - - finally: - if send_log: - send_log_file_to_admin(debug=debug) - +def main(args): + """ + Passes command line arguments and options to respective service methods and + initializes ingestion module class accordingly. + """ + # for case of more than one database choice default to the option with + # the lowest risk if database is updated + if args.database == 'local': + database_choice = 'local_database' + + elif args.database == 'docker_local': + database_choice = 'docker_with_local_python' + + elif args.database == 'codefordc': + database_choice = 'codefordc_remote_admin' + + # docker is default + else: + database_choice = 'docker_database' + + # initialize the ingestion mediator class accordingly + configure_ingestion_mediator(database_choice=database_choice, + debug=args.debug) + + # run requested method along with optional args + if args.service == 'weekly_update': + weekly_update(database_choice=database_choice, + drop_tables_first=args.drop_tables) + + if args.service == 'load_from_raw': + load_db_with_raw_data(unique_data_id_list=args.uid, + download_api_data=args.get_api_data, + load_dependents=args.load_dependents, + database_choice=database_choice) + + if args.service == 'load_from_clean': + load_db_with_cleaned_data(unique_data_id_list=args.uid, + use_raw_if_missing=args.use_raw_if_missing, + database_choice=database_choice) + + if args.service == 'load_from_manifest': + use_clean = not args.use_raw + load_db_from_manifest(use_clean=use_clean, + use_raw_if_missing=args.use_raw_if_missing, + drop_tables=args.drop_tables, + download_api_data=args.get_api_data, + load_dependents=args.load_dependents, + database_choice=database_choice) + + if args.service == 'send_log_file': + send_log_file_to_admin(debug=args.debug) + + # handle args related to recalculating zone_facts table + if args.service == 'recalculate_only' or not args.skip_calculations: + recalculate(database_choice=database_choice) if __name__ == '__main__': - services_parser = argparse.ArgumentParser("Services.py for running the weekly update job") - services_parser.add_argument("database", help="""which database we should connect to - when using existing data as part of the download process""", - choices=['docker', 'docker_local', 'local', 'codefordc']) - - services_parser.add_argument('--drop-all', help="drop all tables before starting the update", - action='store_true') - - - - services_arguments = services_parser.parse_args() - - weekly_update(services_arguments.database, services_arguments.drop_all) \ No newline at end of file + description = ('Scripting module for interfacing with ingestion mediator ' + 'for getting raw data from api and loading into db and ' + 'and other additional tools.') + parser = argparse.ArgumentParser(description=description) + parser.add_argument("database", help='which database the data should be ' + 'loaded to', + choices=['docker', 'docker_local', 'local', + 'codefordc'], default='docker') + parser.add_argument("service", help='which service method to run', + choices=['weekly_update', 'load_from_raw', + 'load_from_clean', 'load_from_manifest', + 'send_log_file', 'recalculate_only'], + default='load_from_clean') + parser.add_argument('--uid', nargs='+', + help='unique data ids that should be loaded into db') + parser.add_argument('--use-raw-if-missing', help='use clean psv but if ' + 'missing use raw data to ' + 'update db', + action='store_true', default=False) + parser.add_argument('--get-api-data', + help='make an api request for new data if ' + 'data_method = api', + action='store_true', default=False) + parser.add_argument('--load-dependents', + help='determine whether to automatically process ' + 'dependent data ids next', + action='store_true', default=False) + parser.add_argument('--use-raw', + help='for load_from_manifest, flag that determines ' + 'whether to load from clean psv file or use ' + 'raw data file', + action='store_true', default=False) + parser.add_argument('--drop-tables', nargs='+', + help='drops tables before running the load data code. ' + ' Add the name of each table to drop in format ' + '"table1 table2" If you want to drop all tables,' + ' use the keyword "all"') + parser.add_argument('--debug', action='store_true', default=False, + help="raise exceptions as they occur") + parser.add_argument('--skip-calculations', action='store_true', + default=False, + help="don't do any calculations") + parser.add_argument('-s', '--sample', help='load with sample data', + action='store_true') + + main(parser.parse_args()) diff --git a/python/tests/test_ingestion_mediator.py b/python/tests/test_ingestion_mediator.py new file mode 100644 index 00000000..e502e22a --- /dev/null +++ b/python/tests/test_ingestion_mediator.py @@ -0,0 +1,257 @@ +import unittest +import os +import sys + +# relative package import for when running as a script +PYTHON_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir)) +sys.path.append(PYTHON_PATH) +SCRIPTS_PATH = os.path.abspath(os.path.join(PYTHON_PATH, 'scripts')) + +# app imports +from housinginsights.tools.ingestion_mediator import IngestionMediator +from housinginsights.ingestion.LoadData import LoadData +from housinginsights.ingestion.Manifest import Manifest +from housinginsights.ingestion.Meta import Meta +from housinginsights.ingestion.SQLWriter import HISql +from housinginsights.ingestion.GetApiData import GetApiData + + +class MyTestCase(unittest.TestCase): + def setUp(self): + manifest_path = os.path.abspath(os.path.join(SCRIPTS_PATH, + 'manifest.csv')) + # initialize mediator and colleague instances + self.load_data = LoadData() + self.mediator = IngestionMediator(debug=True) + self.manifest = Manifest(manifest_path) + self.meta = Meta() + self.hisql = HISql() + self.get_api_data = GetApiData() + + # build connection between mediator and its colleagues + self.load_data.set_ingestion_mediator(self.mediator) + self.mediator.set_load_data(self.load_data) + self.manifest.set_ingestion_mediator(self.mediator) + self.mediator.set_manifest(self.manifest) + self.meta.set_ingestion_mediator(self.mediator) + self.mediator.set_meta(self.meta) + self.hisql.set_ingestion_mediator(self.mediator) + self.mediator.set_hi_sql(self.hisql) + self.get_api_data.set_ingestion_mediator(self.mediator) + self.mediator.set_get_api_data(self.get_api_data) + + # get db engine + self.engine = self.mediator.get_engine() + + def test_get_clean_psv_path(self): + # case - invalid unique_data_id passed + result = self.mediator.get_clean_psv_path('fake') + self.assertIsNone(result, 'should return None') + + # case - valid unique_data_id passed + result = self.mediator.get_clean_psv_path('mar') + self.assertIsNotNone(result, 'should not return None') + self.assertTrue(result, 'should return a path') + self.assertTrue(os.path.exists(result), 'should return a path that ' + 'exists') + + def test_process_and_clean_raw_data(self): + # case - invalid unique_data_id passed + self.assertRaises(ValueError, self.mediator.process_and_clean_raw_data, + 'fake') + + # case - valid unique_data_id passed + result = self.mediator.process_and_clean_raw_data('mar') + self.assertTrue(result, 'should return a result') + self.assertTrue(os.path.exists(result), 'should return a path that ' + 'exists') + + # case - invalid unique_data_id passed with debug = False + self.mediator.set_debug(False) + result = self.mediator.process_and_clean_raw_data('fake') + self.assertIsNone(result, 'should return None') + + def test_LoadData_load_raw_data(self): + # # case - invalid unique_data_id passed + self.assertRaises(ValueError, self.load_data.load_raw_data, ['fake']) + + # case - pass empty unique_data_id_list + result = self.load_data.load_raw_data([]) + self.assertFalse(result, 'should return empty list') + self.assertEqual(len(result), 0, 'should return empty list') + + # case - single unique_data_id passed + result = self.load_data.load_raw_data(['mar']) + self.assertTrue(result, 'should return a non-empty list') + self.assertEqual(len(result), 1, 'should return a list with len = 1') + self.assertTrue('mar' in result, '"mar" should be only value in list') + + # case - multiple unique_data_ids passed + unique_data_id_list = ['mar', 'prescat_project', + 'dhcd_dfd_properties_addre', 'dchousing_subsidy'] + result = self.load_data.load_raw_data(unique_data_id_list) + self.assertTrue(result, 'should return a non-empty list') + self.assertEqual(len(result), len(unique_data_id_list), + 'should return a list with same length as original') + for uid in unique_data_id_list: + self.assertTrue(uid in result, + '"{}" should be value in result: {}'.format(uid, + result)) + + # case - invalid unique_data_id passed with debug=False + self.mediator.set_debug(False) + result = self.load_data.load_raw_data(['fake']) + self.assertFalse(result, 'should return empty list') + self.assertEqual(len(result), 0, 'should be empty list') + + # case - load dependents feature + self.mediator.set_debug(True) + expected = ['prescat_addre', 'dchousing_project', 'prescat_parcel', + 'dchousing_subsidy', 'dchousing_addre', 'tax', + 'dhcd_dfd_properties_project', + 'dhcd_dfd_properties_subsidy', 'dhcd_dfd_properties_addre'] + result = self.load_data.load_raw_data(['prescat_addre'], + download_api_data=True, + load_dependents=True) + self.assertTrue(result, 'should return a non-empty list') + self.assertEqual(len(result), 9, 'should return a list with len = 9') + for uid in expected: + self.assertTrue(uid in result, + '"{}" should be value in result: {}'.format( + uid, result)) + + def test_LoadData_load_cleaned_data(self): + # case - invalid unique_data_id passed + self.assertRaises(ValueError, self.load_data.load_cleaned_data, + ['fake']) + + # case - missing psv with use_raw_if_missing = False + # delete file first if already exists + psv_path = self.mediator.get_clean_psv_path('dchousing_project') + if psv_path is not None: + os.remove(psv_path) + self.assertRaises(FileNotFoundError, self.load_data.load_cleaned_data, + ['dchousing_project'], use_raw_if_missing=False) + + # case - pass empty unique_data_id_list + result = self.load_data.load_cleaned_data([]) + self.assertFalse(result, 'should return empty list') + self.assertEqual(len(result), 0, 'should return empty list') + + # case - single unique_data_id passed + result = self.load_data.load_cleaned_data(['mar']) + self.assertTrue(result, 'should return a non-empty list') + self.assertEqual(len(result), 1, 'should return a list with len = 1') + self.assertTrue('mar' in result, '"mar" should be only value is list') + + # case - multiple unique_data_ids passed + unique_data_id_list = ['mar', 'prescat_project', + 'dhcd_dfd_properties_addre', 'dchousing_subsidy'] + result = self.load_data.load_cleaned_data(unique_data_id_list) + self.assertTrue(result, 'should return a non-empty list') + self.assertEqual(len(result), len(unique_data_id_list), + 'should return a list with same length as original') + for uid in unique_data_id_list: + self.assertTrue(uid in result, + '"{}" should be only value is list'.format(uid)) + + # case - invalid unique_data_id passed with debug=False + self.mediator.set_debug(False) + result = self.load_data.load_cleaned_data(['fake']) + self.assertFalse(result, 'should return empty list') + self.assertEqual(len(result), 0, 'should be empty list') + + def test_LoadData_reload_all_from_manifest(self): + self.load_data.reload_all_from_manifest(drop_tables=True) + + # verify resulting sql_manifest post database rebuild + with self.engine.connect() as conn: + q = 'select unique_data_id, destination_table, status FROM manifest' + q_result = conn.execute(q) + result = {row[0]: { + 'destination_table': row[1], 'status': row[2]} + for row in q_result.fetchall()} + + for unique_data_id in result: + manifest_row = self.manifest.get_manifest_row(unique_data_id) + manifest_table_name = manifest_row['destination_table'] + result_table_name = result[unique_data_id]['destination_table'] + result_status = result[unique_data_id]['status'] + self.assertEqual(manifest_table_name, result_table_name, + 'should have matching table name: {} : {}'.format( + manifest_table_name, result_table_name)) + self.assertEqual('loaded', result_status, + 'should have status = loaded: actual {}'.format( + result_status)) + + def test_LoadData_recalculate_database(self): + # make sure database is populated with all tables + self.load_data.reload_all_from_manifest() + + # make sure zone_facts is not in db + if 'zone_facts' in self.engine.table_names(): + with self.engine.connect() as conn: + conn.execute('DROP TABLE zone_facts;') + + # currently no zone_facts table + result = 'zone_facts' in self.engine.table_names() + self.assertFalse(result, 'zone_facts table is not in db') + + # rebuild zone_facts table + result = self.load_data.recalculate_database() + self.assertTrue(result, 'should have loaded zone_facts table') + + # confirm zone_facts table was loaded into db + result = 'zone_facts' in self.engine.table_names() + self.assertTrue(result, 'zone_facts table is in db') + + def test_GetApiData_get_files_by_modules(self): + # case - invalid module + result = self.get_api_data.get_files_by_modules(['fake']) + self.assertFalse(result, 'should return empty result: %s' % result) + self.assertEqual(len(result), 0, 'should return empty result') + + # case - single module + result = self.get_api_data.get_files_by_modules(['DCHousing']) + self.assertTrue(result, 'should return non-empty result: %s' % result) + self.assertTrue('DCHousing' in result, 'should return DCHousing as ' + 'processed in result') + self.assertEqual(len(result), 1, 'should return a single result') + + # case - multiple modules + modules_list = ['opendata', 'dhcd'] + result = self.get_api_data.get_files_by_modules(modules_list) + self.assertTrue(result, 'should return non-empty result: %s' % result) + for mod in modules_list: + self.assertTrue(mod in result, + 'should return same values as original: ' + '{} is missing'.format(mod)) + self.assertEqual(len(result), 2, 'should return two values in result') + + def test_GetApiData_get_files_by_data_ids(self): + # case - invalid data id + result = self.get_api_data.get_files_by_data_ids(['fake']) + self.assertFalse(result, 'should return empty result: %s' % result) + self.assertEqual(len(result), 0, 'should return empty result') + + # case - single data id + result = self.get_api_data.get_files_by_data_ids(['dchousing']) + self.assertTrue(result, 'should return non-empty result: %s' % result) + self.assertTrue('dchousing' in result, 'should return dchousing as ' + 'processed in result') + self.assertEqual(len(result), 1, 'should return a single result') + + # case - multiple data ids + modules_list = ['crime_2015', 'building_permits_2016'] + result = self.get_api_data.get_files_by_data_ids(modules_list) + self.assertTrue(result, 'should return non-empty result: %s' % result) + for mod in modules_list: + self.assertTrue(mod in result, + 'should return same values as original: ' + '{} is missing'.format(mod)) + self.assertEqual(len(result), 2, 'should return two values in result') + + +if __name__ == '__main__': + unittest.main() diff --git a/python/tests/test_load_data.py b/python/tests/test_load_data.py index 9b7700da..349002e0 100644 --- a/python/tests/test_load_data.py +++ b/python/tests/test_load_data.py @@ -18,7 +18,8 @@ def setUp(self): self.database_choice = 'docker_database' self.loader = load_data.LoadData(database_choice=self.database_choice, meta_path=self.meta_path, - manifest_path=self.manifest_path) + manifest_path=self.manifest_path, + debug=True) def query_db(self, engine, query): """