esp_prov : python coding style fixed as per conventions

pull/2847/head
Anurag Kar 2018-12-01 18:53:34 +05:30
rodzic 5e0d222188
commit 66a9762b2a
6 zmienionych plików z 148 dodań i 121 usunięć

Wyświetl plik

@ -78,7 +78,7 @@ select =
E275, # missing whitespace after keyword
E301, # expected 1 blank line, found 0
E302, # expected 2 blank lines, found 0
E303, # too many blank lines
E303, # too many blank lines
E304, # blank lines found after function decorator
E305, # expected 2 blank lines after end of function or class
E306, # expected 1 blank line before a nested definition
@ -166,9 +166,6 @@ exclude =
components/ulp/esp32ulp_mapgen.py,
components/wifi_provisioning/python/wifi_config_pb2.py,
components/wifi_provisioning/python/wifi_constants_pb2.py,
examples/provisioning/ble_prov/ble_prov_test.py,
examples/provisioning/softap_prov/softap_prov_test.py,
examples/provisioning/softap_prov/utils/wifi_tools.py,
tools/ci/apply_bot_filter.py,
tools/cmake/convert_to_cmake.py,
tools/esp_app_trace/apptrace_proc.py,
@ -180,7 +177,6 @@ exclude =
tools/esp_app_trace/pylibelf/types/__init__.py,
tools/esp_app_trace/pylibelf/util/__init__.py,
tools/esp_app_trace/pylibelf/util/syms/__init__.py,
tools/esp_prov/esp_prov.py,
tools/esp_prov/proto/__init__.py,
tools/esp_prov/prov/__init__.py,
tools/esp_prov/prov/custom_prov.py,
@ -190,7 +186,6 @@ exclude =
tools/esp_prov/security/security0.py,
tools/esp_prov/security/security1.py,
tools/esp_prov/transport/__init__.py,
tools/esp_prov/transport/ble_cli.py,
tools/esp_prov/transport/transport.py,
tools/esp_prov/transport/transport_ble.py,
tools/esp_prov/transport/transport_console.py,

Wyświetl plik

