Skip to content

Commit

Permalink
Add auto fallback to /query endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
Denis Krumko committed Sep 11, 2024
1 parent aa026e0 commit 201aff9
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 8 deletions.
1 change: 1 addition & 0 deletions src/app/core/ksqldb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
KsqlRequest,
)
from .resources import (
KsqlErrors,
KsqlException,
KsqlQuery,
)
22 changes: 19 additions & 3 deletions src/app/core/ksqldb/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from .resources import (
KsqlEndpoints,
KsqlErrors,
KsqlQuery,
)

Expand Down Expand Up @@ -38,15 +39,30 @@ def query(self) -> KsqlQuery:
"""Get KSQL query."""
return self._query

async def execute(self) -> httpx.Response:
"""Execute KSQL request."""
full_url = self._server / self._endpoint.value
async def execute(self, query_fallback: bool = False) -> httpx.Response:
"""Execute KSQL request to default endpoint."""
response = await self._request(self._endpoint)

# If with_fallback enabled, then try request to /query endpoint
if (
query_fallback
and response.status_code == 400
and response.json()['error_code'] == KsqlErrors.QUERY_ENDPOINT.value
):
return await self._request(KsqlEndpoints.QUERY)

return response

async def _request(self, endpoint: KsqlEndpoints) -> httpx.Response:
"""Get response from endpoint"""
full_url = self._server / endpoint.value
async with httpx.AsyncClient() as client:
return await client.request(
method=self._method,
url=str(full_url),
json={
'ksql': self._query.as_string,
# TODO: Add custom streamsProperties
'streamsProperties': {},
},
headers={
Expand Down
8 changes: 8 additions & 0 deletions src/app/core/ksqldb/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,18 @@ class KsqlEndpoints(Enum):
"""Enum with all available KSQL endpoints."""

KSQL = 'ksql'
QUERY = 'query'
INFO = 'info'
HEALTH = 'healthcheck'


class KsqlErrors(Enum):
"""Enum with all available KSQL errors."""

BAD_STATEMENT = 40001
QUERY_ENDPOINT = 40002


class KsqlException(Exception):
"""Base class for KSQL exceptions."""

Expand Down
4 changes: 2 additions & 2 deletions src/app/core/render.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,11 @@ def render_level(response: ContextResponse) -> str:

if response.code >= 400:
response_level = BootstrapLevel.DANGER.value
elif len(response.data) == 0:
elif len(response.data) == 0 and not response.text:
response_level = BootstrapLevel.WARNING.value
else:
for entry in response.data:
if entry['@type'] == 'warning_entity':
if entry.get('@type') == 'warning_entity':
return BootstrapLevel.WARNING.value

return response_level
10 changes: 9 additions & 1 deletion src/app/core/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
from typing import Any

import httpx
Expand All @@ -13,7 +14,14 @@ def make_list(value: Any) -> list:

class ContextResponse:
def __init__(self, httpx_response: httpx.Response):
self.data = make_list(httpx_response.json())
self.data = []
self.text = ''

try:
self.data = make_list(httpx_response.json())
except json.decoder.JSONDecodeError:
self.text = httpx_response.text

self.code = httpx_response.status_code


Expand Down
2 changes: 1 addition & 1 deletion src/app/requests/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async def perform_request(request: Request) -> Response:
add_request_to_history(request, query)

ksql_request = KsqlRequest(request, query)
ksql_response = await ksql_request.execute()
ksql_response = await ksql_request.execute(query_fallback=True)
context = {'query': query}

return render_template(
Expand Down
8 changes: 7 additions & 1 deletion src/templates/requests/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ <h2 class="my-h2">

{% block right_section %}
{% if x_response %}
<!-- Show response -->
<!-- Show JSON response -->
{% if x_response.data %}
<div>
{% for section in x_response.data %}
Expand All @@ -48,6 +48,12 @@ <h2 class="my-h2">
{% endif %}
{% endfor %}
</div>
{% elif x_response.text %}
<!-- Show text response -->
{% for line in x_response.text.splitlines() %}
{{render_json(line)|safe}}
<br>
{% endfor %}
{% else %}
<h3>Empty response body</h3>
{% endif %}
Expand Down

0 comments on commit 201aff9

Please sign in to comment.