Skip to content

Commit

Permalink
Merge pull request #13 from bitcraze/jonasdn/crazyswarm
Browse files Browse the repository at this point in the history
Add testing of swarms using Crazyswarm project
  • Loading branch information
jonasdn authored Jul 8, 2021
2 parents aed12da + 3a93115 commit 0c74b8b
Show file tree
Hide file tree
Showing 10 changed files with 409 additions and 47 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/crazylab-malmö.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
run: python3 management/program.py --file nightly/firmware-cf2-nightly.zip

- name: Run test suite
run: pytest --verbose --junit-xml $CRAZY_SITE-$(date +%s).xml
run: pytest --verbose --junit-xml $CRAZY_SITE-$(date +%s).xml tests/QA

- name: Checkout Crazyflie python library
uses: actions/checkout@v2
Expand Down
79 changes: 79 additions & 0 deletions .github/workflows/crazyswarm-malmö.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
name: Crazyswarm test 🐝

# Controls when the action will run.
on:
workflow_dispatch:
schedule:
- cron: '0 4 * * *'

jobs:
swarm_at_crazylab:
runs-on: self-hosted
container:
image: ros:noetic
options: --privileged

env:
CSW_PYTHON: python3
CRAZYSWARM_PATH: crazyswarm
CRAZYSWARM_YAML: crazylab-malmö.yaml

steps:
- name: Update apt sources
run: |
echo "deb http://ports.ubuntu.com/ubuntu-ports focal main restricted" >> /etc/apt/sources.list
sudo apt-get update
- name: Install Crazyswarm dependencies
run: |
sudo apt-get install -y git swig libpython3-dev python3-numpy \
python3-yaml python3-matplotlib python3-pytest python3-scipy \
libpcl-dev libusb-1.0-0-dev sdcc ros-noetic-vrpn gcc-arm-none-eabi \
ros-noetic-tf ros-noetic-tf-conversions python3-toml python3-pip
- name: Check out sources
uses: actions/checkout@v2

- name: Download latest firmware files
with:
repo: bitcraze/crazyflie-release
workflow: nightly.yml
uses: dawidd6/action-download-artifact@v2.14.0

- name: Check out sources
uses: actions/checkout@v2

- name: Download latest firmware files
with:
repo: bitcraze/crazyflie-release
workflow: nightly.yml
uses: dawidd6/action-download-artifact@v2.14.0

- name: Checkout Crazyswarm
uses: actions/checkout@v2
with:
repository: USC-ACTLab/crazyswarm
path: crazyswarm

- name: Build Crazyswarm
run: |
source /opt/ros/noetic/setup.bash
cd crazyswarm
./build.sh
shell: bash

- name: Install Crazyflie python library
run: pip3 install git+https://github.com/bitcraze/crazyflie-lib-python.git@master

- name: Upgrade swarm to latest firmware
run: |
source crazyswarm/ros_ws/devel/setup.bash
python3 management/program_swarm.py nightly/firmware-tag-nightly.zip --sim
shell: bash

- name: Run Crazyswarm tests
run: |
source crazyswarm/ros_ws/devel/setup.bash
roslaunch swarms/crazylab-malmö.launch &
python3 -m pytest --verbose tests/crazyswarm
shell: bash
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ repos:
rev: 7192665e31cea6ace58a71e086c7248d7e5610c2
hooks:
- id: flake8
args: [--ignore=E501]
25 changes: 23 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ See the file `sites/single-cf.toml` for the site file format to define new test

To run the test for a single Crazyflie, run:
```
CRAZY_SITE=single-cf pytest --verbose
CRAZY_SITE=single-cf pytest --verbose tests/QA
```

If you have defined your own site, then change the `CRAZY_SITE` environment
Expand All @@ -32,4 +32,25 @@ management/program.py - Flash a firmware file to devices in site
management/bootloader_addresses.py - List all devices bootloader addresses
management/recover.py - Attempt to recover one or all device(s)
from bootloader mode
```
management/program_swarm.py - Flash a firmware file to devices in a swarm
```

