Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

470 IngestionMediator class #593

Open
wants to merge 12 commits into
base: dev
Choose a base branch
from
15 changes: 13 additions & 2 deletions python/housinginsights/ingestion/CSVWriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@

logging_path = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir, os.pardir, "logs"))
# relative package import for when running as a script
PYTHON_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir, os.pardir))
# sys.path.append(PYTHON_PATH)
CLEAN_PSV_PATH = os.path.abspath(os.path.join(PYTHON_PATH, os.pardir,
'data', 'processed',
'_clean_psv'))


class CSVWriter(object):
Expand Down Expand Up @@ -52,7 +59,10 @@ def __init__(self, meta, manifest_row, filename=None):

# By default, creates a temp csv file wherever the calling module was
# located
self.filename = 'temp_{}.psv'.format(self.unique_data_id) if filename == None else filename
if filename is None:
self.filename = 'temp_{}.psv'.format(self.unique_data_id)
else:
self.filename = filename

# remove any existing copy of the file so we are starting clean
self.remove_file()
Expand All @@ -65,7 +75,8 @@ def __init__(self, meta, manifest_row, filename=None):
#print("header written")

self.file = open(self.filename, 'a', newline='', encoding='utf-8')
self.writer = DictWriter(self.file, fieldnames=self.dictwriter_fields, delimiter="|")
self.writer = DictWriter(self.file, fieldnames=self.dictwriter_fields,
delimiter="|")

def write(self, row):
"""
Expand Down
17 changes: 9 additions & 8 deletions python/housinginsights/ingestion/Cleaners.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from housinginsights.sources.models.pres_cat import CLUSTER_DESC_MAP
from housinginsights.sources.google_maps import GoogleMapsApiConn
from housinginsights.sources.models.mar import MAR_TO_TABLE_FIELDS
from housinginsights.tools.base_colleague import Colleague


PYTHON_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir,
Expand All @@ -30,16 +31,18 @@
"""


class CleanerBase(object, metaclass=ABCMeta):
class CleanerBase(Colleague, metaclass=ABCMeta):
def __init__(self, meta, manifest_row, cleaned_csv='', removed_csv='',
engine=None):
super().__init__()

self.cleaned_csv = cleaned_csv
self.removed_csv = removed_csv
self.engine = engine

self.manifest_row = manifest_row
self.tablename = manifest_row['destination_table']
self.meta = meta
self.meta = meta # TODO - remove: not used in this class
self.fields = meta[self.tablename]['fields'] #a list of dicts

