Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run ensure L2 flow dependently on both devices #285

Open
wants to merge 4 commits into
base: stable/yoga-m3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions octavia_f5/controller/statusmanager/status_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import futurist
import oslo_messaging as messaging
import prometheus_client as prometheus
import requests
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_log import log as logging
Expand Down Expand Up @@ -192,14 +191,7 @@ def availability_check(self):
timeout = CONF.status_manager.failover_timeout

# Try reaching device
available = True
try:
requests.get(bigip.scheme + '://' + bigip.hostname, timeout=timeout, verify=False)
LOG.info('Found device with URL {}'.format(bigip.hostname))
except requests.exceptions.Timeout:
LOG.info('Device timed out, considering it unavailable. Timeout: {}s Hostname: {}'.format(
timeout, bigip.hostname))
available = False
available = bigip.is_available(timeout)

if self.bigip_status[bigip.hostname] != available:
# Update database entry
Expand Down
62 changes: 48 additions & 14 deletions octavia_f5/controller/worker/flows/f5_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,34 +27,65 @@ def make_ensure_l2_flow(self, selfips: [network_models.Port], store: dict) -> fl
"""
Construct and return a flow to ensure complete L2 configuration for a new partition.
The flow assumes that no L2 objects exist yet for the network so nothing is cleaned up.
"""

