Skip to content

Commit

Permalink
fix: index tag info into elasticsearch immediately after ui change (#883
Browse files Browse the repository at this point in the history
)

* fix: index tag info into elasticsearch immediately after ui change

Signed-off-by: tianru zhou <tianru.zhou@databricks.com>

* address pr comments v1

Signed-off-by: tianru zhou <tianru.zhou@databricks.com>

* fix lint test

Signed-off-by: tianru zhou <tianru.zhou@databricks.com>

* change table key example

Signed-off-by: tianru zhou <tianru.zhou@databricks.com>
  • Loading branch information
tianruzhou-db authored Jan 27, 2021
1 parent b184f2f commit b34151c
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 17 deletions.
94 changes: 86 additions & 8 deletions amundsen_application/api/metadata/v0.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import logging
import json

from http import HTTPStatus
from typing import Any, Dict, Optional

Expand All @@ -16,7 +17,7 @@

from amundsen_application.api.utils.metadata_utils import is_table_editable, marshall_table_partial, \
marshall_table_full, marshall_dashboard_partial, marshall_dashboard_full, TableUri
from amundsen_application.api.utils.request_utils import get_query_param, request_metadata
from amundsen_application.api.utils.request_utils import get_query_param, request_metadata, request_search


LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -377,6 +378,83 @@ def get_tags() -> Response:
return make_response(payload, HTTPStatus.INTERNAL_SERVER_ERROR)


def _update_metadata_tag(table_key: str, method: str, tag: str) -> int:
table_endpoint = _get_table_endpoint()
url = f'{table_endpoint}/{table_key}/tag/{tag}'
response = request_metadata(url=url, method=method)
status_code = response.status_code
if status_code != HTTPStatus.OK:
LOGGER.info(f'Fail to update tag in metadataservice, http status code: {status_code}')
LOGGER.debug(response.text)
return status_code


def _update_search_tag(table_key: str, method: str, tag: str) -> int:
"""
call the search service endpoint to get whole table information uniquely identified by table_key
update tags list, call search service endpoint again to write back the updated field
TODO: we should update dashboard tag in the future
:param table_key: table key e.g. 'database://cluster.schema/table'
:param method: PUT or DELETE
:param tag: tag name to be put/delete
:return: HTTP status code
"""
searchservice_base = app.config['SEARCHSERVICE_BASE']
searchservice_get_table_url = f'{searchservice_base}/search_table'

# searchservice currently doesn't allow colon or / inside filters, thus can't get item based on key
# table key e.g: 'database://cluster.schema/table'
table_uri = TableUri.from_uri(table_key)

request_param_map = {
'search_request':
{
'type': 'AND',
'filters':
{
'database': [table_uri.database],
'schema': [table_uri.schema],
'table': [table_uri.table],
'cluster': [table_uri.cluster]
}
},
'query_term': ''
}

get_table_response = request_search(url=searchservice_get_table_url, method='POST', json=request_param_map)
get_status_code = get_table_response.status_code
if get_status_code != HTTPStatus.OK:
LOGGER.info(f'Fail to get table info from serviceservice, http status code: {get_status_code}')
LOGGER.debug(get_table_response.text)
return get_status_code

raw_data_map = json.loads(get_table_response.text)
# key is unique, thus (database, cluster, schema, table) should uniquely identify the table
if len(raw_data_map['results']) > 1:
LOGGER.error(f'Error! Duplicate table key: {table_key}')
table = raw_data_map['results'][0]

old_tags_list = table['tags']
new_tags_list = [item for item in old_tags_list if item['tag_name'] != tag]
if method != 'DELETE':
new_tags_list.append({'tag_name': tag})
table['tags'] = new_tags_list

# remove None values
pruned_table = {k: v for k, v in table.items() if v is not None}

post_param_map = {"data": pruned_table}
searchservice_update_url = f'{searchservice_base}/document_table'
update_table_response = request_search(url=searchservice_update_url, method='PUT', json=post_param_map)
update_status_code = update_table_response.status_code
if update_status_code != HTTPStatus.OK:
LOGGER.info(f'Fail to update table info in searchservice, http status code: {update_status_code}')
LOGGER.debug(update_table_response.text)
return update_table_response.status_code

return HTTPStatus.OK


@metadata_blueprint.route('/update_table_tags', methods=['PUT', 'DELETE'])
def update_table_tags() -> Response:

Expand All @@ -388,26 +466,26 @@ def _log_update_table_tags(*, table_key: str, method: str, tag: str) -> None:
args = request.get_json()
method = request.method

table_endpoint = _get_table_endpoint()
table_key = get_query_param(args, 'key')

tag = get_query_param(args, 'tag')

url = f'{table_endpoint}/{table_key}/tag/{tag}'

_log_update_table_tags(table_key=table_key, method=method, tag=tag)

response = request_metadata(url=url, method=method)
status_code = response.status_code
metadata_status_code = _update_metadata_tag(table_key=table_key, method=method, tag=tag)
search_status_code = _update_search_tag(table_key=table_key, method=method, tag=tag)

