From 8b315ef0d82a31991def597906f7f24bd39c6e23 Mon Sep 17 00:00:00 2001 From: Jim Mussared Date: Thu, 29 Jun 2023 15:52:03 +1000 Subject: [PATCH] tests/extmod: Add deflate.DeflateIO tests. Signed-off-by: Jim Mussared --- tests/extmod/deflate_compress.py | 148 +++++++++++++++++++ tests/extmod/deflate_compress.py.exp | 41 ++++++ tests/extmod/deflate_decompress.py | 174 +++++++++++++++++++++++ tests/extmod/deflate_decompress.py.exp | 80 +++++++++++ tests/extmod/deflate_stream_error.py | 89 ++++++++++++ tests/extmod/deflate_stream_error.py.exp | 25 ++++ 6 files changed, 557 insertions(+) create mode 100644 tests/extmod/deflate_compress.py create mode 100644 tests/extmod/deflate_compress.py.exp create mode 100644 tests/extmod/deflate_decompress.py create mode 100644 tests/extmod/deflate_decompress.py.exp create mode 100644 tests/extmod/deflate_stream_error.py create mode 100644 tests/extmod/deflate_stream_error.py.exp diff --git a/tests/extmod/deflate_compress.py b/tests/extmod/deflate_compress.py new file mode 100644 index 0000000000..612af663e4 --- /dev/null +++ b/tests/extmod/deflate_compress.py @@ -0,0 +1,148 @@ +try: + # Check if deflate is available. + import deflate + import io +except ImportError: + print("SKIP") + raise SystemExit + +# Check if compression is enabled. +if not hasattr(deflate.DeflateIO, "write"): + print("SKIP") + raise SystemExit + +# Simple compression & decompression. +b = io.BytesIO() +g = deflate.DeflateIO(b, deflate.RAW) +data = b"micropython" +N = 10 +for i in range(N): + g.write(data) +g.close() +result_raw = b.getvalue() +print(len(result_raw) < len(data) * N) +b = io.BytesIO(result_raw) +g = deflate.DeflateIO(b, deflate.RAW) +print(g.read()) + +# Same, but using a context manager. +b = io.BytesIO() +with deflate.DeflateIO(b, deflate.RAW) as g: + for i in range(N): + g.write(data) +result_raw = b.getvalue() +print(len(result_raw) < len(data) * N) +b = io.BytesIO(result_raw) +with deflate.DeflateIO(b, deflate.RAW) as g: + print(g.read()) + +# Writing to a closed underlying stream. +b = io.BytesIO() +g = deflate.DeflateIO(b, deflate.RAW) +g.write(b"micropython") +b.close() +try: + g.write(b"micropython") +except ValueError: + print("ValueError") + +# Writing to a closed DeflateIO. +b = io.BytesIO() +g = deflate.DeflateIO(b, deflate.RAW) +g.write(b"micropython") +g.close() +try: + g.write(b"micropython") +except OSError: + print("OSError") + + +def decompress(data, *args): + buf = io.BytesIO(data) + with deflate.DeflateIO(buf, *args) as g: + return g.read() + + +def compress(data, *args): + b = io.BytesIO() + with deflate.DeflateIO(b, *args) as g: + g.write(data) + return b.getvalue() + + +def compress_error(data, *args): + try: + compress(data, *args) + except OSError: + print("OSError") + except ValueError: + print("ValueError") + + +# More test patterns. +PATTERNS_RAW = ( + (b"0", b"3\x00\x00"), + (b"a", b"K\x04\x00"), + (b"0" * 100, b"3\xa0\x03\x00\x00"), + ( + bytes(range(64)), + b"c`dbfaec\xe7\xe0\xe4\xe2\xe6\xe1\xe5\xe3\x17\x10\x14\x12\x16\x11\x15\x13\x97\x90\x94\x92\x96\x91\x95\x93WPTRVQUS\xd7\xd0\xd4\xd2\xd6\xd1\xd5\xd370426153\xb7\xb0\xb4\xb2\xb6\xb1\xb5\xb3\x07\x00", + ), +) +for unpacked, packed in PATTERNS_RAW: + print(compress(unpacked) == packed) + print(compress(unpacked, deflate.RAW) == packed) + +# Verify header and checksum format. +unpacked = b"hello" +packed = b"\xcbH\xcd\xc9\xc9\x07\x00" + + +def check_header(n, a, b): + if a == b: + print(n) + else: + print(n, a, b) + + +check_header("RAW", compress(unpacked, deflate.RAW), packed) +check_header( + "ZLIB(9)", compress(unpacked, deflate.ZLIB, 9), b"\x18\x95" + packed + b"\x06,\x02\x15" +) +check_header( + "ZLIB(15)", compress(unpacked, deflate.ZLIB, 15), b"\x78\x9c" + packed + b"\x06,\x02\x15" +) +check_header( + "GZIP", + compress(unpacked, deflate.GZIP, 9), + b"\x1f\x8b\x08\x00\x00\x00\x00\x00\x04\x03" + packed + b"\x86\xa6\x106\x05\x00\x00\x00", +) + +# Valid wbits values. +compress_error(unpacked, deflate.RAW, -1) +print(len(compress(unpacked, deflate.RAW, 0))) +compress_error(unpacked, deflate.RAW, 1) +compress_error(unpacked, deflate.RAW, 4) +for i in range(5, 16): + print(len(compress(unpacked, deflate.RAW, i))) +compress_error(unpacked, deflate.RAW, 16) + +# Invalid values for format. +compress_error(unpacked, -1) +compress_error(unpacked, 5) + +# Fill buf with a predictable pseudorandom sequence. +buf = bytearray(1024) +lfsr = 1 << 15 | 1 +for i in range(len(buf)): + bit = (lfsr ^ (lfsr >> 1) ^ (lfsr >> 3) ^ (lfsr >> 12)) & 1 + lfsr = (lfsr >> 1) | (bit << 15) + buf[i] = lfsr & 0xFF + +# Verify that compression improves as the window size increases. +prev_len = len(buf) +for wbits in range(5, 10): + result = compress(buf, deflate.RAW, wbits) + next_len = len(result) + print(next_len < prev_len and decompress(result, deflate.RAW, wbits) == buf) + prev_len = next_len diff --git a/tests/extmod/deflate_compress.py.exp b/tests/extmod/deflate_compress.py.exp new file mode 100644 index 0000000000..5da70f491b --- /dev/null +++ b/tests/extmod/deflate_compress.py.exp @@ -0,0 +1,41 @@ +True +b'micropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropython' +True +b'micropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropython' +ValueError +OSError +True +True +True +True +True +True +True +True +RAW +ZLIB(9) +ZLIB(15) +GZIP +ValueError +7 +ValueError +ValueError +7 +7 +7 +7 +7 +7 +7 +7 +7 +7 +7 +ValueError +ValueError +ValueError +False +True +True +True +True diff --git a/tests/extmod/deflate_decompress.py b/tests/extmod/deflate_decompress.py new file mode 100644 index 0000000000..29d3ec2d71 --- /dev/null +++ b/tests/extmod/deflate_decompress.py @@ -0,0 +1,174 @@ +try: + # Check if deflate is available. + import deflate + import io +except ImportError: + print("SKIP") + raise SystemExit + +# zlib.compress(b'micropython hello world hello world micropython', wbits=-9) +data_raw = b'\xcb\xcdL.\xca/\xa8,\xc9\xc8\xcfS\xc8H\xcd\xc9\xc9W(\xcf/\xcaIAa\xe7"\xd4\x00\x00' +# zlib.compress(b'micropython hello world hello world micropython', wbits=9) +data_zlib = b'\x18\x95\xcb\xcdL.\xca/\xa8,\xc9\xc8\xcfS\xc8H\xcd\xc9\xc9W(\xcf/\xcaIAa\xe7"\xd4\x00\x00\xbc\xfa\x12\x91' +# zlib.compress(b'micropython hello world hello world micropython', wbits=25) +data_gzip = b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x03\xcb\xcdL.\xca/\xa8,\xc9\xc8\xcfS\xc8H\xcd\xc9\xc9W(\xcf/\xcaIAa\xe7"\xd4\x00\x00"\xeb\xc4\x98/\x00\x00\x00' + +# compress(b'hello' + bytearray(300) + b'hello', format=deflate.RAW, 5) +data_wbits_5 = b"\xcbH\xcd\xc9\xc9g\x18\xe9\x00\x08\x88\x95\xcfH\xcd\xc9\xc9\x07\x00" +# compress(b'hello' + bytearray(300) + b'hello', format=deflate.RAW, 6) +data_wbits_6 = b"\xcbH\xcd\xc9\xc9g\x18\xe9\x00\x08\x88\xd5\x9f\x91\x9a\x93\x93\x0f\x00" +# compress(b'hello' + bytearray(300) + b'hello', format=deflate.RAW, 8) +data_wbits_8 = b"\xcbH\xcd\xc9\xc9g\x18\xe9\x00\x08\x88\xf5\x7fFjNN>\x00" +# compress(b'hello' + bytearray(2000) + b'hello', format=deflate.RAW, 10) +data_wbits_10 = b"\xcbH\xcd\xc9\xc9g\x18\xe9\x00\x08Fz\x18\x00\xc3`\xa4'\x03`2\x18\xe99\x01\x98\x13Fz\xfe\x07\xe6\xff\x91\x9e\xff\x81\xf9\x7f\xa4\xe7\x7f`\xfe\x1f\xba\xf9?#5''\x1f\x00" + + +def decompress(data, *args): + buf = io.BytesIO(data) + with deflate.DeflateIO(buf, *args) as g: + return g.read() + + +def decompress_error(data, *args): + try: + decompress(data, *args) + except OSError: + print("OSError") + except EOFError: + print("EOFError") + except ValueError: + print("ValueError") + + +# Basic handling of format and detection. +print(decompress(data_raw, deflate.RAW)) +print(decompress(data_zlib, deflate.ZLIB)) +print(decompress(data_gzip, deflate.GZIP)) +print(decompress(data_zlib)) # detect zlib/gzip. +print(decompress(data_gzip)) # detect zlib/gzip. + +decompress_error(data_raw) # cannot detect zlib/gzip from raw stream +decompress_error(data_raw, deflate.ZLIB) +decompress_error(data_raw, deflate.GZIP) +decompress_error(data_zlib, deflate.RAW) +decompress_error(data_zlib, deflate.GZIP) +decompress_error(data_gzip, deflate.RAW) +decompress_error(data_gzip, deflate.ZLIB) + +# Invalid data stream. +decompress_error(b"abcef", deflate.RAW) + +# Invalid block type. final-block, block-type=3. +decompress_error(b"\x07", deflate.RAW) + +# Truncated stream. +decompress_error(data_raw[:10], deflate.RAW) + +# Partial reads. +buf = io.BytesIO(data_zlib) +with deflate.DeflateIO(buf) as g: + print(buf.seek(0, 1)) # verify stream is not read until first read of the DeflateIO stream. + print(g.read(1)) + print(buf.seek(0, 1)) # verify that only the minimal amount is read from the source + print(g.read(1)) + print(buf.seek(0, 1)) + print(g.read(2)) + print(buf.seek(0, 1)) + print(g.read()) + print(buf.seek(0, 1)) + print(g.read(1)) + print(buf.seek(0, 1)) + print(g.read()) + +# Invalid zlib checksum (+ length for gzip). Note: only checksum errors are +# currently detected, see the end of uzlib_uncompress_chksum(). +decompress_error(data_zlib[:-4] + b"\x00\x00\x00\x00") +decompress_error(data_gzip[:-8] + b"\x00\x00\x00\x00\x00\x00\x00\x00") +decompress_error(data_zlib[:-4] + b"\x00\x00\x00\x00", deflate.ZLIB) +decompress_error(data_gzip[:-8] + b"\x00\x00\x00\x00\x00\x00\x00\x00", deflate.GZIP) + +# Reading from a closed underlying stream. +b = io.BytesIO(data_raw) +g = deflate.DeflateIO(b, deflate.RAW) +g.read(4) +b.close() +try: + g.read(4) +except ValueError: + print("ValueError") + +# Reading from a closed DeflateIO. +b = io.BytesIO(data_raw) +g = deflate.DeflateIO(b, deflate.RAW) +g.read(4) +g.close() +try: + g.read(4) +except OSError: + print("OSError") + +# Gzip header with extra flags (FCOMMENT FNAME FEXTRA FHCRC) enabled. +data_gzip_header_extra = b"\x1f\x8b\x08\x1e}\x9a\x9bd\x02\x00\x00\x00\x00\x00\x00\xff\xcb\xcdL.\xca/\xa8,\xc9\xc8\xcf\x03\x00\xf2KF>\x0b\x00\x00\x00" +print(decompress(data_gzip_header_extra)) + +# Test patterns. +PATTERNS_ZLIB = [ + # Packed results produced by CPy's zlib.compress() + (b"0", b"x\x9c3\x00\x00\x001\x001"), + (b"a", b"x\x9cK\x04\x00\x00b\x00b"), + (b"0" * 100, b"x\x9c30\xa0=\x00\x00\xb3q\x12\xc1"), + ( + bytes(range(64)), + b"x\x9cc`dbfaec\xe7\xe0\xe4\xe2\xe6\xe1\xe5\xe3\x17\x10\x14\x12\x16\x11\x15\x13\x97\x90\x94\x92\x96\x91\x95\x93WPTRVQUS\xd7\xd0\xd4\xd2\xd6\xd1\xd5\xd370426153\xb7\xb0\xb4\xb2\xb6\xb1\xb5\xb3\x07\x00\xaa\xe0\x07\xe1", + ), + (b"hello", b"x\x01\x01\x05\x00\xfa\xffhello\x06,\x02\x15"), # compression level 0 + # adaptive/dynamic huffman tree + ( + b"13371813150|13764518736|12345678901", + b"x\x9c\x05\xc1\x81\x01\x000\x04\x04\xb1\x95\\\x1f\xcfn\x86o\x82d\x06Qq\xc8\x9d\xc5X}I}\x00\x951D>I}\x00\x951D>I}\x00\x951D>I}\x00\x951D", + b"x\x9c\x05\xc11\x01\x00\x00\x00\x010\x95\x14py\x84\x12C_\x9bR\x8cV\x8a\xd1J1Z)F\x1fw`\x089", + ), +] +for unpacked, packed in PATTERNS_ZLIB: + print(decompress(packed) == unpacked) + print(decompress(packed, deflate.ZLIB) == unpacked) + +# Older version's of CPython's zlib module still included the checksum and length (as if it were a zlib/gzip stream). +# Make sure there're no problem decompressing this. +data_raw_with_footer = data_raw + b"\x00\x00\x00\x00\x00\x00\x00\x00" +print(decompress(data_raw_with_footer, deflate.RAW)) + +# Valid wbits values. +decompress_error(data_wbits_5, deflate.RAW, -1) +print(len(decompress(data_wbits_5, deflate.RAW, 0))) +decompress_error(data_wbits_5, deflate.RAW, 1) +decompress_error(data_wbits_5, deflate.RAW, 4) +for i in range(5, 16): + print(len(decompress(data_wbits_5, deflate.RAW, i))) +decompress_error(data_wbits_5, deflate.RAW, 16) + +# Invalid values for format. +decompress_error(data_raw, -1) +decompress_error(data_raw, 5) + +# Data that requires a higher wbits value. +decompress_error(data_wbits_6, deflate.RAW, 5) +print(len(decompress(data_wbits_6, deflate.RAW, 6))) +print(len(decompress(data_wbits_6, deflate.RAW, 7))) +decompress_error(data_wbits_8, deflate.RAW, 7) +print(len(decompress(data_wbits_8, deflate.RAW, 8))) +print(len(decompress(data_wbits_8, deflate.RAW, 9))) +decompress_error(data_wbits_10, deflate.RAW) +decompress_error(data_wbits_10, deflate.RAW, 9) +print(len(decompress(data_wbits_10, deflate.RAW, 10))) + +# zlib header sets the size, so works with wbits unset or wbits >= 10. +data_wbits_10_zlib = b"(\x91\xcbH\xcd\xc9\xc9g\x18\xe9\x00\x08Fz\x18\x00\xc3`\xa4'\x03`2\x18\xe99\x01\x98\x13Fz\xfe\x07\xe6\xff\x91\x9e\xff\x81\xf9\x7f\xa4\xe7\x7f`\xfe\x1f\xba\xf9?#5''\x1f\x00[\xbc\x04)" +print(len(decompress(data_wbits_10_zlib, deflate.ZLIB))) +decompress_error(data_wbits_10_zlib, deflate.ZLIB, 9) +print(len(decompress(data_wbits_10_zlib, deflate.ZLIB, 10))) +print(len(decompress(data_wbits_10_zlib))) diff --git a/tests/extmod/deflate_decompress.py.exp b/tests/extmod/deflate_decompress.py.exp new file mode 100644 index 0000000000..381f2b0685 --- /dev/null +++ b/tests/extmod/deflate_decompress.py.exp @@ -0,0 +1,80 @@ +b'micropython hello world hello world micropython' +b'micropython hello world hello world micropython' +b'micropython hello world hello world micropython' +b'micropython hello world hello world micropython' +b'micropython hello world hello world micropython' +OSError +OSError +OSError +OSError +OSError +OSError +OSError +OSError +OSError +EOFError +0 +b'm' +4 +b'i' +5 +b'cr' +7 +b'opython hello world hello world micropython' +36 +b'' +36 +b'' +OSError +OSError +OSError +OSError +ValueError +OSError +b'micropython' +True +True +True +True +True +True +True +True +True +True +True +True +True +True +b'micropython hello world hello world micropython' +ValueError +310 +ValueError +ValueError +310 +310 +310 +310 +310 +310 +310 +310 +310 +310 +310 +ValueError +ValueError +ValueError +OSError +310 +310 +OSError +310 +310 +OSError +OSError +2010 +2010 +OSError +2010 +2010 diff --git a/tests/extmod/deflate_stream_error.py b/tests/extmod/deflate_stream_error.py new file mode 100644 index 0000000000..aee6b28033 --- /dev/null +++ b/tests/extmod/deflate_stream_error.py @@ -0,0 +1,89 @@ +# Test deflate module with stream errors. + +try: + # Check if deflate & IOBase are available. + import deflate, io + + io.IOBase +except (ImportError, AttributeError): + print("SKIP") + raise SystemExit + +# Check if compression is enabled. +if not hasattr(deflate.DeflateIO, "write"): + print("SKIP") + raise SystemExit + +formats = (deflate.RAW, deflate.ZLIB, deflate.GZIP) + +# Test error on read when decompressing. + + +class Stream(io.IOBase): + def readinto(self, buf): + print("Stream.readinto", len(buf)) + return -1 + + +try: + deflate.DeflateIO(Stream()).read() +except OSError as er: + print(repr(er)) + +# Test error on write when compressing. + + +class Stream(io.IOBase): + def write(self, buf): + print("Stream.write", buf) + return -1 + + +for format in formats: + try: + deflate.DeflateIO(Stream(), format).write("a") + except OSError as er: + print(repr(er)) + +# Test write after close. + + +class Stream(io.IOBase): + def write(self, buf): + print("Stream.write", buf) + return -1 + + def ioctl(self, cmd, arg): + print("Stream.ioctl", cmd, arg) + return 0 + + +try: + d = deflate.DeflateIO(Stream(), deflate.RAW, 0, True) + d.close() + d.write("a") +except OSError as er: + print(repr(er)) + +# Test error on write when closing. + + +class Stream(io.IOBase): + def __init__(self): + self.num_writes = 0 + + def write(self, buf): + print("Stream.write", buf) + if self.num_writes >= 4: + return -1 + self.num_writes += 1 + return len(buf) + + +for format in formats: + d = deflate.DeflateIO(Stream(), format) + d.write("a") + try: + d.close() + except OSError as er: + print(repr(er)) diff --git a/tests/extmod/deflate_stream_error.py.exp b/tests/extmod/deflate_stream_error.py.exp new file mode 100644 index 0000000000..4ec90d7997 --- /dev/null +++ b/tests/extmod/deflate_stream_error.py.exp @@ -0,0 +1,25 @@ +Stream.readinto 1 +OSError(1,) +Stream.write bytearray(b'K') +OSError(1,) +Stream.write bytearray(b'\x18\x95') +OSError(22,) +Stream.write bytearray(b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x04\x03') +OSError(22,) +Stream.ioctl 4 0 +OSError(22,) +Stream.write bytearray(b'K') +Stream.write bytearray(b'\x04') +Stream.write bytearray(b'\x00') +Stream.write bytearray(b'\x18\x95') +Stream.write bytearray(b'K') +Stream.write bytearray(b'\x04') +Stream.write bytearray(b'\x00') +Stream.write bytearray(b'\x00b\x00b') +OSError(1,) +Stream.write bytearray(b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x04\x03') +Stream.write bytearray(b'K') +Stream.write bytearray(b'\x04') +Stream.write bytearray(b'\x00') +Stream.write bytearray(b'C\xbe\xb7\xe8\x01\x00\x00\x00') +OSError(1,)