ATProto firehose: implement deletes

for #978
in-reply-to-bridged
Ryan Barrett 2024-05-08 13:26:36 -07:00
rodzic 5c77f02dba
commit 7867326ebe
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 6BE31FDF4776E9D4
2 zmienionych plików z 45 dodań i 19 usunięć

Wyświetl plik

@ -50,31 +50,41 @@ def subscribe():
elif header['t'] != '#commit':
continue
_, blocks = read_car(payload['blocks'])
blocks = {block.cid: block for block in blocks}
repo = payload.get('repo')
if repo in our_bridged_dids: # from a Bridgy Fed non-Bluesky user; ignore
# logger.info(f'Ignoring record from our non-ATProto bridged user {repo}')
continue
blocks = {}
if payload['blocks']:
_, blocks = read_car(payload['blocks'])
blocks = {block.cid: block for block in blocks}
# detect records that reference an ATProto user, eg replies, likes,
# reposts, mentions
for op in payload['ops']:
action = op['action']
cid = op['cid']
path = op['path']
assert action, cid # TODO: more graceful
cid = op['cid']
assert action, path
is_ours = repo in our_atproto_dids
if is_ours and action == 'delete':
logger.info(f'Got delete from our ATProto user: {repo} {path}')
new_commits.put(('delete', path))
continue
block = blocks.get(op['cid'])
if not block: # our own commits are sometimes missing the record (?!?)
# our own commits are sometimes missing the record
# https://github.com/snarfed/bridgy-fed/issues/1016
if not block:
continue
record = block.decoded
type = record.get('$type')
if not type:
print('missing $type!', action, cid)
print(dag_json.encode(record).decode())
logger.warning('commit record missing $type! {action} {cid}')
logger.warning(dag_json.encode(record).decode())
continue
if repo in our_atproto_dids:

Wyświetl plik

@ -48,8 +48,11 @@ class FakeWebsocketClient:
def setup_receive(cls, record, path='app.bsky.feed.post/abc123',
action='create', repo='did:plc:user'):
cid = CID.decode('bafkreicqpqncshdd27sgztqgzocd3zhhqnnsv6slvzhs5uz6f57cq6lmtq')
block = Block(decoded=record)
block_bytes = write_car([cid], [block])
if action == 'delete':
block_bytes = b''
else:
block = Block(decoded=record)
block_bytes = write_car([cid], [block])
cls.to_receive = [({
'op': 1,
@ -59,7 +62,7 @@ class FakeWebsocketClient:
'commit': cid,
'ops': [{
'action': action,
'cid': block.cid,
'cid': None if action == 'delete' else block.cid,
'path': path,
}],
'prev': None,
@ -85,14 +88,14 @@ class ATProtoFirehoseSubscribeTest(TestCase):
'eefake:alice', cls=ExplicitEnableFake,
copies=[Target(protocol='atproto', uri='did:alice')])
def assert_enqueues(self, record):
FakeWebsocketClient.setup_receive(record)
def assert_enqueues(self, record=None, expected=None, action='create', **kwargs):
FakeWebsocketClient.setup_receive(record, action=action, **kwargs)
subscribe()
self.assertEqual(('create', record), new_commits.get())
self.assertEqual((action, expected or record), new_commits.get())
self.assertTrue(new_commits.empty())
def assert_doesnt_enqueue(self, record):
FakeWebsocketClient.setup_receive(record)
def assert_doesnt_enqueue(self, record=None, action='create', **kwargs):
FakeWebsocketClient.setup_receive(record, action=action, **kwargs)
subscribe()
self.assertTrue(new_commits.empty())
@ -128,12 +131,12 @@ class ATProtoFirehoseSubscribeTest(TestCase):
user = self.make_user('did:plc:user', cls=ATProto,
enabled_protocols=['eefake'],
obj_bsky=ACTOR_PROFILE_BSKY)
self.assert_enqueues(POST_BSKY)
self.assert_enqueues(POST_BSKY, repo='did:plc:user')
def test_post_by_other(self):
self.store_object(id='did:plc:eve', raw={**DID_DOC, 'id': 'did:plc:eve'})
self.make_user('did:plc:eve', cls=ATProto, enabled_protocols=['eefake'])
self.assert_doesnt_enqueue(POST_BSKY)
self.assert_doesnt_enqueue(POST_BSKY, repo='did:plc:user')
def test_reply_direct_to_our_user(self):
self.assert_enqueues({
@ -279,10 +282,23 @@ class ATProtoFirehoseSubscribeTest(TestCase):
'subject': 'did:eve',
})
def test_delete_by_our_atproto_user(self):
self.store_object(id='did:plc:user', raw=DID_DOC)
user = self.make_user('did:plc:user', cls=ATProto,
enabled_protocols=['eefake'],
obj_bsky=ACTOR_PROFILE_BSKY)
path = 'app.bsky.feed.post/abc123'
self.assert_enqueues(expected=path, path=path, action='delete')
def test_delete_by_other(self):
self.assert_doesnt_enqueue(action='delete')
@skip
class ATProtoFirehoseHandleTest(TestCase):
@skip
def test_handle(self):
new_commits.put('create')
at_uri = 'at://did:plc:user/app.bsky.feed.like/abc123'
user_key = ATProto(id='did:plc:user').key
obj = self.assert_object(at_uri, bsky=LIKE_BSKY, status='new',