## Testing with Crazyswarm
It also possible to test using the [Crazyswarm](https://github.com/USC-ACTLab/crazyswarm) project.
You will need to specify your swarm in `swarms/name.yaml` and a [ROS](https://www.ros.org/) launch file in `swarms/name.launch` you can check the `swarms/crazylab-malmö.[yaml|launch]` files for inspiration.

To run the Crazyswarm tests or flash firmware files to a swarm you need to define some environment variables:

```
$ export CRAZYSWARM_PATH=[path to checked out crazyswarm source]
$ export CRAZYSWARM_YAML=[name_of_your.yaml]
$ source $CRAZYSWARM_PATH/ros_ws/devel/setup.bash
$ roslaunch [path to your launch file] &
# To flash all devices in swarm
$ python3 management/program_swarm.py [firmware file]
# To run the crazyswarm tests
$ python3 -m pytest tests/crazyswarm
```
95 changes: 94 additions & 1 deletion conftest.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import pytest
import binascii
import os
import time
import toml
import glob
import struct
import sys

from typing import Callable
from typing import List
from typing import NoReturn
from typing import Optional

Expand Down Expand Up @@ -134,7 +138,43 @@ def test_setup(request):
fix.device.cf.close_link()


def get_devices():
def get_bl_address(dev: BCDevice) -> str:
'''
Send the BOOTLOADER_CMD_RESET_INIT command to the NRF firmware
and receive the bootloader radio address in the response
'''
address = None
link = cflib.crtp.get_link_driver(dev.link_uri)
if link is None:
return None

# 0xFF => BOOTLOADER CMD
# 0xFE => To the NRF firmware
# 0xFF => BOOTLOADER_CMD_RESET_INIT (to get bl address)
pk = CRTPPacket(0xFF, [0xFE, 0xFF])
link.send_packet(pk)

timeout = 5 # seconds
ts = time.time()
while time.time() - ts < timeout:
pk = link.receive_packet(2)
if pk is None:
continue

# Header 0xFF means port is 0xF ((header & 0xF0) >> 4)) and channel
# is 0x3 (header & 0x03).
if pk.port == 0xF and pk.channel == 0x3 and len(pk.data) > 3:
# 0xFE is NRF target id, 0xFF is BOOTLOADER_CMD_RESET_INIT
if struct.unpack('<BB', pk.data[0:2]) != (0xFE, 0xFF):
continue
address = 'B1' + binascii.hexlify(pk.data[2:6][::-1]).upper().decode('utf8') # noqa
break

link.close()
return address


def get_devices() -> List[BCDevice]:
devices = list()

site = os.getenv('CRAZY_SITE')
Expand All @@ -154,6 +194,59 @@ def get_devices():
return devices


def get_swarm() -> List[BCDevice]:
'''
Given a path to the Crazyswarm project source and path in the
CRAZYSWARM_PATH environment variable and a path to a YAML file defining
a swarm in CRAZYSWARM_YAML return a list of BCDevice.
'''
devices = list()

try:
crazyswarm_path = os.environ['CRAZYSWARM_PATH']
sys.path.append(os.path.join(
crazyswarm_path,
'ros_ws/src/crazyswarm/scripts'
))
sys.path.append(os.path.join(
crazyswarm_path,
'ros_ws/src/crazyflie_ros'
))
from pycrazyswarm import Crazyswarm

crazyflies_yaml = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
'swarms',
os.environ['CRAZYSWARM_YAML']
)
cs = Crazyswarm(crazyflies_yaml=crazyflies_yaml)

for cf in cs.allcfs.crazyflies:
address = 'E7E7E7E7{:X}'.format(cf.id)

# get URI from address using scan
found = cflib.crtp.scan_interfaces(int(address, 16))
if not found:
raise Exception(f'No device found @ {address}!')

dev = BCDevice(
name=f'swarm-{cf.id}',
device={
'radio': found[0][0],
'bootloader_radio': None,
}
)
devices.append(dev)
except KeyError as err:
print('CRAZYSWARM_PATH or CRAZYSWARM_YAML not set', file=sys.stderr)
raise err
except ImportError as err:
print('Failed to import pycrazyswarm', file=sys.stderr)
raise err

return devices


class Requirements(dict):
_instance = None

Expand Down
44 changes: 1 addition & 43 deletions management/bootloader_addresses.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,9 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import binascii
import cflib
import logging
import os
import struct
import sys
import time

from cflib.crtp.crtpstack import CRTPPacket

#
# This is to make it possible to import from conftest
Expand All @@ -28,47 +22,11 @@
parentdir = os.path.join(currentdir, '..')
sys.path.append(parentdir)

from conftest import BCDevice, get_devices # noqa
from conftest import BCDevice, get_devices, get_bl_address # noqa

logger = logging.getLogger(__name__)


def get_bl_address(dev: BCDevice) -> str:
'''
Send the BOOTLOADER_CMD_RESET_INIT command to the NRF firmware
and receive the bootloader radio address in the response
'''
address = None
link = cflib.crtp.get_link_driver(dev.link_uri)
if link is None:
return None

# 0xFF => BOOTLOADER CMD
# 0xFE => To the NRF firmware
# 0xFF => BOOTLOADER_CMD_RESET_INIT (to get bl address)
pk = CRTPPacket(0xFF, [0xFE, 0xFF])
link.send_packet(pk)

timeout = 5 # seconds
ts = time.time()
while time.time() - ts < timeout:
pk = link.receive_packet(2)
if pk is None:
continue

# Header 0xFF means port is 0xF ((header & 0xF0) >> 4)) and channel
# is 0x3 (header & 0x03).
if pk.port == 0xF and pk.channel == 0x3 and len(pk.data) > 3:
# 0xFE is NRF target id, 0xFF is BOOTLOADER_CMD_RESET_INIT
if struct.unpack('<BB', pk.data[0:2]) != (0xFE, 0xFF):
continue
address = 'B1' + binascii.hexlify(pk.data[2:6][::-1]).upper().decode('utf8')
break

link.close()
return address


def list_addresses():
for dev in get_devices():
address = get_bl_address(dev)
Expand Down
64 changes: 64 additions & 0 deletions management/program_swarm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Copyright (C) 2021 Bitcraze AB
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, in version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import os
import sys
import traceback

from pathlib import Path

import cflib.crtp

# Initiate the low level drivers
cflib.crtp.init_drivers()

#
# This is to make it possible to import from conftest
#
currentdir = os.path.dirname(os.path.realpath(__file__))
parentdir = os.path.join(currentdir, '..')
sys.path.append(parentdir)

from conftest import get_swarm # noqa

logger = logging.getLogger(__name__)

current_frame = 0


def progress_cb(msg: str, percent: int):
global current_frame
frames = ['◢', '◣', '◤', '◥']
frame = frames[current_frame % 4]

print('{} {}% {}'.format(frame, percent, msg), end='\r')
current_frame += 1


def program_swarm(fw_file: Path) -> bool:
for dev in get_swarm():
try:
print('Programming device: {}'.format(dev))
dev.flash(fw_file, progress_cb)
except Exception as err:
print('Programming failed: {}'.format(str(err)), file=sys.stderr)
traceback.print_exc()
return False

return True


if __name__ == "__main__":
if not program_swarm(sys.argv[1]):
sys.exit(1)
Loading

0 comments on commit 0c74b8b

Please sign in to comment.