self.null_value = 'Null' #what the SQLwriter expects in the temp csv
Expand All @@ -57,13 +60,11 @@ def __init__(self, meta, manifest_row, cleaned_csv='', removed_csv='',
if field['type'] == 'date':
self.date_fields.append(field['source_name'])


@abstractmethod
def clean(self, row, row_num):
# TODO: add replace_null method as required for an implementation (#176)
pass


def add_proj_addre_lookup_from_mar(self):
"""
Adds an in-memory lookup table of the contents of the current
Expand All @@ -85,7 +86,6 @@ def add_ssl_nlihc_lookup(self):
result = proxy.fetchall()
self.ssl_nlihc_lookup = {d[0]:d[1] for d in result}


def get_nlihc_id_if_exists(self, mar_ids_string, ssl=None):
"Checks for record in project table with matching MAR id."

Expand All @@ -110,7 +110,6 @@ def get_nlihc_id_if_exists(self, mar_ids_string, ssl=None):
#If we don't find a match
return self.null_value


# TODO: figure out what is the point of this method...it looks incomplete
def field_meta(self, field):
for field_meta in self.fields:
Expand Down Expand Up @@ -546,7 +545,6 @@ def add_mar_tract_lookup(self):
result = proxy.fetchall()
self.mar_tract_lookup = {d[0]:d[1] for d in result}


def add_census_tract_from_mar(self, row, column_name='mar_id',
lat_lon_col_names=('LATITUDE', 'LONGITUDE'),
x_y_coords_col_names=('X', 'Y'),
Expand Down Expand Up @@ -667,6 +665,7 @@ def clean(self, row, row_num=None):
row = self.replace_nulls(row, null_values=['N', 'NA', '', None])
return row


class ProjectCleaner(CleanerBase):
def clean(self, row, row_num=None):
row = self.replace_nulls(row, null_values=['N', '', None])
Expand Down Expand Up @@ -854,11 +853,13 @@ def clean(self,row,row_num=None):

return row


class ProjectAddressCleaner(CleanerBase):
def clean(self,row,row_num=None):
row = self.replace_nulls(row)
return row



class ZillowCleaner(CleanerBase):
"""
Incomplete Cleaner - adding data to the code so we have it when needed (was doing analysis on this)
Expand Down
30 changes: 16 additions & 14 deletions python/housinginsights/ingestion/DataReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from datetime import datetime
import dateutil.parser as dateparser


from housinginsights.tools.base_colleague import Colleague
from housinginsights.tools.logger import HILogger
logger = HILogger(name=__file__, logfile="ingestion.log")

Expand All @@ -30,7 +30,7 @@


# TODO: convert relative path to full path when passed as argument
class HIReader(object):
class HIReader(Colleague):
"""
Container object that reads in CSVs and provides them row-by-row through
the __iter__ method. Each object is associated with one specific file
Expand All @@ -41,6 +41,7 @@ class HIReader(object):
and lower bandwidth usage.
"""
def __init__(self, path, path_type="file", encoding="latin-1", keys=None):
super().__init__()
self.path = path
self._length = None
self._keys = keys
Expand Down Expand Up @@ -127,6 +128,7 @@ def get_row_by_column_name(self, col_header_name, look_up_value):
return None


# TODO - refactor: do we really need meta and manifest_row passed
class DataReader(HIReader):
"""
Reads a specific data file. This file must be associated with a specific
Expand Down Expand Up @@ -169,14 +171,14 @@ def __init__(self, meta, manifest_row, load_from="local"):

# Use default encoding if none found
if 'encoding' not in manifest_row:
logger.warning(" Warning: encoding not found in manifest. " \
"Falling back to latin-1.")
logger.warning(" Warning: encoding not found in manifest. "
"Falling back to latin-1.")
self.encoding = manifest_row.get('encoding', 'latin-1')

self.load_from = load_from
self.s3_path = os.path.join(manifest_row['s3_folder'],
manifest_row['filepath'].strip("\/")
).replace("\\","/")
).replace("\\", "/")
self._error_reporting_overhead = {}
# # Test connection to s3 URL
# if self.manifest_row['include_flag'] == 'use':
Expand Down Expand Up @@ -253,7 +255,6 @@ def __iter__(self):
def keys(self):
return self._keys


def _download_data_file(self):
"""
Internal function that tries to load the data file from the local file
Expand Down Expand Up @@ -354,30 +355,31 @@ def _set_keys(self):
"encoding error encountered.")
return _keys

def should_file_be_loaded(self, sql_manifest_row):
def should_file_be_loaded(self):
"""
Runs all the checks that the file is OK to use.

:param sql_manifest_row: the given sql manifest row
:return: True if passes validation; False otherwise.
"""

if self._do_fields_match() and self._check_include_flag(sql_manifest_row):
if self._do_fields_match() and self._check_include_flag():
return True
else:
return False

def _check_include_flag(self, sql_manifest_row):
def _check_include_flag(self):
"""
Checks to make sure the include_flag matches requirements for loading the data
Checks to make sure the include_flag matches requirements for loading
the data

Previously this compared the manifest_row to the sql_manifest_row; however,
since the unique_data_id now stays constant across time this check is
not needed.
Previously this compared the manifest_row to the sql_manifest_row;
however, since the unique_data_id now stays constant across time this
check is not needed.
"""

if self.manifest_row['include_flag'] == 'use':
return True
return True

