Skip to content

Commit

Permalink
Add ASN path anomalies handling and refactor related queries
Browse files Browse the repository at this point in the history
  • Loading branch information
petya-vasileva committed Jan 27, 2025
1 parent 3429fee commit 57d9ff5
Show file tree
Hide file tree
Showing 3 changed files with 308 additions and 17 deletions.
11 changes: 6 additions & 5 deletions src/model/Alarms.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ def formatDfValues(self, df, event):
df.drop(columns=['to_date', 'ipv6', 'asn_count'], inplace=True)

# TODO: create pages/visualizatios for the following events then remove the df.drop('alarm_link') below
if event not in ['unresolvable host', 'hosts not found', 'ASN path anomalies']:
if event not in ['unresolvable host', 'hosts not found']:
df = self.createAlarmURL(df, event)
else:
df.drop('alarm_link', axis=1, inplace=True)
Expand All @@ -405,6 +405,8 @@ def createAlarmURL(df, event):
page = 'throughput/'
elif event == 'path changed':
page = 'paths/'
elif event == 'ASN path anomalies':
page = 'anomalous_paths/'
elif event in ['firewall issue', 'complete packet loss', 'bandwidth decreased from/to multiple sites',
'high packet loss on multiple links', 'high packet loss']:
page = 'loss-delay/'
Expand All @@ -417,10 +419,9 @@ def createAlarmURL(df, event):
df['alarm_link'] = df['alarm_link'].apply(
lambda id: f"<a class='btn btn-secondary' role='button' href='{url}{id}' target='_blank'>VIEW IN A NEW TAB</a>" if id else '-')

# if event == 'path changed between sites':
# df['alarm_link'] = df['path'].apply(
# lambda site: f"<a href='{request.host_url}paths-site/{site}?dateFrom={df['from'].min()}&dateTo={df['to'].max()}' target='_blank'>VIEW</a>" if site else '-')

if event == 'ASN path anomalies':
df['alarm_link'] = df.apply(
lambda row: f"<a class='btn btn-secondary' role='button' href='{request.host_url}anomalous_paths/src_netsite={row['src_netsite']}&dest_netsite={row['dest_netsite']}' target='_blank'>VIEW IN A NEW TAB</a>" if row['src_netsite'] and row['dest_netsite'] else '-', axis=1)
return df


Expand Down
68 changes: 56 additions & 12 deletions src/model/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,45 @@ def getSubcategories():
catdf = pd.DataFrame(subcategories)

return catdf

def query_ASN_anomalies(dateFrom, dateTo):
q = {
"query": {
"bool": {
"must": [
{
"range": {
"to_date": {
"gte": dateFrom,
"lte": dateTo,
"format": "strict_date_optional_time"
}
}
},
{
"term": {
"event.keyword": "ASN path anomalies"
}
}
]
}
}
}

print(str(q).replace("\'", "\""))
data = []
result = scan(client=hp.es, index='ps_traces_changes', query=q, source=['src_netsite', 'dest_netsite', 'ipv6', 'asn_list', 'alarm_id', 'to_date', 'paths'])
for item in result:
temp = item['_source']
for el in temp['paths']:
temp_copy = temp.copy()
temp_copy['last_appearance_path'] = el['last_appearance_path']
temp_copy['repaired_asn_path'] = el['repaired_asn_path']
temp_copy['path_len'] = len(el['repaired_asn_path'])
data.append(temp_copy)

ddf = pd.DataFrame(data)
return ddf


def queryTraceChanges(dateFrom, dateTo, asn=None):
Expand All @@ -412,17 +451,10 @@ def queryTraceChanges(dateFrom, dateTo, asn=None):
"query": {
"bool": {
"must": [
{
"range": {
"from_date": {
"gte": dateFrom,
"format": "strict_date_optional_time"
}
}
},
{
"range": {
"to_date": {
"gte": dateFrom,
"lte": dateTo,
"format": "strict_date_optional_time"
}
Expand All @@ -433,6 +465,13 @@ def queryTraceChanges(dateFrom, dateTo, asn=None):
"diff": [asn]
}
}
],
"must_not": [
{
"term": {
"event": "ASN path anomalies"
}
}
]
}
}
Expand Down Expand Up @@ -469,16 +508,21 @@ def queryTraceChanges(dateFrom, dateTo, asn=None):
data, positions, baseline, altPaths = [],[],[],[]
positions = []
for item in result:
item['_source']['src'] = item['_source']['src'].upper()
item['_source']['dest'] = item['_source']['dest'].upper()
item['_source']['src_site'] = item['_source']['src_site'].upper()
item['_source']['dest_site'] = item['_source']['dest_site'].upper()

