diff --git a/.flake8 b/.flake8 index 72a206ab99..db94eed6fe 100644 --- a/.flake8 +++ b/.flake8 @@ -125,7 +125,6 @@ exclude = tools/esp_prov/utils/convenience.py, tools/gen_esp_err_to_name.py, tools/idf.py, - tools/idf_monitor.py, tools/idf_size.py, tools/kconfig_new/confgen.py, tools/kconfig_new/confserver.py, diff --git a/tools/idf_monitor.py b/tools/idf_monitor.py index 92ef71d96b..6b1706c305 100755 --- a/tools/idf_monitor.py +++ b/tools/idf_monitor.py @@ -29,8 +29,6 @@ # from __future__ import print_function, division from __future__ import unicode_literals -from future import standard_library -standard_library.install_aliases() from builtins import chr from builtins import object from builtins import bytes @@ -71,16 +69,20 @@ ANSI_RED = '\033[1;31m' ANSI_YELLOW = '\033[0;33m' ANSI_NORMAL = '\033[0m' + def color_print(message, color): """ Print a message to stderr with colored highlighting """ sys.stderr.write("%s%s%s\n" % (color, message, ANSI_NORMAL)) + def yellow_print(message): color_print(message, ANSI_YELLOW) + def red_print(message): color_print(message, ANSI_RED) + __version__ = "1.1" # Tags for tuples in queues @@ -95,6 +97,7 @@ DEFAULT_TOOLCHAIN_PREFIX = "xtensa-esp32-elf-" DEFAULT_PRINT_FILTER = "" + class StoppableThread(object): """ Provide a Thread-like class which can be 'cancelled' via a subclass-provided @@ -120,10 +123,10 @@ class StoppableThread(object): self._thread.start() def _cancel(self): - pass # override to provide cancellation functionality + pass # override to provide cancellation functionality def run(self): - pass # override for the main thread behaviour + pass # override for the main thread behaviour def _run_outer(self): try: @@ -138,6 +141,7 @@ class StoppableThread(object): self._cancel() old_thread.join() + class ConsoleReader(StoppableThread): """ Read input keys from the console and push them to the queue, until stopped. @@ -192,9 +196,11 @@ class ConsoleReader(StoppableThread): # TODO: introduce some workaround to make it work there. # # Note: This would throw exception in testing mode when the stdin is connected to PTY. - import fcntl, termios + import fcntl + import termios fcntl.ioctl(self.console.fd, termios.TIOCSTI, b'\0') + class SerialReader(StoppableThread): """ Read serial data from the serial port and push to the event queue, until stopped. @@ -227,9 +233,10 @@ class SerialReader(StoppableThread): if hasattr(self.serial, 'cancel_read'): try: self.serial.cancel_read() - except: + except Exception: pass + class LineMatcher(object): """ Assembles a dictionary of filtering rules based on the --print_filter @@ -244,14 +251,14 @@ class LineMatcher(object): LEVEL_V = 5 level = {'N': LEVEL_N, 'E': LEVEL_E, 'W': LEVEL_W, 'I': LEVEL_I, 'D': LEVEL_D, - 'V': LEVEL_V, '*': LEVEL_V, '': LEVEL_V} + 'V': LEVEL_V, '*': LEVEL_V, '': LEVEL_V} def __init__(self, print_filter): self._dict = dict() self._re = re.compile(r'^(?:\033\[[01];?[0-9]+m?)?([EWIDV]) \([0-9]+\) ([^:]+): ') items = print_filter.split() if len(items) == 0: - self._dict["*"] = self.LEVEL_V # default is to print everything + self._dict["*"] = self.LEVEL_V # default is to print everything for f in items: s = f.split(r':') if len(s) == 1: @@ -267,6 +274,7 @@ class LineMatcher(object): else: raise ValueError('Missing ":" in filter ' + f) self._dict[s[0]] = lev + def match(self, line): try: m = self._re.search(line) @@ -282,12 +290,14 @@ class LineMatcher(object): # We need something more than "*.N" for printing. return self._dict.get("*", self.LEVEL_N) > self.LEVEL_N + class SerialStopException(Exception): """ This exception is used for stopping the IDF monitor in testing mode. """ pass + class Monitor(object): """ Monitor application main class. @@ -316,7 +326,7 @@ class Monitor(object): self.console.getkey = types.MethodType(getkey_patched, self.console) - socket_mode = serial_instance.port.startswith("socket://") # testing hook - data from serial can make exit the monitor + socket_mode = serial_instance.port.startswith("socket://") # testing hook - data from serial can make exit the monitor self.serial = serial_instance self.console_reader = ConsoleReader(self.console, self.event_queue, socket_mode) self.serial_reader = SerialReader(self.serial, self.event_queue) @@ -331,8 +341,8 @@ class Monitor(object): self.translate_eol = { "CRLF": lambda c: c.replace("\n", "\r\n"), - "CR": lambda c: c.replace("\n", "\r"), - "LF": lambda c: c.replace("\r", "\n"), + "CR": lambda c: c.replace("\n", "\r"), + "LF": lambda c: c.replace("\r", "\n"), }[eol] # internal state @@ -381,7 +391,7 @@ class Monitor(object): # Cancelling _invoke_processing_last_line_timer is not # important here because receiving empty data doesn't matter. self._invoke_processing_last_line_timer = None - except: + except Exception: pass sys.stderr.write(ANSI_NORMAL + "\n") @@ -399,9 +409,9 @@ class Monitor(object): key = self.translate_eol(key) self.serial.write(codecs.encode(key)) except serial.SerialException: - pass # this shouldn't happen, but sometimes port has closed in serial thread + pass # this shouldn't happen, but sometimes port has closed in serial thread except UnicodeEncodeError: - pass # this can happen if a non-ascii character was passed, ignoring + pass # this can happen if a non-ascii character was passed, ignoring def handle_serial_input(self, data, finalize_line=False): sp = data.split(b'\n') @@ -427,7 +437,7 @@ class Monitor(object): # to make a decision. if self._last_line_part != b"": if self._force_line_print or (finalize_line and self._line_matcher.match(self._last_line_part.decode(errors="ignore"))): - self._force_line_print = True; + self._force_line_print = True if self._output_enabled: self.console.write_bytes(self._last_line_part) self.handle_possible_pc_address_in_line(self._last_line_part) @@ -454,7 +464,7 @@ class Monitor(object): def handle_menu_key(self, c): if c == self.exit_key or c == self.menu_key: # send verbatim self.serial.write(codecs.encode(c)) - elif c in [ CTRL_H, 'h', 'H', '?' ]: + elif c in [CTRL_H, 'h', 'H', '?']: red_print(self.get_help_text()) elif c == CTRL_R: # Reset device via RTS self.serial.setRTS(True) @@ -472,10 +482,10 @@ class Monitor(object): # to fast trigger pause without press menu key self.serial.setDTR(False) # IO0=HIGH self.serial.setRTS(True) # EN=LOW, chip in reset - time.sleep(1.3) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.1 + time.sleep(1.3) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.1 self.serial.setDTR(True) # IO0=LOW self.serial.setRTS(False) # EN=HIGH, chip out of reset - time.sleep(0.45) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.05 + time.sleep(0.45) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.05 self.serial.setDTR(False) # IO0=HIGH, done else: red_print('--- unknown menu character {} --'.format(key_description(c))) @@ -502,7 +512,7 @@ class Monitor(object): makecmd=key_description(CTRL_F), appmake=key_description(CTRL_A), output=key_description(CTRL_Y), - pause=key_description(CTRL_P) ) + pause=key_description(CTRL_P)) def __enter__(self): """ Use 'with self' to temporarily disable monitoring behaviour """ @@ -525,7 +535,7 @@ class Monitor(object): --- Press any other key to resume monitor (resets target).""".format(reason, key_description(self.exit_key), key_description(CTRL_F), - key_description(CTRL_A) )) + key_description(CTRL_A))) k = CTRL_T # ignore CTRL-T here, so people can muscle-memory Ctrl-T Ctrl-F, etc. while k == CTRL_T: k = self.console.getkey() @@ -533,16 +543,16 @@ class Monitor(object): self.console.cleanup() if k == self.exit_key: self.event_queue.put((TAG_KEY, k)) - elif k in [ CTRL_F, CTRL_A ]: + elif k in [CTRL_F, CTRL_A]: self.event_queue.put((TAG_KEY, self.menu_key)) self.event_queue.put((TAG_KEY, k)) def run_make(self, target): with self: if isinstance(self.make, list): - popen_args = self.make + [ target ] + popen_args = self.make + [target] else: - popen_args = [ self.make, target ] + popen_args = [self.make, target] yellow_print("Running %s..." % " ".join(popen_args)) p = subprocess.Popen(popen_args) try: @@ -559,13 +569,13 @@ class Monitor(object): ["%saddr2line" % self.toolchain_prefix, "-pfiaC", "-e", self.elf_file, pc_addr], cwd=".") - if not b"?? ??:0" in translation: + if b"?? ??:0" not in translation: yellow_print(translation.decode()) def check_gdbstub_trigger(self, line): line = self._gdb_buffer + line self._gdb_buffer = b"" - m = re.search(b"\\$(T..)#(..)", line) # look for a gdb "reason" for a break + m = re.search(b"\\$(T..)#(..)", line) # look for a gdb "reason" for a break if m is not None: try: chsum = sum(ord(bytes([p])) for p in m.group(1)) & 0xFF @@ -577,16 +587,15 @@ class Monitor(object): else: red_print("Malformed gdb message... calculated checksum %02x received %02x" % (chsum, calc_chsum)) - def run_gdb(self): with self: # disable console control sys.stderr.write(ANSI_NORMAL) try: process = subprocess.Popen(["%sgdb" % self.toolchain_prefix, - "-ex", "set serial baud %d" % self.serial.baudrate, - "-ex", "target remote %s" % self.serial.port, - "-ex", "interrupt", # monitor has already parsed the first 'reason' command, need a second - self.elf_file], cwd=".") + "-ex", "set serial baud %d" % self.serial.baudrate, + "-ex", "target remote %s" % self.serial.port, + "-ex", "interrupt", # monitor has already parsed the first 'reason' command, need a second + self.elf_file], cwd=".") process.wait() except KeyboardInterrupt: pass # happens on Windows, maybe other OSes @@ -594,12 +603,12 @@ class Monitor(object): try: # on Linux, maybe other OSes, gdb sometimes seems to be alive even after wait() returns... process.terminate() - except: + except Exception: pass try: # also on Linux, maybe other OSes, gdb sometimes exits uncleanly and breaks the tty mode subprocess.call(["stty", "sane"]) - except: + except Exception: pass # don't care if there's no stty, we tried... self.prompt_next_action("gdb exited") @@ -610,6 +619,7 @@ class Monitor(object): self._output_enabled = not self._output_enabled yellow_print("\nToggle output display: {}, Type Ctrl-T Ctrl-Y to show/disable output again.".format(self._output_enabled)) + def main(): parser = argparse.ArgumentParser("idf_monitor - a serial output monitor for esp-idf") @@ -690,6 +700,7 @@ def main(): monitor.main_loop() + if os.name == 'nt': # Windows console stuff @@ -704,7 +715,7 @@ if os.name == 'nt': RE_ANSI_COLOR = re.compile(b'\033\\[([01]);3([0-7])m') # list mapping the 8 ANSI colors (the indexes) to Windows Console colors - ANSI_TO_WINDOWS_COLOR = [ 0, 4, 2, 6, 1, 5, 3, 7 ] + ANSI_TO_WINDOWS_COLOR = [0, 4, 2, 6, 1, 5, 3, 7] GetStdHandle = ctypes.windll.kernel32.GetStdHandle SetConsoleTextAttribute = ctypes.windll.kernel32.SetConsoleTextAttribute @@ -745,10 +756,10 @@ if os.name == 'nt': data = bytearray(data, 'utf-8') for b in data: b = bytes([b]) - l = len(self.matched) + length = len(self.matched) if b == b'\033': # ESC self.matched = b - elif (l == 1 and b == b'[') or (1 < l < 7): + elif (length == 1 and b == b'[') or (1 < length < 7): self.matched += b if self.matched == ANSI_NORMAL.encode('latin-1'): # reset console # Flush is required only with Python3 - switching color before it is printed would mess up the console @@ -765,7 +776,7 @@ if os.name == 'nt': self.flush() SetConsoleTextAttribute(self.handle, color) else: - self._output_write(self.matched) # not an ANSI color code, display verbatim + self._output_write(self.matched) # not an ANSI color code, display verbatim self.matched = b'' else: self._output_write(b)