web feed fetching: add next poll task

use average of time between posts in feed as ETA

for #550
pull/777/head
Ryan Barrett 2024-01-01 14:47:03 -08:00
rodzic 9347651349
commit beb865bc07
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 6BE31FDF4776E9D4
4 zmienionych plików z 64 dodań i 22 usunięć

Wyświetl plik

@ -11,6 +11,7 @@ from urllib.parse import urljoin
import cachetools
from Crypto.Util import number
from flask import abort, g, make_response, request
from google.protobuf.timestamp_pb2 import Timestamp
from oauth_dropins.webutil import util, webmention
from oauth_dropins.webutil.appengine_config import tasks_client
from oauth_dropins.webutil import appengine_info
@ -256,7 +257,7 @@ def add(seq, val):
seq.append(val)
def create_task(queue, **params):
def create_task(queue, delay=None, **params):
"""Adds a Cloud Tasks task.
If running in a local server, runs the task handler inline instead of
@ -264,6 +265,7 @@ def create_task(queue, **params):
Args:
queue (str): queue name
delay (:class:`datetime.timedelta`): optional, used as task ETA (from now)
params: form-encoded and included in the task request body
Returns:
@ -286,17 +288,20 @@ def create_task(queue, **params):
# .match(path, method='POST')
# return app.view_functions[endpoint](**args)
task = tasks_client.create_task(
parent=tasks_client.queue_path(appengine_info.APP_ID,
TASKS_LOCATION, queue),
task={
'app_engine_http_request': {
'http_method': 'POST',
'relative_uri': path,
'body': urllib.parse.urlencode(sorted(params.items())).encode(),
'headers': {'Content-Type': 'application/x-www-form-urlencoded'},
},
})
task = {
'app_engine_http_request': {
'http_method': 'POST',
'relative_uri': path,
'body': urllib.parse.urlencode(sorted(params.items())).encode(),
'headers': {'Content-Type': 'application/x-www-form-urlencoded'},
},
}
if delay:
eta_seconds = int(util.to_utc_timestamp(util.now()) + delay.total_seconds())
task['schedule_time'] = Timestamp(seconds=eta_seconds)
parent = tasks_client.queue_path(appengine_info.APP_ID, TASKS_LOCATION, queue)
task = tasks_client.create_task(parent=parent, task=task)
msg = f'Added {queue} task {task.name} : {params}'
logger.info(msg)
return msg, 202

Wyświetl plik

@ -9,7 +9,7 @@ from google.cloud import ndb
from granary import as1, as2, atom, microformats2, rss
from oauth_dropins.webutil import util
from oauth_dropins.webutil import appengine_info
from oauth_dropins.webutil.testutil import NOW, requests_response
from oauth_dropins.webutil.testutil import NOW, NOW_SECONDS, requests_response
from oauth_dropins.webutil.util import json_dumps, json_loads
import requests
from werkzeug.exceptions import BadGateway, BadRequest
@ -1876,6 +1876,10 @@ class WebTest(TestCase):
obj=obj.key.urlsafe(),
authed_as='user.com')
expected_eta = NOW_SECONDS + web.MIN_FEED_POLL_PERIOD.total_seconds()
self.assert_task(mock_create_task, 'poll-feed', '/queue/poll-feed',
domain='user.com', eta_seconds=expected_eta)
@patch('oauth_dropins.webutil.appengine_config.tasks_client.create_task')
def test_poll_feed_rss(self, mock_create_task, mock_get, _):
common.RUN_TASKS_INLINE = False
@ -1948,6 +1952,11 @@ class WebTest(TestCase):
obj=obj.key.urlsafe(),
authed_as='user.com')
# delay is average of 1d and 3d between posts
expected_eta = NOW_SECONDS + timedelta(days=2).total_seconds()
self.assert_task(mock_create_task, 'poll-feed', '/queue/poll-feed',
domain='user.com', eta_seconds=expected_eta)
def test_superfeedr_notify_no_user(self, *_):
orig_count = Object.query().count()

Wyświetl plik

@ -17,6 +17,7 @@ from bs4 import MarkupResemblesLocatorWarning
import dag_cbor.random
from flask import g
from google.cloud import ndb
from google.protobuf.timestamp_pb2 import Timestamp
from granary import as2
from granary.tests.test_as1 import (
ACTOR,
@ -475,17 +476,21 @@ class TestCase(unittest.TestCase, testutil.Asserts):
return got
def assert_task(self, mock_create_task, queue, path, **params):
def assert_task(self, mock_create_task, queue, path, eta_seconds=None, **params):
expected = {
'app_engine_http_request': {
'http_method': 'POST',
'relative_uri': path,
'body': urlencode(sorted(params.items())).encode(),
'headers': {'Content-Type': 'application/x-www-form-urlencoded'},
},
}
if eta_seconds:
expected['schedule_time'] = Timestamp(seconds=int(eta_seconds))
mock_create_task.assert_any_call(
parent=f'projects/{appengine_info.APP_ID}/locations/{TASKS_LOCATION}/queues/{queue}',
task={
'app_engine_http_request': {
'http_method': 'POST',
'relative_uri': path,
'body': urlencode(sorted(params.items())).encode(),
'headers': {'Content-Type': 'application/x-www-form-urlencoded'},
},
},
task=expected,
)

23
web.py
Wyświetl plik

@ -3,6 +3,7 @@ from datetime import timedelta, timezone
import difflib
import logging
import re
import statistics
import urllib.parse
from urllib.parse import quote, urlencode, urljoin, urlparse
@ -63,6 +64,9 @@ FEED_TYPES = {
rss.CONTENT_TYPE.split(';')[0]: 'rss',
}
MIN_FEED_POLL_PERIOD = timedelta(hours=2)
MAX_FEED_POLL_PERIOD = timedelta(weeks=1)
def is_valid_domain(domain):
"""Returns True if this is a valid domain we can use, False otherwise.
@ -723,9 +727,17 @@ def poll_feed_task():
return msg
# create Objects and receive tasks
published_last = None
published_deltas = [] # timedeltas between entry published times
for i, activity in enumerate(activities):
logger.info(f'Converted to AS1: {json_dumps(activity, indent=2)}')
published = activity.get('object', {}).get('published')
if published and published_last:
published_deltas.append(
util.parse_iso8601(published) - util.parse_iso8601(published_last))
published_last = published
id = Object(our_as1=activity).as1.get('id')
if not id:
logger.warning('No id or URL!')
@ -738,8 +750,19 @@ def poll_feed_task():
common.create_task(queue='receive', obj=obj.key.urlsafe(),
authed_as=user.key.id())
# create next poll task
if published_deltas:
seconds = statistics.mean(t.total_seconds() for t in published_deltas)
delay = max(min(timedelta(seconds=seconds), MAX_FEED_POLL_PERIOD),
MIN_FEED_POLL_PERIOD)
else:
# TODO
delay = MIN_FEED_POLL_PERIOD
common.create_task(queue='poll-feed', domain=user.key.id(), delay=delay)
return 'OK'
# generate/check per-user token for auth?
# or https://documentation.superfeedr.com/subscribers.html#http-authentication ?
@app.post(f'/superfeedr/notify/<regex("{DOMAIN_RE}"):domain>')