tests/extmod: Add deflate.DeflateIO tests.

Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
pull/11905/head
Jim Mussared 2023-06-29 15:52:03 +10:00 zatwierdzone przez Damien George
rodzic 3533924c36
commit 8b315ef0d8
6 zmienionych plików z 557 dodań i 0 usunięć

Wyświetl plik

@ -0,0 +1,148 @@
try:
# Check if deflate is available.
import deflate
import io
except ImportError:
print("SKIP")
raise SystemExit
# Check if compression is enabled.
if not hasattr(deflate.DeflateIO, "write"):
print("SKIP")
raise SystemExit
# Simple compression & decompression.
b = io.BytesIO()
g = deflate.DeflateIO(b, deflate.RAW)
data = b"micropython"
N = 10
for i in range(N):
g.write(data)
g.close()
result_raw = b.getvalue()
print(len(result_raw) < len(data) * N)
b = io.BytesIO(result_raw)
g = deflate.DeflateIO(b, deflate.RAW)
print(g.read())
# Same, but using a context manager.
b = io.BytesIO()
with deflate.DeflateIO(b, deflate.RAW) as g:
for i in range(N):
g.write(data)
result_raw = b.getvalue()
print(len(result_raw) < len(data) * N)
b = io.BytesIO(result_raw)
with deflate.DeflateIO(b, deflate.RAW) as g:
print(g.read())
# Writing to a closed underlying stream.
b = io.BytesIO()
g = deflate.DeflateIO(b, deflate.RAW)
g.write(b"micropython")
b.close()
try:
g.write(b"micropython")
except ValueError:
print("ValueError")
# Writing to a closed DeflateIO.
b = io.BytesIO()
g = deflate.DeflateIO(b, deflate.RAW)
g.write(b"micropython")
g.close()
try:
g.write(b"micropython")
except OSError:
print("OSError")
def decompress(data, *args):
buf = io.BytesIO(data)
with deflate.DeflateIO(buf, *args) as g:
return g.read()
def compress(data, *args):
b = io.BytesIO()
with deflate.DeflateIO(b, *args) as g:
g.write(data)
return b.getvalue()
def compress_error(data, *args):
try:
compress(data, *args)
except OSError:
print("OSError")
except ValueError:
print("ValueError")
# More test patterns.
PATTERNS_RAW = (
(b"0", b"3\x00\x00"),
(b"a", b"K\x04\x00"),
(b"0" * 100, b"3\xa0\x03\x00\x00"),
(
bytes(range(64)),
b"c`dbfaec\xe7\xe0\xe4\xe2\xe6\xe1\xe5\xe3\x17\x10\x14\x12\x16\x11\x15\x13\x97\x90\x94\x92\x96\x91\x95\x93WPTRVQUS\xd7\xd0\xd4\xd2\xd6\xd1\xd5\xd370426153\xb7\xb0\xb4\xb2\xb6\xb1\xb5\xb3\x07\x00",
),
)
for unpacked, packed in PATTERNS_RAW:
print(compress(unpacked) == packed)
print(compress(unpacked, deflate.RAW) == packed)
# Verify header and checksum format.
unpacked = b"hello"
packed = b"\xcbH\xcd\xc9\xc9\x07\x00"
def check_header(n, a, b):
if a == b:
print(n)
else:
print(n, a, b)
check_header("RAW", compress(unpacked, deflate.RAW), packed)
check_header(
"ZLIB(9)", compress(unpacked, deflate.ZLIB, 9), b"\x18\x95" + packed + b"\x06,\x02\x15"
)
check_header(
"ZLIB(15)", compress(unpacked, deflate.ZLIB, 15), b"\x78\x9c" + packed + b"\x06,\x02\x15"
)
check_header(
"GZIP",
compress(unpacked, deflate.GZIP, 9),
b"\x1f\x8b\x08\x00\x00\x00\x00\x00\x04\x03" + packed + b"\x86\xa6\x106\x05\x00\x00\x00",
)
# Valid wbits values.
compress_error(unpacked, deflate.RAW, -1)
print(len(compress(unpacked, deflate.RAW, 0)))
compress_error(unpacked, deflate.RAW, 1)
compress_error(unpacked, deflate.RAW, 4)
for i in range(5, 16):
print(len(compress(unpacked, deflate.RAW, i)))
compress_error(unpacked, deflate.RAW, 16)
# Invalid values for format.
compress_error(unpacked, -1)
compress_error(unpacked, 5)
# Fill buf with a predictable pseudorandom sequence.
buf = bytearray(1024)
lfsr = 1 << 15 | 1
for i in range(len(buf)):
bit = (lfsr ^ (lfsr >> 1) ^ (lfsr >> 3) ^ (lfsr >> 12)) & 1
lfsr = (lfsr >> 1) | (bit << 15)
buf[i] = lfsr & 0xFF
# Verify that compression improves as the window size increases.
prev_len = len(buf)
for wbits in range(5, 10):
result = compress(buf, deflate.RAW, wbits)
next_len = len(result)
print(next_len < prev_len and decompress(result, deflate.RAW, wbits) == buf)
prev_len = next_len