tempD = {}
for k,v in item['_source'].items():
if k not in ['positions', 'baseline', 'alt_paths', 'created_at']:
tempD[k] = v
data.append(tempD)

src = item['_source']['src'].upper()
dest = item['_source']['dest'].upper()
src_site = item['_source']['src_site'].upper()
dest_site = item['_source']['dest_site'].upper()
src = item['_source']['src']
dest = item['_source']['dest']
src_site = item['_source']['src_site']
dest_site = item['_source']['dest_site']
from_date,to_date = item['_source']['from_date'], item['_source']['to_date']

temp = item['_source']['positions']
Expand Down
246 changes: 246 additions & 0 deletions src/pages/asn_anomalies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
import dash
from dash import html
from dash import Dash, html, dcc, Input, Output, Patch, callback, State, ctx, dash_table, dcc, html
import dash_bootstrap_components as dbc

from elasticsearch.helpers import scan
import urllib
import urllib3

import pandas as pd
import numpy as np
import plotly.graph_objects as go
import plotly.express as px

import utils.helpers as hp
from utils.helpers import DATE_FORMAT
from model.Alarms import Alarms
import model.queries as qrs

from utils.parquet import Parquet
urllib3.disable_warnings()



def title(q=None):
return f"Latency alarm {q}"



def description(q=None):
return f"Visual represention of an ASN alarm {q}"



dash.register_page(
__name__,
path_template="/anomalous_paths/<q>",
title=title,
description=description,
)


def layout(q=None, **other_unknown_query_strings):
return html.Div([
dcc.Location(id='url', refresh=False),
dcc.Store(id='alarm-data-store'),
html.Div(id='asn-anomalies-content'),
html.Div([
html.Div([
html.H1(f"ASN paths between source and destination sites"),
html.P('The plot shows new ASNs framed in red. The data is based on the alarms of type "ASN path anomalies"', style={"font-size": "1.2rem"})
], className="l-h-3 p-2"),
dcc.Loading(id='loading-spinner', type='default', children=[
html.Div(id='asn-anomalies-graphs')
], color='#00245A'),
], className="l-h-3 p-2 boxwithshadow page-cont ml-1 p-1")
], style={"padding": "0.5% 1.5%"})


@callback(
Output('alarm-data-store', 'data'),
Input('url', 'pathname')
)
def update_store(pathname):
if pathname:
# Extract the parameters from the URL path
path_parts = pathname.split('/')
if len(path_parts) > 2 and '=' in path_parts[2]:
params = path_parts[2].split('&')
query_params = {param.split('=')[0]: param.split('=')[1] for param in params}
return query_params
return {}


@callback(
Output('asn-anomalies-graphs', 'children'),
Input('alarm-data-store', 'data')
)
def update_graphs(query_params):
if query_params:
src = query_params.get('src_netsite')
dest = query_params.get('dest_netsite')
print(src, dest)
if src and dest:
data = query_ASN_anomalies(src, dest)
print(data, data['ipv6'].unique())
if len(data) > 0:
if len(data['ipv6'].unique()) == 2:
ipv6_figure = generate_plotly_heatmap_with_anomalies(data[data['ipv6'] == True])
ipv4_figure = generate_plotly_heatmap_with_anomalies(data[data['ipv6'] == False])
figures = [
dcc.Graph(figure=ipv4_figure, id="asn-sankey-ipv4"),
dcc.Graph(figure=ipv6_figure, id="asn-sankey-ipv6")
]
else:
figure = generate_plotly_heatmap_with_anomalies(data)
figures = [dcc.Graph(figure=figure, id="asn-sankey-ipv4")]

return html.Div(figures)
else:
return html.Div([
html.H1(f"No data found for alarm {src} to {dest}"),
html.P('No data was found for the alarm selected. Please try another alarm.', style={"font-size": "1.2rem"})
], className="l-h-3 p-2 boxwithshadow page-cont ml-1 p-1")
return html.Div()


def generate_plotly_heatmap_with_anomalies(subset_sample):
columns = ['src_netsite', 'dest_netsite', 'asn_list', 'ipv6']
src_site, dest_site, anomaly, ipv = subset_sample[columns].values[0]
# ipv = 'IPv6' if ipv else 'IPv4'

