Save this as commission.py
:
#!/usr/bin/env python
# Author: Pavel Kirienko <pavel.kirienko@zubax.com>
"""
This utility is used for automatic commissioning of a new device for debugging purposes (not production).
It is designed to run without any external dependencies other than Python itself (batteries included, yay).
In order for the firmware to run, the device must possess a valid certificate of authenticity
(CoA, aka digital signature), which is verified by both the bootloader and the firmware itself.
This script will generate a valid CoA and then write it into the device along with the bootloader image.
After this is done, the device can be used for debugging the firmware (or anything, really).
This is similar to the signing process performed by DrWatson during the normal production run sans the testing.
The utility requires Internet connectivity for downloading the bootloader image and obtaining the CoA from
the licensing server. The latter may require access credentials; please contact the project management for those.
The credentials are provided via environment variables named ZUBAX_LICENSING_LOGIN and ZUBAX_LICENSING_PASSWORD.
Consider exporting these from your shell's rc file if the defaults don't work for you.
"""
from __future__ import annotations
import os
import ssl
import sys
import json
import base64
import logging
import pathlib
import tempfile
import argparse
import subprocess
import dataclasses
import urllib.request, urllib.error
from functools import partial
from typing import Optional
DEFAULT_BOOTLOADER_BINARY_URI = (
"https://files.zubax.com/products/com.zubax.telega/bootloader/com.zubax.telega.bootloader.bin"
)
DEFAULT_LICENSING_SERVICE_LOGIN = "ZubaxRobotics"
DEFAULT_LICENSING_SIGNATURE_GENERATION_ENDPOINT = "https://licensing.zubax.com/api/v1/signature/generate"
TOOLCHAIN_PREFIX = "arm-none-eabi-"
UNIQUE_ID_ADDRESS = 0x1FFF_7A10 # STM32F446
UNIQUE_ID_SIZE = 12
BOOTLOADER_ORIGIN = 0x0800_0000 # STM32
APPLICATION_ORIGIN = BOOTLOADER_ORIGIN + 48 * 1024
COA_OFFSET_FROM_BOOTLOADER_ORIGIN = 48 * 1024 - 256
_logger = logging.getLogger(__name__.strip("_"))
class GDBError(RuntimeError):
pass
class GDBInterface:
def __init__(self, port_name: str, toolchain_prefix: str):
self._port_name: str = port_name
self._toolchain_prefix: str = toolchain_prefix
def _check_port(self) -> None:
try:
with open(self._port_name):
pass
except FileNotFoundError:
raise FileNotFoundError(f"GDB port {self._port_name!r} does not exist; debugger disconnected?") from None
def _run_tc(self, fmt, *a):
self._check_port()
cmd = self._toolchain_prefix + fmt
if a:
cmd %= a
p = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if p.stdout:
_logger.debug("Toolchain call stdout:\n%s", p.stdout.decode())
if p.stderr:
_logger.debug("Toolchain call stderr:\n%s", p.stderr.decode())
if p.returncode != 0:
raise GDBError(f"GDB command {cmd!r} failed with exit code {p.returncode}")
def write_memory(self, start_address: int, data: bytes) -> None:
with tempfile.TemporaryDirectory("-drwatson") as tmpdir:
_logger.debug("Executable scratchpad directory: %r", tmpdir)
fn = partial(os.path.join, tmpdir)
# Generating ELF from the provided data.
# We can't just use "restore" (which would have been so much easier) because it doesn't support writing
# to flash, failing with message "Writing to flash memory forbidden in this context".
with open(fn("fw.bin"), "wb") as f:
f.write(data)
with open(fn("link.ld"), "w") as f:
f.write("SECTIONS { . = %s; .text : { *(.text) } }" % start_address)
self._run_tc("ld -b binary -r -o %s %s", fn("tmp.elf"), fn("fw.bin"))
self._run_tc(
"objcopy --rename-section .data=.text --set-section-flags .data=alloc,code,load %s", fn("tmp.elf")
)
self._run_tc("ld %s -T %s -o %s", fn("tmp.elf"), fn("link.ld"), fn("output.elf"))
# Loading the ELF onto the target.
with open(fn("script.gdb"), "w") as f:
f.write(
"\n".join(
[
"target extended-remote %s" % self._port_name,
"mon swdp_scan",
"attach 1",
"set mem inaccessible-by-default off",
"load",
"kill",
"quit 0",
]
)
)
self._run_tc("gdb %s --batch -x %s -return-child-result", fn("output.elf"), fn("script.gdb"))
def read_memory(self, start_address: int, size: int) -> bytes:
with tempfile.TemporaryDirectory("-drwatson") as tmpdir:
fn = partial(os.path.join, tmpdir)
with open(fn("script.gdb"), "w") as f:
f.write(
"\n".join(
[
"target extended-remote %s" % self._port_name,
"mon swdp_scan",
"attach 1",
"set mem inaccessible-by-default off",
"dump bin memory %s 0x%x 0x%x" % (fn("dump.bin"), start_address, start_address + size),
"kill",
"quit 0",
]
)
)
self._run_tc("gdb --batch -x %s", fn("script.gdb"))
try:
data = open(fn("dump.bin"), "rb").read()
except FileNotFoundError:
raise GDBError("Could not read memory. Is the interface connected?") from None
if len(data) != size:
raise GDBError(f"Memory dump size mismatch: expected to read {size} bytes, got {len(data)} bytes")
return data
@dataclasses.dataclass(frozen=True)
class Parameters:
bootloader_uri: str
application_uri: str
gdb_port: str
licensing_login: str
licensing_password: Optional[str]
def execute(par: Parameters) -> None:
_logger.info("Reading the unique-ID via GDB using port %r", par.gdb_port)
iface = GDBInterface(par.gdb_port, TOOLCHAIN_PREFIX)
uid = iface.read_memory(UNIQUE_ID_ADDRESS, UNIQUE_ID_SIZE).ljust(16, b"\0")
# Lower the SSL security level as suggested by Clyde to work around the SSL version issue on the server.
# Remove this after the server is updated. See https://zubaxrobotics.slack.com/archives/CM268KFT4/p1640294820163400
ssl_ctx = ssl.create_default_context()
ssl_ctx.set_ciphers("DEFAULT@SECLEVEL=1")
_logger.debug("Downloading bootloader image from %r", par.bootloader_uri)
bootloader_image = urllib.request.urlopen(par.bootloader_uri, timeout=5, context=ssl_ctx).read()
assert isinstance(bootloader_image, bytes)
_logger.info("Downloaded bootloader image of %d bytes from %r", len(bootloader_image), par.bootloader_uri)
_logger.info("Requesting the CoA from the licensing service for UID %s", uid.hex())
request_args = {
"unique_id": uid.hex(),
"product_name": "com.zubax.telega",
}
req = urllib.request.Request(
DEFAULT_LICENSING_SIGNATURE_GENERATION_ENDPOINT,
data=json.dumps(request_args).encode(),
headers={
"Content-Type": "application/json",
"Authorization": "Basic "
+ base64.b64encode(f"{par.licensing_login}:{par.licensing_password or ''}".encode()).decode(),
},
unverifiable=True,
)
_logger.debug("Licensing service request: url=%r headers=%r data=%r", req.full_url, req.headers, req.data)
try:
with urllib.request.urlopen(req, context=ssl_ctx) as f:
response = json.loads(f.read().decode())
except (urllib.error.HTTPError, urllib.error.URLError) as e:
_logger.warning("Could not obtain CoA from the licensing service, hopefully we can manage without it: %s", e)
_logger.info("Checking if the device has a CoA already...")
coa = iface.read_memory(BOOTLOADER_ORIGIN + COA_OFFSET_FROM_BOOTLOADER_ORIGIN, 256)
if not coa.replace(b"\xFF", b"") or not coa.replace(b"\x00", b""):
raise RuntimeError(
"Could not obtain CoA from the licensing server and the device does not have one either"
) from e
_logger.info("Existing CoA installed on the device: %s", coa.hex())
else:
_logger.debug("Licensing service response: %r", response)
coa = bytes.fromhex(response["signature"])
_logger.info("CoA provided by the licensing service (new=%s): %s", response["new"], coa.hex())
rom_image = bootloader_image.ljust(COA_OFFSET_FROM_BOOTLOADER_ORIGIN, b"\xBA") + coa
assert len(rom_image) <= (APPLICATION_ORIGIN - BOOTLOADER_ORIGIN)
_logger.info("Writing %.3f KiB (bootloader+CoA) at 0x%08x...", len(rom_image) / 1024, BOOTLOADER_ORIGIN)
iface.write_memory(BOOTLOADER_ORIGIN, rom_image)
_logger.debug("Verifying...")
if iface.read_memory(BOOTLOADER_ORIGIN, len(rom_image)) != rom_image:
raise RuntimeError("Bootloader write verification failed; maybe the device is damaged?")
_logger.info("The bootloader and the CoA have been installed successfully")
if par.application_uri:
_logger.info("Application write requested. Image URI: %s", par.application_uri)
application_image = urllib.request.urlopen(par.application_uri, timeout=5, context=ssl_ctx).read()
assert isinstance(application_image, bytes)
_logger.info(
"Downloaded application image of %.3f KiB from %r; writing...",
len(application_image) / 1024,
par.application_uri,
)
iface.write_memory(APPLICATION_ORIGIN, application_image)
_logger.debug("Verifying...")
if iface.read_memory(APPLICATION_ORIGIN, len(application_image)) != application_image:
raise RuntimeError("Application write verification failed; maybe the device is damaged?")
_logger.info("Application written successfully")
else:
_logger.debug("Application write was not requested")
def _init() -> Parameters:
parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter, description=globals()["__doc__"])
parser.add_argument(
"--gdb-port",
"-g",
metavar="LOCATION",
help="GDB port to use with 'target extended-remote'; e.g., /dev/ttyACM0. "
"If not specified, the correct port will be auto-detected, unless there is more than one available.",
)
parser.add_argument(
"--bootloader-image",
"-b",
metavar="LOCATION",
default=DEFAULT_BOOTLOADER_BINARY_URI,
help="Path or URI of the bootloader binary image (*.bin). Default: %(default)s",
)
parser.add_argument(
"--application-image",
"-a",
metavar="LOCATION",
help="Path or URI of the application binary image (*.app.bin). "
"If not specified, only the bootloader will be written, which is what you need most of the times.",
)
parser.add_argument(
"--verbose",
"-v",
action="count",
default=0,
help="Enable verbose output. Twice for extra verbosity.",
)
args = parser.parse_args()
logging.basicConfig(
stream=sys.stderr,
format="%(asctime)s %(levelname)-5.5s %(name)s: %(message)s",
level={
0: logging.WARNING,
1: logging.INFO,
}.get(args.verbose, logging.DEBUG),
)
_logger.debug("CLI arguments: %r", args)
if not args.gdb_port:
options = list(pathlib.Path("/dev/serial/by-id/").glob("usb*Black*Magic*Probe*if00"))
if not options:
raise RuntimeError("Debugger port is not specified explicitly and automatic search returned no matches")
if len(options) > 1:
raise RuntimeError(f"Not sure which debugger to use, please specify one explicitly: {options}")
args.gdb_port = str(options[0].resolve())
if not "://" in args.bootloader_image:
args.bootloader_image = "file://" + urllib.request.pathname2url(os.path.abspath(args.bootloader_image))
if args.application_image and ("://" not in args.application_image):
args.application_image = "file://" + urllib.request.pathname2url(os.path.abspath(args.application_image))
par = Parameters(
bootloader_uri=str(args.bootloader_image),
application_uri=str(args.application_image) if args.application_image else "",
gdb_port=str(args.gdb_port),
licensing_login=os.environ.get("ZUBAX_LICENSING_LOGIN", DEFAULT_LICENSING_SERVICE_LOGIN),
licensing_password=os.environ.get("ZUBAX_LICENSING_PASSWORD"),
)
_logger.debug("Using parameters: %r", par)
return par
if __name__ == "__main__":
try:
execute(_init())
except KeyboardInterrupt:
_logger.warning("Interrupted")
exit(2)
except Exception as ex:
_logger.debug("Unhandled exception: %r", ex, exc_info=True)
_logger.fatal("Failure (use --help to get usage info): %s: %s", type(ex).__name__, str(ex) or repr(ex))
exit(1)
Run it like this to update both the bootloader and the application while preserving the device signature:
./commission.py --application-image=https://files.zubax.com/products/com.zubax.telega/com.zubax.telega-1-0.4.8783aada.application.bin
Use --help
to get help. You will need a Zubax Dronecode Probe and the ARM GNU Toolchain.