#!/usr/bin/env python3 # # This file is part of the MicroPython project, http://micropython.org/ # # The MIT License (MIT) # # Copyright (c) 2014-2021 Damien P. George # Copyright (c) 2017 Paul Sokolovsky # Copyright (c) 2023 Jim Mussared # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. # This is based on the serial-only parts of tools/pyboard.py, with Python 2 # support removed, and is currently in the process of being refactored to # support multiple transports (webrepl, socket, BLE, etc). At the moment, # SerialTransport is just the old Pyboard+PyboardExtended class without any # of this refactoring. The API is going to change significantly. # Once the API is stabilised, the idea is that mpremote can be used both # as a command line tool and a library for interacting with devices. import ast, io, errno, os, re, struct, sys, time from collections import namedtuple from errno import EPERM from .console import VT_ENABLED from .transport import TransportError, Transport def stdout_write_bytes(b): b = b.replace(b"\x04", b"") sys.stdout.buffer.write(b) sys.stdout.buffer.flush() listdir_result = namedtuple("dir_result", ["name", "st_mode", "st_ino", "st_size"]) def reraise_filesystem_error(e, info): if len(e.args) >= 3: if b"OSError" in e.args[2] and b"ENOENT" in e.args[2]: raise FileNotFoundError(info) raise class SerialTransport(Transport): def __init__(self, device, baudrate=115200, wait=0, exclusive=True): self.in_raw_repl = False self.use_raw_paste = True self.device_name = device self.mounted = False import serial import serial.tools.list_ports # Set options, and exclusive if pyserial supports it serial_kwargs = {"baudrate": baudrate, "interCharTimeout": 1} if serial.__version__ >= "3.3": serial_kwargs["exclusive"] = exclusive delayed = False for attempt in range(wait + 1): try: if device.startswith("rfc2217://"): self.serial = serial.serial_for_url(device, **serial_kwargs) elif os.name == "nt": self.serial = serial.Serial(**serial_kwargs) self.serial.port = device portinfo = list(serial.tools.list_ports.grep(device)) # type: ignore if portinfo and portinfo[0].manufacturer != "Microsoft": # ESP8266/ESP32 boards use RTS/CTS for flashing and boot mode selection. # DTR False: to avoid using the reset button will hang the MCU in bootloader mode # RTS False: to prevent pulses on rts on serial.close() that would POWERON_RESET an ESPxx self.serial.dtr = False # DTR False = gpio0 High = Normal boot self.serial.rts = False # RTS False = EN High = MCU enabled self.serial.open() else: self.serial = serial.Serial(device, **serial_kwargs) break except OSError: if wait == 0: continue if attempt == 0: sys.stdout.write("Waiting {} seconds for pyboard ".format(wait)) delayed = True time.sleep(1) sys.stdout.write(".") sys.stdout.flush() else: if delayed: print("") raise TransportError("failed to access " + device) if delayed: print("") def close(self): self.serial.close() def read_until(self, min_num_bytes, ending, timeout=10, data_consumer=None): # if data_consumer is used then data is not accumulated and the ending must be 1 byte long assert data_consumer is None or len(ending) == 1 data = self.serial.read(min_num_bytes) if data_consumer: data_consumer(data) timeout_count = 0 while True: if data.endswith(ending): break elif self.serial.inWaiting() > 0: new_data = self.serial.read(1) if data_consumer: data_consumer(new_data) data = new_data else: data = data + new_data timeout_count = 0 else: timeout_count += 1 if timeout is not None and timeout_count >= 100 * timeout: break time.sleep(0.01) return data def enter_raw_repl(self, soft_reset=True): self.serial.write(b"\r\x03\x03") # ctrl-C twice: interrupt any running program # flush input (without relying on serial.flushInput()) n = self.serial.inWaiting() while n > 0: self.serial.read(n) n = self.serial.inWaiting() self.serial.write(b"\r\x01") # ctrl-A: enter raw REPL if soft_reset: data = self.read_until(1, b"raw REPL; CTRL-B to exit\r\n>") if not data.endswith(b"raw REPL; CTRL-B to exit\r\n>"): print(data) raise TransportError("could not enter raw repl") self.serial.write(b"\x04") # ctrl-D: soft reset # Waiting for "soft reboot" independently to "raw REPL" (done below) # allows boot.py to print, which will show up after "soft reboot" # and before "raw REPL". data = self.read_until(1, b"soft reboot\r\n") if not data.endswith(b"soft reboot\r\n"): print(data) raise TransportError("could not enter raw repl") data = self.read_until(1, b"raw REPL; CTRL-B to exit\r\n") if not data.endswith(b"raw REPL; CTRL-B to exit\r\n"): print(data) raise TransportError("could not enter raw repl") self.in_raw_repl = True def exit_raw_repl(self): self.serial.write(b"\r\x02") # ctrl-B: enter friendly REPL self.in_raw_repl = False def follow(self, timeout, data_consumer=None): # wait for normal output data = self.read_until(1, b"\x04", timeout=timeout, data_consumer=data_consumer) if not data.endswith(b"\x04"): raise TransportError("timeout waiting for first EOF reception") data = data[:-1] # wait for error output data_err = self.read_until(1, b"\x04", timeout=timeout) if not data_err.endswith(b"\x04"): raise TransportError("timeout waiting for second EOF reception") data_err = data_err[:-1] # return normal and error output return data, data_err def raw_paste_write(self, command_bytes): # Read initial header, with window size. data = self.serial.read(2) window_size = struct.unpack("") if not data.endswith(b">"): raise TransportError("could not enter raw repl") if self.use_raw_paste: # Try to enter raw-paste mode. self.serial.write(b"\x05A\x01") data = self.serial.read(2) if data == b"R\x00": # Device understood raw-paste command but doesn't support it. pass elif data == b"R\x01": # Device supports raw-paste mode, write out the command using this mode. return self.raw_paste_write(command_bytes) else: # Device doesn't support raw-paste, fall back to normal raw REPL. data = self.read_until(1, b"w REPL; CTRL-B to exit\r\n>") if not data.endswith(b"w REPL; CTRL-B to exit\r\n>"): print(data) raise TransportError("could not enter raw repl") # Don't try to use raw-paste mode again for this connection. self.use_raw_paste = False # Write command using standard raw REPL, 256 bytes every 10ms. for i in range(0, len(command_bytes), 256): self.serial.write(command_bytes[i : min(i + 256, len(command_bytes))]) time.sleep(0.01) self.serial.write(b"\x04") # check if we could exec command data = self.serial.read(2) if data != b"OK": raise TransportError("could not exec command (response: %r)" % data) def exec_raw(self, command, timeout=10, data_consumer=None): self.exec_raw_no_follow(command) return self.follow(timeout, data_consumer) def eval(self, expression, parse=False): if parse: ret = self.exec("print(repr({}))".format(expression)) ret = ret.strip() return ast.literal_eval(ret.decode()) else: ret = self.exec("print({})".format(expression)) ret = ret.strip() return ret def exec(self, command, data_consumer=None): ret, ret_err = self.exec_raw(command, data_consumer=data_consumer) if ret_err: raise TransportError("exception", ret, ret_err) return ret def execfile(self, filename): with open(filename, "rb") as f: pyfile = f.read() return self.exec(pyfile) def fs_exists(self, src): try: self.exec("import os\nos.stat(%s)" % (("'%s'" % src) if src else "")) return True except TransportError: return False def fs_ls(self, src): cmd = ( "import os\nfor f in os.ilistdir(%s):\n" " print('{:12} {}{}'.format(f[3]if len(f)>3 else 0,f[0],'/'if f[1]&0x4000 else ''))" % (("'%s'" % src) if src else "") ) self.exec(cmd, data_consumer=stdout_write_bytes) def fs_listdir(self, src=""): buf = bytearray() def repr_consumer(b): buf.extend(b.replace(b"\x04", b"")) cmd = "import os\nfor f in os.ilistdir(%s):\n" " print(repr(f), end=',')" % ( ("'%s'" % src) if src else "" ) try: buf.extend(b"[") self.exec(cmd, data_consumer=repr_consumer) buf.extend(b"]") except TransportError as e: reraise_filesystem_error(e, src) return [ listdir_result(*f) if len(f) == 4 else listdir_result(*(f + (0,))) for f in ast.literal_eval(buf.decode()) ] def fs_stat(self, src): try: self.exec("import os") return os.stat_result(self.eval("os.stat(%s)" % ("'%s'" % src), parse=True)) except TransportError as e: reraise_filesystem_error(e, src) def fs_cat(self, src, chunk_size=256): cmd = ( "with open('%s') as f:\n while 1:\n" " b=f.read(%u)\n if not b:break\n print(b,end='')" % (src, chunk_size) ) self.exec(cmd, data_consumer=stdout_write_bytes) def fs_readfile(self, src, chunk_size=256): buf = bytearray() def repr_consumer(b): buf.extend(b.replace(b"\x04", b"")) cmd = ( "with open('%s', 'rb') as f:\n while 1:\n" " b=f.read(%u)\n if not b:break\n print(b,end='')" % (src, chunk_size) ) try: self.exec(cmd, data_consumer=repr_consumer) except TransportError as e: reraise_filesystem_error(e, src) return ast.literal_eval(buf.decode()) def fs_writefile(self, dest, data, chunk_size=256): self.exec("f=open('%s','wb')\nw=f.write" % dest) while data: chunk = data[:chunk_size] self.exec("w(" + repr(chunk) + ")") data = data[len(chunk) :] self.exec("f.close()") def fs_cp(self, src, dest, chunk_size=256, progress_callback=None): if progress_callback: src_size = self.fs_stat(src).st_size written = 0 self.exec("fr=open('%s','rb')\nr=fr.read\nfw=open('%s','wb')\nw=fw.write" % (src, dest)) while True: data_len = int(self.exec("d=r(%u)\nw(d)\nprint(len(d))" % chunk_size)) if not data_len: break if progress_callback: written += data_len progress_callback(written, src_size) self.exec("fr.close()\nfw.close()") def fs_get(self, src, dest, chunk_size=256, progress_callback=None): if progress_callback: src_size = self.fs_stat(src).st_size written = 0 self.exec("f=open('%s','rb')\nr=f.read" % src) with open(dest, "wb") as f: while True: data = bytearray() self.exec("print(r(%u))" % chunk_size, data_consumer=lambda d: data.extend(d)) assert data.endswith(b"\r\n\x04") try: data = ast.literal_eval(str(data[:-3], "ascii")) if not isinstance(data, bytes): raise ValueError("Not bytes") except (UnicodeError, ValueError) as e: raise TransportError("fs_get: Could not interpret received data: %s" % str(e)) if not data: break f.write(data) if progress_callback: written += len(data) progress_callback(written, src_size) self.exec("f.close()") def fs_put(self, src, dest, chunk_size=256, progress_callback=None): if progress_callback: src_size = os.path.getsize(src) written = 0 self.exec("f=open('%s','wb')\nw=f.write" % dest) with open(src, "rb") as f: while True: data = f.read(chunk_size) if not data: break if sys.version_info < (3,): self.exec("w(b" + repr(data) + ")") else: self.exec("w(" + repr(data) + ")") if progress_callback: written += len(data) progress_callback(written, src_size) self.exec("f.close()") def fs_mkdir(self, dir): self.exec("import os\nos.mkdir('%s')" % dir) def fs_rmdir(self, dir): self.exec("import os\nos.rmdir('%s')" % dir) def fs_rm(self, src): self.exec("import os\nos.remove('%s')" % src) def fs_touch(self, src): self.exec("f=open('%s','a')\nf.close()" % src) def filesystem_command(self, args, progress_callback=None, verbose=False): def fname_remote(src): if src.startswith(":"): src = src[1:] # Convert all path separators to "/", because that's what a remote device uses. return src.replace(os.path.sep, "/") def fname_cp_dest(src, dest): _, src = os.path.split(src) if dest is None or dest == "": dest = src elif dest == ".": dest = "./" + src elif dest.endswith("/"): dest += src return dest cmd = args[0] args = args[1:] try: if cmd == "cp": srcs = args[:-1] dest = args[-1] if dest.startswith(":"): op_remote_src = self.fs_cp op_local_src = self.fs_put else: op_remote_src = self.fs_get op_local_src = lambda src, dest, **_: __import__("shutil").copy(src, dest) for src in srcs: if verbose: print("cp %s %s" % (src, dest)) if src.startswith(":"): op = op_remote_src else: op = op_local_src src2 = fname_remote(src) dest2 = fname_cp_dest(src2, fname_remote(dest)) op(src2, dest2, progress_callback=progress_callback) else: ops = { "cat": self.fs_cat, "ls": self.fs_ls, "mkdir": self.fs_mkdir, "rm": self.fs_rm, "rmdir": self.fs_rmdir, "touch": self.fs_touch, } if cmd not in ops: raise TransportError("'{}' is not a filesystem command".format(cmd)) if cmd == "ls" and not args: args = [""] for src in args: src = fname_remote(src) if verbose: print("%s :%s" % (cmd, src)) ops[cmd](src) except TransportError as er: if len(er.args) > 1: print(str(er.args[2], "ascii")) else: print(er) self.exit_raw_repl() self.close() sys.exit(1) def mount_local(self, path, unsafe_links=False): fout = self.serial if self.eval('"RemoteFS" in globals()') == b"False": self.exec(fs_hook_code) self.exec("__mount()") self.mounted = True self.cmd = PyboardCommand(self.serial, fout, path, unsafe_links=unsafe_links) self.serial = SerialIntercept(self.serial, self.cmd) def write_ctrl_d(self, out_callback): self.serial.write(b"\x04") if not self.mounted: return # Read response from the device until it is quiet (with a timeout). INITIAL_TIMEOUT = 0.5 BANNER_TIMEOUT = 2 QUIET_TIMEOUT = 0.1 FULL_TIMEOUT = 5 t_start = t_last_activity = time.monotonic() data_all = b"" soft_reboot_started = False soft_reboot_banner = False while True: t = time.monotonic() n = self.serial.inWaiting() if n > 0: data = self.serial.read(n) out_callback(data) data_all += data t_last_activity = t else: if len(data_all) == 0: if t - t_start > INITIAL_TIMEOUT: return else: if t - t_start > FULL_TIMEOUT: if soft_reboot_started: break return next_data_timeout = QUIET_TIMEOUT if not soft_reboot_started and data_all.find(b"MPY: soft reboot") != -1: soft_reboot_started = True if soft_reboot_started and not soft_reboot_banner: # Once soft reboot has been initiated, give some more time for the startup # banner to be shown if data_all.find(b"\nMicroPython ") != -1: soft_reboot_banner = True elif data_all.find(b"\nraw REPL; CTRL-B to exit\r\n") != -1: soft_reboot_banner = True else: next_data_timeout = BANNER_TIMEOUT if t - t_last_activity > next_data_timeout: break if not soft_reboot_started: return if not soft_reboot_banner: out_callback(b"Warning: Could not remount local filesystem\r\n") return # Determine type of prompt if data_all.endswith(b">"): in_friendly_repl = False prompt = b">" else: in_friendly_repl = True prompt = data_all.rsplit(b"\r\n", 1)[-1] # Clear state while board remounts, it will be re-set once mounted. self.mounted = False self.serial = self.serial.orig_serial # Provide a message about the remount. out_callback(bytes(f"\r\nRemount local directory {self.cmd.root} at /remote\r\n", "utf8")) # Enter raw REPL and re-mount the remote filesystem. self.serial.write(b"\x01") self.exec(fs_hook_code) self.exec("__mount()") self.mounted = True # Exit raw REPL if needed, and wait for the friendly REPL prompt. if in_friendly_repl: self.exit_raw_repl() self.read_until(len(prompt), prompt) out_callback(prompt) self.serial = SerialIntercept(self.serial, self.cmd) def umount_local(self): if self.mounted: self.exec('os.umount("/remote")') self.mounted = False self.serial = self.serial.orig_serial fs_hook_cmds = { "CMD_STAT": 1, "CMD_ILISTDIR_START": 2, "CMD_ILISTDIR_NEXT": 3, "CMD_OPEN": 4, "CMD_CLOSE": 5, "CMD_READ": 6, "CMD_WRITE": 7, "CMD_SEEK": 8, "CMD_REMOVE": 9, "CMD_RENAME": 10, "CMD_MKDIR": 11, "CMD_RMDIR": 12, } fs_hook_code = """\ import os, io, struct, micropython SEEK_SET = 0 class RemoteCommand: def __init__(self): import select, sys self.buf4 = bytearray(4) self.fout = sys.stdout.buffer self.fin = sys.stdin.buffer self.poller = select.poll() self.poller.register(self.fin, select.POLLIN) def poll_in(self): for _ in self.poller.ipoll(1000): return self.end() raise Exception('timeout waiting for remote') def rd(self, n): buf = bytearray(n) self.rd_into(buf, n) return buf def rd_into(self, buf, n): # implement reading with a timeout in case other side disappears if n == 0: return self.poll_in() r = self.fin.readinto(buf, n) if r < n: mv = memoryview(buf) while r < n: self.poll_in() r += self.fin.readinto(mv[r:], n - r) def begin(self, type): micropython.kbd_intr(-1) buf4 = self.buf4 buf4[0] = 0x18 buf4[1] = type self.fout.write(buf4, 2) # Wait for sync byte 0x18, but don't get stuck forever for i in range(30): self.poller.poll(1000) self.fin.readinto(buf4, 1) if buf4[0] == 0x18: break def end(self): micropython.kbd_intr(3) def rd_s8(self): self.rd_into(self.buf4, 1) n = self.buf4[0] if n & 0x80: n -= 0x100 return n def rd_s32(self): buf4 = self.buf4 self.rd_into(buf4, 4) n = buf4[0] | buf4[1] << 8 | buf4[2] << 16 | buf4[3] << 24 if buf4[3] & 0x80: n -= 0x100000000 return n def rd_u32(self): buf4 = self.buf4 self.rd_into(buf4, 4) return buf4[0] | buf4[1] << 8 | buf4[2] << 16 | buf4[3] << 24 def rd_bytes(self, buf): # TODO if n is large (eg >256) then we may miss bytes on stdin n = self.rd_s32() if buf is None: ret = buf = bytearray(n) else: ret = n self.rd_into(buf, n) return ret def rd_str(self): n = self.rd_s32() if n == 0: return '' else: return str(self.rd(n), 'utf8') def wr_s8(self, i): self.buf4[0] = i self.fout.write(self.buf4, 1) def wr_s32(self, i): struct.pack_into(' {len(buf)}") def do_seek(self): fd = self.rd_s8() n = self.rd_s32() whence = self.rd_s8() # self.log_cmd(f"seek {fd} {n}") try: n = self.data_files[fd][0].seek(n, whence) except io.UnsupportedOperation: n = -1 self.wr_s32(n) def do_write(self): fd = self.rd_s8() buf = self.rd_bytes() if self.data_files[fd][1]: buf = str(buf, "utf8") n = self.data_files[fd][0].write(buf) self.wr_s32(n) # self.log_cmd(f"write {fd} {len(buf)} -> {n}") def do_remove(self): path = self.root + self.rd_str() # self.log_cmd(f"remove {path}") try: self.path_check(path) os.remove(path) ret = 0 except OSError as er: ret = -abs(er.errno) self.wr_s32(ret) def do_rename(self): old = self.root + self.rd_str() new = self.root + self.rd_str() # self.log_cmd(f"rename {old} {new}") try: self.path_check(old) self.path_check(new) os.rename(old, new) ret = 0 except OSError as er: ret = -abs(er.errno) self.wr_s32(ret) def do_mkdir(self): path = self.root + self.rd_str() # self.log_cmd(f"mkdir {path}") try: self.path_check(path) os.mkdir(path) ret = 0 except OSError as er: ret = -abs(er.errno) self.wr_s32(ret) def do_rmdir(self): path = self.root + self.rd_str() # self.log_cmd(f"rmdir {path}") try: self.path_check(path) os.rmdir(path) ret = 0 except OSError as er: ret = -abs(er.errno) self.wr_s32(ret) cmd_table = { fs_hook_cmds["CMD_STAT"]: do_stat, fs_hook_cmds["CMD_ILISTDIR_START"]: do_ilistdir_start, fs_hook_cmds["CMD_ILISTDIR_NEXT"]: do_ilistdir_next, fs_hook_cmds["CMD_OPEN"]: do_open, fs_hook_cmds["CMD_CLOSE"]: do_close, fs_hook_cmds["CMD_READ"]: do_read, fs_hook_cmds["CMD_WRITE"]: do_write, fs_hook_cmds["CMD_SEEK"]: do_seek, fs_hook_cmds["CMD_REMOVE"]: do_remove, fs_hook_cmds["CMD_RENAME"]: do_rename, fs_hook_cmds["CMD_MKDIR"]: do_mkdir, fs_hook_cmds["CMD_RMDIR"]: do_rmdir, } class SerialIntercept: def __init__(self, serial, cmd): self.orig_serial = serial self.cmd = cmd self.buf = b"" self.orig_serial.timeout = 5.0 def _check_input(self, blocking): if blocking or self.orig_serial.inWaiting() > 0: c = self.orig_serial.read(1) if c == b"\x18": # a special command c = self.orig_serial.read(1)[0] self.orig_serial.write(b"\x18") # Acknowledge command PyboardCommand.cmd_table[c](self.cmd) elif not VT_ENABLED and c == b"\x1b": # ESC code, ignore these on windows esctype = self.orig_serial.read(1) if esctype == b"[": # CSI while not (0x40 < self.orig_serial.read(1)[0] < 0x7E): # Looking for "final byte" of escape sequence pass else: self.buf += c @property def fd(self): return self.orig_serial.fd def close(self): self.orig_serial.close() def inWaiting(self): self._check_input(False) return len(self.buf) def read(self, n): while len(self.buf) < n: self._check_input(True) out = self.buf[:n] self.buf = self.buf[n:] return out def write(self, buf): self.orig_serial.write(buf)