Merge pull request #374 from ihabunek/entities2

Implement data classes for API entities
aiohttp
Ivan Habunek 2023-06-27 07:54:30 +02:00 zatwierdzone przez GitHub
commit 835f789145
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
7 zmienionych plików z 628 dodań i 59 usunięć

Wyświetl plik

@ -25,7 +25,7 @@ jobs:
pytest
- name: Validate minimum required version
run: |
vermin --target=3.6 --no-tips .
vermin --target=3.7 --no-tips .
- name: Check style
run: |
flake8

Wyświetl plik

@ -10,7 +10,7 @@ publish :
test:
pytest -v
flake8
vermin --target=3.6 --no-tips --violations --exclude-regex venv/.* .
vermin --target=3.7 --no-tips --violations --exclude-regex venv/.* .
coverage:
coverage erase

Wyświetl plik

@ -161,6 +161,7 @@ def test_timeline_with_re(mock_get, monkeypatch, capsys):
'acct': 'fz'
},
'reblog': {
'created_at': '2017-04-12T15:53:18.174Z',
'account': {
'display_name': 'Johnny Cash',
'acct': 'jc'
@ -179,8 +180,8 @@ def test_timeline_with_re(mock_get, monkeypatch, capsys):
out, err = capsys.readouterr()
lines = uncolorize(out).split("\n")
assert "Frank Zappa" in lines[1]
assert "@fz" in lines[1]
assert "Johnny Cash" in lines[1]
assert "@jc" in lines[1]
assert "2017-04-12 15:53 UTC" in lines[1]
assert (
@ -188,7 +189,7 @@ def test_timeline_with_re(mock_get, monkeypatch, capsys):
"exact mathematical design, but\nwhat's missing is the eyebrows." in out)
assert "111111111111111111" in lines[-3]
assert "Reblogged @jc" in lines[-3]
assert "@fz boosted" in lines[-3]
assert err == ""

Wyświetl plik

@ -6,6 +6,7 @@ from datetime import datetime, timedelta, timezone
from time import sleep, time
from toot import api, config, __version__
from toot.auth import login_interactive, login_browser_interactive, create_app_interactive
from toot.entities import Instance, Notification, Status, from_dict
from toot.exceptions import ApiError, ConsoleError
from toot.output import (print_lists, print_out, print_instance, print_account, print_acct_list,
print_search_results, print_timeline, print_notifications, print_tag_list,
@ -56,7 +57,8 @@ def timeline(app, user, args, generator=None):
if args.reverse:
items = reversed(items)
print_timeline(items)
statuses = [from_dict(Status, item) for item in items]
print_timeline(statuses)
if args.once or not sys.stdout.isatty():
break
@ -78,7 +80,8 @@ def thread(app, user, args):
for item in context['descendants']:
thread.append(item)
print_timeline(thread)
statuses = [from_dict(Status, s) for s in thread]
print_timeline(statuses)
def post(app, user, args):
@ -515,6 +518,7 @@ def instance(app, user, args):
try:
instance = api.get_instance(base_url)
instance = from_dict(Instance, instance)
print_instance(instance)
except ApiError:
raise ConsoleError(
@ -542,6 +546,7 @@ def notifications(app, user, args):
if args.reverse:
notifications = reversed(notifications)
notifications = [from_dict(Notification, n) for n in notifications]
print_notifications(notifications)

416
toot/entities.py 100644
Wyświetl plik

@ -0,0 +1,416 @@
"""
Dataclasses which represent entities returned by the Mastodon API.
"""
import dataclasses
from dataclasses import dataclass, is_dataclass
from datetime import date, datetime
from typing import Dict, List, Optional, Type, TypeVar, Union
from typing import get_type_hints
from toot.typing_compat import get_args, get_origin
from toot.utils import get_text
@dataclass
class AccountField:
"""
https://docs.joinmastodon.org/entities/Account/#Field
"""
name: str
value: str
verified_at: Optional[datetime]
@dataclass
class CustomEmoji:
"""
https://docs.joinmastodon.org/entities/CustomEmoji/
"""
shortcode: str
url: str
static_url: str
visible_in_picker: bool
category: str
@dataclass
class Account:
"""
https://docs.joinmastodon.org/entities/Account/
"""
id: str
username: str
acct: str
url: str
display_name: str
note: str
avatar: str
avatar_static: str
header: str
header_static: str
locked: bool
fields: List[AccountField]
emojis: List[CustomEmoji]
bot: bool
group: bool
discoverable: Optional[bool]
noindex: Optional[bool]
moved: Optional["Account"]
suspended: Optional[bool]
limited: Optional[bool]
created_at: datetime
last_status_at: Optional[date]
statuses_count: int
followers_count: int
following_count: int
@property
def note_plaintext(self) -> str:
return get_text(self.note)
@dataclass
class Application:
"""
https://docs.joinmastodon.org/entities/Status/#application
"""
name: str
website: Optional[str]
@dataclass
class MediaAttachment:
"""
https://docs.joinmastodon.org/entities/MediaAttachment/
"""
id: str
type: str
url: str
preview_url: str
remote_url: Optional[str]
meta: dict
description: str
blurhash: str
@dataclass
class StatusMention:
"""
https://docs.joinmastodon.org/entities/Status/#Mention
"""
id: str
username: str
url: str
acct: str
@dataclass
class StatusTag:
"""
https://docs.joinmastodon.org/entities/Status/#Tag
"""
name: str
url: str
@dataclass
class PollOption:
"""
https://docs.joinmastodon.org/entities/Poll/#Option
"""
title: str
votes_count: Optional[int]
@dataclass
class Poll:
"""
https://docs.joinmastodon.org/entities/Poll/
"""
id: str
expires_at: Optional[datetime]
expired: bool
multiple: bool
votes_count: int
voters_count: Optional[int]
options: List[PollOption]
emojis: List[CustomEmoji]
voted: Optional[bool]
own_votes: Optional[List[int]]
@dataclass
class PreviewCard:
"""
https://docs.joinmastodon.org/entities/PreviewCard/
"""
url: str
title: str
description: str
type: str
author_name: str
author_url: str
provider_name: str
provider_url: str
html: str
width: int
height: int
image: Optional[str]
embed_url: str
blurhash: Optional[str]
@dataclass
class FilterKeyword:
"""
https://docs.joinmastodon.org/entities/FilterKeyword/
"""
id: str
keyword: str
whole_word: str
@dataclass
class FilterStatus:
"""
https://docs.joinmastodon.org/entities/FilterStatus/
"""
id: str
status_id: str
@dataclass
class Filter:
"""
https://docs.joinmastodon.org/entities/Filter/
"""
id: str
title: str
context: List[str]
expires_at: Optional[datetime]
filter_action: str
keywords: List[FilterKeyword]
statuses: List[FilterStatus]
@dataclass
class FilterResult:
"""
https://docs.joinmastodon.org/entities/FilterResult/
"""
filter: Filter
keyword_matches: Optional[List[str]]
status_matches: Optional[str]
@dataclass
class Status:
"""
https://docs.joinmastodon.org/entities/Status/
"""
id: str
uri: str
created_at: datetime
account: Account
content: str
visibility: str
sensitive: bool
spoiler_text: str
media_attachments: List[MediaAttachment]
application: Optional[Application]
mentions: List[StatusMention]
tags: List[StatusTag]
emojis: List[CustomEmoji]
reblogs_count: int
favourites_count: int
replies_count: int
url: Optional[str]
in_reply_to_id: Optional[str]
in_reply_to_account_id: Optional[str]
reblog: Optional["Status"]
poll: Optional[Poll]
card: Optional[PreviewCard]
language: Optional[str]
text: Optional[str]
edited_at: Optional[datetime]
favourited: Optional[bool]
reblogged: Optional[bool]
muted: Optional[bool]
bookmarked: Optional[bool]
pinned: Optional[bool]
filtered: Optional[List[FilterResult]]
@property
def original(self) -> "Status":
return self.reblog or self
@dataclass
class Report:
"""
https://docs.joinmastodon.org/entities/Report/
"""
id: str
action_taken: bool
action_taken_at: Optional[datetime]
category: str
comment: str
forwarded: bool
created_at: datetime
status_ids: Optional[List[str]]
rule_ids: Optional[List[str]]
target_account: Account
@dataclass
class Notification:
"""
https://docs.joinmastodon.org/entities/Notification/
"""
id: str
type: str
created_at: datetime
account: Account
status: Optional[Status]
report: Optional[Report]
@dataclass
class InstanceUrls:
streaming_api: str
@dataclass
class InstanceStats:
user_count: int
status_count: int
domain_count: int
@dataclass
class InstanceConfigurationStatuses:
max_characters: int
max_media_attachments: int
characters_reserved_per_url: int
@dataclass
class InstanceConfigurationMediaAttachments:
supported_mime_types: List[str]
image_size_limit: int
image_matrix_limit: int
video_size_limit: int
video_frame_rate_limit: int
video_matrix_limit: int
@dataclass
class InstanceConfigurationPolls:
max_options: int
max_characters_per_option: int
min_expiration: int
max_expiration: int
@dataclass
class InstanceConfiguration:
"""
https://docs.joinmastodon.org/entities/V1_Instance/#configuration
"""
statuses: InstanceConfigurationStatuses
media_attachments: InstanceConfigurationMediaAttachments
polls: InstanceConfigurationPolls
@dataclass
class Rule:
"""
https://docs.joinmastodon.org/entities/Rule/
"""
id: str
text: str
@dataclass
class Instance:
"""
https://docs.joinmastodon.org/entities/V1_Instance/
"""
uri: str
title: str
short_description: str
description: str
email: str
version: str
urls: InstanceUrls
stats: InstanceStats
thumbnail: Optional[str]
languages: List[str]
registrations: bool
approval_required: bool
invites_enabled: bool
configuration: InstanceConfiguration
contact_account: Optional[Account]
rules: List[Rule]
# Generic data class instance
T = TypeVar("T")
def from_dict(cls: Type[T], data: Dict) -> T:
"""Convert a nested dict into an instance of `cls`."""
def _fields():
hints = get_type_hints(cls)
for field in dataclasses.fields(cls):
field_type = _prune_optional(hints[field.name])
default_value = _get_default_value(field)
value = data.get(field.name, default_value)
yield field.name, _convert(field_type, value)
return cls(**dict(_fields()))
def _get_default_value(field):
if field.default is not dataclasses.MISSING:
return field.default
if field.default_factory is not dataclasses.MISSING:
return field.default_factory()
return None
def _convert(field_type, value):
if value is None:
return None
if field_type in [str, int, bool, dict]:
return value
if field_type == datetime:
return datetime.strptime(value, "%Y-%m-%dT%H:%M:%S.%f%z")
if field_type == date:
return date.fromisoformat(value)
if get_origin(field_type) == list:
(inner_type,) = get_args(field_type)
return [_convert(inner_type, x) for x in value]
if is_dataclass(field_type):
return from_dict(field_type, value)
raise ValueError(f"Not implemented for type '{field_type}'")
def _prune_optional(field_type):
"""For `Optional[<type>]` returns the encapsulated `<type>`."""
if get_origin(field_type) == Union:
args = get_args(field_type)
if len(args) == 2 and args[1] == type(None): # noqa
return args[0]
return field_type

Wyświetl plik

@ -3,12 +3,11 @@ import re
import sys
import textwrap
from typing import List
from wcwidth import wcswidth
from toot.tui.utils import parse_datetime
from toot.entities import Instance, Notification, Poll, Status
from toot.utils import get_text, parse_html
from toot.wcstring import wc_wrap
from typing import List
from wcwidth import wcswidth
STYLES = {
@ -136,25 +135,23 @@ def print_err(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
def print_instance(instance):
print_out(f"<green>{instance['title']}</green>")
print_out(f"<blue>{instance['uri']}</blue>")
print_out(f"running Mastodon {instance['version']}")
def print_instance(instance: Instance):
print_out(f"<green>{instance.title}</green>")
print_out(f"<blue>{instance.uri}</blue>")
print_out(f"running Mastodon {instance.version}")
print_out()
description = instance.get("description")
if description:
for paragraph in re.split(r"[\r\n]+", description.strip()):
if instance.description:
for paragraph in re.split(r"[\r\n]+", instance.description.strip()):
paragraph = get_text(paragraph)
print_out(textwrap.fill(paragraph, width=80))
print_out()
rules = instance.get("rules")
if rules:
if instance.rules:
print_out("Rules:")
for ordinal, rule in enumerate(rules):
for ordinal, rule in enumerate(instance.rules):
ordinal = f"{ordinal + 1}."
lines = textwrap.wrap(rule["text"], 80 - len(ordinal))
lines = textwrap.wrap(rule.text, 80 - len(ordinal))
first = True
for line in lines:
if first:
@ -162,6 +159,11 @@ def print_instance(instance):
first = False
else:
print_out(f"{' ' * len(ordinal)} {line}")
print_out()
contact = instance.contact_account
if contact:
print_out(f"Contact: {contact.display_name} @{contact.acct}")
def print_account(account):
@ -269,20 +271,18 @@ def print_search_results(results):
print_out("<yellow>Nothing found</yellow>")
def print_status(status, width):
reblog = status['reblog']
content = reblog['content'] if reblog else status['content']
media_attachments = reblog['media_attachments'] if reblog else status['media_attachments']
in_reply_to = status['in_reply_to_id']
poll = reblog.get('poll') if reblog else status.get('poll')
def print_status(status: Status, width: int):
status_id = status.id
in_reply_to_id = status.in_reply_to_id
reblogged_by = status.account if status.reblog else None
time = parse_datetime(status['created_at'])
time = time.strftime('%Y-%m-%d %H:%M %Z')
status = status.original
username = "@" + status['account']['acct']
time = status.created_at.strftime('%Y-%m-%d %H:%M %Z')
username = "@" + status.account.acct
spacing = width - wcswidth(username) - wcswidth(time) - 2
display_name = status['account']['display_name']
display_name = status.account.display_name
if display_name:
spacing -= wcswidth(display_name) + 1
@ -294,23 +294,24 @@ def print_status(status, width):
)
print_out("")
print_html(content, width)
print_html(status.content, width)
if media_attachments:
if status.media_attachments:
print_out("\nMedia:")
for attachment in media_attachments:
url = attachment["url"]
for attachment in status.media_attachments:
url = attachment.url
for line in wc_wrap(url, width):
print_out(line)
if poll:
print_poll(poll)
if status.poll:
print_poll(status.poll)
print_out()
print_out(
f"ID <yellow>{status['id']}</yellow> ",
f"↲ In reply to <yellow>{in_reply_to}</yellow> " if in_reply_to else "",
f"Reblogged <blue>@{reblog['account']['acct']}</blue> " if reblog else "",
f"ID <yellow>{status_id}</yellow> ",
f"↲ In reply to <yellow>{in_reply_to_id}</yellow> " if in_reply_to_id else "",
f"<blue>@{reblogged_by.acct}</blue> boosted " if reblogged_by else "",
)
@ -325,33 +326,33 @@ def print_html(text, width=80):
first = False
def print_poll(poll):
def print_poll(poll: Poll):
print_out()
for idx, option in enumerate(poll["options"]):
perc = (round(100 * option["votes_count"] / poll["votes_count"])
if poll["votes_count"] else 0)
for idx, option in enumerate(poll.options):
perc = (round(100 * option.votes_count / poll.votes_count)
if poll.votes_count and option.votes_count is not None else 0)
if poll["voted"] and poll["own_votes"] and idx in poll["own_votes"]:
if poll.voted and poll.own_votes and idx in poll.own_votes:
voted_for = " <yellow>✓</yellow>"
else:
voted_for = ""
print_out(f'{option["title"]} - {perc}% {voted_for}')
print_out(f'{option.title} - {perc}% {voted_for}')
poll_footer = f'Poll · {poll["votes_count"]} votes'
poll_footer = f'Poll · {poll.votes_count} votes'
if poll["expired"]:
if poll.expired:
poll_footer += " · Closed"
if poll["expires_at"]:
expires_at = parse_datetime(poll["expires_at"]).strftime("%Y-%m-%d %H:%M")
if poll.expires_at:
expires_at = poll.expires_at.strftime("%Y-%m-%d %H:%M")
poll_footer += f" · Closes on {expires_at}"
print_out()
print_out(poll_footer)
def print_timeline(items, width=100):
def print_timeline(items: List[Status], width=100):
print_out("" * width)
for item in items:
print_status(item, width)
@ -366,20 +367,19 @@ notification_msgs = {
}
def print_notification(notification, width=100):
account = "{display_name} @{acct}".format(**notification["account"])
msg = notification_msgs.get(notification["type"])
def print_notification(notification: Notification, width=100):
account = f"{notification.account.display_name} @{notification.account.acct}"
msg = notification_msgs.get(notification.type)
if msg is None:
return
print_out("" * width)
print_out(msg.format(account=account))
status = notification.get("status")
if status is not None:
print_status(status, width)
if notification.status:
print_status(notification.status, width)
def print_notifications(notifications, width=100):
def print_notifications(notifications: List[Notification], width=100):
for notification in notifications:
print_notification(notification)
print_out("" * width)

Wyświetl plik

@ -0,0 +1,147 @@
# Taken from https://github.com/rossmacarthur/typing-compat/
# TODO: Remove once the minimum python version is increased to 3.8
#
# Licensed under the MIT license
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# flake8: noqa
import collections
import typing
__all__ = ['get_args', 'get_origin']
__title__ = 'typing-compat'
__version__ = '0.1.0'
__url__ = 'https://github.com/rossmacarthur/typing-compat'
__author__ = 'Ross MacArthur'
__author_email__ = 'ross@macarthur.io'
__description__ = 'Python typing compatibility library'
try:
# Python >=3.8 should have these functions already
from typing import get_args as _get_args # novermin
from typing import get_origin as _get_origin # novermin
except ImportError:
if hasattr(typing, '_GenericAlias'): # Python 3.7
def _get_origin(tp):
"""Copied from the Python 3.8 typing module"""
if isinstance(tp, typing._GenericAlias):
return tp.__origin__
if tp is typing.Generic:
return typing.Generic
return None
def _get_args(tp):
"""Copied from the Python 3.8 typing module"""
if isinstance(tp, typing._GenericAlias):
res = tp.__args__
if (
get_origin(tp) is collections.abc.Callable
and res[0] is not Ellipsis
):
res = (list(res[:-1]), res[-1])
return res
return ()
else: # Python <3.7
def _resolve_via_mro(tp):
if hasattr(tp, '__mro__'):
for t in tp.__mro__:
if t.__module__ in ('builtins', '__builtin__') and t is not object:
return t
return tp
def _get_origin(tp):
"""Emulate the behaviour of Python 3.8 typing module"""
if isinstance(tp, typing._ClassVar):
return typing.ClassVar
elif isinstance(tp, typing._Union):
return typing.Union
elif isinstance(tp, typing.GenericMeta):
if hasattr(tp, '_gorg'):
return _resolve_via_mro(tp._gorg)
else:
while tp.__origin__ is not None:
tp = tp.__origin__
return _resolve_via_mro(tp)
elif hasattr(typing, '_Literal') and isinstance(tp, typing._Literal): # novermin
return typing.Literal # novermin
def _normalize_arg(args):
if isinstance(args, tuple) and len(args) > 1:
base, rest = args[0], tuple(_normalize_arg(arg) for arg in args[1:])
if isinstance(base, typing.CallableMeta):
return typing.Callable[list(rest[:-1]), rest[-1]]
elif isinstance(base, (typing.GenericMeta, typing._Union)):
return base[rest]
return args
def _get_args(tp):
"""Emulate the behaviour of Python 3.8 typing module"""
if isinstance(tp, typing._ClassVar):
return (tp.__type__,)
elif hasattr(tp, '_subs_tree'):
tree = tp._subs_tree()
if isinstance(tree, tuple) and len(tree) > 1:
if isinstance(tree[0], typing.CallableMeta) and len(tree) == 2:
return ([], _normalize_arg(tree[1]))
return tuple(_normalize_arg(arg) for arg in tree[1:])
return ()
def get_origin(tp):
"""
Get the unsubscripted version of a type.
This supports generic types, Callable, Tuple, Union, Literal, Final and
ClassVar. Returns None for unsupported types.
Examples:
get_origin(Literal[42]) is Literal
get_origin(int) is None
get_origin(ClassVar[int]) is ClassVar
get_origin(Generic) is Generic
get_origin(Generic[T]) is Generic
get_origin(Union[T, int]) is Union
get_origin(List[Tuple[T, T]][int]) == list
"""
return _get_origin(tp)
def get_args(tp):
"""
Get type arguments with all substitutions performed.
For unions, basic simplifications used by Union constructor are performed.
Examples:
get_args(Dict[str, int]) == (str, int)
get_args(int) == ()
get_args(Union[int, Union[T, int], str][int]) == (int, str)
get_args(Union[int, Tuple[T, int]][str]) == (int, Tuple[str, int])
get_args(Callable[[], T][int]) == ([], int)
"""
return _get_args(tp)