Enhanced WiFi setup

devel
Hansi, dl9rdz 2024-01-07 22:31:03 +00:00
rodzic 1081fc6373
commit d5393ca8eb
8 zmienionych plików z 774 dodań i 80 usunięć

Wyświetl plik

@ -1293,9 +1293,19 @@ void SetupAsyncServer() {
request->send(200, "application/gpx+xml", sendGPX(request));
else {
// TODO: set correct type for .js
request->send(SPIFFS, url, "text/html");
// Caching is an important work-around for a bug somewhere in the network stack that causes corrupt replies
// with platform-espressif32 (some TCP segments simply get lost before being sent, so reply header and parts of data is missing)
// This happens with concurrent requests, notably if a browser fetches rdz.js and cfg.js concurrently for config.html
// With the cache, rdz.js is likely already in the cache...0
Serial.printf("URL is %s\n", url.c_str());
//request->send(404);
AsyncWebServerResponse *response = request->beginResponse(SPIFFS, url, "text/html");
if(response) {
response->addHeader("Cache-Control", "max-age=900");
request->send(response);
} else {
request->send(404);
}
}
}
});
@ -2173,7 +2183,7 @@ void loopDecoder() {
}
void setCurrentDisplay(int value) {
Serial.printf("setCurrentDisplay: setting index %d, entry %d\b", value, sonde.config.display[value]);
Serial.printf("setCurrentDisplay: setting index %d, entry %d\n", value, sonde.config.display[value]);
currentDisplay = sonde.config.display[value];
}
@ -2320,6 +2330,7 @@ void WiFiEvent(WiFiEvent_t event)
wifi_state = WIFI_DISABLED;
WiFi.disconnect(true);
}
WiFi.mode(WIFI_MODE_NULL);
break;
case SYSTEM_EVENT_STA_AUTHMODE_CHANGE:
Serial.println("Authentication mode of access point has changed");
@ -2422,17 +2433,28 @@ void wifiConnect(int16_t res) {
}
}
void wifiConnectDirect(int16_t index) {
Serial.println("AP mode 4: trying direct reconnect");
WiFi.begin(fetchWifiSSID(index), fetchWifiPw(index));
wifi_state = WIFI_CONNECT;
}
static int wifi_cto;
void loopWifiBackground() {
// Serial.printf("WifiBackground: state %d\n", wifi_state);
Serial.printf("WifiBackground: state %d\n", wifi_state);
// handle Wifi station mode in background
if (sonde.config.wifi == 0 || sonde.config.wifi == 2) return; // nothing to do if disabled or access point mode
if (wifi_state == WIFI_DISABLED) { // stopped => start can
wifi_state = WIFI_SCAN;
Serial.println("WiFi start scan");
WiFi.scanNetworks(true); // scan in async mode
if (sonde.config.wifi == 4) { // direct connect to first network, supports hidden SSID
wifiConnectDirect(1);
wifi_cto = 0;
} else {
Serial.println("WiFi start scan");
wifi_state = WIFI_SCAN;
WiFi.scanNetworks(true); // scan in async mode
}
} else if (wifi_state == WIFI_SCAN) {
int16_t res = WiFi.scanComplete();
if (res == 0 || res == WIFI_SCAN_FAILED) {
@ -2463,6 +2485,7 @@ void loopWifiBackground() {
WiFi.disconnect(true);
}
} else if (wifi_state == WIFI_CONNECTED) {
Serial.printf("status: %d\n", ((WiFiSTAClass)WiFi).status());
if (!WiFi.isConnected()) {
sonde.setIP("", false);
sonde.updateDisplayIP();
@ -2470,7 +2493,7 @@ void loopWifiBackground() {
wifi_state = WIFI_DISABLED; // restart scan
enableNetwork(false);
WiFi.disconnect(true);
}
} else Serial.println("WiFi still connected");
}
}
@ -2526,84 +2549,92 @@ void loopTouchCalib() {
// Wifi modes
// 0: disabled. directly start initial mode (spectrum or scanner)
// 1: station mode in background. directly start initial mode (spectrum or scanner)
// 2: access point mode in background. directly start initial mode (spectrum or scanner)
// 1: Station mode, new version: start with synchronous WiFi scan, then
// - if button was pressed, switch to AP mode
// - if connect successful, all good
// - otherwise, continue with station mode in background
// 2: access point mode (wait for clients in background)
// 3: traditional sync. WifiScan. Tries to connect to a network, in case of failure activates AP.
// Mode 3 shows more debug information on serial port and display.
// 4: Station mode/hidden AP: same as 1, but instead of scan, just call espressif method to connect (will connect to hidden AP as well
#define MAXWIFIDELAY 40
static const char* _scan[2] = {"/", "\\"};
void loopWifiScan() {
if (sonde.config.wifi == 0) { // no Wifi
wifi_state = WIFI_DISABLED;
getKeyPressEvent(); // Clear any old events
WiFi.disconnect(true);
wifi_state = WIFI_DISABLED;
disp.rdis->setFont(FONT_SMALL);
uint8_t dispw, disph, dispxs, dispys;
disp.rdis->getDispSize(&disph, &dispw, &dispxs, &dispys);
int lastl = (disph / dispys - 2) * dispys;
int cnt = 0;
char abort = 0; // abort on keypress
switch(sonde.config.wifi) {
case 0: // no WiFi
initialMode();
return;
}
if (sonde.config.wifi == 1) { // station mode, setup in background
wifi_state = WIFI_DISABLED; // will start scanning in wifiLoopBackgroiund
initialMode();
return;
}
if (sonde.config.wifi == 2) { // AP mode, setup in background
case 2: // AP mode, setup in background
startAP();
enableNetwork(true);
initialMode();
return;
}
// wifi==3 => original mode with non-async wifi setup
disp.rdis->setFont(FONT_SMALL);
disp.rdis->drawString(0, 0, "WiFi Scan...");
uint8_t dispw, disph, dispxs, dispys;
disp.rdis->getDispSize(&disph, &dispw, &dispxs, &dispys);
case 4: // direct connect without scan, only first item in network list
// Mode STN/DIRECT[4]: Connect directly (supports hidden AP)
{
disp.rdis->drawString(0, 0, "WiFi Connect...");
const char *ssid = fetchWifiSSID(1);
WiFi.mode(WIFI_STA);
WiFi.begin( ssid, fetchWifiPw(1) );
disp.rdis->drawString(0, dispys * 2, ssid);
}
break;
case 1: // STATION mode (continue in BG if no connection)
case 3: // old AUTO mode (change to AP if no connection)
// Mode STATION[1] or SETUP[3]: Scan for networks;
disp.rdis->drawString(0, 0, "WiFi Scan...");
int line = 0;
int index = -1;
WiFi.mode(WIFI_STA);
int n = WiFi.scanNetworks();
for (int i = 0; i < n; i++) {
String ssid = WiFi.SSID(i);
disp.rdis->drawString(0, dispys * (1 + line), ssid.c_str());
line = (line + 1) % (disph / dispys);
String mac = WiFi.BSSIDstr(i);
String encryptionTypeDescription = translateEncryptionType(WiFi.encryptionType(i));
Serial.printf("Network %s: RSSI %d, MAC %s, enc: %s\n", ssid.c_str(), WiFi.RSSI(i), mac.c_str(), encryptionTypeDescription.c_str());
int curidx = fetchWifiIndex(ssid.c_str());
if (curidx >= 0 && index == -1) {
index = curidx;
Serial.printf("Match found at scan entry %d, config network %d\n", i, index);
}
}
if (index >= 0) { // some network was found
Serial.print("Connecting to: "); Serial.print(fetchWifiSSID(index));
Serial.print(" with password "); Serial.println(fetchWifiPw(index));
int line = 0;
int cnt = 0;
WiFi.disconnect(true);
WiFi.mode(WIFI_STA);
int index = -1;
int n = WiFi.scanNetworks();
for (int i = 0; i < n; i++) {
String ssid = WiFi.SSID(i);
disp.rdis->drawString(0, dispys * (1 + line), ssid.c_str());
line = (line + 1) % (disph / dispys);
String mac = WiFi.BSSIDstr(i);
String encryptionTypeDescription = translateEncryptionType(WiFi.encryptionType(i));
Serial.printf("Network %s: RSSI %d, MAC %s, enc: %s\n", ssid.c_str(), WiFi.RSSI(i), mac.c_str(), encryptionTypeDescription.c_str());
int curidx = fetchWifiIndex(ssid.c_str());
if (curidx >= 0 && index == -1) {
index = curidx;
Serial.printf("Match found at scan entry %d, config network %d\n", i, index);
disp.rdis->drawString(0, lastl, "Conn:");
disp.rdis->drawString(6 * dispxs, lastl, fetchWifiSSID(index));
WiFi.begin(fetchWifiSSID(index), fetchWifiPw(index));
} else {
abort = 2; // no network found in scan => abort right away
}
}
int lastl = (disph / dispys - 2) * dispys;
if (index >= 0) { // some network was found
Serial.print("Connecting to: "); Serial.print(fetchWifiSSID(index));
Serial.print(" with password "); Serial.println(fetchWifiPw(index));
disp.rdis->drawString(0, lastl, "Conn:");
disp.rdis->drawString(6 * dispxs, lastl, fetchWifiSSID(index));
WiFi.begin(fetchWifiSSID(index), fetchWifiPw(index));
while (WiFi.status() != WL_CONNECTED && cnt < MAXWIFIDELAY) {
delay(500);
Serial.print(".");
disp.rdis->drawString(15 * dispxs, lastl + dispys, _scan[cnt & 1]);
cnt++;
}
while (WiFi.status() != WL_CONNECTED && cnt < MAXWIFIDELAY && !abort) {
delay(500);
Serial.print(".");
disp.rdis->drawString(15 * dispxs, lastl + dispys, _scan[cnt & 1]);
cnt++;
handlePMUirq(); // Needed to react to PMU chip button
abort = (getKeyPressEvent() != EVT_NONE);
}
if (index < 0 || cnt >= MAXWIFIDELAY) { // no network found, or connect not successful
WiFi.disconnect(true);
delay(1000);
startAP();
IPAddress myIP = WiFi.softAPIP();
Serial.print("AP IP address: ");
Serial.println(myIP);
disp.rdis->drawString(0, lastl, "AP: ");
disp.rdis->drawString(6 * dispxs, lastl + 1, networks[0].id.c_str());
delay(3000);
} else {
Serial.println("");
Serial.println("WiFi connected");
Serial.println("IP address: ");
// We reach this point for mode 1, 3, and 4
// If connected (in any case) => all good, download eph if needed, all up and running
// Otherwise, If key was pressed, switch to AP mode
// Otherwise, if mode is 3 (old AUTO), switch to AP mode
// Otherwise, no network yet, keep trying to activate network in background (loopWiFiBackground)
if(WiFi.status() == WL_CONNECTED) {
Serial.println("\nWiFi connected\nIP address:");
String localIPstr = WiFi.localIP().toString();
Serial.println(localIPstr);
sonde.setIP(localIPstr.c_str(), false);
@ -2620,9 +2651,21 @@ void loopWifiScan() {
get_eph("/brdc");
}
#endif
enableNetwork(true);
delay(3000);
}
else if(sonde.config.wifi == 3 || abort==1 ) {
WiFi.disconnect(true);
delay(1000);
startAP();
IPAddress myIP = WiFi.softAPIP();
Serial.print("AP IP address: ");
Serial.println(myIP);
disp.rdis->drawString(0, lastl, "AP: ");
disp.rdis->drawString(6 * dispxs, lastl + 1, networks[0].id.c_str());
enableNetwork(true);
delay(3000);
}
enableNetwork(true);
initialMode();
}

Wyświetl plik

@ -3,7 +3,7 @@
#include "SX1278FSK.h"
#include "Sonde.h"
#define DECODERBASE_DEBUG 1
#define DECODERBASE_DEBUG 0
#if DECODERBASE_DEBUG
#define DBG(x) x

Wyświetl plik

@ -1,4 +1,4 @@
const char *version_name = "rdzTTGOsonde";
const char *version_id = "devel20231212";
const char *version_id = "devel20240107";
const int SPIFFS_MAJOR=2;
const int SPIFFS_MINOR=17;

Wyświetl plik

@ -0,0 +1,37 @@
We currently use https://github.com/platformio/platform-espressif32.git#v6.3.1 for platformIO builds
When upgrading to v6.4.0, there is a problem:
If a web browser sends concurrent requests (mainly reproducible with the "config" tab, where the config.html loads two files, rdz.js and cfg.js from SPIFFS),
one of the two often gets garbled (TCP response is missing its first 4 frames)
bad: two TCP packets with 1490 and 1094 bytes (data: 1436 and 1040 bytes)
good: TCP packets with 1490 (5x) and 1094 bytes (data: 1436 and 1040 bytes)
Somehow something seems to go wrong internally in the TCP stack. Not reproducible with a minimalistic example, but happens almost always with the full code base.
Two mitigations:
- Do not upgrade to v6.4.0
- Avoid double request by browser. For this, cache control has been added (useful anyway). Implication: After an upgrade, browser might still use old rdz.js and cfg.js for 15 Minutes.
**
In v6.5.0 there seems to be another issue: In AP mode the client does no longer get an IP address -- needs to be checked
**
Tested with 6.3.1:
Sometimes, when a AP is turned off (for < 1s) and turned on again, the WiFi produces a Disconnect event (#5) but no Clients cleared event (#3)
In that case, the WiFi state remains connected (as in WiFi.isConnected()), so network is not working, but never reestablished.
fix: added WiFi.mode(0) when event #5 is received.
Still, for each reconnect, there apparently is a memory leak. Heap goes down by 40 bytes for each reconnect.

Wyświetl plik

@ -0,0 +1,40 @@
#!/usr/bin/env python3
import socket
import sys
import time
# Connect the socket to the port where the server is listening
server_address = ('192.168.4.1', 80)
# Create a TCP/IP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(server_address)
# Create a TCP/IP socket
sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock2.connect(server_address)
# send http request
req = '''GET /{} HTTP/1.1
Host: 192.168.4.1
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:121.0) Gecko/20100101 Firefox/121.0
Accept: */*
Accept-Language: en-US,en;q=0.5
Accept-Encoding: gzip, deflate
DNT: 1
Connection: keep-alive
Referer: http://192.168.4.1/qrg.html
'''
sock.sendall(req.format("rdz.js").encode())
sock2.sendall(req.format("cfg.js").encode())
time.sleep(0.5)
sock.close()
sock2.close()

Wyświetl plik

@ -10,7 +10,6 @@
[platformio]
src_dir = RX_FSK
;# lib_dir = RX_FSK
data_dir = RX_FSK/data
[extra]
@ -19,15 +18,13 @@ lib_deps_builtin =
lib_deps_external =
olikraus/U8g2 @ ^2.35.8
stevemarple/MicroNMEA @ ^2.0.5
; This is an old version with regex support unconditionally compiled in (adds >100k of code)
; me-no-dev/ESP Async WebServer @ ^1.2.3
https://github.com/me-no-dev/ESPAsyncWebServer/archive/refs/heads/master.zip
; https://github.com/moononournation/Arduino_GFX#v1.1.5
https://github.com/moononournation/Arduino_GFX#v1.2.9
https://github.com/dx168b/async-mqtt-client
[env:ttgo-lora32]
platform = https://github.com/platformio/platform-espressif32.git#v6.4.0
# Issues with 6.4.0 (TCP corruption) and 6.5.0 (no DHCP response from AP) need to be investigated before upgrading further.
platform = https://github.com/platformio/platform-espressif32.git#v6.3.0
board = ttgo-lora32-v1
framework = arduino
monitor_speed = 115200

Wyświetl plik

@ -0,0 +1,577 @@
#!/usr/bin/env python
#
# ESP32 partition table generation tool
#
# Converts partition tables to/from CSV and binary formats.
#
# See https://docs.espressif.com/projects/esp-idf/en/latest/api-guides/partition-tables.html
# for explanation of partition table structure and uses.
#
# SPDX-FileCopyrightText: 2016-2021 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from __future__ import division, print_function, unicode_literals
import argparse
import binascii
import errno
import hashlib
import os
import re
import struct
import sys
MAX_PARTITION_LENGTH = 0xC00 # 3K for partition data (96 entries) leaves 1K in a 4K sector for signature
MD5_PARTITION_BEGIN = b'\xEB\xEB' + b'\xFF' * 14 # The first 2 bytes are like magic numbers for MD5 sum
PARTITION_TABLE_SIZE = 0x1000 # Size of partition table
MIN_PARTITION_SUBTYPE_APP_OTA = 0x10
NUM_PARTITION_SUBTYPE_APP_OTA = 16
__version__ = '1.2'
APP_TYPE = 0x00
DATA_TYPE = 0x01
TYPES = {
'app': APP_TYPE,
'data': DATA_TYPE,
}
def get_ptype_as_int(ptype):
""" Convert a string which might be numeric or the name of a partition type to an integer """
try:
return TYPES[ptype]
except KeyError:
try:
return int(ptype, 0)
except TypeError:
return ptype
# Keep this map in sync with esp_partition_subtype_t enum in esp_partition.h
SUBTYPES = {
APP_TYPE: {
'factory': 0x00,
'test': 0x20,
},
DATA_TYPE: {
'ota': 0x00,
'phy': 0x01,
'nvs': 0x02,
'coredump': 0x03,
'nvs_keys': 0x04,
'efuse': 0x05,
'undefined': 0x06,
'esphttpd': 0x80,
'fat': 0x81,
'spiffs': 0x82,
},
}
def get_subtype_as_int(ptype, subtype):
""" Convert a string which might be numeric or the name of a partition subtype to an integer """
try:
return SUBTYPES[get_ptype_as_int(ptype)][subtype]
except KeyError:
try:
return int(subtype, 0)
except TypeError:
return subtype
ALIGNMENT = {
APP_TYPE: 0x10000,
DATA_TYPE: 0x4,
}
STRICT_DATA_ALIGNMENT = 0x1000
def get_alignment_for_type(ptype):
return ALIGNMENT.get(ptype, ALIGNMENT[DATA_TYPE])
quiet = False
md5sum = True
secure = False
offset_part_table = 0
def status(msg):
""" Print status message to stderr """
if not quiet:
critical(msg)
def critical(msg):
""" Print critical message to stderr """
sys.stderr.write(msg)
sys.stderr.write('\n')
class PartitionTable(list):
def __init__(self):
super(PartitionTable, self).__init__(self)
@classmethod
def from_file(cls, f):
data = f.read()
data_is_binary = data[0:2] == PartitionDefinition.MAGIC_BYTES
if data_is_binary:
status('Parsing binary partition input...')
return cls.from_binary(data), True
data = data.decode()
status('Parsing CSV input...')
return cls.from_csv(data), False
@classmethod
def from_csv(cls, csv_contents):
res = PartitionTable()
lines = csv_contents.splitlines()
def expand_vars(f):
f = os.path.expandvars(f)
m = re.match(r'(?<!\\)\$([A-Za-z_][A-Za-z0-9_]*)', f)
if m:
raise InputError("unknown variable '%s'" % m.group(1))
return f
for line_no in range(len(lines)):
line = expand_vars(lines[line_no]).strip()
if line.startswith('#') or len(line) == 0:
continue
try:
res.append(PartitionDefinition.from_csv(line, line_no + 1))
except InputError as err:
raise InputError('Error at line %d: %s' % (line_no + 1, err))
except Exception:
critical('Unexpected error parsing CSV line %d: %s' % (line_no + 1, line))
raise
# fix up missing offsets & negative sizes
last_end = offset_part_table + PARTITION_TABLE_SIZE # first offset after partition table
for e in res:
if e.offset is not None and e.offset < last_end:
if e == res[0]:
raise InputError('CSV Error: First partition offset 0x%x overlaps end of partition table 0x%x'
% (e.offset, last_end))
else:
raise InputError('CSV Error: Partitions overlap. Partition at line %d sets offset 0x%x. Previous partition ends 0x%x'
% (e.line_no, e.offset, last_end))
if e.offset is None:
pad_to = get_alignment_for_type(e.type)
if last_end % pad_to != 0:
last_end += pad_to - (last_end % pad_to)
e.offset = last_end
if e.size < 0:
e.size = -e.size - e.offset
last_end = e.offset + e.size
return res
def __getitem__(self, item):
""" Allow partition table access via name as well as by
numeric index. """
if isinstance(item, str):
for x in self:
if x.name == item:
return x
raise ValueError("No partition entry named '%s'" % item)
else:
return super(PartitionTable, self).__getitem__(item)
def find_by_type(self, ptype, subtype):
""" Return a partition by type & subtype, returns
None if not found """
# convert ptype & subtypes names (if supplied this way) to integer values
ptype = get_ptype_as_int(ptype)
subtype = get_subtype_as_int(ptype, subtype)
for p in self:
if p.type == ptype and p.subtype == subtype:
yield p
return
def find_by_name(self, name):
for p in self:
if p.name == name:
return p
return None
def verify(self):
# verify each partition individually
for p in self:
p.verify()
# check on duplicate name
names = [p.name for p in self]
duplicates = set(n for n in names if names.count(n) > 1)
# print sorted duplicate partitions by name
if len(duplicates) != 0:
critical('A list of partitions that have the same name:')
for p in sorted(self, key=lambda x:x.name):
if len(duplicates.intersection([p.name])) != 0:
critical('%s' % (p.to_csv()))
raise InputError('Partition names must be unique')
# check for overlaps
last = None
for p in sorted(self, key=lambda x:x.offset):
if p.offset < offset_part_table + PARTITION_TABLE_SIZE:
raise InputError('Partition offset 0x%x is below 0x%x' % (p.offset, offset_part_table + PARTITION_TABLE_SIZE))
if last is not None and p.offset < last.offset + last.size:
raise InputError('Partition at 0x%x overlaps 0x%x-0x%x' % (p.offset, last.offset, last.offset + last.size - 1))
last = p
# check that otadata should be unique
otadata_duplicates = [p for p in self if p.type == TYPES['data'] and p.subtype == SUBTYPES[DATA_TYPE]['ota']]
if len(otadata_duplicates) > 1:
for p in otadata_duplicates:
critical('%s' % (p.to_csv()))
raise InputError('Found multiple otadata partitions. Only one partition can be defined with type="data"(1) and subtype="ota"(0).')
if len(otadata_duplicates) == 1 and otadata_duplicates[0].size != 0x2000:
p = otadata_duplicates[0]
critical('%s' % (p.to_csv()))
raise InputError('otadata partition must have size = 0x2000')
def flash_size(self):
""" Return the size that partitions will occupy in flash
(ie the offset the last partition ends at)
"""
try:
last = sorted(self, reverse=True)[0]
except IndexError:
return 0 # empty table!
return last.offset + last.size
def verify_size_fits(self, flash_size_bytes: int) -> None:
""" Check that partition table fits into the given flash size.
Raises InputError otherwise.
"""
table_size = self.flash_size()
if flash_size_bytes < table_size:
mb = 1024 * 1024
raise InputError('Partitions tables occupies %.1fMB of flash (%d bytes) which does not fit in configured '
"flash size %dMB. Change the flash size in menuconfig under the 'Serial Flasher Config' menu." %
(table_size / mb, table_size, flash_size_bytes / mb))
@classmethod
def from_binary(cls, b):
md5 = hashlib.md5()
result = cls()
for o in range(0,len(b),32):
data = b[o:o + 32]
if len(data) != 32:
raise InputError('Partition table length must be a multiple of 32 bytes')
if data == b'\xFF' * 32:
return result # got end marker
if md5sum and data[:2] == MD5_PARTITION_BEGIN[:2]: # check only the magic number part
if data[16:] == md5.digest():
continue # the next iteration will check for the end marker
else:
raise InputError("MD5 checksums don't match! (computed: 0x%s, parsed: 0x%s)" % (md5.hexdigest(), binascii.hexlify(data[16:])))
else:
md5.update(data)
result.append(PartitionDefinition.from_binary(data))
raise InputError('Partition table is missing an end-of-table marker')
def to_binary(self):
result = b''.join(e.to_binary() for e in self)
if md5sum:
result += MD5_PARTITION_BEGIN + hashlib.md5(result).digest()
if len(result) >= MAX_PARTITION_LENGTH:
raise InputError('Binary partition table length (%d) longer than max' % len(result))
result += b'\xFF' * (MAX_PARTITION_LENGTH - len(result)) # pad the sector, for signing
return result
def to_csv(self, simple_formatting=False):
rows = ['# ESP-IDF Partition Table',
'# Name, Type, SubType, Offset, Size, Flags']
rows += [x.to_csv(simple_formatting) for x in self]
return '\n'.join(rows) + '\n'
class PartitionDefinition(object):
MAGIC_BYTES = b'\xAA\x50'
# dictionary maps flag name (as used in CSV flags list, property name)
# to bit set in flags words in binary format
FLAGS = {
'encrypted': 0
}
# add subtypes for the 16 OTA slot values ("ota_XX, etc.")
for ota_slot in range(NUM_PARTITION_SUBTYPE_APP_OTA):
SUBTYPES[TYPES['app']]['ota_%d' % ota_slot] = MIN_PARTITION_SUBTYPE_APP_OTA + ota_slot
def __init__(self):
self.name = ''
self.type = None
self.subtype = None
self.offset = None
self.size = None
self.encrypted = False
@classmethod
def from_csv(cls, line, line_no):
""" Parse a line from the CSV """
line_w_defaults = line + ',,,,' # lazy way to support default fields
fields = [f.strip() for f in line_w_defaults.split(',')]
res = PartitionDefinition()
res.line_no = line_no
res.name = fields[0]
res.type = res.parse_type(fields[1])
res.subtype = res.parse_subtype(fields[2])
res.offset = res.parse_address(fields[3])
res.size = res.parse_address(fields[4])
if res.size is None:
raise InputError("Size field can't be empty")
flags = fields[5].split(':')
for flag in flags:
if flag in cls.FLAGS:
setattr(res, flag, True)
elif len(flag) > 0:
raise InputError("CSV flag column contains unknown flag '%s'" % (flag))
return res
def __eq__(self, other):
return self.name == other.name and self.type == other.type \
and self.subtype == other.subtype and self.offset == other.offset \
and self.size == other.size
def __repr__(self):
def maybe_hex(x):
return '0x%x' % x if x is not None else 'None'
return "PartitionDefinition('%s', 0x%x, 0x%x, %s, %s)" % (self.name, self.type, self.subtype or 0,
maybe_hex(self.offset), maybe_hex(self.size))
def __str__(self):
return "Part '%s' %d/%d @ 0x%x size 0x%x" % (self.name, self.type, self.subtype, self.offset or -1, self.size or -1)
def __cmp__(self, other):
return self.offset - other.offset
def __lt__(self, other):
return self.offset < other.offset
def __gt__(self, other):
return self.offset > other.offset
def __le__(self, other):
return self.offset <= other.offset
def __ge__(self, other):
return self.offset >= other.offset
def parse_type(self, strval):
if strval == '':
raise InputError("Field 'type' can't be left empty.")
return parse_int(strval, TYPES)
def parse_subtype(self, strval):
if strval == '':
if self.type == TYPES['app']:
raise InputError('App partition cannot have an empty subtype')
return SUBTYPES[DATA_TYPE]['undefined']
return parse_int(strval, SUBTYPES.get(self.type, {}))
def parse_address(self, strval):
if strval == '':
return None # PartitionTable will fill in default
return parse_int(strval)
def verify(self):
if self.type is None:
raise ValidationError(self, 'Type field is not set')
if self.subtype is None:
raise ValidationError(self, 'Subtype field is not set')
if self.offset is None:
raise ValidationError(self, 'Offset field is not set')
align = get_alignment_for_type(self.type)
if self.offset % align:
raise ValidationError(self, 'Offset 0x%x is not aligned to 0x%x' % (self.offset, align))
# The alignment requirement for non-app partition is 4 bytes, but it should be 4 kB.
# Print a warning for now, make it an error in IDF 5.0 (IDF-3742).
if self.type != APP_TYPE and self.offset % STRICT_DATA_ALIGNMENT:
critical('WARNING: Partition %s not aligned to 0x%x.'
'This is deprecated and will be considered an error in the future release.' % (self.name, STRICT_DATA_ALIGNMENT))
if self.size % align and secure and self.type == APP_TYPE:
raise ValidationError(self, 'Size 0x%x is not aligned to 0x%x' % (self.size, align))
if self.size is None:
raise ValidationError(self, 'Size field is not set')
if self.name in TYPES and TYPES.get(self.name, '') != self.type:
critical("WARNING: Partition has name '%s' which is a partition type, but does not match this partition's "
'type (0x%x). Mistake in partition table?' % (self.name, self.type))
all_subtype_names = []
for names in (t.keys() for t in SUBTYPES.values()):
all_subtype_names += names
if self.name in all_subtype_names and SUBTYPES.get(self.type, {}).get(self.name, '') != self.subtype:
critical("WARNING: Partition has name '%s' which is a partition subtype, but this partition has "
'non-matching type 0x%x and subtype 0x%x. Mistake in partition table?' % (self.name, self.type, self.subtype))
STRUCT_FORMAT = b'<2sBBLL16sL'
@classmethod
def from_binary(cls, b):
if len(b) != 32:
raise InputError('Partition definition length must be exactly 32 bytes. Got %d bytes.' % len(b))
res = cls()
(magic, res.type, res.subtype, res.offset,
res.size, res.name, flags) = struct.unpack(cls.STRUCT_FORMAT, b)
if b'\x00' in res.name: # strip null byte padding from name string
res.name = res.name[:res.name.index(b'\x00')]
res.name = res.name.decode()
if magic != cls.MAGIC_BYTES:
raise InputError('Invalid magic bytes (%r) for partition definition' % magic)
for flag,bit in cls.FLAGS.items():
if flags & (1 << bit):
setattr(res, flag, True)
flags &= ~(1 << bit)
if flags != 0:
critical('WARNING: Partition definition had unknown flag(s) 0x%08x. Newer binary format?' % flags)
return res
def get_flags_list(self):
return [flag for flag in self.FLAGS.keys() if getattr(self, flag)]
def to_binary(self):
flags = sum((1 << self.FLAGS[flag]) for flag in self.get_flags_list())
return struct.pack(self.STRUCT_FORMAT,
self.MAGIC_BYTES,
self.type, self.subtype,
self.offset, self.size,
self.name.encode(),
flags)
def to_csv(self, simple_formatting=False):
def addr_format(a, include_sizes):
if not simple_formatting and include_sizes:
for (val, suffix) in [(0x100000, 'M'), (0x400, 'K')]:
if a % val == 0:
return '%d%s' % (a // val, suffix)
return '0x%x' % a
def lookup_keyword(t, keywords):
for k,v in keywords.items():
if simple_formatting is False and t == v:
return k
return '%d' % t
def generate_text_flags():
""" colon-delimited list of flags """
return ':'.join(self.get_flags_list())
return ','.join([self.name,
lookup_keyword(self.type, TYPES),
lookup_keyword(self.subtype, SUBTYPES.get(self.type, {})),
addr_format(self.offset, False),
addr_format(self.size, True),
generate_text_flags()])
def parse_int(v, keywords={}):
"""Generic parser for integer fields - int(x,0) with provision for
k/m/K/M suffixes and 'keyword' value lookup.
"""
try:
for letter, multiplier in [('k', 1024), ('m', 1024 * 1024)]:
if v.lower().endswith(letter):
return parse_int(v[:-1], keywords) * multiplier
return int(v, 0)
except ValueError:
if len(keywords) == 0:
raise InputError('Invalid field value %s' % v)
try:
return keywords[v.lower()]
except KeyError:
raise InputError("Value '%s' is not valid. Known keywords: %s" % (v, ', '.join(keywords)))
def main():
global quiet
global md5sum
global offset_part_table
global secure
parser = argparse.ArgumentParser(description='ESP32 partition table utility')
parser.add_argument('--flash-size', help='Optional flash size limit, checks partition table fits in flash',
nargs='?', choices=['1MB', '2MB', '4MB', '8MB', '16MB', '32MB', '64MB', '128MB'])
parser.add_argument('--disable-md5sum', help='Disable md5 checksum for the partition table', default=False, action='store_true')
parser.add_argument('--no-verify', help="Don't verify partition table fields", action='store_true')
parser.add_argument('--verify', '-v', help='Verify partition table fields (deprecated, this behaviour is '
'enabled by default and this flag does nothing.', action='store_true')
parser.add_argument('--quiet', '-q', help="Don't print non-critical status messages to stderr", action='store_true')
parser.add_argument('--offset', '-o', help='Set offset partition table', default='0x8000')
parser.add_argument('--secure', help='Require app partitions to be suitable for secure boot', action='store_true')
parser.add_argument('input', help='Path to CSV or binary file to parse.', type=argparse.FileType('rb'))
parser.add_argument('output', help='Path to output converted binary or CSV file. Will use stdout if omitted.',
nargs='?', default='-')
args = parser.parse_args()
quiet = args.quiet
md5sum = not args.disable_md5sum
secure = args.secure
offset_part_table = int(args.offset, 0)
table, input_is_binary = PartitionTable.from_file(args.input)
if not args.no_verify:
status('Verifying table...')
table.verify()
if args.flash_size:
size_mb = int(args.flash_size.replace('MB', ''))
table.verify_size_fits(size_mb * 1024 * 1024)
# Make sure that the output directory is created
output_dir = os.path.abspath(os.path.dirname(args.output))
if not os.path.exists(output_dir):
try:
os.makedirs(output_dir)
except OSError as exc:
if exc.errno != errno.EEXIST:
raise
if input_is_binary:
output = table.to_csv()
with sys.stdout if args.output == '-' else open(args.output, 'w') as f:
f.write(output)
else:
output = table.to_binary()
try:
stdout_binary = sys.stdout.buffer # Python 3
except AttributeError:
stdout_binary = sys.stdout
with stdout_binary if args.output == '-' else open(args.output, 'wb') as f:
f.write(output)
class InputError(RuntimeError):
def __init__(self, e):
super(InputError, self).__init__(e)
class ValidationError(InputError):
def __init__(self, partition, message):
super(ValidationError, self).__init__(
'Partition %s invalid: %s' % (partition.name, message))
if __name__ == '__main__':
try:
main()
except InputError as e:
print(e, file=sys.stderr)
sys.exit(2)