Ivan Habunek 2023-06-29 12:17:51 +02:00
rodzic 167c74df84
commit 1f0cc20080
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: CDBD63C43A30BB95
9 zmienionych plików z 280 dodań i 183 usunięć

Wyświetl plik

@ -1,154 +0,0 @@
import re
from typing import Optional
from urllib.parse import urlparse
from uuid import uuid4
from toot import Context
from toot.ahttp import Response, request
from toot.exceptions import ConsoleError
from toot.utils import drop_empty_values, str_bool
async def find_account(ctx: Context, account_name: str):
if not account_name:
raise ConsoleError("Empty account name given")
normalized_name = account_name.lstrip("@").lower()
# Strip @<instance_name> from accounts on the local instance. The `acct`
# field in account object contains the qualified name for users of other
# instances, but only the username for users of the local instance. This is
# required in order to match the account name below.
if "@" in normalized_name:
[username, instance] = normalized_name.split("@", maxsplit=1)
if instance == ctx.app.instance:
normalized_name = username
response = await search(ctx, account_name, type="accounts", resolve=True)
accounts = response.json["accounts"]
for account in accounts:
if account["acct"].lower() == normalized_name:
return account
raise ConsoleError("Account not found")
# ------------------------------------------------------------------------------
# Accounts
# https://docs.joinmastodon.org/methods/accounts/
# ------------------------------------------------------------------------------
async def verify_credentials(ctx: Context) -> Response:
"""
Test to make sure that the user token works.
https://docs.joinmastodon.org/methods/accounts/#verify_credentials
"""
return await request(ctx, "GET", "/api/v1/accounts/verify_credentials")
# ------------------------------------------------------------------------------
# Search
# https://docs.joinmastodon.org/methods/search/
# ------------------------------------------------------------------------------
async def search(ctx: Context, query: str, resolve: bool = False, type: Optional[str] = None):
"""
Perform a search.
https://docs.joinmastodon.org/methods/search/#v2
"""
return await request(ctx, "GET", "/api/v2/search", params={
"q": query,
"resolve": str_bool(resolve),
"type": type
})
# ------------------------------------------------------------------------------
# Statuses
# https://docs.joinmastodon.org/methods/statuses/
# ------------------------------------------------------------------------------
async def post_status(
ctx: Context,
status,
visibility='public',
media_ids=None,
sensitive=False,
spoiler_text=None,
in_reply_to_id=None,
language=None,
scheduled_at=None,
content_type=None,
poll_options=None,
poll_expires_in=None,
poll_multiple=None,
poll_hide_totals=None,
):
"""
Publish a new status.
https://docs.joinmastodon.org/methods/statuses/#create
"""
# Idempotency key assures the same status is not posted multiple times
# if the request is retried.
headers = {"Idempotency-Key": uuid4().hex}
# Strip keys for which value is None
# Sending null values doesn't bother Mastodon, but it breaks Pleroma
data = drop_empty_values({
"status": status,
"media_ids": media_ids,
"visibility": visibility,
"sensitive": sensitive,
"in_reply_to_id": in_reply_to_id,
"language": language,
"scheduled_at": scheduled_at,
"content_type": content_type,
"spoiler_text": spoiler_text,
})
if poll_options:
data["poll"] = {
"options": poll_options,
"expires_in": poll_expires_in,
"multiple": poll_multiple,
"hide_totals": poll_hide_totals,
}
return await request(ctx, "POST", "/api/v1/statuses", json=data, headers=headers)
async def get_status(ctx: Context, status_id) -> Response:
url = f"/api/v1/statuses/{status_id}"
return await request(ctx, "GET", url)
async def get_status_context(ctx: Context, status_id) -> Response:
url = f"/api/v1/statuses/{status_id}/context"
return await request(ctx, "GET", url)
# Timelines
async def home_timeline_generator(ctx: Context, limit=20):
path = "/api/v1/timelines/home"
params = {"limit": limit}
return _timeline_generator(ctx, path, params)
async def _timeline_generator(ctx: Context, path: str, params=None):
while path:
response = await request(ctx, "GET", path, params=params)
yield response.json
path = _get_next_path(response.headers)
def _get_next_path(headers: dict):
"""Given timeline response headers, returns the path to the next batch"""
links = headers.get('Link', '')
matches = re.match('<([^>]+)>; rel="next"', links)
if matches:
parsed = urlparse(matches.group(1))
return "?".join([parsed.path, parsed.query])

