chapeau/kepi/validate.py

116 wiersze
3.1 KiB
Python

import json
import httpsig
import logging
import kepi
import kepi.fetch
# Somewhat based on bowler. Will merge things when the dust settles.
logger = logging.getLogger('kepi')
def validate(
message,
):
"""
Validates a message.
"""
message = dict([
(k.lower(),v) for k,v in message.items()
])
if 'body' not in message:
raise ValueError("message must contain a body")
if isinstance(message['body'], bytes):
try:
# XXX It might not be UTF-8; we have to check the headers
message['body'] = str(message['body'], encoding='UTF-8')
except UnicodeDecodeError as ude:
logger.info("Failed validation: invalid encoding: %s",
ude)
return
if isinstance(message['body'], dict):
body = message['body']
elif isinstance(message['body'], str):
body = json.loads(message['body'])
else:
raise TypeError("message body must be bytes, dict, or str: "+
message['body'])
# make sure this is a real dict.
# httpsig.utils.CaseInsensitiveDict doesn't
# reimplement get(), which cases confusion.
message = dict([(f.lower(), v)
for f,v in message.items()])
logger.debug('Begin validation of: %s', message)
actor = kepi.fetch.fetch(
body.get('actor', None),
)
if not actor['success']:
if (
actor['problem']=='status-code' and
actor['status-code']==410 and
body['type']=='Delete' and
body['actor']==body['object']
):
logger.info("actor is Gone, but that's okay")
return True
else:
logger.info("Couldn't discover remote actor.")
return False
logger.debug('%s: message signature is: %s',
message, message['signature'])
logger.debug('%s: message body is: %s',
message, message['body'])
logger.debug('%s: actor details are: %s',
message, actor)
# XXX key used to sign must "_obviously_belong_to" the actor
try:
key = actor['publicKey']
except TypeError as te:
logger.info('%s: actor has an invalid public key (%s); dropping message',
message, te,
)
return False
hv_args = {
'headers': {
'Content-Type': message['content-type'],
'Date': message['date'],
'Signature': message['signature'],
'Host': message['host'],
'Digest': message.get('digest',''),
},
'secret': key['publicKeyPem'],
'method': 'POST',
'path': message['path'],
'host': message['host'],
'sign_header': 'Signature',
}
logger.debug("Verifying: %s", hv_args)
logger.debug(" ('secret' is not secret: it's the public key)")
hv = httpsig.HeaderVerifier(
**hv_args,
)
if not hv.verify():
logger.info('Validation failed: spoofing attempt')
return False
logger.info('Validation passed!')
return True