tests: Update SSL network tests to use SSLContext, and work on CPython.

Changes are:
- use ssl.SSLContext.wrap_socket instead of ssl.wrap_socket
- disable check_hostname and call load_default_certs() where appropriate,
  to get CPython to run the tests correctly
- pass socket.AF_INET to getaddrinfo and socket.socket(), to force IPv4
- change tests to use github.com instead of google.com, because certificate
  validation was failing with google.com

Signed-off-by: Damien George <damien@micropython.org>
pull/13181/head
Damien George 2023-12-12 17:17:22 +11:00
rodzic ef996d15b9
commit bba8a673d5
6 zmienionych plików z 61 dodań i 44 usunięć

Wyświetl plik

@ -27,11 +27,12 @@ def do_connect(peer_addr, tls, handshake):
print(" got", er.errno) print(" got", er.errno)
# wrap with ssl/tls if desired # wrap with ssl/tls if desired
if tls: if tls:
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
if hasattr(ssl_context, "check_hostname"):
ssl_context.check_hostname = False
try: try:
if sys.implementation.name == "micropython": s = ssl_context.wrap_socket(s, do_handshake_on_connect=handshake)
s = ssl.wrap_socket(s, do_handshake=handshake)
else:
s = ssl.wrap_socket(s, do_handshake_on_connect=handshake)
print("wrap: True") print("wrap: True")
except Exception as e: except Exception as e:
dp(e) dp(e)

Wyświetl plik

@ -1,12 +1,12 @@
# test that socket.connect() on a non-blocking socket raises EINPROGRESS # test that socket.connect() on a non-blocking socket raises EINPROGRESS
# and that an immediate write/send/read/recv does the right thing # and that an immediate write/send/read/recv does the right thing
import sys, errno, socket, ssl import sys, errno, select, socket, ssl
def test(addr, hostname, block=True): def test(addr, hostname, block=True):
print("---", hostname or addr) print("---", hostname)
s = socket.socket() s = socket.socket(socket.AF_INET)
s.setblocking(block) s.setblocking(block)
try: try:
s.connect(addr) s.connect(addr)
@ -16,11 +16,15 @@ def test(addr, hostname, block=True):
raise raise
print("EINPROGRESS") print("EINPROGRESS")
if sys.implementation.name != "micropython":
# in CPython we have to wait, otherwise wrap_socket is not happy
select.select([], [s], [])
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.verify_mode = ssl.CERT_REQUIRED
try: try:
if sys.implementation.name == "micropython": s = ssl_context.wrap_socket(s, do_handshake_on_connect=block, server_hostname=hostname)
s = ssl.wrap_socket(s, do_handshake=block)
else:
s = ssl.wrap_socket(s, do_handshake_on_connect=block)
print("wrap: True") print("wrap: True")
except OSError: except OSError:
print("wrap: error") print("wrap: error")
@ -36,11 +40,11 @@ def test(addr, hostname, block=True):
if __name__ == "__main__": if __name__ == "__main__":
# connect to plain HTTP port, oops! # connect to plain HTTP port, oops!
addr = socket.getaddrinfo("micropython.org", 80)[0][-1] addr = socket.getaddrinfo("micropython.org", 80, socket.AF_INET)[0][-1]
test(addr, None) test(addr, "micropython.org")
# connect to plain HTTP port, oops! # connect to plain HTTP port, oops!
addr = socket.getaddrinfo("micropython.org", 80)[0][-1] addr = socket.getaddrinfo("micropython.org", 80, socket.AF_INET)[0][-1]
test(addr, None, False) test(addr, "micropython.org", False)
# connect to server with self-signed cert, oops! # connect to server with self-signed cert, oops!
addr = socket.getaddrinfo("test.mosquitto.org", 8883)[0][-1] addr = socket.getaddrinfo("test.mosquitto.org", 8883, socket.AF_INET)[0][-1]
test(addr, "test.mosquitto.org") test(addr, "test.mosquitto.org")

Wyświetl plik