else:
logger.warning("Skipping data source. {} include_flag is {}".format(
Expand Down
138 changes: 138 additions & 0 deletions python/housinginsights/ingestion/GetApiData.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""
get_api_data.py provides command line convenience access to the modules in the housinginsights.sources
directory.

Every API class should implement a few key features
"""

# built-in imports
import os
import importlib

# app imports
from housinginsights.tools.base_colleague import Colleague

# Configure logging
from housinginsights.tools.logger import HILogger
logger = HILogger(name=__file__, logfile="sources.log")

PYTHON_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir))


class GetApiData(Colleague):
def __init__(self):
super().__init__()
self._API_FOLDER = 'housinginsights.sources'
self._MODULES = {
'opendata': 'OpenDataApiConn',
'DCHousing': 'DCHousingApiConn',
'dhcd': 'DhcdApiConn',
'census': 'CensusApiConn',
'wmata_distcalc': 'WmataApiConn',
'prescat': 'PrescatApiConn'
}
self._IDS_TO_MODULES = {
'tax': 'opendata',
'building_permits_2013': 'opendata',
'building_permits_2014': 'opendata',
'building_permits_2015': 'opendata',
'building_permits_2016': 'opendata',
'building_permits_2017': 'opendata',
'crime_2013': 'opendata',
'crime_2014': 'opendata',
'crime_2015': 'opendata',
'crime_2016': 'opendata',
'crime_2017': 'opendata',
'mar': 'opendata',
'dchousing': 'DCHousing',
'dhcd_dfd_projects': 'dhcd',
'dhcd_dfd_properties': 'dhcd',
'acs5_2009': 'census',
'acs5_2010': 'census',
'acs5_2011': 'census',
'acs5_2012': 'census',
'acs5_2013': 'census',
'acs5_2014': 'census',
'acs5_2015': 'census',
'acs5_2009_moe': 'census',
'acs5_2010_moe': 'census',
'acs5_2011_moe': 'census',
'acs5_2012_moe': 'census',
'acs5_2013_moe': 'census',
'acs5_2014_moe': 'census',
'acs5_2015_moe': 'census',
'wmata_stops': 'wmata_distcalc',
'wmata_dist': 'wmata_distcalc'
}

def get_files_by_data_ids(self, unique_data_ids_list):
processed_ids = list()
for data_id in unique_data_ids_list:
try:
mod = self._IDS_TO_MODULES[data_id]
except KeyError:
logger.error('%s is not a valid data id! Skipping...' % data_id)
continue

result = self.get_files_by_modules([mod], unique_data_id=[data_id])
if len(result) == 1:
processed_ids.append(data_id)
return processed_ids

def get_files_by_modules(self, modules_list, unique_data_id=None):
processed = list()
for m in modules_list:
try:
class_name = self._MODULES[m]
except KeyError:
logger.error('Module %s is not valid! Skipping...' % m)
continue

try:
if unique_data_id is None:
logger.info("Processing %s module with class %s", m,
class_name)
else:
logger.info("Processing %s with class %s", unique_data_id,
class_name)
module_name = self._API_FOLDER + '.' + m
api_method = self._get_api_method(class_name, module_name)

# Get the data
# TODO refactor all the methods that need db to instead use
# TODO self.engine() created in __init__(see base_project for
# TODO example)
api_method(unique_data_id, False, 'csv',
db=self._database_choice)
processed.append(m)

# log outcome
if unique_data_id is None:
logger.info("Completed processing %s module with class "
"%s", m, class_name)
else:
logger.info("Completed processing %s with class "
"%s", unique_data_id, class_name)
except Exception as e:
logger.error("The request for '%s' failed with error: %s", m, e)

if self._debug:
raise e

self._ingestion_mediator.update_manifest_with_new_path()
return processed

def get_all_files(self):
modules_list = self._MODULES.keys()
return self.get_files_by_modules(modules_list)

def _get_api_method(self, class_name, module_name):
mod = importlib.import_module(module_name)

api_class = getattr(mod, class_name)

api_instance = api_class(database_choice=self._database_choice,
debug=self._debug)
# IMPORTANT: Every class should have a get_data method!
return getattr(api_instance, 'get_data')
Loading