Wyświetl plik

@ -14,7 +14,7 @@ logger = logging.getLogger(__name__)
Params = Dict[str, str]
Headers = Dict[str, str]
Json = Dict[str, Any]
Json = Any
@dataclass(frozen=True)

Wyświetl plik

Wyświetl plik

@ -0,0 +1,15 @@
"""
Accounts
https://docs.joinmastodon.org/methods/accounts/
"""
from toot import Context
from toot.ahttp import Response, request
async def verify_credentials(ctx: Context) -> Response:
"""
Test to make sure that the user token works.
https://docs.joinmastodon.org/methods/accounts/#verify_credentials
"""
return await request(ctx, "GET", "/api/v1/accounts/verify_credentials")

Wyświetl plik

@ -0,0 +1,48 @@
"""
Search
https://docs.joinmastodon.org/methods/search/
"""
from typing import Optional
from toot import Context
from toot.ahttp import request
from toot.exceptions import ConsoleError
from toot.utils import str_bool
async def search(ctx: Context, query: str, resolve: bool = False, type: Optional[str] = None):
"""
Perform a search.
https://docs.joinmastodon.org/methods/search/#v2
"""
return await request(ctx, "GET", "/api/v2/search", params={
"q": query,
"resolve": str_bool(resolve),
"type": type
})
async def find_account(ctx: Context, account_name: str):
if not account_name:
raise ConsoleError("Empty account name given")
normalized_name = account_name.lstrip("@").lower()
# Strip @<instance_name> from accounts on the local instance. The `acct`
# field in account object contains the qualified name for users of other
# instances, but only the username for users of the local instance. This is
# required in order to match the account name below.
if "@" in normalized_name:
[username, instance] = normalized_name.split("@", maxsplit=1)
if instance == ctx.app.instance:
normalized_name = username
response = await search(ctx, account_name, type="accounts", resolve=True)
accounts = response.json["accounts"]
for account in accounts:
if account["acct"].lower() == normalized_name:
return account
raise ConsoleError("Account not found")

Wyświetl plik

@ -0,0 +1,70 @@
"""
Statuses
https://docs.joinmastodon.org/methods/statuses/
"""
from uuid import uuid4
from toot import Context
from toot.ahttp import Response, request
from toot.utils import drop_empty_values
async def post(
ctx: Context,
status,
visibility='public',
media_ids=None,
sensitive=False,
spoiler_text=None,
in_reply_to_id=None,
language=None,
scheduled_at=None,
content_type=None,
poll_options=None,
poll_expires_in=None,
poll_multiple=None,
poll_hide_totals=None,
):
"""
Publish a new status.
https://docs.joinmastodon.org/methods/statuses/#create
"""
# Idempotency key assures the same status is not posted multiple times
# if the request is retried.
headers = {"Idempotency-Key": uuid4().hex}
# Strip keys for which value is None
# Sending null values doesn't bother Mastodon, but it breaks Pleroma
data = drop_empty_values({
"status": status,
"media_ids": media_ids,
"visibility": visibility,
"sensitive": sensitive,
"in_reply_to_id": in_reply_to_id,
"language": language,
"scheduled_at": scheduled_at,
"content_type": content_type,
"spoiler_text": spoiler_text,
})
if poll_options:
data["poll"] = {
"options": poll_options,
"expires_in": poll_expires_in,
"multiple": poll_multiple,
"hide_totals": poll_hide_totals,
}
return await request(ctx, "POST", "/api/v1/statuses", json=data, headers=headers)
async def get_status(ctx: Context, status_id) -> Response:
url = f"/api/v1/statuses/{status_id}"
return await request(ctx, "GET", url)
async def get_context(ctx: Context, status_id) -> Response:
url = f"/api/v1/statuses/{status_id}/context"
return await request(ctx, "GET", url)

Wyświetl plik