@ -2,12 +2,12 @@ import socket, ssl, errno, sys, time, select
def test_one(site, opts): def test_one(site, opts):
ai = socket.getaddrinfo(site, 443) ai = socket.getaddrinfo(site, 443, socket.AF_INET)
addr = ai[0][-1] addr = ai[0][-1]
print(addr) print(site)
# Connect the raw socket # Connect the raw socket
s = socket.socket() s = socket.socket(socket.AF_INET)
s.setblocking(False) s.setblocking(False)
try: try:
s.connect(addr) s.connect(addr)
@ -16,17 +16,22 @@ def test_one(site, opts):
if e.errno != errno.EINPROGRESS: if e.errno != errno.EINPROGRESS:
raise raise
# Create SSLContext.
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
# CPython compatibility:
# - disable check_hostname
# - load default system certificate chain
# - must wait for socket to be writable before calling wrap_socket
if sys.implementation.name != "micropython": if sys.implementation.name != "micropython":
# in CPython we have to wait, otherwise wrap_socket is not happy ssl_context.check_hostname = False
ssl_context.load_default_certs()
select.select([], [s], []) select.select([], [s], [])
try: try:
# Wrap with SSL # Wrap with SSL
try: try:
if sys.implementation.name == "micropython": s = ssl_context.wrap_socket(s, do_handshake_on_connect=False)
s = ssl.wrap_socket(s, do_handshake=False)
else:
s = ssl.wrap_socket(s, do_handshake_on_connect=False)
except OSError as e: except OSError as e:
if e.errno != errno.EINPROGRESS: if e.errno != errno.EINPROGRESS:
raise raise
@ -87,8 +92,7 @@ def test_one(site, opts):
SITES = [ SITES = [
"google.com", "www.github.com",
{"host": "www.google.com"},
"micropython.org", "micropython.org",
"pypi.org", "pypi.org",
{"host": "api.pushbullet.com", "sni": True}, {"host": "api.pushbullet.com", "sni": True},
@ -105,7 +109,7 @@ def main():
test_one(site, opts) test_one(site, opts)
print(site, "ok") print(site, "ok")
except Exception as e: except Exception as e:
print(site, "error") print(site, "error", e)
print("DONE") print("DONE")

Wyświetl plik

@ -1,24 +1,34 @@
import sys
import select
import socket import socket
import ssl import ssl
# CPython only supports server_hostname with SSLContext
if hasattr(ssl, "SSLContext"):
ssl = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
def test_one(site, opts): def test_one(site, opts):
ai = socket.getaddrinfo(site, 443) ai = socket.getaddrinfo(site, 443, socket.AF_INET)
addr = ai[0][-1] addr = ai[0][-1]
s = socket.socket() s = socket.socket(socket.AF_INET)
# Create SSLContext.
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
# CPython compatibility:
# - disable check_hostname
# - load default system certificate chain
# - must wait for socket to be writable before calling wrap_socket
if sys.implementation.name != "micropython":
ssl_context.check_hostname = False
ssl_context.load_default_certs()
select.select([], [s], [])
try: try:
s.connect(addr) s.connect(addr)
if "sni" in opts: if "sni" in opts:
s = ssl.wrap_socket(s, server_hostname=opts["host"]) s = ssl_context.wrap_socket(s, server_hostname=opts["host"])
else: else:
s = ssl.wrap_socket(s) s = ssl_context.wrap_socket(s)
s.write(b"GET / HTTP/1.0\r\nHost: %s\r\n\r\n" % bytes(site, "latin")) s.write(b"GET / HTTP/1.0\r\nHost: %s\r\n\r\n" % bytes(site, "latin"))
resp = s.read(4096) resp = s.read(4096)
@ -31,8 +41,7 @@ def test_one(site, opts):
SITES = [ SITES = [
"google.com", "www.github.com",
"www.google.com",
"micropython.org", "micropython.org",
"pypi.org", "pypi.org",
{"host": "api.pushbullet.com", "sni": True}, {"host": "api.pushbullet.com", "sni": True},

Wyświetl plik

@ -1,5 +0,0 @@
google.com ok
www.google.com ok
micropython.org ok
pypi.org ok
api.pushbullet.com ok

Wyświetl plik

@ -1,13 +1,17 @@
# test that modtls produces a text error message # test that modtls produces a text error message
import socket, ssl, sys import socket, ssl
def test(addr): def test(addr):
s = socket.socket() s = socket.socket()
s.connect(addr) s.connect(addr)
try: try:
s = ssl.wrap_socket(s) ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
if hasattr(ssl_context, "check_hostname"):
# Disable hostname check on CPython.
ssl_context.check_hostname = False
s = ssl_context.wrap_socket(s)
print("wrap: no exception") print("wrap: no exception")
except OSError as e: except OSError as e:
# mbedtls produces "mbedtls -0x7200: SSL - An invalid SSL record was received" # mbedtls produces "mbedtls -0x7200: SSL - An invalid SSL record was received"