Wyświetl plik

@ -0,0 +1,41 @@
True
b'micropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropython'
True
b'micropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropython'
ValueError
OSError
True
True
True
True
True
True
True
True
RAW
ZLIB(9)
ZLIB(15)
GZIP
ValueError
7
ValueError
ValueError
7
7
7
7
7
7
7
7
7
7
7
ValueError
ValueError
ValueError
False
True
True
True
True

Wyświetl plik

@ -0,0 +1,174 @@
try:
# Check if deflate is available.
import deflate
import io
except ImportError:
print("SKIP")
raise SystemExit
# zlib.compress(b'micropython hello world hello world micropython', wbits=-9)
data_raw = b'\xcb\xcdL.\xca/\xa8,\xc9\xc8\xcfS\xc8H\xcd\xc9\xc9W(\xcf/\xcaIAa\xe7"\xd4\x00\x00'
# zlib.compress(b'micropython hello world hello world micropython', wbits=9)
data_zlib = b'\x18\x95\xcb\xcdL.\xca/\xa8,\xc9\xc8\xcfS\xc8H\xcd\xc9\xc9W(\xcf/\xcaIAa\xe7"\xd4\x00\x00\xbc\xfa\x12\x91'
# zlib.compress(b'micropython hello world hello world micropython', wbits=25)
data_gzip = b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x03\xcb\xcdL.\xca/\xa8,\xc9\xc8\xcfS\xc8H\xcd\xc9\xc9W(\xcf/\xcaIAa\xe7"\xd4\x00\x00"\xeb\xc4\x98/\x00\x00\x00'
# compress(b'hello' + bytearray(300) + b'hello', format=deflate.RAW, 5)
data_wbits_5 = b"\xcbH\xcd\xc9\xc9g\x18\xe9\x00\x08\x88\x95\xcfH\xcd\xc9\xc9\x07\x00"
# compress(b'hello' + bytearray(300) + b'hello', format=deflate.RAW, 6)
data_wbits_6 = b"\xcbH\xcd\xc9\xc9g\x18\xe9\x00\x08\x88\xd5\x9f\x91\x9a\x93\x93\x0f\x00"
# compress(b'hello' + bytearray(300) + b'hello', format=deflate.RAW, 8)
data_wbits_8 = b"\xcbH\xcd\xc9\xc9g\x18\xe9\x00\x08\x88\xf5\x7fFjNN>\x00"
# compress(b'hello' + bytearray(2000) + b'hello', format=deflate.RAW, 10)
data_wbits_10 = b"\xcbH\xcd\xc9\xc9g\x18\xe9\x00\x08Fz\x18\x00\xc3`\xa4'\x03`2\x18\xe99\x01\x98\x13Fz\xfe\x07\xe6\xff\x91\x9e\xff\x81\xf9\x7f\xa4\xe7\x7f`\xfe\x1f\xba\xf9?#5''\x1f\x00"
def decompress(data, *args):
buf = io.BytesIO(data)
with deflate.DeflateIO(buf, *args) as g:
return g.read()
def decompress_error(data, *args):
try:
decompress(data, *args)
except OSError:
print("OSError")
except EOFError:
print("EOFError")
except ValueError:
print("ValueError")
# Basic handling of format and detection.
print(decompress(data_raw, deflate.RAW))
print(decompress(data_zlib, deflate.ZLIB))
print(decompress(data_gzip, deflate.GZIP))
print(decompress(data_zlib)) # detect zlib/gzip.
print(decompress(data_gzip)) # detect zlib/gzip.
decompress_error(data_raw) # cannot detect zlib/gzip from raw stream
decompress_error(data_raw, deflate.ZLIB)
decompress_error(data_raw, deflate.GZIP)
decompress_error(data_zlib, deflate.RAW)
decompress_error(data_zlib, deflate.GZIP)
decompress_error(data_gzip, deflate.RAW)
decompress_error(data_gzip, deflate.ZLIB)
# Invalid data stream.
decompress_error(b"abcef", deflate.RAW)
# Invalid block type. final-block, block-type=3.
decompress_error(b"\x07", deflate.RAW)
# Truncated stream.
decompress_error(data_raw[:10], deflate.RAW)
# Partial reads.
buf = io.BytesIO(data_zlib)
with deflate.DeflateIO(buf) as g:
print(buf.seek(0, 1)) # verify stream is not read until first read of the DeflateIO stream.
print(g.read(1))
print(buf.seek(0, 1)) # verify that only the minimal amount is read from the source
print(g.read(1))
print(buf.seek(0, 1))
print(g.read(2))
print(buf.seek(0, 1))
print(g.read())
print(buf.seek(0, 1))
print(g.read(1))
print(buf.seek(0, 1))
print(g.read())
# Invalid zlib checksum (+ length for gzip). Note: only checksum errors are
# currently detected, see the end of uzlib_uncompress_chksum().
decompress_error(data_zlib[:-4] + b"\x00\x00\x00\x00")
decompress_error(data_gzip[:-8] + b"\x00\x00\x00\x00\x00\x00\x00\x00")
decompress_error(data_zlib[:-4] + b"\x00\x00\x00\x00", deflate.ZLIB)
decompress_error(data_gzip[:-8] + b"\x00\x00\x00\x00\x00\x00\x00\x00", deflate.GZIP)
# Reading from a closed underlying stream.
b = io.BytesIO(data_raw)
g = deflate.DeflateIO(b, deflate.RAW)
g.read(4)
b.close()
try:
g.read(4)
except ValueError:
print("ValueError")
# Reading from a closed DeflateIO.
b = io.BytesIO(data_raw)
g = deflate.DeflateIO(b, deflate.RAW)
g.read(4)
g.close()
try:
g.read(4)
except OSError:
print("OSError")
# Gzip header with extra flags (FCOMMENT FNAME FEXTRA FHCRC) enabled.
data_gzip_header_extra = b"\x1f\x8b\x08\x1e}\x9a\x9bd\x02\x00\x00\x00\x00\x00\x00\xff\xcb\xcdL.\xca/\xa8,\xc9\xc8\xcf\x03\x00\xf2KF>\x0b\x00\x00\x00"
print(decompress(data_gzip_header_extra))
# Test patterns.
PATTERNS_ZLIB = [
# Packed results produced by CPy's zlib.compress()
(b"0", b"x\x9c3\x00\x00\x001\x001"),
(b"a", b"x\x9cK\x04\x00\x00b\x00b"),
(b"0" * 100, b"x\x9c30\xa0=\x00\x00\xb3q\x12\xc1"),
(
bytes(range(64)),
b"x\x9cc`dbfaec\xe7\xe0\xe4\xe2\xe6\xe1\xe5\xe3\x17\x10\x14\x12\x16\x11\x15\x13\x97\x90\x94\x92\x96\x91\x95\x93WPTRVQUS\xd7\xd0\xd4\xd2\xd6\xd1\xd5\xd370426153\xb7\xb0\xb4\xb2\xb6\xb1\xb5\xb3\x07\x00\xaa\xe0\x07\xe1",
),
(b"hello", b"x\x01\x01\x05\x00\xfa\xffhello\x06,\x02\x15"), # compression level 0
# adaptive/dynamic huffman tree
(
b"13371813150|13764518736|12345678901",
b"x\x9c\x05\xc1\x81\x01\x000\x04\x04\xb1\x95\\\x1f\xcfn\x86o\x82d\x06Qq\xc8\x9d\xc5X}<e\xb5g\x83\x0f\x89X\x07\xab",
),
# dynamic Huffman tree with "case 17" (repeat code for 3-10 times)
(
b">I}\x00\x951D>I}\x00\x951D>I}\x00\x951D>I}\x00\x951D",
b"x\x9c\x05\xc11\x01\x00\x00\x00\x010\x95\x14py\x84\x12C_\x9bR\x8cV\x8a\xd1J1Z)F\x1fw`\x089",
),
]
for unpacked, packed in PATTERNS_ZLIB:
print(decompress(packed) == unpacked)
print(decompress(packed, deflate.ZLIB) == unpacked)
# Older version's of CPython's zlib module still included the checksum and length (as if it were a zlib/gzip stream).
# Make sure there're no problem decompressing this.
data_raw_with_footer = data_raw + b"\x00\x00\x00\x00\x00\x00\x00\x00"
print(decompress(data_raw_with_footer, deflate.RAW))
# Valid wbits values.
decompress_error(data_wbits_5, deflate.RAW, -1)
print(len(decompress(data_wbits_5, deflate.RAW, 0)))
decompress_error(data_wbits_5, deflate.RAW, 1)
decompress_error(data_wbits_5, deflate.RAW, 4)
for i in range(5, 16):
print(len(decompress(data_wbits_5, deflate.RAW, i)))
decompress_error(data_wbits_5, deflate.RAW, 16)
# Invalid values for format.
decompress_error(data_raw, -1)
decompress_error(data_raw, 5)
# Data that requires a higher wbits value.
decompress_error(data_wbits_6, deflate.RAW, 5)
print(len(decompress(data_wbits_6, deflate.RAW, 6)))
print(len(decompress(data_wbits_6, deflate.RAW, 7)))
decompress_error(data_wbits_8, deflate.RAW, 7)
print(len(decompress(data_wbits_8, deflate.RAW, 8)))
print(len(decompress(data_wbits_8, deflate.RAW, 9)))
decompress_error(data_wbits_10, deflate.RAW)
decompress_error(data_wbits_10, deflate.RAW, 9)
print(len(decompress(data_wbits_10, deflate.RAW, 10)))
# zlib header sets the size, so works with wbits unset or wbits >= 10.
data_wbits_10_zlib = b"(\x91\xcbH\xcd\xc9\xc9g\x18\xe9\x00\x08Fz\x18\x00\xc3`\xa4'\x03`2\x18\xe99\x01\x98\x13Fz\xfe\x07\xe6\xff\x91\x9e\xff\x81\xf9\x7f\xa4\xe7\x7f`\xfe\x1f\xba\xf9?#5''\x1f\x00[\xbc\x04)"
print(len(decompress(data_wbits_10_zlib, deflate.ZLIB)))
decompress_error(data_wbits_10_zlib, deflate.ZLIB, 9)
print(len(decompress(data_wbits_10_zlib, deflate.ZLIB, 10)))
print(len(decompress(data_wbits_10_zlib)))

Wyświetl plik

@ -0,0 +1,80 @@
b'micropython hello world hello world micropython'
b'micropython hello world hello world micropython'
b'micropython hello world hello world micropython'
b'micropython hello world hello world micropython'
b'micropython hello world hello world micropython'
OSError
OSError
OSError
OSError
OSError
OSError
OSError
OSError
OSError
EOFError
0
b'm'
4
b'i'
5
b'cr'
7
b'opython hello world hello world micropython'
36
b''
36
b''
OSError
OSError
OSError
OSError
ValueError
OSError
b'micropython'
True
True
True
True
True
True
True
True
True
True
True
True
True
True
b'micropython hello world hello world micropython'
ValueError
310
ValueError
ValueError
310
310
310
310
310
310
310
310
310
310
310
ValueError
ValueError
ValueError
OSError
310
310
OSError
310
310
OSError
OSError
2010
2010
OSError
2010
2010

Wyświetl plik

@ -0,0 +1,89 @@
# Test deflate module with stream errors.
try:
# Check if deflate & IOBase are available.
import deflate, io
io.IOBase
except (ImportError, AttributeError):
print("SKIP")
raise SystemExit
# Check if compression is enabled.
if not hasattr(deflate.DeflateIO, "write"):
print("SKIP")
raise SystemExit
formats = (deflate.RAW, deflate.ZLIB, deflate.GZIP)
# Test error on read when decompressing.
class Stream(io.IOBase):
def readinto(self, buf):
print("Stream.readinto", len(buf))
return -1
try:
deflate.DeflateIO(Stream()).read()
except OSError as er:
print(repr(er))
# Test error on write when compressing.
class Stream(io.IOBase):
def write(self, buf):
print("Stream.write", buf)
return -1
for format in formats:
try:
deflate.DeflateIO(Stream(), format).write("a")
except OSError as er:
print(repr(er))
# Test write after close.
class Stream(io.IOBase):
def write(self, buf):
print("Stream.write", buf)
return -1
def ioctl(self, cmd, arg):
print("Stream.ioctl", cmd, arg)
return 0
try:
d = deflate.DeflateIO(Stream(), deflate.RAW, 0, True)
d.close()
d.write("a")
except OSError as er:
print(repr(er))
# Test error on write when closing.
class Stream(io.IOBase):
def __init__(self):
self.num_writes = 0
def write(self, buf):
print("Stream.write", buf)
if self.num_writes >= 4:
return -1
self.num_writes += 1
return len(buf)
for format in formats:
d = deflate.DeflateIO(Stream(), format)
d.write("a")
try:
d.close()
except OSError as er:
print(repr(er))

Wyświetl plik

@ -0,0 +1,25 @@
Stream.readinto 1
OSError(1,)
Stream.write bytearray(b'K')
OSError(1,)
Stream.write bytearray(b'\x18\x95')
OSError(22,)
Stream.write bytearray(b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x04\x03')
OSError(22,)
Stream.ioctl 4 0
OSError(22,)
Stream.write bytearray(b'K')
Stream.write bytearray(b'\x04')
Stream.write bytearray(b'\x00')
Stream.write bytearray(b'\x18\x95')
Stream.write bytearray(b'K')
Stream.write bytearray(b'\x04')
Stream.write bytearray(b'\x00')
Stream.write bytearray(b'\x00b\x00b')
OSError(1,)
Stream.write bytearray(b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x04\x03')
Stream.write bytearray(b'K')
Stream.write bytearray(b'\x04')
Stream.write bytearray(b'\x00')
Stream.write bytearray(b'C\xbe\xb7\xe8\x01\x00\x00\x00')
OSError(1,)