if status_code == HTTPStatus.OK:
http_status_code = HTTPStatus.OK
if metadata_status_code == HTTPStatus.OK and search_status_code == HTTPStatus.OK:
message = 'Success'
else:
message = f'Encountered error: {method} table tag failed'
logging.error(message)
http_status_code = HTTPStatus.INTERNAL_SERVER_ERROR

payload = jsonify({'msg': message})
return make_response(payload, status_code)
return make_response(payload, http_status_code)

except Exception as e:
message = 'Encountered exception: ' + str(e)
logging.exception(message)
Expand Down
22 changes: 13 additions & 9 deletions amundsen_application/api/utils/request_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ def request_metadata(*, # type: ignore
method: str = 'GET',
headers=None,
timeout_sec: int = 0,
data=None):
data=None,
json=None):
"""
Helper function to make a request to metadata service.
Sets the client and header information based on the configuration
Expand All @@ -43,15 +44,17 @@ def request_metadata(*, # type: ignore
client=app.config['METADATASERVICE_REQUEST_CLIENT'],
headers=headers,
timeout_sec=timeout_sec,
data=data)
data=data,
json=json)


def request_search(*, # type: ignore
url: str,
method: str = 'GET',
headers=None,
timeout_sec: int = 0,
data=None):
data=None,
json=None):
"""
Helper function to make a request to search service.
Sets the client and header information based on the configuration
Expand All @@ -75,11 +78,12 @@ def request_search(*, # type: ignore
client=app.config['SEARCHSERVICE_REQUEST_CLIENT'],
headers=headers,
timeout_sec=timeout_sec,
data=data)
data=data,
json=json)


# TODO: Define an interface for envoy_client
def request_wrapper(method: str, url: str, client, headers, timeout_sec: int, data=None): # type: ignore
def request_wrapper(method: str, url: str, client, headers, timeout_sec: int, data=None, json=None): # type: ignore
"""
Wraps a request to use Envoy client and headers, if available
:param method: DELETE | GET | POST | PUT
Expand All @@ -99,9 +103,9 @@ def request_wrapper(method: str, url: str, client, headers, timeout_sec: int, da
elif method == 'GET':
return client.get(url, headers=headers, raw_response=True)
elif method == 'POST':
return client.post(url, headers=headers, raw_response=True, raw_request=True, data=data)
return client.post(url, headers=headers, raw_response=True, raw_request=True, data=data, json=json)
elif method == 'PUT':
return client.put(url, headers=headers, raw_response=True, raw_request=True, data=data)
return client.put(url, headers=headers, raw_response=True, raw_request=True, data=data, json=json)
else:
raise Exception('Method not allowed: {}'.format(method))
else:
Expand All @@ -111,9 +115,9 @@ def request_wrapper(method: str, url: str, client, headers, timeout_sec: int, da
elif method == 'GET':
return s.get(url, headers=headers, timeout=timeout_sec)
elif method == 'POST':
return s.post(url, headers=headers, timeout=timeout_sec, data=data)
return s.post(url, headers=headers, timeout=timeout_sec, data=data, json=json)
elif method == 'PUT':
return s.put(url, headers=headers, timeout=timeout_sec, data=data)
return s.put(url, headers=headers, timeout=timeout_sec, data=data, json=json)
else:
raise Exception('Method not allowed: {}'.format(method))

Expand Down
18 changes: 18 additions & 0 deletions tests/unit/api/metadata/test_v0.py
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,15 @@ def test_update_table_tags_put(self) -> None:
url = local_app.config['METADATASERVICE_BASE'] + TABLE_ENDPOINT + '/db://cluster.schema/table/tag/tag_5'
responses.add(responses.PUT, url, json={}, status=HTTPStatus.OK)

searchservice_base = local_app.config['SEARCHSERVICE_BASE']
get_table_url = f'{searchservice_base}/search_table'
responses.add(responses.POST, get_table_url,
json={'results': [{'id': '1', 'tags': [{'tag_name': 'tag_1'}, {'tag_name': 'tag_2'}]}]},
status=HTTPStatus.OK)

post_table_url = f'{searchservice_base}/document_table'
responses.add(responses.PUT, post_table_url, json={}, status=HTTPStatus.OK)

with local_app.test_client() as test:
response = test.put(
'/api/metadata/v0/update_table_tags',
Expand All @@ -822,6 +831,15 @@ def test_update_table_tags_delete(self) -> None:
url = local_app.config['METADATASERVICE_BASE'] + TABLE_ENDPOINT + '/db://cluster.schema/table/tag/tag_5'
responses.add(responses.DELETE, url, json={}, status=HTTPStatus.OK)

searchservice_base = local_app.config['SEARCHSERVICE_BASE']
get_table_url = f'{searchservice_base}/search_table'
responses.add(responses.POST, get_table_url,
json={'results': [{'id': '1', 'tags': [{'tag_name': 'tag_1'}, {'tag_name': 'tag_2'}]}]},
status=HTTPStatus.OK)

post_table_url = f'{searchservice_base}/document_table'
responses.add(responses.PUT, post_table_url, json={}, status=HTTPStatus.OK)

with local_app.test_client() as test:
response = test.delete(
'/api/metadata/v0/update_table_tags',
Expand Down

0 comments on commit b34151c

Please sign in to comment.