@ -0,0 +1,116 @@
"""
Timelines API
https://docs.joinmastodon.org/methods/timelines/
"""
import re
from typing import Mapping, Optional
from urllib.parse import quote, urlparse
from aiohttp import ClientSession
from toot import Context
from toot.ahttp import request
from toot.utils import str_bool
from toot.async_api.search import find_account
async def anon_public_timeline_generator(ctx, instance, local=False, limit=20):
path = '/api/v1/timelines/public'
params = {'local': str_bool(local), 'limit': limit}
return _anon_timeline_generator(ctx, instance, path, params)
async def anon_tag_timeline_generator(ctx, instance, hashtag, local=False, limit=20):
path = f"/api/v1/timelines/tag/{quote(hashtag)}"
params = {'local': str_bool(local), 'limit': limit}
return _anon_timeline_generator(ctx, instance, path, params)
async def home_timeline_generator(ctx: Context, limit=20):
path = "/api/v1/timelines/home"
params = {"limit": limit}
return _timeline_generator(ctx, path, params)
async def public_timeline_generator(ctx: Context, local=False, limit=20):
path = '/api/v1/timelines/public'
params = {'local': str_bool(local), 'limit': limit}
return _timeline_generator(ctx, path, params)
async def tag_timeline_generator(ctx: Context, hashtag, local=False, limit=20):
path = f"/api/v1/timelines/tag/{quote(hashtag)}"
params = {'local': str_bool(local), 'limit': limit}
return _timeline_generator(ctx, path, params)
async def bookmark_timeline_generator(ctx: Context, limit=20):
path = '/api/v1/bookmarks'
params = {'limit': limit}
return _timeline_generator(ctx, path, params)
async def notification_timeline_generator(ctx: Context, limit=20):
# exclude all but mentions and statuses
exclude_types = ["follow", "favourite", "reblog", "poll", "follow_request"]
params = {"exclude_types[]": exclude_types, "limit": limit}
return _notification_timeline_generator(ctx, "/api/v1/notifications", params)
async def conversation_timeline_generator(ctx: Context, limit=20):
path = "/api/v1/conversations"
params = {"limit": limit}
return _conversation_timeline_generator(ctx, path, params)
async def account_timeline_generator(ctx: Context, account_name: str, replies=False, reblogs=False, limit=20):
account = await find_account(ctx, account_name)
path = f"/api/v1/accounts/{account['id']}/statuses"
params = {"limit": limit, "exclude_replies": not replies, "exclude_reblogs": not reblogs}
return _timeline_generator(ctx, path, params)
async def list_timeline_generator(ctx: Context, list_id: str, limit: int = 20):
path = f"/api/v1/timelines/list/{list_id}"
return _timeline_generator(ctx, path, {"limit": limit})
async def _anon_timeline_generator(ctx: Context, instance: str, path: Optional[str], params=None):
# TODO: reuse anon session? remove base url from ctx.session?
async with ClientSession() as session:
ctx = Context(ctx.app, ctx.user, session)
while path:
response = await request(ctx, "GET", f"https://{instance}{path}", params=params)
yield response.json
path = _get_next_path(response.headers)
async def _timeline_generator(ctx: Context, path: Optional[str], params=None):
while path:
response = await request(ctx, "GET", path, params=params)
yield response.json
path = _get_next_path(response.headers)
async def _notification_timeline_generator(ctx: Context, path: Optional[str], params=None):
while path:
response = await request(ctx, "GET", path, params=params)
yield [n["status"] for n in response.json if n["status"]]
path = _get_next_path(response.headers)
async def _conversation_timeline_generator(ctx, path, params=None):
while path:
response = await request(ctx, "GET", path, params=params)
yield [c["last_status"] for c in response.json if c["last_status"]]
path = _get_next_path(response.headers)
def _get_next_path(headers: Mapping[str, str]) -> Optional[str]:
"""Given timeline response headers, returns the path to the next batch"""
links = headers.get('Link', '')
matches = re.match('<([^>]+)>; rel="next"', links)
if matches:
parsed = urlparse(matches.group(1))
return "?".join([parsed.path, parsed.query])

Wyświetl plik

