PicoWireless: move HTTP code to ppwhttp library

Creates a new ppwhttp library to hide the implementation detail of HTTP clients/servers from the examples.

Adds a new example - plasma_ws2812_http.py - showing how to expand rgb_http.py to use a WS2812 pixel strip.

Adds "secrets.py" and moves WIFI connection information there. ppwhttp will throw an error if it's missing.
pull/208/head
Phil Howard 2021-09-29 12:16:54 +01:00
rodzic 37209cd0c0
commit 9f07be90da
6 zmienionych plików z 349 dodań i 222 usunięć

Wyświetl plik

@ -0,0 +1 @@
secrets.py

Wyświetl plik

@ -1,106 +1,25 @@
import time
import picowireless
from micropython import const
WIFI_SSID = "Your SSID here!"
WIFI_PASS = "Your PSK here!"
try:
import ppwhttp
except ImportError:
raise RuntimeError("Cannot find ppwhttp. Have you copied ppwhttp.py to your Pico?")
CLOUDFLARE_DNS = (1, 1, 1, 1)
GOOGLE_DNS = (8, 8, 8, 8)
USE_DNS = CLOUDFLARE_DNS
TCP_MODE = const(0)
HTTP_REQUEST_DELAY = const(30)
HTTP_PORT = 80
HTTP_REQUEST_DELAY = const(60)
HTTP_REQUEST_PORT = const(80)
HTTP_REQUEST_HOST = "api.thingspeak.com"
HTTP_REQUEST_PATH = "/channels/1417/field/2/last.txt"
def connect(host_address, port, client_sock, timeout=1000):
picowireless.client_start(host_address, port, client_sock, TCP_MODE)
t_start = time.time()
timeout /= 1000.0
while time.time() - t_start < timeout:
state = picowireless.get_client_state(client_sock)
if state == 4:
return True
time.sleep(1.0)
return False
def http_request(client_sock, host_address, port, request_host, request_path, handler, timeout=5000):
print("Connecting to {1}.{2}.{3}.{4}:{0}...".format(port, *host_address))
if not connect(host_address, port, client_sock):
print("Connection failed!")
return False
print("Connected!")
http_request = """GET {} HTTP/1.1
Host: {}
Connection: close
""".format(request_path, request_host).replace("\n", "\r\n")
picowireless.send_data(client_sock, http_request)
t_start = time.time()
while True:
if time.time() - t_start > timeout:
picowireless.client_stop(client_sock)
print("HTTP request to {}:{} timed out...".format(host_address, port))
return False
avail_length = picowireless.avail_data(client_sock)
if avail_length > 0:
break
print("Got response: {} bytes".format(avail_length))
response = b""
while len(response) < avail_length:
data = picowireless.get_data_buf(client_sock)
response += data
response = response.decode("utf-8")
head, body = response.split("\r\n\r\n", 1)
dhead = {}
for line in head.split("\r\n")[1:]:
key, value = line.split(": ", 1)
dhead[key] = value
handler(dhead, body)
picowireless.client_stop(client_sock)
picowireless.init()
print("Connecting to {}...".format(WIFI_SSID))
picowireless.wifi_set_passphrase(WIFI_SSID, WIFI_PASS)
while True:
if picowireless.get_connection_status() == 3:
break
print("Connected!")
ppwhttp.start_wifi()
ppwhttp.set_dns(ppwhttp.GOOGLE_DNS)
# Get our own local IP!
my_ip = picowireless.get_ip_address()
my_ip = ppwhttp.get_ip_address()
print("Local IP: {}.{}.{}.{}".format(*my_ip))
# Resolve and cache the IP address
picowireless.set_dns(USE_DNS)
http_address = picowireless.get_host_by_name(HTTP_REQUEST_HOST)
print("Resolved {} to {}.{}.{}.{}".format(HTTP_REQUEST_HOST, *http_address))
client_sock = picowireless.get_socket()
def handler(head, body):
if head["Status"] == "200 OK":
@ -108,12 +27,12 @@ def handler(head, body):
r = int(color[0:2], 16)
g = int(color[2:4], 16)
b = int(color[4:6], 16)
picowireless.set_led(r, g, b)
ppwhttp.set_led(r, g, b)
print("Set LED to {} {} {}".format(r, g, b))
else:
print("Error: {}".format(head["Status"]))
while True:
http_request(client_sock, http_address, HTTP_PORT, HTTP_REQUEST_HOST, HTTP_REQUEST_PATH, handler)
time.sleep(60.0)
ppwhttp.http_request(HTTP_REQUEST_HOST, HTTP_REQUEST_PORT, HTTP_REQUEST_HOST, HTTP_REQUEST_PATH, handler)
time.sleep(HTTP_REQUEST_DELAY)

