little-boxes/little_boxes/httpsig.py

146 wiersze
4.3 KiB
Python
Czysty Zwykły widok Historia

2018-06-13 19:42:03 +00:00
"""Implements HTTP signature for Flask requests.
Mastodon instances won't accept requests that are not signed using this scheme.
"""
import base64
import hashlib
import logging
2018-06-15 22:27:49 +00:00
from datetime import datetime
from typing import Any
from typing import Dict
from typing import Optional
from urllib.parse import urlparse
2018-06-13 19:42:03 +00:00
2018-06-15 22:27:49 +00:00
from Crypto.Hash import SHA256
from Crypto.Signature import PKCS1_v1_5
2018-06-13 19:42:03 +00:00
from requests.auth import AuthBase
2018-06-16 09:17:15 +00:00
from .activitypub import get_backend
2019-04-23 22:25:45 +00:00
from .activitypub import _has_type
2018-08-28 20:15:31 +00:00
from .errors import ActivityNotFoundError
from .errors import ActivityGoneError
2018-06-16 09:17:15 +00:00
from .key import Key
2018-06-13 19:42:03 +00:00
2018-06-16 09:17:15 +00:00
logger = logging.getLogger(__name__)
2018-06-15 22:27:49 +00:00
2018-06-13 19:42:03 +00:00
def _build_signed_string(
signed_headers: str, method: str, path: str, headers: Any, body_digest: str
) -> str:
out = []
for signed_header in signed_headers.split(" "):
if signed_header == "(request-target)":
out.append("(request-target): " + method.lower() + " " + path)
elif signed_header == "digest":
out.append("digest: " + body_digest)
else:
out.append(signed_header + ": " + headers[signed_header])
return "\n".join(out)
def _parse_sig_header(val: Optional[str]) -> Optional[Dict[str, str]]:
if not val:
return None
out = {}
for data in val.split(","):
k, v = data.split("=", 1)
out[k] = v[1 : len(v) - 1] # noqa: black conflict
return out
def _verify_h(signed_string, signature, pubkey):
signer = PKCS1_v1_5.new(pubkey)
digest = SHA256.new()
digest.update(signed_string.encode("utf-8"))
return signer.verify(digest, signature)
2018-06-16 09:17:15 +00:00
def _body_digest(body: str) -> str:
2018-06-13 19:42:03 +00:00
h = hashlib.new("sha256")
2018-06-16 19:57:07 +00:00
h.update(body) # type: ignore
2018-06-13 19:42:03 +00:00
return "SHA-256=" + base64.b64encode(h.digest()).decode("utf-8")
2018-06-16 09:17:15 +00:00
def _get_public_key(key_id: str) -> Key:
actor = get_backend().fetch_iri(key_id)
2019-04-23 22:25:45 +00:00
if _has_type(actor["type"], "Key"):
# The Key is not embedded in the Person
k = Key(actor["owner"], actor["id"])
k.load_pub(actor["publicKeyPem"])
else:
k = Key(actor["id"], actor["publicKey"]["id"])
k.load_pub(actor["publicKey"]["publicKeyPem"])
# Ensure the right key was fetch
2019-04-23 22:25:45 +00:00
if key_id != k.key_id():
2019-04-17 22:51:17 +00:00
raise ValueError(
f"failed to fetch requested key {key_id}: got {actor['publicKey']['id']}"
)
2018-06-16 09:17:15 +00:00
return k
def verify_request(method: str, path: str, headers: Any, body: str) -> bool:
hsig = _parse_sig_header(headers.get("Signature"))
2018-06-13 19:42:03 +00:00
if not hsig:
logger.debug("no signature in header")
return False
logger.debug(f"hsig={hsig}")
signed_string = _build_signed_string(
2018-06-16 09:17:15 +00:00
hsig["headers"], method, path, headers, _body_digest(body)
2018-06-13 19:42:03 +00:00
)
2018-06-16 09:17:15 +00:00
2018-08-28 20:15:31 +00:00
try:
k = _get_public_key(hsig["keyId"])
except (ActivityGoneError, ActivityNotFoundError):
logger.debug("cannot get public key")
return False
2018-06-16 09:17:15 +00:00
return _verify_h(signed_string, base64.b64decode(hsig["signature"]), k.pubkey)
2018-06-13 19:42:03 +00:00
class HTTPSigAuth(AuthBase):
2018-06-16 09:17:15 +00:00
"""Requests auth plugin for signing requests on the fly."""
def __init__(self, key: Key) -> None:
self.key = key
2018-06-13 19:42:03 +00:00
def __call__(self, r):
2018-06-16 10:01:00 +00:00
logger.info(f"keyid={self.key.key_id()}")
2018-06-13 19:42:03 +00:00
host = urlparse(r.url).netloc
2018-06-16 09:17:15 +00:00
2018-06-13 19:42:03 +00:00
bh = hashlib.new("sha256")
2018-06-16 10:01:00 +00:00
body = r.body
try:
body = r.body.encode("utf-8")
except AttributeError:
pass
bh.update(body)
2018-06-13 19:42:03 +00:00
bodydigest = "SHA-256=" + base64.b64encode(bh.digest()).decode("utf-8")
2018-06-16 09:17:15 +00:00
2018-06-13 19:42:03 +00:00
date = datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S GMT")
2018-06-16 09:17:15 +00:00
r.headers.update({"Digest": bodydigest, "Date": date, "Host": host})
2018-06-13 19:42:03 +00:00
sigheaders = "(request-target) user-agent host date digest content-type"
2018-06-16 09:17:15 +00:00
2018-06-13 19:42:03 +00:00
to_be_signed = _build_signed_string(
sigheaders, r.method, r.path_url, r.headers, bodydigest
)
2018-06-16 09:17:15 +00:00
signer = PKCS1_v1_5.new(self.key.privkey)
2018-06-13 19:42:03 +00:00
digest = SHA256.new()
digest.update(to_be_signed.encode("utf-8"))
sig = base64.b64encode(signer.sign(digest))
sig = sig.decode("utf-8")
2018-06-16 09:17:15 +00:00
key_id = self.key.key_id()
2018-06-13 19:42:03 +00:00
headers = {
2018-06-16 09:17:15 +00:00
"Signature": f'keyId="{key_id}",algorithm="rsa-sha256",headers="{sigheaders}",signature="{sig}"'
2018-06-13 19:42:03 +00:00
}
2018-06-16 09:17:15 +00:00
logger.debug(f"signed request headers={headers}")
2018-06-13 19:42:03 +00:00
r.headers.update(headers)
2018-06-16 09:17:15 +00:00
2018-06-13 19:42:03 +00:00
return r