ATProto firehose: subscribe at stored cursor + 1

for #978
in-reply-to-bridged
Ryan Barrett 2024-05-09 08:44:30 -07:00
rodzic a690bc0115
commit eea8779872
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 6BE31FDF4776E9D4
3 zmienionych plików z 6 dodań i 2 usunięć

Wyświetl plik

@ -88,6 +88,9 @@ class Cursor(StringIdModel):
Key id is ``[HOST] [XRPC]``, where ``[XRPC]`` is the NSID of the XRPC method
for the event stream. For example, `subscribeRepos` on the production relay
is ``bsky.network com.atproto.sync.subscribeRepos``.
``cursor`` is the latest sequence number that we know we've seen, so when we
re-subscribe to this event stream, we should send ``cursor + 1``.
"""
cursor = ndb.IntegerProperty()
created = ndb.DateTimeProperty(auto_now_add=True)

Wyświetl plik

@ -82,7 +82,8 @@ def _subscribe(atproto_dids=None, bridged_dids=None):
client = Client(f'https://{os.environ["BGS_HOST"]}')
for header, payload in client.com.atproto.sync.subscribeRepos(cursor=cursor.cursor):
sub_cursor = cursor.cursor + 1 if cursor.cursor else None
for header, payload in client.com.atproto.sync.subscribeRepos(cursor=sub_cursor):
# parse header
if header['op'] == -1:
logger.warning(f'Got error from relay! {payload}')

Wyświetl plik

@ -140,7 +140,7 @@ class ATProtoFirehoseSubscribeTest(TestCase):
subscribe(reconnect=False)
self.assertTrue(new_commits.empty())
self.assertEqual(
'https://bgs.local/xrpc/com.atproto.sync.subscribeRepos?cursor=444',
'https://bgs.local/xrpc/com.atproto.sync.subscribeRepos?cursor=445',
FakeWebsocketClient.url)
def test_non_commit(self):