@ -15,32 +15,31 @@
# limitations under the License.
from __future__ import print_function
import imp
import re
import os
import sys
import time
# This environment variable is expected on the host machine
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
try:
import IDF
except ImportError:
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
# When running on local machine execute the following before running this script
# > export TEST_FW_PATH='~/esp/esp-idf/tools/tiny-test-fw'
# > make print_flash_cmd | tail -n 1 > build/download.config
# > make app bootloader
import TinyFW
import IDF
# Import esp_prov tool
idf_path = os.environ['IDF_PATH']
esp_prov = imp.load_source("esp_prov", idf_path + "/tools/esp_prov/esp_prov.py")
try:
import esp_prov
except ImportError:
esp_prov_path = os.getenv("IDF_PATH") + "/tools/esp_prov"
if esp_prov_path and esp_prov_path not in sys.path:
sys.path.insert(0, esp_prov_path)
import esp_prov
# Have esp_prov throw exception
esp_prov.config_throw_except = True
@IDF.idf_example_test(env_tag="Example_WIFI_BT")
def test_examples_provisioning_ble(env, extra_data):
# Acquire DUT
@ -49,8 +48,8 @@ def test_examples_provisioning_ble(env, extra_data):
# Get binary file
binary_file = os.path.join(dut1.app.binary_path, "ble_prov.bin")
bin_size = os.path.getsize(binary_file)
IDF.log_performance("ble_prov_bin_size", "{}KB".format(bin_size//1024))
IDF.check_performance("ble_prov_bin_size", bin_size//1024)
IDF.log_performance("ble_prov_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("ble_prov_bin_size", bin_size // 1024)
# Upload binary and start testing
dut1.start_app()
@ -73,12 +72,12 @@ def test_examples_provisioning_ble(env, extra_data):
print("Getting security")
security = esp_prov.get_security(secver, pop, verbose)
if security == None:
if security is None:
raise RuntimeError("Failed to get security")
print("Getting transport")
transport = esp_prov.get_transport(provmode, None, devname)
if transport == None:
if transport is None:
raise RuntimeError("Failed to get transport")
print("Verifying protocol version")
@ -112,5 +111,6 @@ def test_examples_provisioning_ble(env, extra_data):
if not success:
raise RuntimeError("Provisioning failed")
if __name__ == '__main__':
test_examples_provisioning_ble()

Wyświetl plik

@ -15,33 +15,39 @@
# limitations under the License.
from __future__ import print_function
import imp
import re
import os
import sys
import time
# This environment variable is expected on the host machine
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
try:
import IDF
except ImportError:
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
# When running on local machine execute the following before running this script
# > export TEST_FW_PATH='~/esp/esp-idf/tools/tiny-test-fw'
# > make print_flash_cmd | tail -n 1 > build/download.config
# > make app bootloader
try:
import esp_prov
except ImportError:
esp_prov_path = os.getenv("IDF_PATH") + "/tools/esp_prov"
if esp_prov_path and esp_prov_path not in sys.path:
sys.path.insert(0, esp_prov_path)
import esp_prov
import TinyFW
import IDF
# Import esp_prov tool
idf_path = os.environ['IDF_PATH']
esp_prov = imp.load_source("esp_prov", idf_path + "/tools/esp_prov/esp_prov.py")
wifi_tools = imp.load_source("wifi_tools", idf_path + "/examples/provisioning/softap_prov/utils/wifi_tools.py")
try:
import wifi_tools
except ImportError:
wifi_tools_path = os.getenv("IDF_PATH") + "/examples/provisioning/softap_prov/utils"
if wifi_tools_path and wifi_tools_path not in sys.path:
sys.path.insert(0, wifi_tools_path)
import wifi_tools
# Have esp_prov throw exception
esp_prov.config_throw_except = True
@IDF.idf_example_test(env_tag="Example_WIFI_BT")
def test_examples_provisioning_softap(env, extra_data):
# Acquire DUT
@ -50,8 +56,8 @@ def test_examples_provisioning_softap(env, extra_data):
# Get binary file
binary_file = os.path.join(dut1.app.binary_path, "softap_prov.bin")
bin_size = os.path.getsize(binary_file)
IDF.log_performance("softap_prov_bin_size", "{}KB".format(bin_size//1024))
IDF.check_performance("softap_prov_bin_size", bin_size//1024)
IDF.log_performance("softap_prov_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("softap_prov_bin_size", bin_size // 1024)
# Upload binary and start testing
dut1.start_app()
@ -62,13 +68,13 @@ def test_examples_provisioning_softap(env, extra_data):
[ssid, password] = dut1.expect(re.compile(r"SoftAP Provisioning started with SSID '(\S+)', Password '(\S+)'"), timeout=30)
iface = wifi_tools.get_wiface_name()
if iface == None:
if iface is None:
raise RuntimeError("Failed to get Wi-Fi interface on host")
print("Interface name : " + iface)
print("SoftAP SSID : " + ssid)
print("SoftAP Password : " + password)
ctrl = wifi_tools.wpa_cli(iface, reset_on_exit = True)
ctrl = wifi_tools.wpa_cli(iface, reset_on_exit=True)
print("Connecting to DUT SoftAP...")
ip = ctrl.connect(ssid, password)
got_ip = dut1.expect(re.compile(r"softAP assign IP to station,IP is: (\d+.\d+.\d+.\d+)"), timeout=30)[0]
@ -84,16 +90,16 @@ def test_examples_provisioning_softap(env, extra_data):
provmode = "softap"
ap_ssid = "myssid"
ap_password = "mypassword"
softap_endpoint = ip.split('.')[0] + "." + ip.split('.')[1]+ "." + ip.split('.')[2] + ".1:80"
softap_endpoint = ip.split('.')[0] + "." + ip.split('.')[1] + "." + ip.split('.')[2] + ".1:80"
print("Getting security")
security = esp_prov.get_security(secver, pop, verbose)
if security == None:
if security is None:
raise RuntimeError("Failed to get security")
print("Getting transport")
transport = esp_prov.get_transport(provmode, softap_endpoint, None)
if transport == None:
if transport is None:
raise RuntimeError("Failed to get transport")
print("Verifying protocol version")
@ -127,5 +133,6 @@ def test_examples_provisioning_softap(env, extra_data):
if not success:
raise RuntimeError("Provisioning failed")
if __name__ == '__main__':
test_examples_provisioning_softap()

Wyświetl plik

@ -18,12 +18,14 @@ import dbus.mainloop.glib
import netifaces
import time
def get_wiface_name():
for iface in netifaces.interfaces():
if iface.startswith('w'):
return iface
return None
def get_wiface_IPv4(iface):
try:
[info] = netifaces.ifaddresses(iface)[netifaces.AF_INET]
@ -31,8 +33,9 @@ def get_wiface_IPv4(iface):
except KeyError:
return None
class wpa_cli:
def __init__(self, iface, reset_on_exit = False):
def __init__(self, iface, reset_on_exit=False):
self.iface_name = iface
self.iface_obj = None
self.iface_ifc = None
@ -43,26 +46,27 @@ class wpa_cli:
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
service = dbus.Interface(bus.get_object("fi.w1.wpa_supplicant1", "/fi/w1/wpa_supplicant1"), "fi.w1.wpa_supplicant1")
paths = service.Get("fi.w1.wpa_supplicant1", "Interfaces", dbus_interface='org.freedesktop.DBus.Properties')
service = dbus.Interface(bus.get_object("fi.w1.wpa_supplicant1", "/fi/w1/wpa_supplicant1"),
"fi.w1.wpa_supplicant1")
iface_path = service.GetInterface(self.iface_name)
self.iface_obj = bus.get_object("fi.w1.wpa_supplicant1", iface_path)
self.iface_ifc = dbus.Interface(self.iface_obj, "fi.w1.wpa_supplicant1.Interface")
if self.iface_ifc == None:
if self.iface_ifc is None:
raise RuntimeError('supplicant : Failed to fetch interface')
self.old_network = self.iface_obj.Get("fi.w1.wpa_supplicant1.Interface", "CurrentNetwork", dbus_interface='org.freedesktop.DBus.Properties')
self.old_network = self.iface_obj.Get("fi.w1.wpa_supplicant1.Interface", "CurrentNetwork",
dbus_interface='org.freedesktop.DBus.Properties')
if self.old_network == '/':
self.old_network = None
else:
self.connected = True
def connect(self, ssid, password):
if self.connected == True:
if self.connected is True:
self.iface_ifc.Disconnect()
self.connected = False
if self.new_network != None:
if self.new_network is not None:
self.iface_ifc.RemoveNetwork(self.new_network)
self.new_network = self.iface_ifc.AddNetwork({"ssid": ssid, "psk": password})
@ -73,7 +77,7 @@ class wpa_cli:
while retry > 0:
time.sleep(5)
ip = get_wiface_IPv4(self.iface_name)
if ip != None:
if ip is not None:
self.connected = True
return ip
retry -= 1
@ -82,17 +86,17 @@ class wpa_cli:
raise RuntimeError('wpa_cli : Connection failed')
def reset(self):
if self.iface_ifc != None:
if self.connected == True:
if self.iface_ifc is not None:
if self.connected is True:
self.iface_ifc.Disconnect()
self.connected = False
if self.new_network != None:
if self.new_network is not None:
self.iface_ifc.RemoveNetwork(self.new_network)
self.new_network = None
if self.old_network != None:
if self.old_network is not None:
self.iface_ifc.SelectNetwork(self.old_network)
self.old_network = None
def __del__(self):
if self.reset_on_exit == True:
if self.reset_on_exit is True:
self.reset()

Wyświetl plik

@ -21,23 +21,31 @@ import time
import os
import sys
idf_path = os.environ['IDF_PATH']
sys.path.insert(0, idf_path + "/components/protocomm/python")
sys.path.insert(1, idf_path + "/tools/esp_prov")
try:
import security
import transport
import prov
import security
import transport
import prov
except ImportError:
idf_path = os.environ['IDF_PATH']
sys.path.insert(0, idf_path + "/components/protocomm/python")
sys.path.insert(1, idf_path + "/tools/esp_prov")
import security
import transport
import prov
# Set this to true to allow exceptions to be thrown
config_throw_except = False
def on_except(err):
if config_throw_except:
raise RuntimeError(err)
else:
print(err)
def get_security(secver, pop=None, verbose=False):
if secver == 1:
return security.Security1(pop, verbose)
@ -45,19 +53,19 @@ def get_security(secver, pop=None, verbose=False):
return security.Security0(verbose)
return None
def get_transport(sel_transport, softap_endpoint=None, ble_devname=None):
try:
tp = None
if (sel_transport == 'softap'):
tp = transport.Transport_Softap(softap_endpoint)
elif (sel_transport == 'ble'):
tp = transport.Transport_BLE(devname = ble_devname,
service_uuid = '0000ffff-0000-1000-8000-00805f9b34fb',
nu_lookup = {
'prov-session': 'ff51',
'prov-config' : 'ff52',
'proto-ver' : 'ff53'
})
tp = transport.Transport_BLE(devname=ble_devname,
service_uuid='0000ffff-0000-1000-8000-00805f9b34fb',
nu_lookup={'prov-session': 'ff51',
'prov-config': 'ff52',
'proto-ver': 'ff53'
})
elif (sel_transport == 'console'):
tp = transport.Transport_Console()
return tp
@ -65,6 +73,7 @@ def get_transport(sel_transport, softap_endpoint=None, ble_devname=None):
on_except(e)
return None
def version_match(tp, protover):
try:
response = tp.send_data('proto-ver', protover)
@ -75,21 +84,23 @@ def version_match(tp, protover):
on_except(e)
return None
def establish_session(tp, sec):
try:
response = None
while True:
request = sec.security_session(response)
if request == None:
if request is None:
break
response = tp.send_data('prov-session', request)
if (response == None):
if (response is None):
return False
return True
except RuntimeError as e:
on_except(e)
return None
def custom_config(tp, sec, custom_info, custom_ver):
try:
message = prov.custom_config_request(sec, custom_info, custom_ver)
@ -99,6 +110,7 @@ def custom_config(tp, sec, custom_info, custom_ver):
on_except(e)
return None
def send_wifi_config(tp, sec, ssid, passphrase):
try:
message = prov.config_set_config_request(sec, ssid, passphrase)
@ -108,6 +120,7 @@ def send_wifi_config(tp, sec, ssid, passphrase):
on_except(e)
return None
def apply_wifi_config(tp, sec):
try:
message = prov.config_apply_config_request(sec)
@ -117,6 +130,7 @@ def apply_wifi_config(tp, sec):
on_except(e)
return None
def get_wifi_config(tp, sec):
try:
message = prov.config_get_status_request(sec)
@ -126,80 +140,80 @@ def get_wifi_config(tp, sec):
on_except(e)
return None
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Generate ESP prov payload")
parser.add_argument("--ssid", dest = 'ssid', type = str,
help = "SSID of Wi-Fi Network", required = True)
parser.add_argument("--passphrase", dest = 'passphrase', type = str,
help = "Passphrase of Wi-Fi network", default = '')
parser.add_argument("--ssid", dest='ssid', type=str,
help="SSID of Wi-Fi Network", required=True)
parser.add_argument("--passphrase", dest='passphrase', type=str,
help="Passphrase of Wi-Fi network", default='')
parser.add_argument("--sec_ver", dest = 'secver', type = int,
help = "Security scheme version", default = 1)
parser.add_argument("--proto_ver", dest = 'protover', type = str,
help = "Protocol version", default = 'V0.1')
parser.add_argument("--pop", dest = 'pop', type = str,
help = "Proof of possession", default = '')
parser.add_argument("--sec_ver", dest='secver', type=int,
help="Security scheme version", default=1)
parser.add_argument("--proto_ver", dest='protover', type=str,
help="Protocol version", default='V0.1')
parser.add_argument("--pop", dest='pop', type=str,
help="Proof of possession", default='')
parser.add_argument("--softap_endpoint", dest = 'softap_endpoint', type = str,
help = "<softap_ip:port>, http(s):// shouldn't be included", default = '192.168.4.1:80')
parser.add_argument("--softap_endpoint", dest='softap_endpoint', type=str,
help="<softap_ip:port>, http(s):// shouldn't be included", default='192.168.4.1:80')
parser.add_argument("--ble_devname", dest = 'ble_devname', type = str,
help = "BLE Device Name", default = '')
parser.add_argument("--ble_devname", dest='ble_devname', type=str,
help="BLE Device Name", default='')
parser.add_argument("--transport", dest = 'provmode', type = str,
help = "provisioning mode i.e console or softap or ble", default = 'softap')
parser.add_argument("--transport", dest='provmode', type=str,
help="provisioning mode i.e console or softap or ble", default='softap')
parser.add_argument("--custom_config", help="Provision Custom Configuration",
action = "store_true")
parser.add_argument("--custom_info", dest = 'custom_info', type = str,
help = "Custom Config Info String", default = '<some custom info string>')
parser.add_argument("--custom_ver", dest = 'custom_ver', type = int,
help = "Custom Config Version Number", default = 2)
action="store_true")
parser.add_argument("--custom_info", dest='custom_info', type=str,
help="Custom Config Info String", default='<some custom info string>')
parser.add_argument("--custom_ver", dest='custom_ver', type=int,
help="Custom Config Version Number", default=2)
parser.add_argument("-v","--verbose", help = "increase output verbosity",
action = "store_true")
parser.add_argument("-v","--verbose", help="increase output verbosity", action="store_true")
args = parser.parse_args()
print("==== Esp_Prov Version: " + args.protover + " ====")
security = get_security(args.secver, args.pop, args.verbose)
if security == None:
obj_security = get_security(args.secver, args.pop, args.verbose)
if obj_security is None:
print("---- Invalid Security Version ----")
exit(1)
transport = get_transport(args.provmode, args.softap_endpoint, args.ble_devname)
if transport == None:
obj_transport = get_transport(args.provmode, args.softap_endpoint, args.ble_devname)
if obj_transport is None:
print("---- Invalid provisioning mode ----")
exit(2)
print("\n==== Verifying protocol version ====")
if not version_match(transport, args.protover):
if not version_match(obj_transport, args.protover):
print("---- Error in protocol version matching ----")
exit(3)
print("==== Verified protocol version successfully ====")
print("\n==== Starting Session ====")
if not establish_session(transport, security):
if not establish_session(obj_transport, obj_security):
print("---- Error in establishing session ----")
exit(4)
print("==== Session Established ====")
if args.custom_config:
print("\n==== Sending Custom config to esp32 ====")
if not custom_config(transport, security, args.custom_info, args.custom_ver):
if not custom_config(obj_transport, obj_security, args.custom_info, args.custom_ver):
print("---- Error in custom config ----")
exit(5)
print("==== Custom config sent successfully ====")
print("\n==== Sending Wi-Fi credential to esp32 ====")
if not send_wifi_config(transport, security, args.ssid, args.passphrase):
if not send_wifi_config(obj_transport, obj_security, args.ssid, args.passphrase):
print("---- Error in send Wi-Fi config ----")
exit(6)
print("==== Wi-Fi Credentials sent successfully ====")
print("\n==== Applying config to esp32 ====")
if not apply_wifi_config(transport, security):
if not apply_wifi_config(obj_transport, obj_security):
print("---- Error in apply Wi-Fi config ----")
exit(7)
print("==== Apply config sent successfully ====")
@ -207,7 +221,7 @@ if __name__ == '__main__':
while True:
time.sleep(5)
print("\n==== Wi-Fi connection state ====")
ret = get_wifi_config(transport, security)
ret = get_wifi_config(obj_transport, obj_security)
if (ret == 1):
continue
elif (ret == 0):

Wyświetl plik

@ -22,6 +22,7 @@ import utils
fallback = True
# Check if platform is Linux and required packages are installed
# else fallback to console mode
if platform.system() == 'Linux':
@ -30,10 +31,12 @@ if platform.system() == 'Linux':
import dbus.mainloop.glib
import time
fallback = False
except:
except ImportError:
pass
#--------------------------------------------------------------------
# --------------------------------------------------------------------
# BLE client (Linux Only) using Bluez and DBus
class BLE_Bluez_Client:
@ -52,13 +55,13 @@ class BLE_Bluez_Client:
for path, interfaces in objects.items():
adapter = interfaces.get("org.bluez.Adapter1")
if adapter != None:
if adapter is not None:
if path.endswith(iface):
self.adapter = dbus.Interface(bus.get_object("org.bluez", path), "org.bluez.Adapter1")
self.adapter_props = dbus.Interface(bus.get_object("org.bluez", path), "org.freedesktop.DBus.Properties")
break
if self.adapter == None:
if self.adapter is None:
raise RuntimeError("Bluetooth adapter not found")
self.adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1))
@ -67,7 +70,7 @@ class BLE_Bluez_Client:
retry = 10
while (retry > 0):
try:
if self.device == None:
if self.device is None:
print("Connecting...")
# Wait for device to be discovered
time.sleep(5)
@ -98,7 +101,7 @@ class BLE_Bluez_Client:
dev_path = path
break
if dev_path == None:
if dev_path is None:
raise RuntimeError("BLE device not found")
try:
@ -120,12 +123,12 @@ class BLE_Bluez_Client:
if path.startswith(self.device.object_path):
service = bus.get_object("org.bluez", path)
uuid = service.Get('org.bluez.GattService1', 'UUID',
dbus_interface='org.freedesktop.DBus.Properties')
dbus_interface='org.freedesktop.DBus.Properties')
if uuid == self.srv_uuid:
srv_path = path
break
if srv_path == None:
if srv_path is None:
self.device.Disconnect(dbus_interface='org.bluez.Device1')
self.device = None
raise RuntimeError("Provisioning service not found")
@ -137,7 +140,7 @@ class BLE_Bluez_Client:
if path.startswith(srv_path):
chrc = bus.get_object("org.bluez", path)
uuid = chrc.Get('org.bluez.GattCharacteristic1', 'UUID',
dbus_interface='org.freedesktop.DBus.Properties')
dbus_interface='org.freedesktop.DBus.Properties')
self.characteristics[uuid] = chrc
def has_characteristic(self, uuid):
@ -162,7 +165,9 @@ class BLE_Bluez_Client:
path.WriteValue([ord(c) for c in data], {}, dbus_interface='org.bluez.GattCharacteristic1')
return ''.join(chr(b) for b in path.ReadValue({}, dbus_interface='org.bluez.GattCharacteristic1'))
#--------------------------------------------------------------------
# --------------------------------------------------------------------
# Console based BLE client for Cross Platform support
class BLE_Console_Client:
@ -196,7 +201,9 @@ class BLE_Console_Client:
resp = input("\t<< ")
return utils.hexstr_to_str(resp)
#--------------------------------------------------------------------
# --------------------------------------------------------------------
# Function to get client instance depending upon platform
def get_client():