subset_sample['last_appearance_path'] = pd.to_datetime(subset_sample['last_appearance_path'], errors='coerce')

# Create a short format date column for plotting
subset_sample['last_appearance_short'] = subset_sample['last_appearance_path'].dt.strftime('%H:%M %d-%b')

print('Size of dataset:', len(subset_sample))
max_length = subset_sample["path_len"].max()

# Convert the path list into a pivot DataFrame
pivot_df = pd.DataFrame(
subset_sample['repaired_asn_path'].tolist(),
index=subset_sample.index,
columns=[f"pos_{i+1}" for i in range(max_length)]
).applymap(lambda x: int(x) if isinstance(x, (int, float)) and not pd.isna(x) else x)

# Map ASNs to colors
unique_rids = pd.Series(pivot_df.stack().unique()).dropna().tolist()
if 0 not in unique_rids:
unique_rids.append(0)
rid_to_index = {rid: i + 1 for i, rid in enumerate(unique_rids)}
rid_to_index[np.nan] = 0

base_colors = px.colors.qualitative.Prism + px.colors.qualitative.Bold
expanded_colors = (base_colors * (len(unique_rids) // len(base_colors) + 1))[:len(unique_rids)]
color_list = ['#FFFFFF'] + expanded_colors
color_list[rid_to_index[0]] = '#000000'

index_df = pivot_df.applymap(lambda x: rid_to_index.get(x, 0))

# Prepare hoverlabel background colors
index_to_color = {i: color_list[i] for i in range(len(color_list))}
hover_bgcolor = np.array([index_to_color.get(z, '#FFFFFF') for row in index_df.values for z in row]).reshape(index_df.shape)

# Prepare the heatmap using WebGL
fig = go.Figure()
heatmap = go.Heatmap(
z=index_df.values,
x=index_df.columns,
y=subset_sample['last_appearance_short'], # Use formatted short date for y-axis
colorscale=color_list,
zmin=0,
zmax=len(unique_rids),
customdata=pivot_df.values,
hoverlabel=dict(bgcolor=hover_bgcolor),
hovertemplate="<b>Position: %{x}</b><br><b>ASN: %{customdata}</b><extra></extra>",
showscale=False,
)
fig.add_trace(heatmap)

# Add annotations for anomalies
for idx, row in enumerate(pivot_df.values):
for col_idx, asn in enumerate(row):
if ~np.isnan(asn):
if asn in anomaly:
fig.add_annotation(
x=index_df.columns[col_idx],
y=subset_sample['last_appearance_short'].iloc[idx],
text=int(asn),
showarrow=False,
bordercolor='red',
font=dict(color='white', size=12, family="Arial", weight='bold'),
)
else:
fig.add_annotation(
x=index_df.columns[col_idx],
y=subset_sample['last_appearance_short'].iloc[idx],
text=int(asn),
showarrow=False,
font=dict(color='white', size=10, family="Arial", weight='bold'),
)

fig.update_layout(
title=f"ASN path signature between {src_site} and {dest_site} for {ipv} paths",
xaxis_title='Position',
yaxis_title='Path Observation Date',
margin=dict(r=150),
height=600,
yaxis=dict(autorange='reversed', type='category', fixedrange=False)
)

return fig


def query_ASN_anomalies(src, dest):
dateFrom, dateTo = hp.defaultTimeRange(days=2)
q = {
"query": {
"bool": {
"must": [
{
"range": {
"to_date": {
"gte": dateFrom,
"lte": dateTo,
"format": "strict_date_optional_time"
}
}
},
{
"term": {
"event.keyword": "ASN path anomalies"
}
},
{
"term": {
"src_netsite.keyword": src
}
},
{
"term": {
"dest_netsite.keyword": dest
}
}
]
}
}
}

print(str(q).replace("\'", "\""))
fields = ['ipv6', 'src_netsite', 'dest_netsite', 'last_appearance_path', 'repaired_asn_path', 'asn_list', 'paths']
result = scan(client=hp.es, index='ps_traces_changes', query=q, source=fields)

data = []
for item in result:
temp = item['_source']
for el in temp['paths']:
temp_copy = temp.copy()
temp_copy['last_appearance_path'] = el['last_appearance_path']
temp_copy['repaired_asn_path'] = el['repaired_asn_path']
temp_copy['path_len'] = len(el['repaired_asn_path'])
data.append(temp_copy)

ddf = pd.DataFrame(data)
return ddf

0 comments on commit 57d9ff5

Please sign in to comment.