Wyświetl plik

@ -0,0 +1,82 @@
import time
import plasma
try:
import ppwhttp
except ImportError:
raise RuntimeError("Cannot find ppwhttp. Have you copied ppwhttp.py to your Pico?")
"""
This example uses the Plasma WS2812 LED library to drive a string of LEDs alongside the built-in RGB LED.
You should wire your LEDs to VBUS/GND and connect the data pin to pin 27 (unused by Pico Wireless).
"""
NUM_LEDS = 30 # Number of connected LEDs
LED_PIN = 27 # LED data pin (27 is unused by Pico Wireless)
LED_PIO = 0 # Hardware PIO (0 or 1)
LED_SM = 0 # PIO State-Machine (0 to 3)
r = 0
g = 0
b = 0
led_strip = plasma.WS2812(NUM_LEDS, LED_PIO, LED_SM, LED_PIN)
# Edit your routes here
# Nothing fancy is supported, just plain ol' URLs and GET/POST methods
@ppwhttp.route("/", methods=["GET", "POST"])
def get_home(method, url, data=None):
if method == "POST":
global r, g, b
r = int(data.get("r", 0))
g = int(data.get("g", 0))
b = int(data.get("b", 0))
ppwhttp.set_led(r, g, b)
for i in range(NUM_LEDS):
led_strip.set_rgb(i, r, g, b)
print("Set LED to {} {} {}".format(r, g, b))
return """<form method="post" action="/">
<input id="r" name="r" type="number" value="{r}" />
<input name="g" type="number" value="{g}" />
<input name="b" type="number" value="{b}" />
<input type="submit" value="Set LED" />
</form>""".format(r=r, g=g, b=b)
@ppwhttp.route("/test", methods="GET")
def get_test(method, url):
return "Hello World!"
ppwhttp.start_wifi()
led_strip.start()
server_sock = ppwhttp.start_server()
while True:
ppwhttp.handle_http_request(server_sock)
time.sleep(0.01)
# Whoa there! Did you know you could run the server polling loop
# on Pico's *other* core!? Here's how:
#
# import _thread
#
# def server_loop_forever():
# # Start a server and continuously poll for HTTP requests
# server_sock = ppwhttp.start_server()
# while True:
# ppwhttp.handle_http_request(server_sock)
# time.sleep(0.01)
#
# Handle the server polling loop on the other core!
# _thread.start_new_thread(server_loop_forever, ())
#
# # Your very own main loop for fun and profit!
# while True:
# print("Colour: {} {} {}".format(r, g, b))
# time.sleep(5.0)

Wyświetl plik

