diff --git a/tools/mpremote/mpremote/commands.py b/tools/mpremote/mpremote/commands.py index 805fcd4fc0..9e05aca376 100644 --- a/tools/mpremote/mpremote/commands.py +++ b/tools/mpremote/mpremote/commands.py @@ -4,7 +4,8 @@ import tempfile import serial.tools.list_ports -from . import pyboardextended as pyboard +from .transport import TransportError +from .transport_serial import SerialTransport class CommandError(Exception): @@ -36,28 +37,28 @@ def do_connect(state, args=None): for p in sorted(serial.tools.list_ports.comports()): if p.vid is not None and p.pid is not None: try: - state.pyb = pyboard.PyboardExtended(p.device, baudrate=115200) + state.transport = SerialTransport(p.device, baudrate=115200) return - except pyboard.PyboardError as er: + except TransportError as er: if not er.args[0].startswith("failed to access"): raise er - raise pyboard.PyboardError("no device found") + raise TransportError("no device found") elif dev.startswith("id:"): # Search for a device with the given serial number. serial_number = dev[len("id:") :] dev = None for p in serial.tools.list_ports.comports(): if p.serial_number == serial_number: - state.pyb = pyboard.PyboardExtended(p.device, baudrate=115200) + state.transport = SerialTransport(p.device, baudrate=115200) return - raise pyboard.PyboardError("no device with serial number {}".format(serial_number)) + raise TransportError("no device with serial number {}".format(serial_number)) else: # Connect to the given device. if dev.startswith("port:"): dev = dev[len("port:") :] - state.pyb = pyboard.PyboardExtended(dev, baudrate=115200) + state.transport = SerialTransport(dev, baudrate=115200) return - except pyboard.PyboardError as er: + except TransportError as er: msg = er.args[0] if msg.startswith("failed to access"): msg += " (it may be in use by another program)" @@ -66,23 +67,23 @@ def do_connect(state, args=None): def do_disconnect(state, _args=None): - if not state.pyb: + if not state.transport: return try: - if state.pyb.mounted: - if not state.pyb.in_raw_repl: - state.pyb.enter_raw_repl(soft_reset=False) - state.pyb.umount_local() - if state.pyb.in_raw_repl: - state.pyb.exit_raw_repl() + if state.transport.mounted: + if not state.transport.in_raw_repl: + state.transport.enter_raw_repl(soft_reset=False) + state.transport.umount_local() + if state.transport.in_raw_repl: + state.transport.exit_raw_repl() except OSError: # Ignore any OSError exceptions when shutting down, eg: # - pyboard.filesystem_command will close the connection if it had an error # - umounting will fail if serial port disappeared pass - state.pyb.close() - state.pyb = None + state.transport.close() + state.transport = None state._auto_soft_reset = True @@ -136,16 +137,17 @@ def do_filesystem(state, args): raise CommandError("'cp -r' source files must be local") _list_recursive(src_files, path) known_dirs = {""} - state.pyb.exec_("import uos") + state.transport.exec_("import uos") for dir, file in src_files: dir_parts = dir.split("/") for i in range(len(dir_parts)): d = "/".join(dir_parts[: i + 1]) if d not in known_dirs: - state.pyb.exec_("try:\n uos.mkdir('%s')\nexcept OSError as e:\n print(e)" % d) + state.transport.exec_( + "try:\n uos.mkdir('%s')\nexcept OSError as e:\n print(e)" % d + ) known_dirs.add(d) - pyboard.filesystem_command( - state.pyb, + state.transport.filesystem_command( ["cp", "/".join((dir, file)), ":" + dir + "/"], progress_callback=show_progress_bar, verbose=verbose, @@ -154,8 +156,8 @@ def do_filesystem(state, args): if args.recursive: raise CommandError("'-r' only supported for 'cp'") try: - pyboard.filesystem_command( - state.pyb, [command] + paths, progress_callback=show_progress_bar, verbose=verbose + state.transport.filesystem_command( + [command] + paths, progress_callback=show_progress_bar, verbose=verbose ) except OSError as er: raise CommandError(er) @@ -166,17 +168,17 @@ def do_edit(state, args): state.did_action() if not os.getenv("EDITOR"): - raise pyboard.PyboardError("edit: $EDITOR not set") + raise TransportError("edit: $EDITOR not set") for src in args.files: src = src.lstrip(":") dest_fd, dest = tempfile.mkstemp(suffix=os.path.basename(src)) try: print("edit :%s" % (src,)) os.close(dest_fd) - state.pyb.fs_touch(src) - state.pyb.fs_get(src, dest, progress_callback=show_progress_bar) + state.transport.fs_touch(src) + state.transport.fs_get(src, dest, progress_callback=show_progress_bar) if os.system('%s "%s"' % (os.getenv("EDITOR"), dest)) == 0: - state.pyb.fs_put(dest, src, progress_callback=show_progress_bar) + state.transport.fs_put(dest, src, progress_callback=show_progress_bar) finally: os.unlink(dest) @@ -186,13 +188,15 @@ def _do_execbuffer(state, buf, follow): state.did_action() try: - state.pyb.exec_raw_no_follow(buf) + state.transport.exec_raw_no_follow(buf) if follow: - ret, ret_err = state.pyb.follow(timeout=None, data_consumer=pyboard.stdout_write_bytes) + ret, ret_err = state.transport.follow( + timeout=None, data_consumer=pyboard.stdout_write_bytes + ) if ret_err: pyboard.stdout_write_bytes(ret_err) sys.exit(1) - except pyboard.PyboardError as er: + except TransportError as er: print(er) sys.exit(1) except KeyboardInterrupt: @@ -221,13 +225,13 @@ def do_run(state, args): def do_mount(state, args): state.ensure_raw_repl() path = args.path[0] - state.pyb.mount_local(path, unsafe_links=args.unsafe_links) + state.transport.mount_local(path, unsafe_links=args.unsafe_links) print(f"Local directory {path} is mounted at /remote") def do_umount(state, path): state.ensure_raw_repl() - state.pyb.umount_local() + state.transport.umount_local() def do_resume(state, _args=None): diff --git a/tools/mpremote/mpremote/main.py b/tools/mpremote/mpremote/main.py index cb96369a53..6689266098 100644 --- a/tools/mpremote/mpremote/main.py +++ b/tools/mpremote/mpremote/main.py @@ -449,7 +449,7 @@ def do_command_expansion(args): class State: def __init__(self): - self.pyb = None + self.transport = None self._did_action = False self._auto_soft_reset = True @@ -460,20 +460,20 @@ class State: return not self._did_action def ensure_connected(self): - if self.pyb is None: + if self.transport is None: do_connect(self) def ensure_raw_repl(self, soft_reset=None): self.ensure_connected() soft_reset = self._auto_soft_reset if soft_reset is None else soft_reset - if soft_reset or not self.pyb.in_raw_repl: - self.pyb.enter_raw_repl(soft_reset=soft_reset) + if soft_reset or not self.transport.in_raw_repl: + self.transport.enter_raw_repl(soft_reset=soft_reset) self._auto_soft_reset = False def ensure_friendly_repl(self): self.ensure_connected() - if self.pyb.in_raw_repl: - self.pyb.exit_raw_repl() + if self.transport.in_raw_repl: + self.transport.exit_raw_repl() def main(): diff --git a/tools/mpremote/mpremote/mip.py b/tools/mpremote/mpremote/mip.py index 99ca9ff7e3..f42c7a0b42 100644 --- a/tools/mpremote/mpremote/mip.py +++ b/tools/mpremote/mpremote/mip.py @@ -16,7 +16,7 @@ _CHUNK_SIZE = 128 # This implements os.makedirs(os.dirname(path)) -def _ensure_path_exists(pyb, path): +def _ensure_path_exists(transport, path): import os split = path.split("/") @@ -29,8 +29,8 @@ def _ensure_path_exists(pyb, path): prefix = "" for i in range(len(split) - 1): prefix += split[i] - if not pyb.fs_exists(prefix): - pyb.fs_mkdir(prefix) + if not transport.fs_exists(prefix): + transport.fs_mkdir(prefix) prefix += "/" @@ -68,7 +68,7 @@ def _rewrite_url(url, branch=None): return url -def _download_file(pyb, url, dest): +def _download_file(transport, url, dest): try: with urllib.request.urlopen(url) as src: fd, path = tempfile.mkstemp() @@ -76,8 +76,8 @@ def _download_file(pyb, url, dest): print("Installing:", dest) with os.fdopen(fd, "wb") as f: _chunk(src, f.write, src.length) - _ensure_path_exists(pyb, dest) - pyb.fs_put(path, dest, progress_callback=show_progress_bar) + _ensure_path_exists(transport, dest) + transport.fs_put(path, dest, progress_callback=show_progress_bar) finally: os.unlink(path) except urllib.error.HTTPError as e: @@ -89,7 +89,7 @@ def _download_file(pyb, url, dest): raise CommandError(f"{e.reason} requesting {url}") -def _install_json(pyb, package_json_url, index, target, version, mpy): +def _install_json(transport, package_json_url, index, target, version, mpy): try: with urllib.request.urlopen(_rewrite_url(package_json_url, version)) as response: package_json = json.load(response) @@ -103,15 +103,15 @@ def _install_json(pyb, package_json_url, index, target, version, mpy): for target_path, short_hash in package_json.get("hashes", ()): fs_target_path = target + "/" + target_path file_url = f"{index}/file/{short_hash[:2]}/{short_hash}" - _download_file(pyb, file_url, fs_target_path) + _download_file(transport, file_url, fs_target_path) for target_path, url in package_json.get("urls", ()): fs_target_path = target + "/" + target_path - _download_file(pyb, _rewrite_url(url, version), fs_target_path) + _download_file(transport, _rewrite_url(url, version), fs_target_path) for dep, dep_version in package_json.get("deps", ()): - _install_package(pyb, dep, index, target, dep_version, mpy) + _install_package(transport, dep, index, target, dep_version, mpy) -def _install_package(pyb, package, index, target, version, mpy): +def _install_package(transport, package, index, target, version, mpy): if ( package.startswith("http://") or package.startswith("https://") @@ -120,7 +120,7 @@ def _install_package(pyb, package, index, target, version, mpy): if package.endswith(".py") or package.endswith(".mpy"): print(f"Downloading {package} to {target}") _download_file( - pyb, _rewrite_url(package, version), target + "/" + package.rsplit("/")[-1] + transport, _rewrite_url(package, version), target + "/" + package.rsplit("/")[-1] ) return else: @@ -136,14 +136,15 @@ def _install_package(pyb, package, index, target, version, mpy): mpy_version = "py" if mpy: - pyb.exec("import sys") + transport.exec("import sys") mpy_version = ( - int(pyb.eval("getattr(sys.implementation, '_mpy', 0) & 0xFF").decode()) or "py" + int(transport.eval("getattr(sys.implementation, '_mpy', 0) & 0xFF").decode()) + or "py" ) package = f"{index}/package/{mpy_version}/{package}/{version}.json" - _install_json(pyb, package, index, target, version, mpy) + _install_json(transport, package, index, target, version, mpy) def do_mip(state, args): @@ -163,9 +164,9 @@ def do_mip(state, args): args.index = _PACKAGE_INDEX if args.target is None: - state.pyb.exec("import sys") + state.transport.exec("import sys") lib_paths = ( - state.pyb.eval("'\\n'.join(p for p in sys.path if p.endswith('/lib'))") + state.transport.eval("'\\n'.join(p for p in sys.path if p.endswith('/lib'))") .decode() .split("\n") ) @@ -181,7 +182,12 @@ def do_mip(state, args): try: _install_package( - state.pyb, package, args.index.rstrip("/"), args.target, version, args.mpy + state.transport, + package, + args.index.rstrip("/"), + args.target, + version, + args.mpy, ) except CommandError: print("Package may be partially installed") diff --git a/tools/mpremote/mpremote/repl.py b/tools/mpremote/mpremote/repl.py index 239e267d75..d24a7774ac 100644 --- a/tools/mpremote/mpremote/repl.py +++ b/tools/mpremote/mpremote/repl.py @@ -1,45 +1,45 @@ from .console import Console, ConsolePosix -from . import pyboardextended as pyboard +from .transport import TransportError def do_repl_main_loop( state, console_in, console_out_write, *, escape_non_printable, code_to_inject, file_to_inject ): while True: - console_in.waitchar(state.pyb.serial) + console_in.waitchar(state.transport.serial) c = console_in.readchar() if c: if c in (b"\x1d", b"\x18"): # ctrl-] or ctrl-x, quit break elif c == b"\x04": # ctrl-D # special handling needed for ctrl-D if filesystem is mounted - state.pyb.write_ctrl_d(console_out_write) + state.transport.write_ctrl_d(console_out_write) elif c == b"\x0a" and code_to_inject is not None: # ctrl-j, inject code - state.pyb.serial.write(code_to_inject) + state.transport.serial.write(code_to_inject) elif c == b"\x0b" and file_to_inject is not None: # ctrl-k, inject script console_out_write(bytes("Injecting %s\r\n" % file_to_inject, "utf8")) - state.pyb.enter_raw_repl(soft_reset=False) + state.transport.enter_raw_repl(soft_reset=False) with open(file_to_inject, "rb") as f: pyfile = f.read() try: - state.pyb.exec_raw_no_follow(pyfile) - except pyboard.PyboardError as er: + state.transport.exec_raw_no_follow(pyfile) + except TransportError as er: console_out_write(b"Error:\r\n") console_out_write(er) - state.pyb.exit_raw_repl() + state.transport.exit_raw_repl() else: - state.pyb.serial.write(c) + state.transport.serial.write(c) try: - n = state.pyb.serial.inWaiting() + n = state.transport.serial.inWaiting() except OSError as er: if er.args[0] == 5: # IO error, device disappeared print("device disconnected") break if n > 0: - dev_data_in = state.pyb.serial.read(n) + dev_data_in = state.transport.serial.read(n) if dev_data_in is not None: if escape_non_printable: # Pass data through to the console, with escaping of non-printables. @@ -63,7 +63,7 @@ def do_repl(state, args): code_to_inject = args.inject_code file_to_inject = args.inject_file - print("Connected to MicroPython at %s" % state.pyb.device_name) + print("Connected to MicroPython at %s" % state.transport.device_name) print("Use Ctrl-] or Ctrl-x to exit this shell") if escape_non_printable: print("Escaping non-printable bytes/characters by printing their hex code") diff --git a/tools/mpremote/mpremote/transport.py b/tools/mpremote/mpremote/transport.py new file mode 100644 index 0000000000..6e9a77b2bb --- /dev/null +++ b/tools/mpremote/mpremote/transport.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 +# +# This file is part of the MicroPython project, http://micropython.org/ +# +# The MIT License (MIT) +# +# Copyright (c) 2023 Jim Mussared +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + + +class TransportError(Exception): + pass + + +class Transport: + pass diff --git a/tools/mpremote/mpremote/pyboardextended.py b/tools/mpremote/mpremote/transport_serial.py similarity index 50% rename from tools/mpremote/mpremote/pyboardextended.py rename to tools/mpremote/mpremote/transport_serial.py index b9dbd1e3b1..84822fe69c 100644 --- a/tools/mpremote/mpremote/pyboardextended.py +++ b/tools/mpremote/mpremote/transport_serial.py @@ -1,14 +1,604 @@ -import io, os, re, struct, time +#!/usr/bin/env python3 +# +# This file is part of the MicroPython project, http://micropython.org/ +# +# The MIT License (MIT) +# +# Copyright (c) 2014-2021 Damien P. George +# Copyright (c) 2017 Paul Sokolovsky +# Copyright (c) 2023 Jim Mussared +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +# This is based on the serial-only parts of tools/pyboard.py, with Python 2 +# support removed, and is currently in the process of being refactored to +# support multiple transports (webrepl, socket, BLE, etc). At the moment, +# SerialTransport is just the old Pyboard+PyboardExtended class without any +# of this refactoring. The API is going to change significantly. + +# Once the API is stabilised, the idea is that mpremote can be used both +# as a command line tool and a library for interacting with devices. + +import ast, io, errno, os, re, struct, sys, time +from collections import namedtuple from errno import EPERM from .console import VT_ENABLED +from .transport import TransportError, Transport -try: - from .pyboard import Pyboard, PyboardError, stdout_write_bytes, filesystem_command -except ImportError: - import sys - sys.path.append(os.path.dirname(__file__) + "/../..") - from pyboard import Pyboard, PyboardError, stdout_write_bytes, filesystem_command +def stdout_write_bytes(b): + b = b.replace(b"\x04", b"") + sys.stdout.buffer.write(b) + sys.stdout.buffer.flush() + + +listdir_result = namedtuple("dir_result", ["name", "st_mode", "st_ino", "st_size"]) + + +def reraise_filesystem_error(e, info): + if len(e.args) >= 3: + if b"OSError" in e.args[2] and b"ENOENT" in e.args[2]: + raise FileNotFoundError(info) + raise + + +class SerialTransport(Transport): + def __init__(self, device, baudrate=115200, wait=0, exclusive=True): + self.in_raw_repl = False + self.use_raw_paste = True + self.device_name = device + self.mounted = False + + import serial + import serial.tools.list_ports + + # Set options, and exclusive if pyserial supports it + serial_kwargs = {"baudrate": baudrate, "interCharTimeout": 1} + if serial.__version__ >= "3.3": + serial_kwargs["exclusive"] = exclusive + + delayed = False + for attempt in range(wait + 1): + try: + if os.name == "nt": + self.serial = serial.Serial(**serial_kwargs) + self.serial.port = device + portinfo = list(serial.tools.list_ports.grep(device)) # type: ignore + if portinfo and portinfo[0].manufacturer != "Microsoft": + # ESP8266/ESP32 boards use RTS/CTS for flashing and boot mode selection. + # DTR False: to avoid using the reset button will hang the MCU in bootloader mode + # RTS False: to prevent pulses on rts on serial.close() that would POWERON_RESET an ESPxx + self.serial.dtr = False # DTR False = gpio0 High = Normal boot + self.serial.rts = False # RTS False = EN High = MCU enabled + self.serial.open() + else: + self.serial = serial.Serial(device, **serial_kwargs) + break + except OSError: + if wait == 0: + continue + if attempt == 0: + sys.stdout.write("Waiting {} seconds for pyboard ".format(wait)) + delayed = True + time.sleep(1) + sys.stdout.write(".") + sys.stdout.flush() + else: + if delayed: + print("") + raise TransportError("failed to access " + device) + if delayed: + print("") + + def close(self): + self.serial.close() + + def read_until(self, min_num_bytes, ending, timeout=10, data_consumer=None): + # if data_consumer is used then data is not accumulated and the ending must be 1 byte long + assert data_consumer is None or len(ending) == 1 + + data = self.serial.read(min_num_bytes) + if data_consumer: + data_consumer(data) + timeout_count = 0 + while True: + if data.endswith(ending): + break + elif self.serial.inWaiting() > 0: + new_data = self.serial.read(1) + if data_consumer: + data_consumer(new_data) + data = new_data + else: + data = data + new_data + timeout_count = 0 + else: + timeout_count += 1 + if timeout is not None and timeout_count >= 100 * timeout: + break + time.sleep(0.01) + return data + + def enter_raw_repl(self, soft_reset=True): + self.serial.write(b"\r\x03\x03") # ctrl-C twice: interrupt any running program + + # flush input (without relying on serial.flushInput()) + n = self.serial.inWaiting() + while n > 0: + self.serial.read(n) + n = self.serial.inWaiting() + + self.serial.write(b"\r\x01") # ctrl-A: enter raw REPL + + if soft_reset: + data = self.read_until(1, b"raw REPL; CTRL-B to exit\r\n>") + if not data.endswith(b"raw REPL; CTRL-B to exit\r\n>"): + print(data) + raise TransportError("could not enter raw repl") + + self.serial.write(b"\x04") # ctrl-D: soft reset + + # Waiting for "soft reboot" independently to "raw REPL" (done below) + # allows boot.py to print, which will show up after "soft reboot" + # and before "raw REPL". + data = self.read_until(1, b"soft reboot\r\n") + if not data.endswith(b"soft reboot\r\n"): + print(data) + raise TransportError("could not enter raw repl") + + data = self.read_until(1, b"raw REPL; CTRL-B to exit\r\n") + if not data.endswith(b"raw REPL; CTRL-B to exit\r\n"): + print(data) + raise TransportError("could not enter raw repl") + + self.in_raw_repl = True + + def exit_raw_repl(self): + self.serial.write(b"\r\x02") # ctrl-B: enter friendly REPL + self.in_raw_repl = False + + def follow(self, timeout, data_consumer=None): + # wait for normal output + data = self.read_until(1, b"\x04", timeout=timeout, data_consumer=data_consumer) + if not data.endswith(b"\x04"): + raise TransportError("timeout waiting for first EOF reception") + data = data[:-1] + + # wait for error output + data_err = self.read_until(1, b"\x04", timeout=timeout) + if not data_err.endswith(b"\x04"): + raise TransportError("timeout waiting for second EOF reception") + data_err = data_err[:-1] + + # return normal and error output + return data, data_err + + def raw_paste_write(self, command_bytes): + # Read initial header, with window size. + data = self.serial.read(2) + window_size = struct.unpack("") + if not data.endswith(b">"): + raise TransportError("could not enter raw repl") + + if self.use_raw_paste: + # Try to enter raw-paste mode. + self.serial.write(b"\x05A\x01") + data = self.serial.read(2) + if data == b"R\x00": + # Device understood raw-paste command but doesn't support it. + pass + elif data == b"R\x01": + # Device supports raw-paste mode, write out the command using this mode. + return self.raw_paste_write(command_bytes) + else: + # Device doesn't support raw-paste, fall back to normal raw REPL. + data = self.read_until(1, b"w REPL; CTRL-B to exit\r\n>") + if not data.endswith(b"w REPL; CTRL-B to exit\r\n>"): + print(data) + raise TransportError("could not enter raw repl") + # Don't try to use raw-paste mode again for this connection. + self.use_raw_paste = False + + # Write command using standard raw REPL, 256 bytes every 10ms. + for i in range(0, len(command_bytes), 256): + self.serial.write(command_bytes[i : min(i + 256, len(command_bytes))]) + time.sleep(0.01) + self.serial.write(b"\x04") + + # check if we could exec command + data = self.serial.read(2) + if data != b"OK": + raise TransportError("could not exec command (response: %r)" % data) + + def exec_raw(self, command, timeout=10, data_consumer=None): + self.exec_raw_no_follow(command) + return self.follow(timeout, data_consumer) + + def eval(self, expression, parse=False): + if parse: + ret = self.exec("print(repr({}))".format(expression)) + ret = ret.strip() + return ast.literal_eval(ret.decode()) + else: + ret = self.exec("print({})".format(expression)) + ret = ret.strip() + return ret + + def exec(self, command, data_consumer=None): + ret, ret_err = self.exec_raw(command, data_consumer=data_consumer) + if ret_err: + raise TransportError("exception", ret, ret_err) + return ret + + def execfile(self, filename): + with open(filename, "rb") as f: + pyfile = f.read() + return self.exec(pyfile) + + def fs_exists(self, src): + try: + self.exec("import uos\nuos.stat(%s)" % (("'%s'" % src) if src else "")) + return True + except TransportError: + return False + + def fs_ls(self, src): + cmd = ( + "import uos\nfor f in uos.ilistdir(%s):\n" + " print('{:12} {}{}'.format(f[3]if len(f)>3 else 0,f[0],'/'if f[1]&0x4000 else ''))" + % (("'%s'" % src) if src else "") + ) + self.exec(cmd, data_consumer=stdout_write_bytes) + + def fs_listdir(self, src=""): + buf = bytearray() + + def repr_consumer(b): + buf.extend(b.replace(b"\x04", b"")) + + cmd = "import uos\nfor f in uos.ilistdir(%s):\n" " print(repr(f), end=',')" % ( + ("'%s'" % src) if src else "" + ) + try: + buf.extend(b"[") + self.exec(cmd, data_consumer=repr_consumer) + buf.extend(b"]") + except TransportError as e: + reraise_filesystem_error(e, src) + + return [ + listdir_result(*f) if len(f) == 4 else listdir_result(*(f + (0,))) + for f in ast.literal_eval(buf.decode()) + ] + + def fs_stat(self, src): + try: + self.exec("import uos") + return os.stat_result(self.eval("uos.stat(%s)" % (("'%s'" % src)), parse=True)) + except TransportError as e: + reraise_filesystem_error(e, src) + + def fs_cat(self, src, chunk_size=256): + cmd = ( + "with open('%s') as f:\n while 1:\n" + " b=f.read(%u)\n if not b:break\n print(b,end='')" % (src, chunk_size) + ) + self.exec(cmd, data_consumer=stdout_write_bytes) + + def fs_readfile(self, src, chunk_size=256): + buf = bytearray() + + def repr_consumer(b): + buf.extend(b.replace(b"\x04", b"")) + + cmd = ( + "with open('%s', 'rb') as f:\n while 1:\n" + " b=f.read(%u)\n if not b:break\n print(b,end='')" % (src, chunk_size) + ) + try: + self.exec(cmd, data_consumer=repr_consumer) + except TransportError as e: + reraise_filesystem_error(e, src) + return ast.literal_eval(buf.decode()) + + def fs_writefile(self, dest, data, chunk_size=256): + self.exec("f=open('%s','wb')\nw=f.write" % dest) + while data: + chunk = data[:chunk_size] + self.exec("w(" + repr(chunk) + ")") + data = data[len(chunk) :] + self.exec("f.close()") + + def fs_cp(self, src, dest, chunk_size=256, progress_callback=None): + if progress_callback: + src_size = self.fs_stat(src).st_size + written = 0 + self.exec("fr=open('%s','rb')\nr=fr.read\nfw=open('%s','wb')\nw=fw.write" % (src, dest)) + while True: + data_len = int(self.exec("d=r(%u)\nw(d)\nprint(len(d))" % chunk_size)) + if not data_len: + break + if progress_callback: + written += data_len + progress_callback(written, src_size) + self.exec("fr.close()\nfw.close()") + + def fs_get(self, src, dest, chunk_size=256, progress_callback=None): + if progress_callback: + src_size = self.fs_stat(src).st_size + written = 0 + self.exec("f=open('%s','rb')\nr=f.read" % src) + with open(dest, "wb") as f: + while True: + data = bytearray() + self.exec("print(r(%u))" % chunk_size, data_consumer=lambda d: data.extend(d)) + assert data.endswith(b"\r\n\x04") + try: + data = ast.literal_eval(str(data[:-3], "ascii")) + if not isinstance(data, bytes): + raise ValueError("Not bytes") + except (UnicodeError, ValueError) as e: + raise TransportError("fs_get: Could not interpret received data: %s" % str(e)) + if not data: + break + f.write(data) + if progress_callback: + written += len(data) + progress_callback(written, src_size) + self.exec("f.close()") + + def fs_put(self, src, dest, chunk_size=256, progress_callback=None): + if progress_callback: + src_size = os.path.getsize(src) + written = 0 + self.exec("f=open('%s','wb')\nw=f.write" % dest) + with open(src, "rb") as f: + while True: + data = f.read(chunk_size) + if not data: + break + if sys.version_info < (3,): + self.exec("w(b" + repr(data) + ")") + else: + self.exec("w(" + repr(data) + ")") + if progress_callback: + written += len(data) + progress_callback(written, src_size) + self.exec("f.close()") + + def fs_mkdir(self, dir): + self.exec("import uos\nuos.mkdir('%s')" % dir) + + def fs_rmdir(self, dir): + self.exec("import uos\nuos.rmdir('%s')" % dir) + + def fs_rm(self, src): + self.exec("import uos\nuos.remove('%s')" % src) + + def fs_touch(self, src): + self.exec("f=open('%s','a')\nf.close()" % src) + + def filesystem_command(self, args, progress_callback=None, verbose=False): + def fname_remote(src): + if src.startswith(":"): + src = src[1:] + # Convert all path separators to "/", because that's what a remote device uses. + return src.replace(os.path.sep, "/") + + def fname_cp_dest(src, dest): + _, src = os.path.split(src) + if dest is None or dest == "": + dest = src + elif dest == ".": + dest = "./" + src + elif dest.endswith("/"): + dest += src + return dest + + cmd = args[0] + args = args[1:] + try: + if cmd == "cp": + srcs = args[:-1] + dest = args[-1] + if dest.startswith(":"): + op_remote_src = self.fs_cp + op_local_src = self.fs_put + else: + op_remote_src = self.fs_get + op_local_src = lambda src, dest, **_: __import__("shutil").copy(src, dest) + for src in srcs: + if verbose: + print("cp %s %s" % (src, dest)) + if src.startswith(":"): + op = op_remote_src + else: + op = op_local_src + src2 = fname_remote(src) + dest2 = fname_cp_dest(src2, fname_remote(dest)) + op(src2, dest2, progress_callback=progress_callback) + else: + ops = { + "cat": self.fs_cat, + "ls": self.fs_ls, + "mkdir": self.fs_mkdir, + "rm": self.fs_rm, + "rmdir": self.fs_rmdir, + "touch": self.fs_touch, + } + if cmd not in ops: + raise TransportError("'{}' is not a filesystem command".format(cmd)) + if cmd == "ls" and not args: + args = [""] + for src in args: + src = fname_remote(src) + if verbose: + print("%s :%s" % (cmd, src)) + ops[cmd](src) + except TransportError as er: + if len(er.args) > 1: + print(str(er.args[2], "ascii")) + else: + print(er) + self.exit_raw_repl() + self.close() + sys.exit(1) + + def mount_local(self, path, unsafe_links=False): + fout = self.serial + if self.eval('"RemoteFS" in globals()') == b"False": + self.exec(fs_hook_code) + self.exec("__mount()") + self.mounted = True + self.cmd = PyboardCommand(self.serial, fout, path, unsafe_links=unsafe_links) + self.serial = SerialIntercept(self.serial, self.cmd) + + def write_ctrl_d(self, out_callback): + self.serial.write(b"\x04") + if not self.mounted: + return + + # Read response from the device until it is quiet (with a timeout). + INITIAL_TIMEOUT = 0.5 + BANNER_TIMEOUT = 2 + QUIET_TIMEOUT = 0.1 + FULL_TIMEOUT = 5 + t_start = t_last_activity = time.monotonic() + data_all = b"" + soft_reboot_started = False + soft_reboot_banner = False + while True: + t = time.monotonic() + n = self.serial.inWaiting() + if n > 0: + data = self.serial.read(n) + out_callback(data) + data_all += data + t_last_activity = t + else: + if len(data_all) == 0: + if t - t_start > INITIAL_TIMEOUT: + return + else: + if t - t_start > FULL_TIMEOUT: + if soft_reboot_started: + break + return + + next_data_timeout = QUIET_TIMEOUT + + if not soft_reboot_started and data_all.find(b"MPY: soft reboot") != -1: + soft_reboot_started = True + + if soft_reboot_started and not soft_reboot_banner: + # Once soft reboot has been initiated, give some more time for the startup + # banner to be shown + if data_all.find(b"\nMicroPython ") != -1: + soft_reboot_banner = True + elif data_all.find(b"\nraw REPL; CTRL-B to exit\r\n") != -1: + soft_reboot_banner = True + else: + next_data_timeout = BANNER_TIMEOUT + + if t - t_last_activity > next_data_timeout: + break + + if not soft_reboot_started: + return + + if not soft_reboot_banner: + out_callback(b"Warning: Could not remount local filesystem\r\n") + return + + # Determine type of prompt + if data_all.endswith(b">"): + in_friendly_repl = False + prompt = b">" + else: + in_friendly_repl = True + prompt = data_all.rsplit(b"\r\n", 1)[-1] + + # Clear state while board remounts, it will be re-set once mounted. + self.mounted = False + self.serial = self.serial.orig_serial + + # Provide a message about the remount. + out_callback(bytes(f"\r\nRemount local directory {self.cmd.root} at /remote\r\n", "utf8")) + + # Enter raw REPL and re-mount the remote filesystem. + self.serial.write(b"\x01") + self.exec(fs_hook_code) + self.exec("__mount()") + self.mounted = True + + # Exit raw REPL if needed, and wait for the friendly REPL prompt. + if in_friendly_repl: + self.exit_raw_repl() + self.read_until(len(prompt), prompt) + out_callback(prompt) + self.serial = SerialIntercept(self.serial, self.cmd) + + def umount_local(self): + if self.mounted: + self.exec('uos.umount("/remote")') + self.mounted = False + self.serial = self.serial.orig_serial + fs_hook_cmds = { "CMD_STAT": 1, @@ -617,110 +1207,3 @@ class SerialIntercept: def write(self, buf): self.orig_serial.write(buf) - - -class PyboardExtended(Pyboard): - def __init__(self, dev, *args, **kwargs): - super().__init__(dev, *args, **kwargs) - self.device_name = dev - self.mounted = False - - def mount_local(self, path, unsafe_links=False): - fout = self.serial - if self.eval('"RemoteFS" in globals()') == b"False": - self.exec_(fs_hook_code) - self.exec_("__mount()") - self.mounted = True - self.cmd = PyboardCommand(self.serial, fout, path, unsafe_links=unsafe_links) - self.serial = SerialIntercept(self.serial, self.cmd) - - def write_ctrl_d(self, out_callback): - self.serial.write(b"\x04") - if not self.mounted: - return - - # Read response from the device until it is quiet (with a timeout). - INITIAL_TIMEOUT = 0.5 - BANNER_TIMEOUT = 2 - QUIET_TIMEOUT = 0.1 - FULL_TIMEOUT = 5 - t_start = t_last_activity = time.monotonic() - data_all = b"" - soft_reboot_started = False - soft_reboot_banner = False - while True: - t = time.monotonic() - n = self.serial.inWaiting() - if n > 0: - data = self.serial.read(n) - out_callback(data) - data_all += data - t_last_activity = t - else: - if len(data_all) == 0: - if t - t_start > INITIAL_TIMEOUT: - return - else: - if t - t_start > FULL_TIMEOUT: - if soft_reboot_started: - break - return - - next_data_timeout = QUIET_TIMEOUT - - if not soft_reboot_started and data_all.find(b"MPY: soft reboot") != -1: - soft_reboot_started = True - - if soft_reboot_started and not soft_reboot_banner: - # Once soft reboot has been initiated, give some more time for the startup - # banner to be shown - if data_all.find(b"\nMicroPython ") != -1: - soft_reboot_banner = True - elif data_all.find(b"\nraw REPL; CTRL-B to exit\r\n") != -1: - soft_reboot_banner = True - else: - next_data_timeout = BANNER_TIMEOUT - - if t - t_last_activity > next_data_timeout: - break - - if not soft_reboot_started: - return - - if not soft_reboot_banner: - out_callback(b"Warning: Could not remount local filesystem\r\n") - return - - # Determine type of prompt - if data_all.endswith(b">"): - in_friendly_repl = False - prompt = b">" - else: - in_friendly_repl = True - prompt = data_all.rsplit(b"\r\n", 1)[-1] - - # Clear state while board remounts, it will be re-set once mounted. - self.mounted = False - self.serial = self.serial.orig_serial - - # Provide a message about the remount. - out_callback(bytes(f"\r\nRemount local directory {self.cmd.root} at /remote\r\n", "utf8")) - - # Enter raw REPL and re-mount the remote filesystem. - self.serial.write(b"\x01") - self.exec_(fs_hook_code) - self.exec_("__mount()") - self.mounted = True - - # Exit raw REPL if needed, and wait for the friendly REPL prompt. - if in_friendly_repl: - self.exit_raw_repl() - self.read_until(len(prompt), prompt) - out_callback(prompt) - self.serial = SerialIntercept(self.serial, self.cmd) - - def umount_local(self): - if self.mounted: - self.exec_('uos.umount("/remote")') - self.mounted = False - self.serial = self.serial.orig_serial diff --git a/tools/mpremote/pyproject.toml b/tools/mpremote/pyproject.toml index 1b6c2173d2..b01385c3d5 100644 --- a/tools/mpremote/pyproject.toml +++ b/tools/mpremote/pyproject.toml @@ -44,11 +44,5 @@ raw-options = { root = "../..", version_scheme = "post-release" } [tool.hatch.build] packages = ["mpremote"] -# Also grab pyboard.py from /tools and add it to the package for both wheel and sdist. -[tool.hatch.build.force-include] -"../pyboard.py" = "mpremote/pyboard.py" - -# Workaround to allow `python -m build` to work. [tool.hatch.build.targets.sdist.force-include] -"../pyboard.py" = "mpremote/pyboard.py" "requirements.txt" = "requirements.txt"