litepub
Thomas Sileo 2019-08-08 22:43:21 +02:00
rodzic 26a9483ebc
commit 549a430fc7
2 zmienionych plików z 32 dodań i 13 usunięć

Wyświetl plik

@ -21,18 +21,25 @@ from .activitypub import _has_type
from .errors import ActivityNotFoundError
from .errors import ActivityGoneError
from .key import Key
from typing import Tuple
logger = logging.getLogger(__name__)
def _build_signed_string(
signed_headers: str, method: str, path: str, headers: Any, body_digest: str
signed_headers: str,
method: str,
path: str,
headers: Any,
body_digest: Optional[str],
) -> str:
out = []
for signed_header in signed_headers.split(" "):
if signed_header == "(request-target)":
out.append("(request-target): " + method.lower() + " " + path)
elif signed_header == "digest":
if not body_digest:
raise ValueError("missing body digest for build signed string")
out.append("digest: " + body_digest)
else:
out.append(signed_header + ": " + headers[signed_header])
@ -56,7 +63,9 @@ def _verify_h(signed_string, signature, pubkey):
return signer.verify(digest, signature)
def _body_digest(body: str) -> str:
def _body_digest(body: Optional[str]) -> Optional[str]:
if not body:
return None
h = hashlib.new("sha256")
h.update(body) # type: ignore
return "SHA-256=" + base64.b64encode(h.digest()).decode("utf-8")
@ -82,11 +91,13 @@ def _get_public_key(key_id: str) -> Key:
return k
def verify_request(method: str, path: str, headers: Any, body: str) -> bool:
def verify_request(
method: str, path: str, headers: Any, body: Optional[str]
) -> Tuple[bool, str]:
hsig = _parse_sig_header(headers.get("Signature"))
if not hsig:
logger.debug("no signature in header")
return False
return False, ""
logger.debug(f"hsig={hsig}")
signed_string = _build_signed_string(
hsig["headers"], method, path, headers, _body_digest(body)
@ -96,9 +107,12 @@ def verify_request(method: str, path: str, headers: Any, body: str) -> bool:
k = _get_public_key(hsig["keyId"])
except (ActivityGoneError, ActivityNotFoundError):
logger.debug("cannot get public key")
return False
return False, hsig["keyId"]
return _verify_h(signed_string, base64.b64decode(hsig["signature"]), k.pubkey)
return (
_verify_h(signed_string, base64.b64decode(hsig["signature"]), k.pubkey),
hsig["keyId"],
)
class HTTPSigAuth(AuthBase):
@ -117,14 +131,19 @@ class HTTPSigAuth(AuthBase):
body = r.body.encode("utf-8")
except AttributeError:
pass
bh.update(body)
bodydigest = "SHA-256=" + base64.b64encode(bh.digest()).decode("utf-8")
date = datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S GMT")
new_headers = {"Date": date, "Host": host}
bodydigest = None
if body:
sigheaders = "(request-target) user-agent host date digest content-type"
bh.update(body)
bodydigest = "SHA-256=" + base64.b64encode(bh.digest()).decode("utf-8")
new_headers.update({"Digest": bodydigest})
else:
sigheaders = "(request-target) user-agent host date"
r.headers.update({"Digest": bodydigest, "Date": date, "Host": host})
sigheaders = "(request-target) user-agent host date digest content-type"
r.headers.update(new_headers)
to_be_signed = _build_signed_string(
sigheaders, r.method, r.path_url, r.headers, bodydigest

Wyświetl plik

@ -34,7 +34,7 @@ def test_httpsig():
resp.request.path_url,
resp.request.headers,
resp.request.body,
)
)[0]
@httpretty.activate
@ -56,4 +56,4 @@ def test_httpsig_key():
resp.request.path_url,
resp.request.headers,
resp.request.body,
)
)[0]