@ -0,0 +1,240 @@
"""Pimoroni Pico Wireless HTTP
A super-simple HTTP server library for Pico Wireless.
"""
import time
import picowireless
from micropython import const
try:
from secrets import WIFI_SSID, WIFI_PASS
except ImportError:
WIFI_SSID = None
WIFI_PASS = None
TCP_CLOSED = const(0)
TCP_LISTEN = const(1)
CLOUDFLARE_DNS = (1, 1, 1, 1)
GOOGLE_DNS = (8, 8, 8, 8)
DEFAULT_HTTP_PORT = const(80)
routes = {}
sockets = []
hosts = {}
def set_led(r, g, b):
"""Set """
picowireless.set_led(r, g, b)
def get_socket(force_new=False):
global sockets
if force_new or len(sockets) == 0:
socket = picowireless.get_socket()
sockets.append(socket)
return socket
return sockets[0]
def get_ip_address():
return picowireless.get_ip_address()
def set_dns(dns):
picowireless.set_dns(dns)
def get_host_by_name(hostname, no_cache=False):
# Already an IP
if type(hostname) is tuple and len(hostname) == 4:
return hostname
# Get from cached hosts
if hostname in hosts and not no_cache:
return hosts[hostname]
ip = picowireless.get_host_by_name(hostname)
hosts[hostname] = ip
return ip
def start_wifi(wifi_ssid=WIFI_SSID, wifi_pass=WIFI_PASS):
if wifi_ssid is None or wifi_pass is None:
raise RuntimeError("WiFi SSID/PASS required. Set them in secrets.py and copy it to your Pico, or pass them as arguments.")
picowireless.init()
print("Connecting to {}...".format(wifi_ssid))
picowireless.wifi_set_passphrase(wifi_ssid, wifi_pass)
while True:
if picowireless.get_connection_status() == 3:
break
print("Connected!")
def start_server(http_port=DEFAULT_HTTP_PORT, timeout=5000):
my_ip = picowireless.get_ip_address()
print("Starting server...")
server_sock = picowireless.get_socket()
picowireless.server_start(http_port, server_sock, 0)
t_start = time.ticks_ms()
while time.ticks_ms() - t_start < timeout:
state = picowireless.get_server_state(server_sock)
if state == TCP_LISTEN:
print("Server listening on {1}.{2}.{3}.{4}:{0}".format(http_port, *my_ip))
return server_sock
return None
def connect_to_server(host_address, port, client_sock, timeout=5000):
picowireless.client_start(host_address, port, client_sock, TCP_CLOSED)
t_start = time.ticks_ms()
while time.ticks_ms() - t_start < timeout:
state = picowireless.get_client_state(client_sock)
if state == 4:
return True
time.sleep(1.0)
return False
def http_request(host_address, port, request_host, request_path, handler, timeout=5000, client_sock=None):
if client_sock is None:
client_sock = get_socket()
host_address = get_host_by_name(host_address)
print("Connecting to {1}.{2}.{3}.{4}:{0}...".format(port, *host_address))
if not connect_to_server(host_address, port, client_sock):
print("Connection failed!")
return False
print("Connected!")
http_request = """GET {} HTTP/1.1
Host: {}
Connection: close
""".format(request_path, request_host).replace("\n", "\r\n")
picowireless.send_data(client_sock, http_request)
t_start = time.ticks_ms()
while True:
if time.ticks_ms() - t_start > timeout:
picowireless.client_stop(client_sock)
print("HTTP request to {}:{} timed out...".format(host_address, port))
return False
avail_length = picowireless.avail_data(client_sock)
if avail_length > 0:
break
print("Got response: {} bytes".format(avail_length))
response = b""
while len(response) < avail_length:
data = picowireless.get_data_buf(client_sock)
response += data
response = response.decode("utf-8")
head, body = response.split("\r\n\r\n", 1)
dhead = {}
for line in head.split("\r\n")[1:]:
key, value = line.split(": ", 1)
dhead[key] = value
handler(dhead, body)
picowireless.client_stop(client_sock)
def handle_http_request(server_sock, timeout=5000):
t_start = time.ticks_ms()
client_sock = picowireless.avail_server(server_sock)
if client_sock in [server_sock, 255, -1]:
return False
print("Client connected!")
avail_length = picowireless.avail_data(client_sock)
if avail_length == 0:
picowireless.client_stop(client_sock)
return False
request = b""
while len(request) < avail_length:
data = picowireless.get_data_buf(client_sock)
request += data
if time.ticks_ms() - t_start > timeout:
print("Client timed out getting data!")
picowireless.client_stop(client_sock)
return False
request = request.decode("utf-8")
if len(request) > 0:
head, body = request.split("\r\n\r\n", 1)
dhead = {}
for line in head.split("\r\n")[1:]:
key, value = line.split(": ", 1)
dhead[key] = value
method, url, _ = head.split("\r\n", 1)[0].split(" ")
print("Serving {} on {}...".format(method, url))
response = None
# Dispatch the request to the relevant route
if url in routes and method in routes[url] and callable(routes[url][method]):
if method == "POST":
data = {}
for var in body.split("&"):
key, value = var.split("=")
data[key] = value
response = routes[url][method](method, url, data)
else:
response = routes[url][method](method, url)
if response is not None:
response = "HTTP/1.1 200 OK\r\nContent-Length: {}\r\nContent-Type: text/html\r\n\r\n".format(len(response)) + response
picowireless.send_data(client_sock, response)
picowireless.client_stop(client_sock)
print("Success! Sending 200 OK")
return True
else:
picowireless.send_data(client_sock, "HTTP/1.1 501 Not Implemented\r\nContent-Length: 19\r\n\r\n501 Not Implemented")
picowireless.client_stop(client_sock)
print("Unhandled Request! Sending 501 OK")
return False
def route(url, methods="GET"):
if type(methods) is str:
methods = [methods]
def decorate(handler):
for method in methods:
if url not in routes:
routes[url] = {}
routes[url][method] = handler
return decorate

Wyświetl plik