@ -5,7 +5,8 @@ import platform
from datetime import datetime, timedelta, timezone
from time import sleep, time
from toot import api, aapi, config, __version__, Context
from toot import api, config, __version__, Context
from toot.async_api import timelines, statuses, accounts, search as search_api
from toot.auth import login_interactive, login_browser_interactive, create_app_interactive
from toot.entities import Instance, Notification, Status, from_dict
from toot.exceptions import ApiError, ConsoleError
@ -16,7 +17,7 @@ from toot.tui.utils import parse_datetime
from toot.utils import args_get_instance, delete_tmp_status_file, editor_input, multiline_input, EOF_KEY
async def get_timeline_generator(ctx, args):
async def get_timeline_generator(ctx: Context, args):
if len([arg for arg in [args.tag, args.list, args.public, args.account] if arg]) > 1:
raise ConsoleError("Only one of --public, --tag, --account, or --list can be used at one time.")
@ -26,23 +27,22 @@ async def get_timeline_generator(ctx, args):
if args.instance and not (args.public or args.tag):
raise ConsoleError("The --instance option is only valid alongside --public or --tag.")
return await aapi.home_timeline_generator(ctx, limit=args.count)
# if args.public:
# if args.instance:
# return api.anon_public_timeline_generator(args.instance, local=args.local, limit=args.count)
# else:
# return api.public_timeline_generator(app, user, local=args.local, limit=args.count)
# elif args.tag:
# if args.instance:
# return api.anon_tag_timeline_generator(args.instance, args.tag, limit=args.count)
# else:
# return api.tag_timeline_generator(app, user, args.tag, local=args.local, limit=args.count)
# elif args.account:
# return api.account_timeline_generator(app, user, args.account, limit=args.count)
# elif args.list:
# return api.timeline_list_generator(app, user, args.list, limit=args.count)
# else:
# return api.home_timeline_generator(app, user, limit=args.count)
if args.public:
if args.instance:
return await timelines.anon_public_timeline_generator(ctx, args.instance, local=args.local, limit=args.count)
else:
return await timelines.public_timeline_generator(ctx, local=args.local, limit=args.count)
elif args.tag:
if args.instance:
return await timelines.anon_tag_timeline_generator(ctx, args.instance, args.tag, limit=args.count)
else:
return await timelines.tag_timeline_generator(ctx, args.tag, local=args.local, limit=args.count)
elif args.account:
return await timelines.account_timeline_generator(ctx, args.account, limit=args.count)
elif args.list:
return await timelines.list_timeline_generator(ctx, args.list, limit=args.count)
else:
return await timelines.home_timeline_generator(ctx, limit=args.count)
async def timeline(ctx: Context, args, generator=None):
@ -66,12 +66,12 @@ async def timeline(ctx: Context, args, generator=None):
async def thread(ctx: Context, args):
if args.json:
context_response = await aapi.get_status_context(ctx, args.status_id)
context_response = await statuses.get_context(ctx, args.status_id)
print_out(context_response.body)
else:
status_response, context_response = await asyncio.gather(
aapi.get_status(ctx, args.status_id),
aapi.get_status_context(ctx, args.status_id),
statuses.get_status(ctx, args.status_id),
statuses.get_context(ctx, args.status_id),
)
status = status_response.json
context = context_response.json
@ -84,8 +84,7 @@ async def thread(ctx: Context, args):
for item in context["descendants"]:
thread.append(item)
statuses = [from_dict(Status, s) for s in thread]
print_timeline(statuses)
print_timeline([from_dict(Status, s) for s in thread])
async def post(ctx, args):
@ -105,7 +104,7 @@ async def post(ctx, args):
if not status_text and not media_ids:
raise ConsoleError("You must specify either text or media to post.")
response = await aapi.post_status(
response = await statuses.post(
ctx,
status_text,
visibility=args.visibility,
@ -510,16 +509,15 @@ def unblock(app, user, args):
async def whoami(ctx: Context, args):
response = await aapi.verify_credentials(ctx)
response = await accounts.verify_credentials(ctx)
if args.json:
print_out(response.body)
else:
print(response.json)
print_account(response.json)
async def whois(ctx: Context, args):
account = await aapi.find_account(ctx, args.account)
account = await search_api.find_account(ctx, args.account)
print_account(account)

Wyświetl plik

@ -936,6 +936,8 @@ def make_session(app: App, user: User, debug: bool) -> ClientSession:
def main():
from datetime import datetime
start = datetime.now()
# Enable debug logging if --debug is in args
if "--debug" in sys.argv:
filename = os.getenv("TOOT_LOG_FILE")
@ -963,3 +965,5 @@ def main():
sys.exit(1)
except KeyboardInterrupt:
pass
finally:
print(datetime.now() - start)