kopia lustrzana https://github.com/micropython/micropython-lib
pathlib: Add initial pathlib implementation.
This adds most of the common functionality of pathlib.Path. The glob functionality could use some work; currently it only supports a single "*" wildcard; however, this is the vast majority of common use-cases and it won't fail silently if non-supported glob patterns are provided.pull/507/head
rodzic
d1aaec7174
commit
0051a5ef50
|
@ -0,0 +1,3 @@
|
|||
metadata(version="0.0.1")
|
||||
|
||||
module("pathlib.py")
|
|
@ -0,0 +1,207 @@
|
|||
import errno
|
||||
import os
|
||||
|
||||
from micropython import const
|
||||
|
||||
_SEP = const("/")
|
||||
|
||||
|
||||
def _mode_if_exists(path):
|
||||
try:
|
||||
return os.stat(path)[0]
|
||||
except OSError as e:
|
||||
if e.errno == errno.ENOENT:
|
||||
return 0
|
||||
raise e
|
||||
|
||||
|
||||
def _clean_segment(segment):
|
||||
segment = str(segment)
|
||||
if not segment:
|
||||
return "."
|
||||
segment = segment.rstrip(_SEP)
|
||||
if not segment:
|
||||
return _SEP
|
||||
while True:
|
||||
no_double = segment.replace(_SEP + _SEP, _SEP)
|
||||
if no_double == segment:
|
||||
break
|
||||
segment = no_double
|
||||
return segment
|
||||
|
||||
|
||||
class Path:
|
||||
def __init__(self, *segments):
|
||||
segments_cleaned = []
|
||||
for segment in segments:
|
||||
segment = _clean_segment(segment)
|
||||
if segment[0] == _SEP:
|
||||
segments_cleaned = [segment]
|
||||
elif segment == ".":
|
||||
continue
|
||||
else:
|
||||
segments_cleaned.append(segment)
|
||||
|
||||
self._path = _clean_segment(_SEP.join(segments_cleaned))
|
||||
|
||||
def __truediv__(self, other):
|
||||
return Path(self._path, str(other))
|
||||
|
||||
def __repr__(self):
|
||||
return f'{type(self).__name__}("{self._path}")'
|
||||
|
||||
def __str__(self):
|
||||
return self._path
|
||||
|
||||
def __eq__(self, other):
|
||||
return self.absolute() == Path(other).absolute()
|
||||
|
||||
def absolute(self):
|
||||
path = self._path
|
||||
cwd = os.getcwd()
|
||||
if not path or path == ".":
|
||||
return cwd
|
||||
if path[0] == _SEP:
|
||||
return path
|
||||
return _SEP + path if cwd == _SEP else cwd + _SEP + path
|
||||
|
||||
def resolve(self):
|
||||
return self.absolute()
|
||||
|
||||
def open(self, mode="r", encoding=None):
|
||||
return open(self._path, mode, encoding=encoding)
|
||||
|
||||
def exists(self):
|
||||
return bool(_mode_if_exists(self._path))
|
||||
|
||||
def mkdir(self, parents=False, exist_ok=False):
|
||||
try:
|
||||
os.mkdir(self._path)
|
||||
return
|
||||
except OSError as e:
|
||||
if e.errno == errno.EEXIST and exist_ok:
|
||||
return
|
||||
elif e.errno == errno.ENOENT and parents:
|
||||
pass # handled below
|
||||
else:
|
||||
raise e
|
||||
|
||||
segments = self._path.split(_SEP)
|
||||
progressive_path = ""
|
||||
if segments[0] == "":
|
||||
segments = segments[1:]
|
||||
progressive_path = _SEP
|
||||
for segment in segments:
|
||||
progressive_path += _SEP + segment
|
||||
try:
|
||||
os.mkdir(progressive_path)
|
||||
except OSError as e:
|
||||
if e.errno != errno.EEXIST:
|
||||
raise e
|
||||
|
||||
def is_dir(self):
|
||||
return bool(_mode_if_exists(self._path) & 0x4000)
|
||||
|
||||
def is_file(self):
|
||||
return bool(_mode_if_exists(self._path) & 0x8000)
|
||||
|
||||
def _glob(self, path, pattern, recursive):
|
||||
# Currently only supports a single "*" pattern.
|
||||
n_wildcards = pattern.count("*")
|
||||
n_single_wildcards = pattern.count("?")
|
||||
|
||||
if n_single_wildcards:
|
||||
raise NotImplementedError("? single wildcards not implemented.")
|
||||
|
||||
if n_wildcards == 0:
|
||||
raise ValueError
|
||||
elif n_wildcards > 1:
|
||||
raise NotImplementedError("Multiple * wildcards not implemented.")
|
||||
|
||||
prefix, suffix = pattern.split("*")
|
||||
|
||||
for name, mode, *_ in os.ilistdir(path):
|
||||
full_path = path + _SEP + name
|
||||
if name.startswith(prefix) and name.endswith(suffix):
|
||||
yield full_path
|
||||
if recursive and mode & 0x4000: # is_dir
|
||||
yield from self._glob(full_path, pattern, recursive=recursive)
|
||||
|
||||
def glob(self, pattern):
|
||||
"""Iterate over this subtree and yield all existing files (of any
|
||||
kind, including directories) matching the given relative pattern.
|
||||
|
||||
Currently only supports a single "*" pattern.
|
||||
"""
|
||||
return self._glob(self._path, pattern, recursive=False)
|
||||
|
||||
def rglob(self, pattern):
|
||||
return self._glob(self._path, pattern, recursive=True)
|
||||
|
||||
def stat(self):
|
||||
return os.stat(self._path)
|
||||
|
||||
def read_bytes(self):
|
||||
with open(self._path, "rb") as f:
|
||||
return f.read()
|
||||
|
||||
def read_text(self, encoding=None):
|
||||
with open(self._path, "r", encoding=encoding) as f:
|
||||
return f.read()
|
||||
|
||||
def rename(self, target):
|
||||
os.rename(self._path, target)
|
||||
|
||||
def rmdir(self):
|
||||
os.rmdir(self._path)
|
||||
|
||||
def touch(self, exist_ok=True):
|
||||
if self.exists():
|
||||
if exist_ok:
|
||||
return # TODO: should update timestamp
|
||||
else:
|
||||
# In lieue of FileExistsError
|
||||
raise OSError(errno.EEXIST)
|
||||
with open(self._path, "w"):
|
||||
pass
|
||||
|
||||
def unlink(self, missing_ok=False):
|
||||
try:
|
||||
os.unlink(self._path)
|
||||
except OSError as e:
|
||||
if not (missing_ok and e.errno == errno.ENOENT):
|
||||
raise e
|
||||
|
||||
def write_bytes(self, data):
|
||||
with open(self._path, "wb") as f:
|
||||
f.write(data)
|
||||
|
||||
def write_text(self, data, encoding=None):
|
||||
with open(self._path, "w", encoding=encoding) as f:
|
||||
f.write(data)
|
||||
|
||||
def with_suffix(self, suffix):
|
||||
index = -len(self.suffix) or None
|
||||
return Path(self._path[:index] + suffix)
|
||||
|
||||
@property
|
||||
def stem(self):
|
||||
return self.name.rsplit(".", 1)[0]
|
||||
|
||||
@property
|
||||
def parent(self):
|
||||
tokens = self._path.rsplit(_SEP, 1)
|
||||
if len(tokens) == 2:
|
||||
if not tokens[0]:
|
||||
tokens[0] = _SEP
|
||||
return Path(tokens[0])
|
||||
return Path(".")
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
return self._path.rsplit(_SEP, 1)[-1]
|
||||
|
||||
@property
|
||||
def suffix(self):
|
||||
elems = self._path.rsplit(".", 1)
|
||||
return "" if len(elems) == 1 else "." + elems[1]
|
|
@ -0,0 +1,324 @@
|
|||
import os
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
|
||||
|
||||
def _isgenerator(x):
|
||||
return isinstance(x, type((lambda: (yield))()))
|
||||
|
||||
|
||||
class TestPathlib(unittest.TestCase):
|
||||
def assertExists(self, fn):
|
||||
os.stat(fn)
|
||||
|
||||
def assertNotExists(self, fn):
|
||||
with self.assertRaises(OSError):
|
||||
os.stat(fn)
|
||||
|
||||
def setUp(self):
|
||||
self._tmp_path_obj = TemporaryDirectory()
|
||||
self.tmp_path = self._tmp_path_obj.name
|
||||
|
||||
def tearDown(self):
|
||||
self._tmp_path_obj.cleanup()
|
||||
|
||||
def test_init_single_segment(self):
|
||||
path = Path("foo")
|
||||
self.assertTrue(path._path == "foo")
|
||||
|
||||
path = Path("foo/")
|
||||
self.assertTrue(path._path == "foo")
|
||||
|
||||
path = Path("/foo")
|
||||
self.assertTrue(path._path == "/foo")
|
||||
|
||||
path = Path("/////foo")
|
||||
self.assertTrue(path._path == "/foo")
|
||||
|
||||
path = Path("")
|
||||
self.assertTrue(path._path == ".")
|
||||
|
||||
def test_init_multiple_segment(self):
|
||||
path = Path("foo", "bar")
|
||||
self.assertTrue(path._path == "foo/bar")
|
||||
|
||||
path = Path("foo/", "bar")
|
||||
self.assertTrue(path._path == "foo/bar")
|
||||
|
||||
path = Path("/foo", "bar")
|
||||
self.assertTrue(path._path == "/foo/bar")
|
||||
|
||||
path = Path("/foo", "", "bar")
|
||||
self.assertTrue(path._path == "/foo/bar")
|
||||
|
||||
path = Path("/foo/", "", "/bar/")
|
||||
self.assertTrue(path._path == "/bar")
|
||||
|
||||
path = Path("", "")
|
||||
self.assertTrue(path._path == ".")
|
||||
|
||||
def test_truediv_join_str(self):
|
||||
actual = Path("foo") / "bar"
|
||||
self.assertTrue(actual == Path("foo/bar"))
|
||||
|
||||
def test_truediv_join_path(self):
|
||||
actual = Path("foo") / Path("bar")
|
||||
self.assertTrue(actual == Path("foo/bar"))
|
||||
|
||||
actual = Path("foo") / Path("/bar")
|
||||
self.assertTrue(actual == "/bar")
|
||||
|
||||
def test_eq_and_absolute(self):
|
||||
self.assertTrue(Path("") == Path("."))
|
||||
self.assertTrue(Path("foo") == Path(os.getcwd(), "foo"))
|
||||
self.assertTrue(Path("foo") == "foo")
|
||||
self.assertTrue(Path("foo") == os.getcwd() + "/foo")
|
||||
|
||||
self.assertTrue(Path("foo") != Path("bar"))
|
||||
self.assertTrue(Path(".") != Path("/"))
|
||||
|
||||
def test_open(self):
|
||||
fn = self.tmp_path + "/foo.txt"
|
||||
path = Path(fn)
|
||||
|
||||
with open(fn, "w") as f:
|
||||
f.write("file contents")
|
||||
|
||||
with path.open("r") as f:
|
||||
actual = f.read()
|
||||
|
||||
self.assertTrue(actual == "file contents")
|
||||
|
||||
def test_exists(self):
|
||||
fn = self.tmp_path + "/foo.txt"
|
||||
|
||||
path = Path(str(fn))
|
||||
self.assertTrue(not path.exists())
|
||||
|
||||
with open(fn, "w"):
|
||||
pass
|
||||
|
||||
self.assertTrue(path.exists())
|
||||
|
||||
def test_mkdir(self):
|
||||
target = self.tmp_path + "/foo/bar/baz"
|
||||
path = Path(target)
|
||||
|
||||
with self.assertRaises(OSError):
|
||||
path.mkdir()
|
||||
|
||||
with self.assertRaises(OSError):
|
||||
path.mkdir(exist_ok=True)
|
||||
|
||||
path.mkdir(parents=True)
|
||||
self.assertExists(target)
|
||||
|
||||
with self.assertRaises(OSError):
|
||||
path.mkdir(exist_ok=False)
|
||||
|
||||
path.mkdir(exist_ok=True)
|
||||
|
||||
def test_is_dir(self):
|
||||
target = self.tmp_path
|
||||
path = Path(target)
|
||||
self.assertTrue(path.is_dir())
|
||||
|
||||
target = self.tmp_path + "/foo"
|
||||
path = Path(target)
|
||||
self.assertTrue(not path.is_dir())
|
||||
os.mkdir(target)
|
||||
self.assertTrue(path.is_dir())
|
||||
|
||||
target = self.tmp_path + "/bar.txt"
|
||||
path = Path(target)
|
||||
self.assertTrue(not path.is_dir())
|
||||
with open(target, "w"):
|
||||
pass
|
||||
self.assertTrue(not path.is_dir())
|
||||
|
||||
def test_is_file(self):
|
||||
target = self.tmp_path
|
||||
path = Path(target)
|
||||
self.assertTrue(not path.is_file())
|
||||
|
||||
target = self.tmp_path + "/bar.txt"
|
||||
path = Path(target)
|
||||
self.assertTrue(not path.is_file())
|
||||
with open(target, "w"):
|
||||
pass
|
||||
self.assertTrue(path.is_file())
|
||||
|
||||
def test_glob(self):
|
||||
foo_txt = self.tmp_path + "/foo.txt"
|
||||
with open(foo_txt, "w"):
|
||||
pass
|
||||
bar_txt = self.tmp_path + "/bar.txt"
|
||||
with open(bar_txt, "w"):
|
||||
pass
|
||||
baz_bin = self.tmp_path + "/baz.bin"
|
||||
with open(baz_bin, "w"):
|
||||
pass
|
||||
|
||||
path = Path(self.tmp_path)
|
||||
glob_gen = path.glob("*.txt")
|
||||
self.assertTrue(_isgenerator(glob_gen))
|
||||
|
||||
res = [str(x) for x in glob_gen]
|
||||
self.assertTrue(len(res) == 2)
|
||||
self.assertTrue(foo_txt in res)
|
||||
self.assertTrue(bar_txt in res)
|
||||
|
||||
def test_rglob(self):
|
||||
foo_txt = self.tmp_path + "/foo.txt"
|
||||
with open(foo_txt, "w"):
|
||||
pass
|
||||
bar_txt = self.tmp_path + "/bar.txt"
|
||||
with open(bar_txt, "w"):
|
||||
pass
|
||||
baz_bin = self.tmp_path + "/baz.bin"
|
||||
with open(baz_bin, "w"):
|
||||
pass
|
||||
|
||||
boop_folder = self.tmp_path + "/boop"
|
||||
os.mkdir(boop_folder)
|
||||
bap_txt = self.tmp_path + "/boop/bap.txt"
|
||||
with open(bap_txt, "w"):
|
||||
pass
|
||||
|
||||
path = Path(self.tmp_path)
|
||||
glob_gen = path.rglob("*.txt")
|
||||
self.assertTrue(_isgenerator(glob_gen))
|
||||
|
||||
res = [str(x) for x in glob_gen]
|
||||
self.assertTrue(len(res) == 3)
|
||||
self.assertTrue(foo_txt in res)
|
||||
self.assertTrue(bar_txt in res)
|
||||
self.assertTrue(bap_txt in res)
|
||||
|
||||
def test_stat(self):
|
||||
expected = os.stat(self.tmp_path)
|
||||
path = Path(self.tmp_path)
|
||||
actual = path.stat()
|
||||
self.assertTrue(expected == actual)
|
||||
|
||||
def test_rmdir(self):
|
||||
target = self.tmp_path + "/foo"
|
||||
path = Path(target)
|
||||
|
||||
with self.assertRaises(OSError):
|
||||
# Doesn't exist
|
||||
path.rmdir()
|
||||
|
||||
os.mkdir(target)
|
||||
self.assertExists(target)
|
||||
path.rmdir()
|
||||
self.assertNotExists(target)
|
||||
|
||||
os.mkdir(target)
|
||||
with open(target + "/bar.txt", "w"):
|
||||
pass
|
||||
|
||||
with self.assertRaises(OSError):
|
||||
# Cannot rmdir; contains file.
|
||||
path.rmdir()
|
||||
|
||||
def test_touch(self):
|
||||
target = self.tmp_path + "/foo.txt"
|
||||
|
||||
path = Path(target)
|
||||
path.touch()
|
||||
self.assertExists(target)
|
||||
|
||||
path.touch() # touching existing file is fine
|
||||
self.assertExists(target)
|
||||
|
||||
# Technically should be FileExistsError,
|
||||
# but thats not builtin to micropython
|
||||
with self.assertRaises(OSError):
|
||||
path.touch(exist_ok=False)
|
||||
|
||||
path = Path(self.tmp_path + "/bar/baz.txt")
|
||||
with self.assertRaises(OSError):
|
||||
# Parent directory does not exist
|
||||
path.touch()
|
||||
|
||||
def test_unlink(self):
|
||||
target = self.tmp_path + "/foo.txt"
|
||||
|
||||
path = Path(target)
|
||||
with self.assertRaises(OSError):
|
||||
# File does not exist
|
||||
path.unlink()
|
||||
|
||||
with open(target, "w"):
|
||||
pass
|
||||
|
||||
self.assertExists(target)
|
||||
path.unlink()
|
||||
self.assertNotExists(target)
|
||||
|
||||
path = Path(self.tmp_path)
|
||||
with self.assertRaises(OSError):
|
||||
# File does not exist
|
||||
path.unlink()
|
||||
|
||||
def test_write_bytes(self):
|
||||
target = self.tmp_path + "/foo.bin"
|
||||
path = Path(target)
|
||||
path.write_bytes(b"test byte data")
|
||||
with open(target, "rb") as f:
|
||||
actual = f.read()
|
||||
self.assertTrue(actual == b"test byte data")
|
||||
|
||||
def test_write_text(self):
|
||||
target = self.tmp_path + "/foo.txt"
|
||||
path = Path(target)
|
||||
path.write_text("test string")
|
||||
with open(target, "r") as f:
|
||||
actual = f.read()
|
||||
self.assertTrue(actual == "test string")
|
||||
|
||||
def test_read_bytes(self):
|
||||
target = self.tmp_path + "/foo.bin"
|
||||
with open(target, "wb") as f:
|
||||
f.write(b"test byte data")
|
||||
|
||||
path = Path(target)
|
||||
actual = path.read_bytes()
|
||||
self.assertTrue(actual == b"test byte data")
|
||||
|
||||
def test_read_text(self):
|
||||
target = self.tmp_path + "/foo.bin"
|
||||
with open(target, "w") as f:
|
||||
f.write("test string")
|
||||
|
||||
path = Path(target)
|
||||
actual = path.read_text()
|
||||
self.assertTrue(actual == "test string")
|
||||
|
||||
def test_stem(self):
|
||||
self.assertTrue(Path("foo/test").stem == "test")
|
||||
self.assertTrue(Path("foo/bar.bin").stem == "bar")
|
||||
self.assertTrue(Path("").stem == "")
|
||||
|
||||
def test_name(self):
|
||||
self.assertTrue(Path("foo/test").name == "test")
|
||||
self.assertTrue(Path("foo/bar.bin").name == "bar.bin")
|
||||
|
||||
def test_parent(self):
|
||||
self.assertTrue(Path("foo/test").parent == Path("foo"))
|
||||
self.assertTrue(Path("foo/bar.bin").parent == Path("foo"))
|
||||
self.assertTrue(Path("bar.bin").parent == Path("."))
|
||||
self.assertTrue(Path(".").parent == Path("."))
|
||||
self.assertTrue(Path("/").parent == Path("/"))
|
||||
|
||||
def test_suffix(self):
|
||||
self.assertTrue(Path("foo/test").suffix == "")
|
||||
self.assertTrue(Path("foo/bar.bin").suffix == ".bin")
|
||||
self.assertTrue(Path("bar.txt").suffix == ".txt")
|
||||
|
||||
def test_with_suffix(self):
|
||||
self.assertTrue(Path("foo/test").with_suffix(".tar") == Path("foo/test.tar"))
|
||||
self.assertTrue(Path("foo/bar.bin").with_suffix(".txt") == Path("foo/bar.txt"))
|
||||
self.assertTrue(Path("bar.txt").with_suffix("") == Path("bar"))
|
Ładowanie…
Reference in New Issue