@ -1,143 +1,26 @@
import time
import picowireless
from micropython import const
WIFI_SSID = "your SSID here!"
WIFI_PASS = "Your PSK here!"
try:
import ppwhttp
except ImportError:
raise RuntimeError("Cannot find ppwhttp. Have you copied ppwhttp.py to your Pico?")
TCP_CLOSED = 0
TCP_LISTEN = 1
CLOUDFLARE_DNS = (1, 1, 1, 1)
GOOGLE_DNS = (8, 8, 8, 8)
TCP_MODE = const(0)
HTTP_REQUEST_DELAY = const(30)
HTTP_PORT = 80
routes = {}
r = 0
g = 0
b = 0
def start_wifi():
picowireless.init()
print("Connecting to {}...".format(WIFI_SSID))
picowireless.wifi_set_passphrase(WIFI_SSID, WIFI_PASS)
while True:
if picowireless.get_connection_status() == 3:
break
print("Connected!")
def start_server(http_port, timeout=1.0):
my_ip = picowireless.get_ip_address()
print("Starting server...")
server_sock = picowireless.get_socket()
picowireless.server_start(http_port, server_sock, 0)
t_start = time.time()
while time.time() - t_start < timeout:
state = picowireless.get_server_state(server_sock)
if state == TCP_LISTEN:
print("Server listening on {1}.{2}.{3}.{4}:{0}".format(http_port, *my_ip))
return server_sock
return None
def handle_http_request(server_sock, timeout=1.0):
t_start = time.time()
client_sock = picowireless.avail_server(server_sock)
if client_sock in [server_sock, 255, -1]:
return False
print("Client connected!")
avail_length = picowireless.avail_data(client_sock)
if avail_length == 0:
picowireless.client_stop(client_sock)
return False
request = b""
while len(request) < avail_length:
data = picowireless.get_data_buf(client_sock)
request += data
if time.time() - t_start > timeout:
print("Client timed out getting data!")
picowireless.client_stop(client_sock)
return False
request = request.decode("utf-8")
if len(request) > 0:
head, body = request.split("\r\n\r\n", 1)
dhead = {}
for line in head.split("\r\n")[1:]:
key, value = line.split(": ", 1)
dhead[key] = value
method, url, _ = head.split("\r\n", 1)[0].split(" ")
print("Serving {} on {}...".format(method, url))
response = None
# Dispatch the request to the relevant route
if url in routes and method in routes[url] and callable(routes[url][method]):
if method == "POST":
data = {}
for var in body.split("&"):
key, value = var.split("=")
data[key] = value
response = routes[url][method](method, url, data)
else:
response = routes[url][method](method, url)
if response is not None:
response = "HTTP/1.1 200 OK\r\nContent-Length: {}\r\nContent-Type: text/html\r\n\r\n".format(len(response)) + response
picowireless.send_data(client_sock, response)
picowireless.client_stop(client_sock)
print("Success! Sending 200 OK")
return True
else:
picowireless.send_data(client_sock, "HTTP/1.1 501 Not Implemented\r\nContent-Length: 19\r\n\r\n501 Not Implemented")
picowireless.client_stop(client_sock)
print("Unhandled Request! Sending 501 OK")
return False
def route(url, methods="GET"):
if type(methods) is str:
methods = [methods]
def decorate(handler):
for method in methods:
if url not in routes:
routes[url] = {}
routes[url][method] = handler
return decorate
# Edit your routes here
# Nothing fancy is supported, just plain ol' URLs and GET/POST methods
@route("/", methods=["GET", "POST"])
@ppwhttp.route("/", methods=["GET", "POST"])
def get_home(method, url, data=None):
if method == "POST":
global r, g, b
r = int(data.get("r", 0))
g = int(data.get("g", 0))
b = int(data.get("b", 0))
picowireless.set_led(r, g, b)
ppwhttp.set_led(r, g, b)
print("Set LED to {} {} {}".format(r, g, b))
return """<form method="post" action="/">
@ -148,16 +31,16 @@ def get_home(method, url, data=None):
</form>""".format(r=r, g=g, b=b)
@route("/test", methods="GET")
@ppwhttp.route("/test", methods="GET")
def get_test(method, url):
return "Hello World!"
start_wifi()
ppwhttp.start_wifi()
server_sock = start_server(HTTP_PORT)
server_sock = ppwhttp.start_server()
while True:
handle_http_request(server_sock)
ppwhttp.handle_http_request(server_sock)
time.sleep(0.01)
@ -168,9 +51,9 @@ while True:
#
# def server_loop_forever():
# # Start a server and continuously poll for HTTP requests
# server_sock = start_server(HTTP_PORT)
# server_sock = ppwhttp.start_server()
# while True:
# handle_http_request(server_sock)
# ppwhttp.handle_http_request(server_sock)
# time.sleep(0.01)
#
# Handle the server polling loop on the other core!

Wyświetl plik

@ -0,0 +1,2 @@
WIFI_SSID = "your SSID here!"
WIFI_PASS = "Your PSK here!"