tools: Correct coding style of idf_monitor

pull/2770/head
Roland Dobai 2018-11-20 14:10:35 +01:00
rodzic 9273de8b43
commit 1b464d23a9
2 zmienionych plików z 47 dodań i 37 usunięć

Wyświetl plik

@ -125,7 +125,6 @@ exclude =
tools/esp_prov/utils/convenience.py, tools/esp_prov/utils/convenience.py,
tools/gen_esp_err_to_name.py, tools/gen_esp_err_to_name.py,
tools/idf.py, tools/idf.py,
tools/idf_monitor.py,
tools/idf_size.py, tools/idf_size.py,
tools/kconfig_new/confgen.py, tools/kconfig_new/confgen.py,
tools/kconfig_new/confserver.py, tools/kconfig_new/confserver.py,

Wyświetl plik

@ -29,8 +29,6 @@
# #
from __future__ import print_function, division from __future__ import print_function, division
from __future__ import unicode_literals from __future__ import unicode_literals
from future import standard_library
standard_library.install_aliases()
from builtins import chr from builtins import chr
from builtins import object from builtins import object
from builtins import bytes from builtins import bytes
@ -71,16 +69,20 @@ ANSI_RED = '\033[1;31m'
ANSI_YELLOW = '\033[0;33m' ANSI_YELLOW = '\033[0;33m'
ANSI_NORMAL = '\033[0m' ANSI_NORMAL = '\033[0m'
def color_print(message, color): def color_print(message, color):
""" Print a message to stderr with colored highlighting """ """ Print a message to stderr with colored highlighting """
sys.stderr.write("%s%s%s\n" % (color, message, ANSI_NORMAL)) sys.stderr.write("%s%s%s\n" % (color, message, ANSI_NORMAL))
def yellow_print(message): def yellow_print(message):
color_print(message, ANSI_YELLOW) color_print(message, ANSI_YELLOW)
def red_print(message): def red_print(message):
color_print(message, ANSI_RED) color_print(message, ANSI_RED)
__version__ = "1.1" __version__ = "1.1"
# Tags for tuples in queues # Tags for tuples in queues
@ -95,6 +97,7 @@ DEFAULT_TOOLCHAIN_PREFIX = "xtensa-esp32-elf-"
DEFAULT_PRINT_FILTER = "" DEFAULT_PRINT_FILTER = ""
class StoppableThread(object): class StoppableThread(object):
""" """
Provide a Thread-like class which can be 'cancelled' via a subclass-provided Provide a Thread-like class which can be 'cancelled' via a subclass-provided
@ -120,10 +123,10 @@ class StoppableThread(object):
self._thread.start() self._thread.start()
def _cancel(self): def _cancel(self):
pass # override to provide cancellation functionality pass # override to provide cancellation functionality
def run(self): def run(self):
pass # override for the main thread behaviour pass # override for the main thread behaviour
def _run_outer(self): def _run_outer(self):
try: try:
@ -138,6 +141,7 @@ class StoppableThread(object):
self._cancel() self._cancel()
old_thread.join() old_thread.join()
class ConsoleReader(StoppableThread): class ConsoleReader(StoppableThread):
""" Read input keys from the console and push them to the queue, """ Read input keys from the console and push them to the queue,
until stopped. until stopped.
@ -192,9 +196,11 @@ class ConsoleReader(StoppableThread):
# TODO: introduce some workaround to make it work there. # TODO: introduce some workaround to make it work there.
# #
# Note: This would throw exception in testing mode when the stdin is connected to PTY. # Note: This would throw exception in testing mode when the stdin is connected to PTY.
import fcntl, termios import fcntl
import termios
fcntl.ioctl(self.console.fd, termios.TIOCSTI, b'\0') fcntl.ioctl(self.console.fd, termios.TIOCSTI, b'\0')
class SerialReader(StoppableThread): class SerialReader(StoppableThread):
""" Read serial data from the serial port and push to the """ Read serial data from the serial port and push to the
event queue, until stopped. event queue, until stopped.
@ -227,9 +233,10 @@ class SerialReader(StoppableThread):
if hasattr(self.serial, 'cancel_read'): if hasattr(self.serial, 'cancel_read'):
try: try:
self.serial.cancel_read() self.serial.cancel_read()
except: except Exception:
pass pass
class LineMatcher(object): class LineMatcher(object):
""" """
Assembles a dictionary of filtering rules based on the --print_filter Assembles a dictionary of filtering rules based on the --print_filter
@ -244,14 +251,14 @@ class LineMatcher(object):
LEVEL_V = 5 LEVEL_V = 5
level = {'N': LEVEL_N, 'E': LEVEL_E, 'W': LEVEL_W, 'I': LEVEL_I, 'D': LEVEL_D, level = {'N': LEVEL_N, 'E': LEVEL_E, 'W': LEVEL_W, 'I': LEVEL_I, 'D': LEVEL_D,
'V': LEVEL_V, '*': LEVEL_V, '': LEVEL_V} 'V': LEVEL_V, '*': LEVEL_V, '': LEVEL_V}
def __init__(self, print_filter): def __init__(self, print_filter):
self._dict = dict() self._dict = dict()
self._re = re.compile(r'^(?:\033\[[01];?[0-9]+m?)?([EWIDV]) \([0-9]+\) ([^:]+): ') self._re = re.compile(r'^(?:\033\[[01];?[0-9]+m?)?([EWIDV]) \([0-9]+\) ([^:]+): ')
items = print_filter.split() items = print_filter.split()
if len(items) == 0: if len(items) == 0:
self._dict["*"] = self.LEVEL_V # default is to print everything self._dict["*"] = self.LEVEL_V # default is to print everything
for f in items: for f in items:
s = f.split(r':') s = f.split(r':')
if len(s) == 1: if len(s) == 1:
@ -267,6 +274,7 @@ class LineMatcher(object):
else: else:
raise ValueError('Missing ":" in filter ' + f) raise ValueError('Missing ":" in filter ' + f)
self._dict[s[0]] = lev self._dict[s[0]] = lev
def match(self, line): def match(self, line):
try: try:
m = self._re.search(line) m = self._re.search(line)
@ -282,12 +290,14 @@ class LineMatcher(object):
# We need something more than "*.N" for printing. # We need something more than "*.N" for printing.
return self._dict.get("*", self.LEVEL_N) > self.LEVEL_N return self._dict.get("*", self.LEVEL_N) > self.LEVEL_N
class SerialStopException(Exception): class SerialStopException(Exception):
""" """
This exception is used for stopping the IDF monitor in testing mode. This exception is used for stopping the IDF monitor in testing mode.
""" """
pass pass
class Monitor(object): class Monitor(object):
""" """
Monitor application main class. Monitor application main class.
@ -316,7 +326,7 @@ class Monitor(object):
self.console.getkey = types.MethodType(getkey_patched, self.console) self.console.getkey = types.MethodType(getkey_patched, self.console)
socket_mode = serial_instance.port.startswith("socket://") # testing hook - data from serial can make exit the monitor socket_mode = serial_instance.port.startswith("socket://") # testing hook - data from serial can make exit the monitor
self.serial = serial_instance self.serial = serial_instance
self.console_reader = ConsoleReader(self.console, self.event_queue, socket_mode) self.console_reader = ConsoleReader(self.console, self.event_queue, socket_mode)
self.serial_reader = SerialReader(self.serial, self.event_queue) self.serial_reader = SerialReader(self.serial, self.event_queue)
@ -331,8 +341,8 @@ class Monitor(object):
self.translate_eol = { self.translate_eol = {
"CRLF": lambda c: c.replace("\n", "\r\n"), "CRLF": lambda c: c.replace("\n", "\r\n"),
"CR": lambda c: c.replace("\n", "\r"), "CR": lambda c: c.replace("\n", "\r"),
"LF": lambda c: c.replace("\r", "\n"), "LF": lambda c: c.replace("\r", "\n"),
}[eol] }[eol]
# internal state # internal state
@ -381,7 +391,7 @@ class Monitor(object):
# Cancelling _invoke_processing_last_line_timer is not # Cancelling _invoke_processing_last_line_timer is not
# important here because receiving empty data doesn't matter. # important here because receiving empty data doesn't matter.
self._invoke_processing_last_line_timer = None self._invoke_processing_last_line_timer = None
except: except Exception:
pass pass
sys.stderr.write(ANSI_NORMAL + "\n") sys.stderr.write(ANSI_NORMAL + "\n")
@ -399,9 +409,9 @@ class Monitor(object):
key = self.translate_eol(key) key = self.translate_eol(key)
self.serial.write(codecs.encode(key)) self.serial.write(codecs.encode(key))
except serial.SerialException: except serial.SerialException:
pass # this shouldn't happen, but sometimes port has closed in serial thread pass # this shouldn't happen, but sometimes port has closed in serial thread
except UnicodeEncodeError: except UnicodeEncodeError:
pass # this can happen if a non-ascii character was passed, ignoring pass # this can happen if a non-ascii character was passed, ignoring
def handle_serial_input(self, data, finalize_line=False): def handle_serial_input(self, data, finalize_line=False):
sp = data.split(b'\n') sp = data.split(b'\n')
@ -427,7 +437,7 @@ class Monitor(object):
# to make a decision. # to make a decision.
if self._last_line_part != b"": if self._last_line_part != b"":
if self._force_line_print or (finalize_line and self._line_matcher.match(self._last_line_part.decode(errors="ignore"))): if self._force_line_print or (finalize_line and self._line_matcher.match(self._last_line_part.decode(errors="ignore"))):
self._force_line_print = True; self._force_line_print = True
if self._output_enabled: if self._output_enabled:
self.console.write_bytes(self._last_line_part) self.console.write_bytes(self._last_line_part)
self.handle_possible_pc_address_in_line(self._last_line_part) self.handle_possible_pc_address_in_line(self._last_line_part)
@ -454,7 +464,7 @@ class Monitor(object):
def handle_menu_key(self, c): def handle_menu_key(self, c):
if c == self.exit_key or c == self.menu_key: # send verbatim if c == self.exit_key or c == self.menu_key: # send verbatim
self.serial.write(codecs.encode(c)) self.serial.write(codecs.encode(c))
elif c in [ CTRL_H, 'h', 'H', '?' ]: elif c in [CTRL_H, 'h', 'H', '?']:
red_print(self.get_help_text()) red_print(self.get_help_text())
elif c == CTRL_R: # Reset device via RTS elif c == CTRL_R: # Reset device via RTS
self.serial.setRTS(True) self.serial.setRTS(True)
@ -472,10 +482,10 @@ class Monitor(object):
# to fast trigger pause without press menu key # to fast trigger pause without press menu key
self.serial.setDTR(False) # IO0=HIGH self.serial.setDTR(False) # IO0=HIGH
self.serial.setRTS(True) # EN=LOW, chip in reset self.serial.setRTS(True) # EN=LOW, chip in reset
time.sleep(1.3) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.1 time.sleep(1.3) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.1
self.serial.setDTR(True) # IO0=LOW self.serial.setDTR(True) # IO0=LOW
self.serial.setRTS(False) # EN=HIGH, chip out of reset self.serial.setRTS(False) # EN=HIGH, chip out of reset
time.sleep(0.45) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.05 time.sleep(0.45) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.05
self.serial.setDTR(False) # IO0=HIGH, done self.serial.setDTR(False) # IO0=HIGH, done
else: else:
red_print('--- unknown menu character {} --'.format(key_description(c))) red_print('--- unknown menu character {} --'.format(key_description(c)))
@ -502,7 +512,7 @@ class Monitor(object):
makecmd=key_description(CTRL_F), makecmd=key_description(CTRL_F),
appmake=key_description(CTRL_A), appmake=key_description(CTRL_A),
output=key_description(CTRL_Y), output=key_description(CTRL_Y),
pause=key_description(CTRL_P) ) pause=key_description(CTRL_P))
def __enter__(self): def __enter__(self):
""" Use 'with self' to temporarily disable monitoring behaviour """ """ Use 'with self' to temporarily disable monitoring behaviour """
@ -525,7 +535,7 @@ class Monitor(object):
--- Press any other key to resume monitor (resets target).""".format(reason, --- Press any other key to resume monitor (resets target).""".format(reason,
key_description(self.exit_key), key_description(self.exit_key),
key_description(CTRL_F), key_description(CTRL_F),
key_description(CTRL_A) )) key_description(CTRL_A)))
k = CTRL_T # ignore CTRL-T here, so people can muscle-memory Ctrl-T Ctrl-F, etc. k = CTRL_T # ignore CTRL-T here, so people can muscle-memory Ctrl-T Ctrl-F, etc.
while k == CTRL_T: while k == CTRL_T:
k = self.console.getkey() k = self.console.getkey()
@ -533,16 +543,16 @@ class Monitor(object):
self.console.cleanup() self.console.cleanup()
if k == self.exit_key: if k == self.exit_key:
self.event_queue.put((TAG_KEY, k)) self.event_queue.put((TAG_KEY, k))
elif k in [ CTRL_F, CTRL_A ]: elif k in [CTRL_F, CTRL_A]:
self.event_queue.put((TAG_KEY, self.menu_key)) self.event_queue.put((TAG_KEY, self.menu_key))
self.event_queue.put((TAG_KEY, k)) self.event_queue.put((TAG_KEY, k))
def run_make(self, target): def run_make(self, target):
with self: with self:
if isinstance(self.make, list): if isinstance(self.make, list):
popen_args = self.make + [ target ] popen_args = self.make + [target]
else: else:
popen_args = [ self.make, target ] popen_args = [self.make, target]
yellow_print("Running %s..." % " ".join(popen_args)) yellow_print("Running %s..." % " ".join(popen_args))
p = subprocess.Popen(popen_args) p = subprocess.Popen(popen_args)
try: try:
@ -559,13 +569,13 @@ class Monitor(object):
["%saddr2line" % self.toolchain_prefix, ["%saddr2line" % self.toolchain_prefix,
"-pfiaC", "-e", self.elf_file, pc_addr], "-pfiaC", "-e", self.elf_file, pc_addr],
cwd=".") cwd=".")
if not b"?? ??:0" in translation: if b"?? ??:0" not in translation:
yellow_print(translation.decode()) yellow_print(translation.decode())
def check_gdbstub_trigger(self, line): def check_gdbstub_trigger(self, line):
line = self._gdb_buffer + line line = self._gdb_buffer + line
self._gdb_buffer = b"" self._gdb_buffer = b""
m = re.search(b"\\$(T..)#(..)", line) # look for a gdb "reason" for a break m = re.search(b"\\$(T..)#(..)", line) # look for a gdb "reason" for a break
if m is not None: if m is not None:
try: try:
chsum = sum(ord(bytes([p])) for p in m.group(1)) & 0xFF chsum = sum(ord(bytes([p])) for p in m.group(1)) & 0xFF
@ -577,16 +587,15 @@ class Monitor(object):
else: else:
red_print("Malformed gdb message... calculated checksum %02x received %02x" % (chsum, calc_chsum)) red_print("Malformed gdb message... calculated checksum %02x received %02x" % (chsum, calc_chsum))
def run_gdb(self): def run_gdb(self):
with self: # disable console control with self: # disable console control
sys.stderr.write(ANSI_NORMAL) sys.stderr.write(ANSI_NORMAL)
try: try:
process = subprocess.Popen(["%sgdb" % self.toolchain_prefix, process = subprocess.Popen(["%sgdb" % self.toolchain_prefix,
"-ex", "set serial baud %d" % self.serial.baudrate, "-ex", "set serial baud %d" % self.serial.baudrate,
"-ex", "target remote %s" % self.serial.port, "-ex", "target remote %s" % self.serial.port,
"-ex", "interrupt", # monitor has already parsed the first 'reason' command, need a second "-ex", "interrupt", # monitor has already parsed the first 'reason' command, need a second
self.elf_file], cwd=".") self.elf_file], cwd=".")
process.wait() process.wait()
except KeyboardInterrupt: except KeyboardInterrupt:
pass # happens on Windows, maybe other OSes pass # happens on Windows, maybe other OSes
@ -594,12 +603,12 @@ class Monitor(object):
try: try:
# on Linux, maybe other OSes, gdb sometimes seems to be alive even after wait() returns... # on Linux, maybe other OSes, gdb sometimes seems to be alive even after wait() returns...
process.terminate() process.terminate()
except: except Exception:
pass pass
try: try:
# also on Linux, maybe other OSes, gdb sometimes exits uncleanly and breaks the tty mode # also on Linux, maybe other OSes, gdb sometimes exits uncleanly and breaks the tty mode
subprocess.call(["stty", "sane"]) subprocess.call(["stty", "sane"])
except: except Exception:
pass # don't care if there's no stty, we tried... pass # don't care if there's no stty, we tried...
self.prompt_next_action("gdb exited") self.prompt_next_action("gdb exited")
@ -610,6 +619,7 @@ class Monitor(object):
self._output_enabled = not self._output_enabled self._output_enabled = not self._output_enabled
yellow_print("\nToggle output display: {}, Type Ctrl-T Ctrl-Y to show/disable output again.".format(self._output_enabled)) yellow_print("\nToggle output display: {}, Type Ctrl-T Ctrl-Y to show/disable output again.".format(self._output_enabled))
def main(): def main():
parser = argparse.ArgumentParser("idf_monitor - a serial output monitor for esp-idf") parser = argparse.ArgumentParser("idf_monitor - a serial output monitor for esp-idf")
@ -690,6 +700,7 @@ def main():
monitor.main_loop() monitor.main_loop()
if os.name == 'nt': if os.name == 'nt':
# Windows console stuff # Windows console stuff
@ -704,7 +715,7 @@ if os.name == 'nt':
RE_ANSI_COLOR = re.compile(b'\033\\[([01]);3([0-7])m') RE_ANSI_COLOR = re.compile(b'\033\\[([01]);3([0-7])m')
# list mapping the 8 ANSI colors (the indexes) to Windows Console colors # list mapping the 8 ANSI colors (the indexes) to Windows Console colors
ANSI_TO_WINDOWS_COLOR = [ 0, 4, 2, 6, 1, 5, 3, 7 ] ANSI_TO_WINDOWS_COLOR = [0, 4, 2, 6, 1, 5, 3, 7]
GetStdHandle = ctypes.windll.kernel32.GetStdHandle GetStdHandle = ctypes.windll.kernel32.GetStdHandle
SetConsoleTextAttribute = ctypes.windll.kernel32.SetConsoleTextAttribute SetConsoleTextAttribute = ctypes.windll.kernel32.SetConsoleTextAttribute
@ -745,10 +756,10 @@ if os.name == 'nt':
data = bytearray(data, 'utf-8') data = bytearray(data, 'utf-8')
for b in data: for b in data:
b = bytes([b]) b = bytes([b])
l = len(self.matched) length = len(self.matched)
if b == b'\033': # ESC if b == b'\033': # ESC
self.matched = b self.matched = b
elif (l == 1 and b == b'[') or (1 < l < 7): elif (length == 1 and b == b'[') or (1 < length < 7):
self.matched += b self.matched += b
if self.matched == ANSI_NORMAL.encode('latin-1'): # reset console if self.matched == ANSI_NORMAL.encode('latin-1'): # reset console
# Flush is required only with Python3 - switching color before it is printed would mess up the console # Flush is required only with Python3 - switching color before it is printed would mess up the console
@ -765,7 +776,7 @@ if os.name == 'nt':
self.flush() self.flush()
SetConsoleTextAttribute(self.handle, color) SetConsoleTextAttribute(self.handle, color)
else: else:
self._output_write(self.matched) # not an ANSI color code, display verbatim self._output_write(self.matched) # not an ANSI color code, display verbatim
self.matched = b'' self.matched = b''
else: else:
self._output_write(b) self._output_write(b)