ATProto: first pass at polling posts (timelines)

for #694
pull/729/head
Ryan Barrett 2023-11-14 12:30:14 -08:00
rodzic 9c5adab4f4
commit a5b5078729
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 6BE31FDF4776E9D4
3 zmienionych plików z 159 dodań i 3 usunięć

Wyświetl plik

@ -415,9 +415,13 @@ def poll_notifications():
Uses the ``listNotifications`` endpoint, which is intended for end users. 🤷
https://github.com/bluesky-social/atproto/discussions/1538
TODO: unify with poll_posts
"""
repos = {r.key.id(): r for r in AtpRepo.query()}
logger.info(f'Got {len(repos)} repos')
if not repos:
return
# TODO: switch from atproto_did to copies
users = itertools.chain(*(cls.query(cls.atproto_did.IN(list(repos)))
@ -459,3 +463,62 @@ def poll_notifications():
# User yet.
return 'OK'
# URL route is registered in hub.py
def poll_posts():
"""Fetches and enqueueus new posts from the AppView for our users.
Uses the ``getTimeline`` endpoint, which is intended for end users. 🤷
TODO: unify with poll_notifications
"""
repos = {r.key.id(): r for r in AtpRepo.query()}
logger.info(f'Got {len(repos)} repos')
if not repos:
return
# TODO: switch from atproto_did to copies
users = itertools.chain(*(cls.query(cls.atproto_did.IN(list(repos)))
for cls in set(PROTOCOLS.values())
if cls and cls != ATProto))
# TODO: convert to Session for connection pipelining!
client = Client(f'https://{os.environ["APPVIEW_HOST"]}',
headers={'User-Agent': USER_AGENT})
for user in users:
logging.debug(f'Fetching notifs for {user.key.id()}')
# TODO: store and use cursor
# seenAt would be easier, but they don't support it yet
# https://github.com/bluesky-social/atproto/issues/1636
repo = repos[user.atproto_did]
client.session['accessJwt'] = service_jwt(os.environ['APPVIEW_HOST'],
repo_did=user.atproto_did,
privkey=repo.signing_key)
resp = client.app.bsky.feed.getTimeline()
for item in resp['feed']:
uri = item['post']['uri']
logger.debug(f'Got {uri}: {json_dumps(item, indent=2)}')
# TODO: handle reposts once we have a URI for them
# https://github.com/bluesky-social/atproto/issues/1811
#
# TODO: verify sig. skipping this for now because we're getting
# these from the AppView, which is trusted, specifically we expect
# the BGS and/or the AppView already checked sigs.
obj = Object.get_or_create(id=uri, bsky=item['post'],
source_protocol=ATProto.ABBREV)
if not obj.status:
obj.status = 'new'
obj.add('feed', user.key)
obj.put()
common.create_task(queue='receive', obj=obj.key.urlsafe(),
authed_as=user.atproto_did)
# note that we don't pass a user param above. it's the acting user,
# which is different for every notif, and may not actually have a BF
# User yet.
return 'OK'

4
hub.py
Wyświetl plik

@ -70,6 +70,10 @@ app.add_url_rule('/queue/atproto-poll-notifs',
view_func=atproto.poll_notifications,
methods=['POST'])
app.add_url_rule('/queue/atproto-poll-posts',
view_func=atproto.poll_posts,
methods=['POST'])
@app.post('/queue/atproto-commit')
@flask_util.cloud_tasks_only
def atproto_commit():

Wyświetl plik

@ -3,7 +3,7 @@ import base64
import copy
import logging
from unittest import skip
from unittest.mock import call, MagicMock, patch
from unittest.mock import ANY, call, MagicMock, patch
from arroba.datastore_storage import AtpBlock, AtpRemoteBlob, AtpRepo, DatastoreStorage
from arroba.did import encode_did_key
@ -704,8 +704,8 @@ class ATProtoTest(TestCase):
@patch('requests.get')
def test_poll_notifications(self, mock_get, mock_create_task):
user_a = self.make_user(id='fake:user-a', cls=Fake, atproto_did=f'did:plc:a')
user_b = self.make_user(id='fake:user-c', cls=Fake, atproto_did=f'did:plc:b')
user_c = self.make_user(id='fake:user-b', cls=Fake, atproto_did=f'did:plc:c')
user_b = self.make_user(id='fake:user-b', cls=Fake, atproto_did=f'did:plc:b')
user_c = self.make_user(id='fake:user-c', cls=Fake, atproto_did=f'did:plc:c')
Repo.create(self.storage, 'did:plc:a', signing_key=ATPROTO_KEY)
Repo.create(self.storage, 'did:plc:c', signing_key=ATPROTO_KEY)
@ -809,3 +809,92 @@ class ATProtoTest(TestCase):
self.assertEqual(follow, follow_obj.bsky)
self.assert_task(mock_create_task, 'receive', '/queue/receive',
obj=follow_obj.key.urlsafe(), authed_as='did:plc:a')
@patch.object(tasks_client, 'create_task', return_value=Task(name='my task'))
@patch('requests.get')
def test_poll_posts(self, mock_get, mock_create_task):
user_a = self.make_user(id='fake:user-a', cls=Fake, atproto_did=f'did:plc:a')
user_b = self.make_user(id='fake:user-b', cls=Fake, atproto_did=f'did:plc:b')
user_c = self.make_user(id='fake:user-c', cls=Fake, atproto_did=f'did:plc:c')
Repo.create(self.storage, 'did:plc:a', signing_key=ATPROTO_KEY)
Repo.create(self.storage, 'did:plc:b', signing_key=ATPROTO_KEY)
Repo.create(self.storage, 'did:plc:c', signing_key=ATPROTO_KEY)
post_view = {
'$type': 'app.bsky.feed.defs#postView',
'uri': 'at://did:web:alice.com/app.bsky.feed.post/123',
'cid': 'TODO',
'record': {
'$type': 'app.bsky.feed.post',
'text': 'My original post',
'createdAt': '2007-07-07T03:04:05',
},
'author': {
'$type': 'app.bsky.actor.defs#profileViewBasic',
'did': 'did:web:alice.com',
'handle': 'alice.com',
},
}
mock_get.side_effect = [
requests_response({
'cursor': '...',
'feed': [{
'$type': 'app.bsky.feed.defs#feedViewPost',
'post': post_view,
}],
}),
requests_response({
**DID_DOC,
'id': 'did:plc:alice.com',
}),
requests_response({
'cursor': '...',
'feed': [],
}),
requests_response({
'cursor': '...',
'feed': [{
'$type': 'app.bsky.feed.defs#feedViewPost',
'post': post_view,
'reason': {
'$type': 'app.bsky.feed.defs#reasonRepost',
'by': {
'$type': 'app.bsky.actor.defs#profileViewBasic',
'did': 'did:web:bob.com',
'handle': 'bob.com',
},
'indexedAt': '2022-01-02T03:04:05+00:00',
},
}],
}),
]
resp = self.post('/queue/atproto-poll-posts', client=hub.app.test_client())
self.assertEqual(200, resp.status_code)
get_timeline = call(
'https://api.bsky-sandbox.dev/xrpc/app.bsky.feed.getTimeline',
json=None,
headers={
'Content-Type': 'application/json',
'User-Agent': common.USER_AGENT,
'Authorization': ANY,
})
self.assertEqual([
get_timeline,
self.req('https://alice.com/.well-known/did.json'),
get_timeline,
get_timeline,
], mock_get.call_args_list)
post_obj = Object.get_by_id('at://did:web:alice.com/app.bsky.feed.post/123')
self.assertEqual(post_view, post_obj.bsky)
self.assert_task(mock_create_task, 'receive', '/queue/receive',
obj=post_obj.key.urlsafe(), authed_as='did:plc:a')
# TODO
# repost_obj = Object.get_by_id('at://did:plc:d/app.bsky.feed.post/456')
# self.assertEqual(repost, repost_obj.bsky)
# self.assert_task(mock_create_task, 'receive', '/queue/receive',
# obj=repost_obj.key.urlsafe(), authed_as='did:plc:eve')