We have to inject all required variables to each flow/task because these flows will
be running as part of Graph flow and storage contains equal variables but for two F5
devices, their variables' names overlap. Also in graph flow, every subflow/task should
have a unique name that's why we have to add BigIP hostname.
"""
bigip_hostname = store["bigip"].hostname
# make SelfIP creation subflow
ensure_selfips_subflow = unordered_flow.Flow('ensure-selfips-subflow')
ensure_selfips_subflow = unordered_flow.Flow(
f'ensure-selfips-subflow-{bigip_hostname}')
for selfip_port in selfips:
ensure_selfip_task = f5_tasks.EnsureSelfIP(
name=f"ensure-selfip-{selfip_port.id}", inject={'port': selfip_port})
name=f'ensure-selfip-{bigip_hostname}-{selfip_port.id}',
inject={
'port': selfip_port,
**store
}
)
ensure_selfips_subflow.add(ensure_selfip_task)

# create subnet routes for all subnets that don't have a SelfIP
network = store['network']
subnets_to_create_routes_for = [subnet for subnet in network.subnets
if not f5_tasks.selfip_for_subnet_exists(subnet, selfips)]
ensure_subnet_routes_subflow = unordered_flow.Flow('ensure-subnet-routes-subflow')
ensure_subnet_routes_subflow = unordered_flow.Flow(
f'ensure-subnet-routes-subflow-{bigip_hostname}')

# make subnet route creation subflow
for subnet_id in subnets_to_create_routes_for:
subnet_route_name = f5_tasks.get_subnet_route_name(network.id, subnet_id)
ensure_subnet_route_task = f5_tasks.EnsureSubnetRoute(name=f"ensure-subnet-route-{subnet_route_name}",
inject={'subnet_id': subnet_id})
ensure_subnet_route_task = f5_tasks.EnsureSubnetRoute(
name=f'ensure-subnet-route-{bigip_hostname}-{subnet_route_name}',
inject={
'subnet_id': subnet_id,
**store
}
)
ensure_subnet_routes_subflow.add(ensure_subnet_route_task)

ensure_route_domain = f5_tasks.EnsureRouteDomain()
ensure_default_route = f5_tasks.EnsureDefaultRoute()
ensure_vlan = f5_tasks.EnsureVLAN()

ensure_l2_flow = linear_flow.Flow('ensure-l2-flow')
ensure_l2_flow.add(ensure_vlan,
get_existing_route_domain = f5_tasks.GetExistingRouteDomain(
name=f'get-existing-route-domain-{bigip_hostname}',
inject=store)
ensure_route_domain = f5_tasks.EnsureRouteDomain(
name=f'ensure-route-domain-{bigip_hostname}',
inject=store)
ensure_default_route = f5_tasks.EnsureDefaultRoute(
name=f'ensure-default-route-{bigip_hostname}',
inject=store)
get_existing_vlan = f5_tasks.GetExistingVLAN(
name=f'get-existing-vlan-{bigip_hostname}',
inject=store)
ensure_vlan = f5_tasks.EnsureVLAN(
name=f'ensure-vlan-{bigip_hostname}',
inject=store)

ensure_l2_flow = linear_flow.Flow(f'ensure-l2-flow-{bigip_hostname}')
ensure_l2_flow.add(get_existing_vlan,
ensure_vlan,
get_existing_route_domain,
ensure_route_domain,
# SelfIPs must be present for routes to work
ensure_selfips_subflow,
Expand Down Expand Up @@ -184,7 +215,8 @@ def make_ensure_selfips_and_subnet_routes_flow(self, needed_selfips, subnets_tha
ensure_selfips_subflow = unordered_flow.Flow('ensure-selfips-subflow')
for selfip_port in selfips_to_create:
ensure_selfip_task = f5_tasks.EnsureSelfIP(
name=f"ensure-selfip-{selfip_port.id}", inject={'port': selfip_port})
name=f'ensure-selfip-{store["bigip"].hostname}-{selfip_port.id}',
inject={'port': selfip_port})
ensure_selfips_subflow.add(ensure_selfip_task)

# find subnet routes for subnets that need them but don't have any yet
Expand Down Expand Up @@ -226,12 +258,14 @@ def make_get_existing_selfips_and_subnet_routes_flow(self) -> flow.Flow:
return get_existing_sip_sr_flow

def make_ensure_vcmp_l2_flow(self) -> flow.Flow:
get_existing_vlan = f5_tasks.GetExistingVLAN()
ensure_vlan = f5_tasks.EnsureVLAN()
ensure_vlan_interface = f5_tasks.EnsureVLANInterface()
ensure_guest_vlan = f5_tasks.EnsureGuestVLAN()

ensure_vcmp_l2_flow = linear_flow.Flow('ensure-vcmp-l2-flow')
ensure_vcmp_l2_flow.add(ensure_vlan,
ensure_vcmp_l2_flow.add(get_existing_vlan,
ensure_vlan,
ensure_vlan_interface,
ensure_guest_vlan)
return ensure_vcmp_l2_flow
Expand Down
54 changes: 37 additions & 17 deletions octavia_f5/controller/worker/l2_sync_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from oslo_config import cfg
from oslo_log import log as logging
from taskflow.listeners import logging as tf_logging
from taskflow.patterns import graph_flow

from octavia.common import data_models as octavia_models
from octavia.common.base_taskflow import BaseTaskFlowEngine
Expand Down Expand Up @@ -83,21 +84,30 @@ def failover(self):
for bigip in self._bigips:
bigip.update_status()

def _do_ensure_l2_flow(self, selfips: [network_models.Port], store: dict):

# get existing SelfIPs and subnet routes - they are needed to determine,
# which ones have to be created and which already exist
e = self.taskflow_load(self._f5flows.make_get_existing_selfips_and_subnet_routes_flow(), store=store)
with tf_logging.LoggingListener(e, log=LOG):
e.run()

# info about existing SelfIPs and subnet routes could be needed for either
# flow construction or in the tasks themselves, or both
store['existing_selfips'] = e.storage.get('get-existing-selfips')
store['existing_subnet_routes'] = e.storage.get('get-existing-subnet-routes')

ensure_l2_flow = self._f5flows.make_ensure_l2_flow(selfips, store=store)
e = self.taskflow_load(ensure_l2_flow, store=store)
def _do_ensure_l2_flow(self, data: dict):
ensure_l2_flow = graph_flow.Flow('ensure-l2-flow-from-all-devices')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the point in using a graph flow here. The way I understand it, a graph flow is for throwing multiple tasks/subflows into a bucket, which are then resolved according to their interdependencies. However, here we only have two subflows, one for each BigIP device. As far as I can see those can be parallelized, so an unordered_flow would suffice. Or is there another good reason to use graph_flow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main difference is that you can run flows on both devices in parallel but if one of them fails the second one will be also reverted. You can achieve that with unorder flow only if you put all tasks for both devices into one unordered flow and they will run one by one for a really long time.

for flow_data in data.values():
# get existing SelfIPs and subnet routes - they are needed to determine,
# which ones have to be created and which already exist
e = self.taskflow_load(self._f5flows.make_get_existing_selfips_and_subnet_routes_flow(),
store=flow_data['store'])
with tf_logging.LoggingListener(e, log=LOG):
e.run()

# info about existing SelfIPs and subnet routes could be needed for either
# flow construction or in the tasks themselves, or both
flow_data['store']['existing_selfips'] = e.storage.get('get-existing-selfips')
flow_data['store']['existing_subnet_routes'] = e.storage.get('get-existing-subnet-routes')

ensure_l2_flow.add(
self._f5flows.make_ensure_l2_flow(
flow_data['selfips'], store=flow_data['store']))

# We have to inject all required variables to each flow/task because these flows will
# be running as part of Graph flow and storage contains equal variables but for two F5
# devices, their variables' names overlap. Also in graph flow, every subflow/task should
# have a unique name that's why we have to add BigIP hostname.
e = self.taskflow_load(ensure_l2_flow)
with tf_logging.DynamicLoggingListener(e, log=LOG):
e.run()

Expand Down Expand Up @@ -180,14 +190,24 @@ def ensure_l2_flow(self, selfips: [network_models.Port], network_id: str, device

# run l2 flow for all devices in parallel
fs = {}
ensure_l2_flow_data = {}
for bigip in self._bigips:
if device and bigip.hostname != device:
continue

# Check if device available by symple GET request
if not bigip.is_available(timeout=CONF.status_manager.failover_timeout):
continue

selfips_for_host = [selfip for selfip in selfips if bigip.hostname in selfip.name]
subnet_ids = set(sip.fixed_ips[0].subnet_id for sip in selfips_for_host)
store = {'bigip': bigip, 'network': network, 'subnet_id': subnet_ids.pop()}
fs[self.executor.submit(self._do_ensure_l2_flow, selfips=selfips_for_host, store=store)] = bigip
ensure_l2_flow_data[bigip.hostname] = {
'store': {'bigip': bigip, 'network': network, 'subnet_id': subnet_ids.pop()},
'selfips': selfips_for_host,
}
fs[self.executor.submit(
self._do_ensure_l2_flow,
data=ensure_l2_flow_data)] = ','.join(ensure_l2_flow_data.keys())

# run VCMP l2 flow for all VCMPs in parallel
for vcmp in self._vcmps:
Expand Down
103 changes: 79 additions & 24 deletions octavia_f5/controller/worker/tasks/f5_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ class EnsureVLAN(task.Task):
)
def execute(self,
bigip: bigip_restclient.BigIPRestClient,
network: f5_network_models.Network):

network: f5_network_models.Network,
existing_vlan: dict):
vlan = {
'name': f'vlan-{network.vlan_id}',
'tag': network.vlan_id,
Expand All @@ -64,22 +64,34 @@ def execute(self,
'syncacheThreshold': CONF.networking.syncache_threshold
}

device_response = bigip.get(path=f"/mgmt/tm/net/vlan/~Common~{vlan['name']}?expandSubcollections=true")
# Create vlan if not existing
if device_response.status_code == 404:
if existing_vlan is None:
res = bigip.post(path='/mgmt/tm/net/vlan', json=vlan)
res.raise_for_status()
return res.json()

device_vlan = device_response.json()
if not vlan.items() <= device_vlan.items():
if not vlan.items() <= existing_vlan.items():
res = bigip.patch(path=f"/mgmt/tm/net/vlan/~Common~{vlan['name']}",
json=vlan)
res.raise_for_status()
return res.json()

# No Changes needed
return device_vlan
return existing_vlan

@decorators.RaisesIControlRestError()
def revert(self, network: f5_network_models.Network,
bigip: bigip_restclient.BigIPRestClient,
existing_vlan, *args, **kwargs):
if existing_vlan is not None:
LOG.warning(f"Reverting EnsureVLAN: Not deleting VLAN, since it existed before "
f"the task was run: {existing_vlan}")
return
res = bigip.delete(path=f"/mgmt/tm/net/vlan/~Common~vlan-{network.vlan_id}")
if not res.ok:
LOG.warning("Reverting EnsureVLAN: Failed removing VLAN on the device %s for "
"vlan_id=%s: %s", bigip.hostname, network.vlan_id, res.content)
res.raise_for_status()


class EnsureVLANInterface(task.Task):
Expand Down Expand Up @@ -158,29 +170,49 @@ class EnsureRouteDomain(task.Task):
stop=tenacity.stop_after_attempt(3)
)
def execute(self, network: f5_network_models.Network,
bigip: bigip_restclient.BigIPRestClient):

bigip: bigip_restclient.BigIPRestClient,
existing_route_domain: dict):
vlans = [f"/Common/vlan-{network.vlan_id}"]
rd = {'name': f"vlan-{network.vlan_id}", 'vlans': vlans, 'id': network.vlan_id}

device_response = bigip.get(path=f"/mgmt/tm/net/route-domain/{rd['name']}")
if device_response.status_code == 404:
path = f"/mgmt/tm/net/route-domain/net-{network.id}"
device_response = bigip.get(path=path)

# Create route_domain if not existing
if device_response.status_code == 404:
if existing_route_domain is None:
res = bigip.post(path='/mgmt/tm/net/route-domain', json=rd)
res.raise_for_status()
return res.json()

device_rd = device_response.json()
if device_rd.get('vlans', []) != vlans:
res = bigip.patch(path=f"/mgmt/tm/net/route-domain/{device_rd['fullPath']}",
if existing_route_domain.get('vlans', []) != vlans:
res = bigip.patch(path=f"/mgmt/tm/net/route-domain/{existing_route_domain['fullPath']}",
json={'vlans': vlans})
res.raise_for_status()
return res.json()
return device_rd

return existing_route_domain

@decorators.RaisesIControlRestError()
def revert(self, network: f5_network_models.Network,
bigip: bigip_restclient.BigIPRestClient,
existing_route_domain, *args, **kwargs):
paths = [
f"/mgmt/tm/net/route-domain/vlan-{network.vlan_id}",
f"/mgmt/tm/net/route-domain/net-{network.id}"
]
if existing_route_domain is not None:
LOG.warning(f"Reverting EnsureRouteDomain: Not deleting RouteDomain, since it existed before "
f"the task was run: {existing_route_domain}")
return

res = None
for path in paths:
if bigip.get(path=path).ok:
res = bigip.delete(path=path)
break

if res and not res.ok:
LOG.warning("Reverting EnsureRouteDomain: Failed removing route domain on the device %s "
"for network_id=%s vlan_id=%s: %s",
bigip.hostname, network.id, network.vlan_id, res.content)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please reraise the error, like this: res.raise_for_status().

res.raise_for_status()


class EnsureSelfIP(task.Task):
Expand All @@ -190,7 +222,6 @@ class EnsureSelfIP(task.Task):
def execute(self, bigip: bigip_restclient.BigIPRestClient,
network: f5_network_models.Network,
port: network_models.Port):

# payload
name = f"port-{port.id}"
vlan = f"/Common/vlan-{network.vlan_id}"
Expand Down Expand Up @@ -225,7 +256,6 @@ def revert(self, port: network_models.Port,
bigip: bigip_restclient.BigIPRestClient,
existing_selfips, *args, **kwargs):
selfip_name = f"port-{port.id}"

# don't remove the SelfIP if it existed before this task was executed
if port.id in [p['port_id'] for p in existing_selfips]:
LOG.warning("Reverting EnsureSelfIP: Not deleting SelfIP, since it existed before the task was run: "
Expand All @@ -241,6 +271,34 @@ def revert(self, port: network_models.Port,
device_response.raise_for_status()


class GetExistingVLAN(task.Task):
default_provides = 'existing_vlan'

@decorators.RaisesIControlRestError()
def execute(self, bigip: bigip_restclient.BigIPRestClient,
network: f5_network_models.Network):
device_response = bigip.get(path=f"/mgmt/tm/net/vlan/~Common~vlan-{network.vlan_id}?expandSubcollections=true")
if device_response.status_code == 404:
return None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please either reraise any error (device_response.raise_for_status()) or remove the RaisesIControlRestError annotation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not need to that here, because if resource does not exist it's ok and we will create it in the next task

return device_response.json()


class GetExistingRouteDomain(task.Task):
default_provides = 'existing_route_domain'

@decorators.RaisesIControlRestError()
def execute(self, bigip: bigip_restclient.BigIPRestClient,
network: f5_network_models.Network):
device_response = bigip.get(path=f"/mgmt/tm/net/route-domain/vlan-{network.vlan_id}")
if device_response.status_code == 404:
path = f"/mgmt/tm/net/route-domain/net-{network.id}"
device_response = bigip.get(path=path)

if device_response.status_code == 404:
return None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please either reraise any error (device_response.raise_for_status()) or remove the RaisesIControlRestError annotation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not need to that here, because if resource does not exist it's ok and we will create it in the next task

return device_response.json()


class GetExistingSelfIPsForVLAN(task.Task):
default_provides = 'existing_selfips'

Expand Down Expand Up @@ -289,7 +347,6 @@ class EnsureDefaultRoute(task.Task):
def execute(self, bigip: bigip_restclient.BigIPRestClient,
subnet_id: str,
network: f5_network_models.Network):

if CONF.networking.route_on_active and not bigip.is_active:
# Skip passive device if route_on_active is enabled
return None
Expand Down Expand Up @@ -334,7 +391,6 @@ class EnsureSubnetRoute(task.Task):
def execute(self, bigip: bigip_restclient.BigIPRestClient,
network: f5_network_models.Network,
subnet_id):

# Skip passive device if route_on_active is enabled
if CONF.networking.route_on_active and not bigip.is_active:
return
Expand Down Expand Up @@ -374,7 +430,6 @@ def revert(self, bigip: bigip_restclient.BigIPRestClient,
subnet_id, existing_subnet_routes,
*args, **kwargs):
subnet_route_name = get_subnet_route_name(network.id, subnet_id)

# Don't remove the route if it existed before this task was executed
if subnet_route_name in [r['name'] for r in existing_subnet_routes]:
LOG.warning("Reverting EnsureSubnetRoute: Not deleting route, since it existed before the task was run: "
Expand Down
Loading
Loading