From bba8a673d5ed6ad4404502c32dac003ad9d59bde Mon Sep 17 00:00:00 2001 From: Damien George Date: Tue, 12 Dec 2023 17:17:22 +1100 Subject: [PATCH] tests: Update SSL network tests to use SSLContext, and work on CPython. Changes are: - use ssl.SSLContext.wrap_socket instead of ssl.wrap_socket - disable check_hostname and call load_default_certs() where appropriate, to get CPython to run the tests correctly - pass socket.AF_INET to getaddrinfo and socket.socket(), to force IPv4 - change tests to use github.com instead of google.com, because certificate validation was failing with google.com Signed-off-by: Damien George --- tests/net_hosted/connect_nonblock_xfer.py | 9 +++---- tests/net_inet/ssl_errors.py | 28 ++++++++++++---------- tests/net_inet/test_tls_nonblock.py | 26 +++++++++++--------- tests/net_inet/test_tls_sites.py | 29 +++++++++++++++-------- tests/net_inet/test_tls_sites.py.exp | 5 ---- tests/net_inet/tls_text_errors.py | 8 +++++-- 6 files changed, 61 insertions(+), 44 deletions(-) delete mode 100644 tests/net_inet/test_tls_sites.py.exp diff --git a/tests/net_hosted/connect_nonblock_xfer.py b/tests/net_hosted/connect_nonblock_xfer.py index e669a5766c..dc4693cea6 100644 --- a/tests/net_hosted/connect_nonblock_xfer.py +++ b/tests/net_hosted/connect_nonblock_xfer.py @@ -27,11 +27,12 @@ def do_connect(peer_addr, tls, handshake): print(" got", er.errno) # wrap with ssl/tls if desired if tls: + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + if hasattr(ssl_context, "check_hostname"): + ssl_context.check_hostname = False + try: - if sys.implementation.name == "micropython": - s = ssl.wrap_socket(s, do_handshake=handshake) - else: - s = ssl.wrap_socket(s, do_handshake_on_connect=handshake) + s = ssl_context.wrap_socket(s, do_handshake_on_connect=handshake) print("wrap: True") except Exception as e: dp(e) diff --git a/tests/net_inet/ssl_errors.py b/tests/net_inet/ssl_errors.py index 65f3637e9e..bc4e5910bc 100644 --- a/tests/net_inet/ssl_errors.py +++ b/tests/net_inet/ssl_errors.py @@ -1,12 +1,12 @@ # test that socket.connect() on a non-blocking socket raises EINPROGRESS # and that an immediate write/send/read/recv does the right thing -import sys, errno, socket, ssl +import sys, errno, select, socket, ssl def test(addr, hostname, block=True): - print("---", hostname or addr) - s = socket.socket() + print("---", hostname) + s = socket.socket(socket.AF_INET) s.setblocking(block) try: s.connect(addr) @@ -16,11 +16,15 @@ def test(addr, hostname, block=True): raise print("EINPROGRESS") + if sys.implementation.name != "micropython": + # in CPython we have to wait, otherwise wrap_socket is not happy + select.select([], [s], []) + + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + ssl_context.verify_mode = ssl.CERT_REQUIRED + try: - if sys.implementation.name == "micropython": - s = ssl.wrap_socket(s, do_handshake=block) - else: - s = ssl.wrap_socket(s, do_handshake_on_connect=block) + s = ssl_context.wrap_socket(s, do_handshake_on_connect=block, server_hostname=hostname) print("wrap: True") except OSError: print("wrap: error") @@ -36,11 +40,11 @@ def test(addr, hostname, block=True): if __name__ == "__main__": # connect to plain HTTP port, oops! - addr = socket.getaddrinfo("micropython.org", 80)[0][-1] - test(addr, None) + addr = socket.getaddrinfo("micropython.org", 80, socket.AF_INET)[0][-1] + test(addr, "micropython.org") # connect to plain HTTP port, oops! - addr = socket.getaddrinfo("micropython.org", 80)[0][-1] - test(addr, None, False) + addr = socket.getaddrinfo("micropython.org", 80, socket.AF_INET)[0][-1] + test(addr, "micropython.org", False) # connect to server with self-signed cert, oops! - addr = socket.getaddrinfo("test.mosquitto.org", 8883)[0][-1] + addr = socket.getaddrinfo("test.mosquitto.org", 8883, socket.AF_INET)[0][-1] test(addr, "test.mosquitto.org") diff --git a/tests/net_inet/test_tls_nonblock.py b/tests/net_inet/test_tls_nonblock.py index 6378280a71..60af858b1f 100644 --- a/tests/net_inet/test_tls_nonblock.py +++ b/tests/net_inet/test_tls_nonblock.py @@ -2,12 +2,12 @@ import socket, ssl, errno, sys, time, select def test_one(site, opts): - ai = socket.getaddrinfo(site, 443) + ai = socket.getaddrinfo(site, 443, socket.AF_INET) addr = ai[0][-1] - print(addr) + print(site) # Connect the raw socket - s = socket.socket() + s = socket.socket(socket.AF_INET) s.setblocking(False) try: s.connect(addr) @@ -16,17 +16,22 @@ def test_one(site, opts): if e.errno != errno.EINPROGRESS: raise + # Create SSLContext. + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + + # CPython compatibility: + # - disable check_hostname + # - load default system certificate chain + # - must wait for socket to be writable before calling wrap_socket if sys.implementation.name != "micropython": - # in CPython we have to wait, otherwise wrap_socket is not happy + ssl_context.check_hostname = False + ssl_context.load_default_certs() select.select([], [s], []) try: # Wrap with SSL try: - if sys.implementation.name == "micropython": - s = ssl.wrap_socket(s, do_handshake=False) - else: - s = ssl.wrap_socket(s, do_handshake_on_connect=False) + s = ssl_context.wrap_socket(s, do_handshake_on_connect=False) except OSError as e: if e.errno != errno.EINPROGRESS: raise @@ -87,8 +92,7 @@ def test_one(site, opts): SITES = [ - "google.com", - {"host": "www.google.com"}, + "www.github.com", "micropython.org", "pypi.org", {"host": "api.pushbullet.com", "sni": True}, @@ -105,7 +109,7 @@ def main(): test_one(site, opts) print(site, "ok") except Exception as e: - print(site, "error") + print(site, "error", e) print("DONE") diff --git a/tests/net_inet/test_tls_sites.py b/tests/net_inet/test_tls_sites.py index 4f457b3abc..d60f4872b6 100644 --- a/tests/net_inet/test_tls_sites.py +++ b/tests/net_inet/test_tls_sites.py @@ -1,24 +1,34 @@ +import sys +import select import socket import ssl -# CPython only supports server_hostname with SSLContext -if hasattr(ssl, "SSLContext"): - ssl = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) - def test_one(site, opts): - ai = socket.getaddrinfo(site, 443) + ai = socket.getaddrinfo(site, 443, socket.AF_INET) addr = ai[0][-1] - s = socket.socket() + s = socket.socket(socket.AF_INET) + + # Create SSLContext. + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + + # CPython compatibility: + # - disable check_hostname + # - load default system certificate chain + # - must wait for socket to be writable before calling wrap_socket + if sys.implementation.name != "micropython": + ssl_context.check_hostname = False + ssl_context.load_default_certs() + select.select([], [s], []) try: s.connect(addr) if "sni" in opts: - s = ssl.wrap_socket(s, server_hostname=opts["host"]) + s = ssl_context.wrap_socket(s, server_hostname=opts["host"]) else: - s = ssl.wrap_socket(s) + s = ssl_context.wrap_socket(s) s.write(b"GET / HTTP/1.0\r\nHost: %s\r\n\r\n" % bytes(site, "latin")) resp = s.read(4096) @@ -31,8 +41,7 @@ def test_one(site, opts): SITES = [ - "google.com", - "www.google.com", + "www.github.com", "micropython.org", "pypi.org", {"host": "api.pushbullet.com", "sni": True}, diff --git a/tests/net_inet/test_tls_sites.py.exp b/tests/net_inet/test_tls_sites.py.exp deleted file mode 100644 index 09faa336a7..0000000000 --- a/tests/net_inet/test_tls_sites.py.exp +++ /dev/null @@ -1,5 +0,0 @@ -google.com ok -www.google.com ok -micropython.org ok -pypi.org ok -api.pushbullet.com ok diff --git a/tests/net_inet/tls_text_errors.py b/tests/net_inet/tls_text_errors.py index 498593bba2..034cc8d36e 100644 --- a/tests/net_inet/tls_text_errors.py +++ b/tests/net_inet/tls_text_errors.py @@ -1,13 +1,17 @@ # test that modtls produces a text error message -import socket, ssl, sys +import socket, ssl def test(addr): s = socket.socket() s.connect(addr) try: - s = ssl.wrap_socket(s) + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + if hasattr(ssl_context, "check_hostname"): + # Disable hostname check on CPython. + ssl_context.check_hostname = False + s = ssl_context.wrap_socket(s) print("wrap: no exception") except OSError as e: # mbedtls produces "mbedtls -0x7200: SSL - An invalid SSL record was received"