ATProto firehose: don't assume required fields, handle error reporting errors

for #978
in-reply-to-bridged
Ryan Barrett 2024-05-09 13:26:24 -07:00
rodzic 61e73d880f
commit 940c0e8cae
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 6BE31FDF4776E9D4
2 zmienionych plików z 38 dodań i 18 usunięć

Wyświetl plik

@ -13,11 +13,10 @@ import dag_json
from granary.bluesky import AT_URI_PATTERN
from lexrpc.client import Client
from oauth_dropins.webutil import util
from oauth_dropins.webutil.appengine_config import error_reporting_client
from oauth_dropins.webutil.appengine_info import DEBUG
from atproto import ATProto, Cursor
from common import add, create_task
from common import add, create_task, report_error
import models
from models import Object
@ -61,10 +60,9 @@ def subscribe(reconnect=True):
try:
_subscribe(atproto_dids=atproto_dids, bridged_dids=bridged_dids)
except BaseException as err:
logger.error(f'reporting error, atproto_firehose.subscribe: {err}')
if DEBUG:
raise
error_reporting_client.report_exception()
report_error(err)
if not reconnect:
return
@ -85,13 +83,13 @@ def _subscribe(atproto_dids=None, bridged_dids=None):
sub_cursor = cursor.cursor + 1 if cursor.cursor else None
for header, payload in client.com.atproto.sync.subscribeRepos(cursor=sub_cursor):
# parse header
if header['op'] == -1:
if header.get('op') == -1:
logger.warning(f'Got error from relay! {payload}')
continue
elif header['t'] == '#info':
elif header.get('t') == '#info':
logger.info(f'Got info from relay: {payload}')
continue
elif header['t'] != '#commit':
elif header.get('t') != '#commit':
continue
# parse payload
@ -102,17 +100,19 @@ def _subscribe(atproto_dids=None, bridged_dids=None):
continue
blocks = {}
if payload['blocks']:
_, blocks = read_car(payload['blocks'])
if block_bytes := payload.get('blocks'):
_, blocks = read_car(block_bytes)
blocks = {block.cid: block for block in blocks}
# detect records that reference an ATProto user, eg replies, likes,
# reposts, mentions
for p_op in payload['ops']:
op = Op(repo=repo, action=p_op['action'], path=p_op['path'],
seq=payload['seq'])
assert op.action and op.path, (op.action, op.path)
cid = p_op['cid']
for p_op in payload.get('ops', []):
op = Op(repo=repo, action=p_op.get('action'), path=p_op.get('path'),
seq=payload.get('seq'))
if not op.action or not op.path:
logger.info(
f'bad payload! seq {op.seq} has action {op.action} path {op.path}!')
continue
is_ours = repo in atproto_dids
if is_ours and op.action == 'delete':
@ -123,10 +123,11 @@ def _subscribe(atproto_dids=None, bridged_dids=None):
new_commits.put(op)
continue
cid = p_op.get('cid')
block = blocks.get(cid)
# our own commits are sometimes missing the record
# https://github.com/snarfed/bridgy-fed/issues/1016
if not block:
if not cid or not block:
continue
op = Op(*op[:-1], record=block.decoded)
@ -231,10 +232,9 @@ def handle(limit=None):
source_protocol=ATProto.LABEL, **record_kwarg)
create_task(queue='receive', obj=obj.key.urlsafe(), authed_as=op.repo)
except BaseException as err:
logger.error(f'reporting error, atproto_firehose.handle: {err}')
if DEBUG:
raise
error_reporting_client.report_exception()
report_error(err)
if util.now().replace(tzinfo=None) - cursor.updated > STORE_CURSOR_FREQ:
# it's been long enough, update our stored cursor

Wyświetl plik

@ -13,7 +13,7 @@ from Crypto.Util import number
from flask import abort, g, make_response, request
from google.protobuf.timestamp_pb2 import Timestamp
from oauth_dropins.webutil import util, webmention
from oauth_dropins.webutil.appengine_config import tasks_client
from oauth_dropins.webutil.appengine_config import error_reporting_client, tasks_client
from oauth_dropins.webutil import appengine_info
from oauth_dropins.webutil.appengine_info import DEBUG
from oauth_dropins.webutil import flask_util
@ -329,3 +329,23 @@ def email_me(msg):
util.send_email(smtp_host=SMTP_HOST, smtp_port=SMTP_PORT,
from_='scufflechuck@gmail.com', to='bridgy-fed@ryanb.org',
subject=util.ellipsize(msg), body=msg)
def report_error(msg, **kwargs):
"""Reports an error to StackDriver Error Reporting.
https://cloud.google.com/error-reporting/docs/reference/libraries#client-libraries-install-python
Duplicated in ``bridgy.util``.
Args:
msg (str)
"""
logger.error(f'reporting error: {msg}')
try:
error_reporting_client.report(msg, **kwargs)
except BaseException:
if not DEBUG:
logger.warning(f'Failed to report error to StackDriver! {msg} {kwargs}', exc_info=True)