Merge remote-tracking branch 'upstream/master' into images

pull/430/head
Daniel Schwarz 2023-11-30 23:42:23 -05:00
commit 7a7f4d4679
44 zmienionych plików z 1690 dodań i 720 usunięć

Wyświetl plik

@ -1,4 +1,4 @@
[flake8]
exclude=build,tests,tmp,venv,toot/tui/scroll.py
ignore=E128,W503
ignore=E128,W503,W504
max-line-length=120

Wyświetl plik

@ -7,7 +7,7 @@ jobs:
runs-on: ubuntu-22.04
strategy:
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"]
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12"]
steps:
- uses: actions/checkout@v3
@ -18,14 +18,13 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -e .
pip install -r requirements-test.txt
pip install -e ".[test,richtext]"
- name: Run tests
run: |
pytest
- name: Validate minimum required version
run: |
vermin --target=3.7 --no-tips .
vermin toot
- name: Check style
run: |
flake8

Wyświetl plik

@ -1,13 +0,0 @@
language: python
python:
- "3.4"
- "3.5"
- "3.6"
- "3.7"
- "nightly"
install:
- pip install -e .
script: make test

4
.vermin 100644
Wyświetl plik

@ -0,0 +1,4 @@
[vermin]
only_show_violations = yes
show_tips = no
targets = 3.7

Wyświetl plik

@ -3,6 +3,31 @@ Changelog
<!-- Do not edit. This file is automatically generated from changelog.yaml.-->
**0.39.0 (2023-11-23)**
* Add `--json` option to many commands, this makes them print the JSON data
returned by the server instead of human-readable data. Useful for scripting.
* TUI: Make media viewer configurable in settings, see:
https://toot.bezdomni.net/settings.html#tui-view-images
* TUI: Add rich text rendering (thanks Dan Schwarz)
**0.38.2 (2023-11-16)**
* Fix compatibility with Pleroma (#399, thanks Sandra Snan)
* Fix language documentation (thanks Sandra Snan)
**0.38.1 (2023-07-25)**
* Fix relative datetimes option in TUI
**0.38.0 (2023-07-25)**
* Add `toot muted` and `toot blocked` commands (thanks Florian Obser)
* Add settings file, allows setting common options, defining defaults for
command arguments, and the TUI palette
* TUI: Remap shortcuts so they don't override HJKL used for navigation (thanks
Dan Schwarz)
**0.37.0 (2023-06-28)**
* **BREAKING:** Require Python 3.7+

Wyświetl plik

@ -77,8 +77,9 @@ pip install -r requirements-dev.txt
pip install -r requirements-test.txt
```
While the virtual env is active, running `toot` will execute the one you checked
out. This allows you to make changes and test them.
While the virtual env is active, you can run `./_env/bin/toot` to
execute the one you checked out. This allows you to make changes and
test them.
#### Crafting good commits

Wyświetl plik

@ -10,7 +10,7 @@ publish :
test:
pytest -v
flake8
vermin --target=3.7 --no-tips --violations --exclude-regex venv/.* .
vermin toot
coverage:
coverage erase

Wyświetl plik

@ -1,3 +1,28 @@
0.39.0:
date: 2023-11-23
changes:
- "Add `--json` option to many commands, this makes them print the JSON data returned by the server instead of human-readable data. Useful for scripting."
- "TUI: Make media viewer configurable in settings, see: https://toot.bezdomni.net/settings.html#tui-view-images"
- "TUI: Add rich text rendering (thanks Dan Schwarz)"
0.38.2:
date: 2023-11-16
changes:
- "Fix compatibility with Pleroma (#399, thanks Sandra Snan)"
- "Fix language documentation (thanks Sandra Snan)"
0.38.1:
date: 2023-07-25
changes:
- "Fix relative datetimes option in TUI"
0.38.0:
date: 2023-07-25
changes:
- "Add `toot muted` and `toot blocked` commands (thanks Florian Obser)"
- "Add settings file, allows setting common options, defining defaults for command arguments, and the TUI palette"
- "TUI: Remap shortcuts so they don't override HJKL used for navigation (thanks Dan Schwarz)"
0.37.0:
date: 2023-06-28
changes:

Wyświetl plik

@ -21,10 +21,10 @@ through the specified server.
For example:
.. code-block:: sh
export HTTPS_PROXY="http://1.2.3.4:5678"
toot login --instance mastodon.social
```sh
export HTTPS_PROXY="http://1.2.3.4:5678"
toot login --instance mastodon.social
```
**NB:** This feature is provided by
[requests](http://docs.python-requests.org/en/master/user/advanced/#proxies>)

Wyświetl plik

@ -3,6 +3,31 @@ Changelog
<!-- Do not edit. This file is automatically generated from changelog.yaml.-->
**0.39.0 (2023-11-23)**
* Add `--json` option to many commands, this makes them print the JSON data
returned by the server instead of human-readable data. Useful for scripting.
* TUI: Make media viewer configurable in settings, see:
https://toot.bezdomni.net/settings.html#tui-view-images
* TUI: Add rich text rendering (thanks Dan Schwarz)
**0.38.2 (2023-11-16)**
* Fix compatibility with Pleroma (#399, thanks Sandra Snan)
* Fix language documentation (thanks Sandra Snan)
**0.38.1 (2023-07-25)**
* Fix relative datetimes option in TUI
**0.38.0 (2023-07-25)**
* Add `toot muted` and `toot blocked` commands (thanks Florian Obser)
* Add settings file, allows setting common options, defining defaults for
command arguments, and the TUI palette
* TUI: Remap shortcuts so they don't override HJKL used for navigation (thanks
Dan Schwarz)
**0.37.0 (2023-06-28)**
* **BREAKING:** Require Python 3.7+

Wyświetl plik

@ -39,14 +39,88 @@ quiet = false
## Overriding command defaults
Defaults for command arguments can be override by specifying a `[command.<name>]` section.
Defaults for command arguments can be override by specifying a `[commands.<name>]` section.
For example, to override `toot post`.
```toml
[command.post]
[commands.post]
editor = "vim"
sensitive = true
visibility = "unlisted"
scheduled_in = "30 minutes"
```
## TUI view images
> Introduced in toot 0.39.0
You can view images in a toot using an external program by setting the
`tui.media_viewer` option to your desired image viewer. When a toot is focused,
pressing `m` will launch the specified executable giving one or more URLs as
arguments. This works well with image viewers like `feh` which accept URLs as
arguments.
```toml
[tui]
media_viewer = "feh"
```
## TUI color palette
TUI uses Urwid which provides several color modes. See
[Urwid documentation](https://urwid.org/manual/displayattributes.html)
for more details.
By default, TUI operates in 16-color mode which can be changed by setting the
`color` setting in the `[tui]` section to one of the following values:
* `1` (monochrome)
* `16` (default)
* `88`
* `256`
* `16777216` (24 bit)
TUI defines a list of colors which can be customized, currently they can be seen
[in the source code](https://github.com/ihabunek/toot/blob/master/toot/tui/constants.py). They can be overriden in the `[tui.palette]` section.
Each color is defined as a list of upto 5 values:
* foreground color (16 color mode)
* background color (16 color mode)
* monochrome color (monochrome mode)
* foreground color (high-color mode)
* background color (high-color mode)
Any colors which are not used by your desired color mode can be skipped or set
to an empty string.
For example, to change the button colors in 16 color mode:
```toml
[tui.palette]
button = ["dark red,bold", ""]
button_focused = ["light gray", "green"]
```
In monochrome mode:
```toml
[tui]
colors = 1
[tui.palette]
button = ["", "", "bold"]
button_focused = ["", "", "italics"]
```
In 256 color mode:
```toml
[tui]
colors = 256
[tui.palette]
button = ["", "", "", "#aaa", "#bbb"]
button_focused = ["", "", "", "#aaa", "#bbb"]
```

Wyświetl plik

@ -1,8 +0,0 @@
coverage
keyring
pyxdg
pyyaml
sphinx
sphinx-autobuild
twine
wheel

Wyświetl plik

@ -1,5 +0,0 @@
flake8
psycopg2-binary
pytest
pytest-xdist[psutil]
vermin

Wyświetl plik

@ -1,6 +0,0 @@
requests>=2.13,<3.0
beautifulsoup4>=4.5.0,<5.0
wcwidth>=0.1.7
urwid>=2.0.0,<3.0
pillow>=9.5.0
term-image==0.7.0

Wyświetl plik

@ -12,7 +12,7 @@ and blocking accounts and other actions.
setup(
name='toot',
version='0.37.0',
version='0.39.0',
description='Mastodon CLI client',
long_description=long_description.strip(),
author='Ivan Habunek',
@ -31,7 +31,7 @@ setup(
'License :: OSI Approved :: GNU General Public License v3 (GPLv3)',
'Programming Language :: Python :: 3',
],
packages=['toot', 'toot.tui', 'toot.utils'],
packages=['toot', 'toot.tui', 'toot.tui.richtext', 'toot.utils'],
python_requires=">=3.7",
install_requires=[
"requests>=2.13,<3.0",
@ -42,6 +42,26 @@ setup(
"pillow>=9.5.0",
"term-image==0.7.0",
],
extras_require={
# Required to display rich text in the TUI
"richtext": [
"urwidgets>=0.1,<0.2"
],
"dev": [
"coverage",
"pyyaml",
"twine",
"wheel",
],
"test": [
"flake8",
"psycopg2-binary",
"pytest",
"pytest-xdist[psutil]",
"setuptools",
"vermin",
],
},
entry_points={
'console_scripts': [
'toot=toot.console:main',

Wyświetl plik

@ -13,6 +13,7 @@ export TOOT_TEST_DATABASE_DSN="dbname=mastodon_development"
```
"""
import json
import re
import os
import psycopg2
@ -41,7 +42,7 @@ ASSETS_DIR = str(Path(__file__).parent.parent / "assets")
def create_app(base_url):
instance = api.get_instance(base_url)
instance = api.get_instance(base_url).json()
response = api.create_app(base_url)
return App(instance["uri"], base_url, response["client_id"], response["client_secret"])
@ -94,6 +95,16 @@ def friend(app):
return register_account(app)
@pytest.fixture(scope="session")
def user_id(app, user):
return api.find_account(app, user, user.username)["id"]
@pytest.fixture(scope="session")
def friend_id(app, user, friend):
return api.find_account(app, user, friend.username)["id"]
@pytest.fixture
def run(app, user, capsys):
def _run(command, *params, as_user=None):
@ -110,6 +121,14 @@ def run(app, user, capsys):
return _run
@pytest.fixture
def run_json(run):
def _run_json(command, *params):
out = run(command, *params)
return json.loads(out)
return _run_json
@pytest.fixture
def run_anon(capsys):
def _run(command, *params):

Wyświetl plik

@ -1,12 +1,22 @@
import json
from toot import App, User, api
from toot.entities import Account, Relationship, from_dict
def test_whoami(user, run):
def test_whoami(user: User, run):
out = run("whoami")
# TODO: test other fields once updating account is supported
assert f"@{user.username}" in out
def test_whois(app, friend, run):
def test_whoami_json(user: User, run):
out = run("whoami", "--json")
account = from_dict(Account, json.loads(out))
assert account.username == user.username
def test_whois(app: App, friend: User, run):
variants = [
friend.username,
f"@{friend.username}",
@ -17,3 +27,192 @@ def test_whois(app, friend, run):
for username in variants:
out = run("whois", username)
assert f"@{friend.username}" in out
def test_following(app: App, user: User, friend: User, friend_id, run):
# Make sure we're not initally following friend
api.unfollow(app, user, friend_id)
out = run("following", user.username)
assert out == ""
out = run("follow", friend.username)
assert out == f"✓ You are now following {friend.username}"
out = run("following", user.username)
assert friend.username in out
# If no account is given defaults to logged in user
out = run("following")
assert friend.username in out
out = run("unfollow", friend.username)
assert out == f"✓ You are no longer following {friend.username}"
out = run("following", user.username)
assert out == ""
def test_following_case_insensitive(user: User, friend: User, run):
assert friend.username != friend.username.upper()
out = run("follow", friend.username.upper())
assert out == f"✓ You are now following {friend.username.upper()}"
def test_following_not_found(run):
out = run("follow", "bananaman")
assert out == "Account not found"
out = run("unfollow", "bananaman")
assert out == "Account not found"
def test_following_json(app: App, user: User, friend: User, user_id, friend_id, run_json):
# Make sure we're not initally following friend
api.unfollow(app, user, friend_id)
result = run_json("following", user.username, "--json")
assert result == []
result = run_json("followers", friend.username, "--json")
assert result == []
result = run_json("follow", friend.username, "--json")
relationship = from_dict(Relationship, result)
assert relationship.id == friend_id
assert relationship.following is True
[result] = run_json("following", user.username, "--json")
relationship = from_dict(Relationship, result)
assert relationship.id == friend_id
# If no account is given defaults to logged in user
[result] = run_json("following", user.username, "--json")
relationship = from_dict(Relationship, result)
assert relationship.id == friend_id
[result] = run_json("followers", friend.username, "--json")
assert result["id"] == user_id
result = run_json("unfollow", friend.username, "--json")
assert result["id"] == friend_id
assert result["following"] is False
result = run_json("following", user.username, "--json")
assert result == []
result = run_json("followers", friend.username, "--json")
assert result == []
def test_mute(app, user, friend, friend_id, run):
# Make sure we're not initially muting friend
api.unmute(app, user, friend_id)
out = run("muted")
assert out == "No accounts muted"
out = run("mute", friend.username)
assert out == f"✓ You have muted {friend.username}"
out = run("muted")
assert friend.username in out
out = run("unmute", friend.username)
assert out == f"{friend.username} is no longer muted"
out = run("muted")
assert out == "No accounts muted"
def test_mute_case_insensitive(friend: User, run):
out = run("mute", friend.username.upper())
assert out == f"✓ You have muted {friend.username.upper()}"
def test_mute_not_found(run):
out = run("mute", "doesnotexistperson")
assert out == f"Account not found"
out = run("unmute", "doesnotexistperson")
assert out == f"Account not found"
def test_mute_json(app: App, user: User, friend: User, run_json, friend_id):
# Make sure we're not initially muting friend
api.unmute(app, user, friend_id)
result = run_json("muted", "--json")
assert result == []
result = run_json("mute", friend.username, "--json")
relationship = from_dict(Relationship, result)
assert relationship.id == friend_id
assert relationship.muting is True
[result] = run_json("muted", "--json")
account = from_dict(Account, result)
assert account.id == friend_id
result = run_json("unmute", friend.username, "--json")
relationship = from_dict(Relationship, result)
assert relationship.id == friend_id
assert relationship.muting is False
result = run_json("muted", "--json")
assert result == []
def test_block(app, user, friend, friend_id, run):
# Make sure we're not initially blocking friend
api.unblock(app, user, friend_id)
out = run("blocked")
assert out == "No accounts blocked"
out = run("block", friend.username)
assert out == f"✓ You are now blocking {friend.username}"
out = run("blocked")
assert friend.username in out
out = run("unblock", friend.username)
assert out == f"{friend.username} is no longer blocked"
out = run("blocked")
assert out == "No accounts blocked"
def test_block_case_insensitive(friend: User, run):
out = run("block", friend.username.upper())
assert out == f"✓ You are now blocking {friend.username.upper()}"
def test_block_not_found(run):
out = run("block", "doesnotexistperson")
assert out == f"Account not found"
def test_block_json(app: App, user: User, friend: User, run_json, friend_id):
# Make sure we're not initially blocking friend
api.unblock(app, user, friend_id)
result = run_json("blocked", "--json")
assert result == []
result = run_json("block", friend.username, "--json")
relationship = from_dict(Relationship, result)
assert relationship.id == friend_id
assert relationship.blocking is True
[result] = run_json("blocked", "--json")
account = from_dict(Account, result)
assert account.id == friend_id
result = run_json("unblock", friend.username, "--json")
relationship = from_dict(Relationship, result)
assert relationship.id == friend_id
assert relationship.blocking is False
result = run_json("blocked", "--json")
assert result == []

Wyświetl plik

@ -1,5 +1,6 @@
from tests.integration.conftest import TRUMPET
from toot import api
from toot.entities import Account, from_dict
from toot.utils import get_text
@ -12,10 +13,17 @@ def test_update_account_display_name(run, app, user):
out = run("update_account", "--display-name", "elwood")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
account = api.verify_credentials(app, user).json()
assert account["display_name"] == "elwood"
def test_update_account_json(run_json, app, user):
out = run_json("update_account", "--display-name", "elwood", "--json")
account = from_dict(Account, out)
assert account.acct == user.username
assert account.display_name == "elwood"
def test_update_account_note(run, app, user):
note = ("It's 106 miles to Chicago, we got a full tank of gas, half a pack "
"of cigarettes, it's dark... and we're wearing sunglasses.")
@ -23,7 +31,7 @@ def test_update_account_note(run, app, user):
out = run("update_account", "--note", note)
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
account = api.verify_credentials(app, user).json()
assert get_text(account["note"]) == note
@ -31,7 +39,7 @@ def test_update_account_language(run, app, user):
out = run("update_account", "--language", "hr")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
account = api.verify_credentials(app, user).json()
assert account["source"]["language"] == "hr"
@ -39,29 +47,29 @@ def test_update_account_privacy(run, app, user):
out = run("update_account", "--privacy", "private")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
account = api.verify_credentials(app, user).json()
assert account["source"]["privacy"] == "private"
def test_update_account_avatar(run, app, user):
account = api.verify_credentials(app, user)
account = api.verify_credentials(app, user).json()
old_value = account["avatar"]
out = run("update_account", "--avatar", TRUMPET)
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
account = api.verify_credentials(app, user).json()
assert account["avatar"] != old_value
def test_update_account_header(run, app, user):
account = api.verify_credentials(app, user)
account = api.verify_credentials(app, user).json()
old_value = account["header"]
out = run("update_account", "--header", TRUMPET)
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
account = api.verify_credentials(app, user).json()
assert account["header"] != old_value
@ -69,13 +77,13 @@ def test_update_account_locked(run, app, user):
out = run("update_account", "--locked")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
account = api.verify_credentials(app, user).json()
assert account["locked"] is True
out = run("update_account", "--no-locked")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
account = api.verify_credentials(app, user).json()
assert account["locked"] is False
@ -83,13 +91,13 @@ def test_update_account_bot(run, app, user):
out = run("update_account", "--bot")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
account = api.verify_credentials(app, user).json()
assert account["bot"] is True
out = run("update_account", "--no-bot")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
account = api.verify_credentials(app, user).json()
assert account["bot"] is False
@ -97,13 +105,13 @@ def test_update_account_discoverable(run, app, user):
out = run("update_account", "--discoverable")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
account = api.verify_credentials(app, user).json()
assert account["discoverable"] is True
out = run("update_account", "--no-discoverable")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
account = api.verify_credentials(app, user).json()
assert account["discoverable"] is False
@ -111,11 +119,11 @@ def test_update_account_sensitive(run, app, user):
out = run("update_account", "--sensitive")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
account = api.verify_credentials(app, user).json()
assert account["source"]["sensitive"] is True
out = run("update_account", "--no-sensitive")
assert out == "✓ Account updated"
account = api.verify_credentials(app, user)
account = api.verify_credentials(app, user).json()
assert account["source"]["sensitive"] is False

Wyświetl plik

@ -1,3 +1,4 @@
import json
import re
import uuid
@ -14,7 +15,7 @@ def test_post(app, user, run):
out = run("post", text)
status_id = posted_status_id(out)
status = api.fetch_status(app, user, status_id)
status = api.fetch_status(app, user, status_id).json()
assert text == get_text(status["content"])
assert status["visibility"] == "public"
assert status["sensitive"] is False
@ -27,11 +28,23 @@ def test_post(app, user, run):
assert status["application"]["website"] == CLIENT_WEBSITE
def test_post_json(run):
content = "i wish i was a #lumberjack"
out = run("post", content, "--json")
status = json.loads(out)
assert get_text(status["content"]) == content
assert status["visibility"] == "public"
assert status["sensitive"] is False
assert status["spoiler_text"] == ""
assert status["poll"] is None
def test_post_visibility(app, user, run):
for visibility in ["public", "unlisted", "private", "direct"]:
out = run("post", "foo", "--visibility", visibility)
status_id = posted_status_id(out)
status = api.fetch_status(app, user, status_id)
status = api.fetch_status(app, user, status_id).json()
assert status["visibility"] == visibility
@ -92,7 +105,7 @@ def test_post_poll(app, user, run):
status_id = posted_status_id(out)
status = api.fetch_status(app, user, status_id)
status = api.fetch_status(app, user, status_id).json()
assert status["poll"]["expired"] is False
assert status["poll"]["multiple"] is False
assert status["poll"]["options"] == [
@ -121,7 +134,7 @@ def test_post_poll_multiple(app, user, run):
status_id = posted_status_id(out)
status = api.fetch_status(app, user, status_id)
status = api.fetch_status(app, user, status_id).json()
assert status["poll"]["multiple"] is True
@ -137,7 +150,7 @@ def test_post_poll_expires_in(app, user, run):
status_id = posted_status_id(out)
status = api.fetch_status(app, user, status_id)
status = api.fetch_status(app, user, status_id).json()
actual = datetime.strptime(status["poll"]["expires_at"], "%Y-%m-%dT%H:%M:%S.%f%z")
expected = datetime.now(timezone.utc) + timedelta(hours=8)
delta = actual - expected
@ -156,7 +169,7 @@ def test_post_poll_hide_totals(app, user, run):
status_id = posted_status_id(out)
status = api.fetch_status(app, user, status_id)
status = api.fetch_status(app, user, status_id).json()
# votes_count is None when totals are hidden
assert status["poll"]["options"] == [
@ -168,12 +181,12 @@ def test_post_poll_hide_totals(app, user, run):
def test_post_language(app, user, run):
out = run("post", "test", "--language", "hr")
status_id = posted_status_id(out)
status = api.fetch_status(app, user, status_id)
status = api.fetch_status(app, user, status_id).json()
assert status["language"] == "hr"
out = run("post", "test", "--language", "zh")
status_id = posted_status_id(out)
status = api.fetch_status(app, user, status_id)
status = api.fetch_status(app, user, status_id).json()
assert status["language"] == "zh"
@ -190,7 +203,7 @@ def test_media_thumbnail(app, user, run):
)
status_id = posted_status_id(out)
status = api.fetch_status(app, user, status_id)
status = api.fetch_status(app, user, status_id).json()
[media] = status["media_attachments"]
assert media["description"] == "foo"
@ -228,7 +241,7 @@ def test_media_attachments(app, user, run):
)
status_id = posted_status_id(out)
status = api.fetch_status(app, user, status_id)
status = api.fetch_status(app, user, status_id).json()
[a1, a2, a3, a4] = status["media_attachments"]
@ -257,7 +270,7 @@ def test_media_attachment_without_text(mock_read, mock_ml, app, user, run):
out = run("post", "--media", media_path)
status_id = posted_status_id(out)
status = api.fetch_status(app, user, status_id)
status = api.fetch_status(app, user, status_id).json()
assert status["content"] == ""
[attachment] = status["media_attachments"]
@ -269,11 +282,11 @@ def test_media_attachment_without_text(mock_read, mock_ml, app, user, run):
def test_reply_thread(app, user, friend, run):
status = api.post_status(app, friend, "This is the status")
status = api.post_status(app, friend, "This is the status").json()
out = run("post", "--reply-to", status["id"], "This is the reply")
status_id = posted_status_id(out)
reply = api.fetch_status(app, user, status_id)
reply = api.fetch_status(app, user, status_id).json()
assert reply["in_reply_to_id"] == status["id"]

Wyświetl plik

@ -1,9 +1,12 @@
import re
from uuid import uuid4
import json
from pprint import pprint
import pytest
import re
from toot import api
from toot.entities import Account, from_dict_list
from toot.exceptions import ConsoleError
from uuid import uuid4
def test_instance(app, run):
@ -13,6 +16,14 @@ def test_instance(app, run):
assert "running Mastodon" in out
def test_instance_json(app, run):
out = run("instance", "--json")
data = json.loads(out)
assert data["title"] is not None
assert data["description"] is not None
assert data["version"] is not None
def test_instance_anon(app, run_anon, base_url):
out = run_anon("instance", base_url)
assert "Mastodon" in out
@ -49,6 +60,12 @@ def test_search_account(friend, run):
assert out == f"Accounts:\n* @{friend.username}"
def test_search_account_json(friend, run_json):
out = run_json("search", friend.username, "--json")
[account] = from_dict_list(Account, out["accounts"])
assert account.acct == friend.username
def test_search_hashtag(app, user, run):
api.post_status(app, user, "#hashtag_x")
api.post_status(app, user, "#hashtag_y")
@ -58,6 +75,19 @@ def test_search_hashtag(app, user, run):
assert out == "Hashtags:\n#hashtag_x, #hashtag_y, #hashtag_z"
def test_search_hashtag_json(app, user, run_json):
api.post_status(app, user, "#hashtag_x")
api.post_status(app, user, "#hashtag_y")
api.post_status(app, user, "#hashtag_z")
out = run_json("search", "#hashtag", "--json")
[h1, h2, h3] = sorted(out["hashtags"], key=lambda h: h["name"])
assert h1["name"] == "hashtag_x"
assert h2["name"] == "hashtag_y"
assert h3["name"] == "hashtag_z"
def test_tags(run, base_url):
out = run("tags_followed")
assert out == "You're not following any hashtags."
@ -86,7 +116,7 @@ def test_tags(run, base_url):
def test_status(app, user, run):
uuid = str(uuid4())
response = api.post_status(app, user, uuid)
response = api.post_status(app, user, uuid).json()
out = run("status", response["id"])
assert uuid in out
@ -96,9 +126,9 @@ def test_status(app, user, run):
def test_thread(app, user, run):
uuid = str(uuid4())
s1 = api.post_status(app, user, uuid + "1")
s2 = api.post_status(app, user, uuid + "2", in_reply_to_id=s1["id"])
s3 = api.post_status(app, user, uuid + "3", in_reply_to_id=s2["id"])
s1 = api.post_status(app, user, uuid + "1").json()
s2 = api.post_status(app, user, uuid + "2", in_reply_to_id=s1["id"]).json()
s3 = api.post_status(app, user, uuid + "3", in_reply_to_id=s2["id"]).json()
for status in [s1, s2, s3]:
out = run("thread", status["id"])

Wyświetl plik

@ -1,3 +1,4 @@
import json
import time
import pytest
@ -5,8 +6,8 @@ from toot import api
from toot.exceptions import NotFoundError
def test_delete_status(app, user, run):
status = api.post_status(app, user, "foo")
def test_delete(app, user, run):
status = api.post_status(app, user, "foo").json()
out = run("delete", status["id"])
assert out == "✓ Status deleted"
@ -15,14 +16,25 @@ def test_delete_status(app, user, run):
api.fetch_status(app, user, status["id"])
def test_delete_json(app, user, run):
status = api.post_status(app, user, "foo").json()
out = run("delete", status["id"], "--json")
result = json.loads(out)
assert result["id"] == status["id"]
with pytest.raises(NotFoundError):
api.fetch_status(app, user, status["id"])
def test_favourite(app, user, run):
status = api.post_status(app, user, "foo")
status = api.post_status(app, user, "foo").json()
assert not status["favourited"]
out = run("favourite", status["id"])
assert out == "✓ Status favourited"
status = api.fetch_status(app, user, status["id"])
status = api.fetch_status(app, user, status["id"]).json()
assert status["favourited"]
out = run("unfavourite", status["id"])
@ -31,18 +43,35 @@ def test_favourite(app, user, run):
# A short delay is required before the server returns new data
time.sleep(0.1)
status = api.fetch_status(app, user, status["id"])
status = api.fetch_status(app, user, status["id"]).json()
assert not status["favourited"]
def test_favourite_json(app, user, run):
status = api.post_status(app, user, "foo").json()
assert not status["favourited"]
out = run("favourite", status["id"], "--json")
result = json.loads(out)
assert result["id"] == status["id"]
assert result["favourited"] is True
out = run("unfavourite", status["id"], "--json")
result = json.loads(out)
assert result["id"] == status["id"]
assert result["favourited"] is False
def test_reblog(app, user, run):
status = api.post_status(app, user, "foo")
status = api.post_status(app, user, "foo").json()
assert not status["reblogged"]
out = run("reblog", status["id"])
assert out == "✓ Status reblogged"
status = api.fetch_status(app, user, status["id"])
status = api.fetch_status(app, user, status["id"]).json()
assert status["reblogged"]
out = run("reblogged_by", status["id"])
@ -51,39 +80,94 @@ def test_reblog(app, user, run):
out = run("unreblog", status["id"])
assert out == "✓ Status unreblogged"
status = api.fetch_status(app, user, status["id"])
status = api.fetch_status(app, user, status["id"]).json()
assert not status["reblogged"]
def test_reblog_json(app, user, run):
status = api.post_status(app, user, "foo").json()
assert not status["reblogged"]
out = run("reblog", status["id"], "--json")
result = json.loads(out)
assert result["reblogged"] is True
assert result["reblog"]["id"] == status["id"]
out = run("reblogged_by", status["id"], "--json")
[reblog] = json.loads(out)
assert reblog["acct"] == user.username
out = run("unreblog", status["id"], "--json")
result = json.loads(out)
assert result["reblogged"] is False
assert result["reblog"] is None
def test_pin(app, user, run):
status = api.post_status(app, user, "foo")
status = api.post_status(app, user, "foo").json()
assert not status["pinned"]
out = run("pin", status["id"])
assert out == "✓ Status pinned"
status = api.fetch_status(app, user, status["id"])
status = api.fetch_status(app, user, status["id"]).json()
assert status["pinned"]
out = run("unpin", status["id"])
assert out == "✓ Status unpinned"
status = api.fetch_status(app, user, status["id"])
status = api.fetch_status(app, user, status["id"]).json()
assert not status["pinned"]
def test_pin_json(app, user, run):
status = api.post_status(app, user, "foo").json()
assert not status["pinned"]
out = run("pin", status["id"], "--json")
result = json.loads(out)
assert result["pinned"] is True
assert result["id"] == status["id"]
out = run("unpin", status["id"], "--json")
result = json.loads(out)
assert result["pinned"] is False
assert result["id"] == status["id"]
def test_bookmark(app, user, run):
status = api.post_status(app, user, "foo")
status = api.post_status(app, user, "foo").json()
assert not status["bookmarked"]
out = run("bookmark", status["id"])
assert out == "✓ Status bookmarked"
status = api.fetch_status(app, user, status["id"])
status = api.fetch_status(app, user, status["id"]).json()
assert status["bookmarked"]
out = run("unbookmark", status["id"])
assert out == "✓ Status unbookmarked"
status = api.fetch_status(app, user, status["id"])
status = api.fetch_status(app, user, status["id"]).json()
assert not status["bookmarked"]
def test_bookmark_json(app, user, run):
status = api.post_status(app, user, "foo").json()
assert not status["bookmarked"]
out = run("bookmark", status["id"], "--json")
result = json.loads(out)
assert result["id"] == status["id"]
assert result["bookmarked"] is True
out = run("unbookmark", status["id"], "--json")
result = json.loads(out)
assert result["id"] == status["id"]
assert result["bookmarked"] is False

Wyświetl plik

@ -1,60 +0,0 @@
from toot import App, User, api, config, auth
from tests.utils import retval
def test_register_app(monkeypatch):
app_data = {'id': 100, 'client_id': 'cid', 'client_secret': 'cs'}
def assert_app(app):
assert isinstance(app, App)
assert app.instance == "foo.bar"
assert app.base_url == "https://foo.bar"
assert app.client_id == "cid"
assert app.client_secret == "cs"
monkeypatch.setattr(api, 'create_app', retval(app_data))
monkeypatch.setattr(api, 'get_instance', retval({"title": "foo", "version": "1", "uri": "bezdomni.net"}))
monkeypatch.setattr(config, 'save_app', assert_app)
app = auth.register_app("foo.bar", "https://foo.bar")
assert_app(app)
def test_create_app_from_config(monkeypatch):
"""When there is saved config, it's returned"""
monkeypatch.setattr(config, 'load_app', retval("loaded app"))
monkeypatch.setattr(api, 'get_instance', retval({"title": "foo", "version": "1", "uri": "bezdomni.net"}))
app = auth.create_app_interactive("https://bezdomni.net")
assert app == 'loaded app'
def test_create_app_registered(monkeypatch):
"""When there is no saved config, a new app is registered"""
monkeypatch.setattr(config, 'load_app', retval(None))
monkeypatch.setattr(auth, 'register_app', retval("registered app"))
monkeypatch.setattr(api, 'get_instance', retval({"title": "foo", "version": "1", "uri": "bezdomni.net"}))
app = auth.create_app_interactive("bezdomni.net")
assert app == 'registered app'
def test_create_user(monkeypatch):
app = App(4, 5, 6, 7)
def assert_user(user, activate=True):
assert activate
assert isinstance(user, User)
assert user.instance == app.instance
assert user.username == "foo"
assert user.access_token == "abc"
monkeypatch.setattr(config, 'save_user', assert_user)
monkeypatch.setattr(api, 'verify_credentials', lambda x, y: {"username": "foo"})
user = auth.create_user(app, 'abc')
assert_user(user)
#
# TODO: figure out how to mock input so the rest can be tested
#

Wyświetl plik

@ -122,6 +122,7 @@ def test_timeline(mock_get, monkeypatch, capsys):
'id': '111111111111111111',
'account': {
'display_name': 'Frank Zappa 🎸',
'last_status_at': '2017-04-12T15:53:18.174Z',
'acct': 'fz'
},
'created_at': '2017-04-12T15:53:18.174Z',
@ -164,6 +165,7 @@ def test_timeline_with_re(mock_get, monkeypatch, capsys):
'created_at': '2017-04-12T15:53:18.174Z',
'account': {
'display_name': 'Johnny Cash',
'last_status_at': '2011-04-12',
'acct': 'jc'
},
'content': "<p>The computer can&apos;t tell you the emotional story. It can give you the exact mathematical design, but what's missing is the eyebrows.</p>",
@ -194,101 +196,6 @@ def test_timeline_with_re(mock_get, monkeypatch, capsys):
assert err == ""
@mock.patch('toot.http.get')
def test_thread(mock_get, monkeypatch, capsys):
mock_get.side_effect = [
MockResponse({
'id': '111111111111111111',
'account': {
'display_name': 'Frank Zappa',
'acct': 'fz'
},
'created_at': '2017-04-12T15:53:18.174Z',
'content': "my response in the middle",
'reblog': None,
'in_reply_to_id': '111111111111111110',
'media_attachments': [],
}),
MockResponse({
'ancestors': [{
'id': '111111111111111110',
'account': {
'display_name': 'Frank Zappa',
'acct': 'fz'
},
'created_at': '2017-04-12T15:53:18.174Z',
'content': "original content",
'media_attachments': [],
'reblog': None,
'in_reply_to_id': None}],
'descendants': [{
'id': '111111111111111112',
'account': {
'display_name': 'Frank Zappa',
'acct': 'fz'
},
'created_at': '2017-04-12T15:53:18.174Z',
'content': "response message",
'media_attachments': [],
'reblog': None,
'in_reply_to_id': '111111111111111111'}],
}),
]
console.run_command(app, user, 'thread', ['111111111111111111'])
calls = [
mock.call(app, user, '/api/v1/statuses/111111111111111111'),
mock.call(app, user, '/api/v1/statuses/111111111111111111/context'),
]
mock_get.assert_has_calls(calls, any_order=False)
out, err = capsys.readouterr()
assert not err
# Display order
assert out.index('original content') < out.index('my response in the middle')
assert out.index('my response in the middle') < out.index('response message')
assert "original content" in out
assert "my response in the middle" in out
assert "response message" in out
assert "Frank Zappa" in out
assert "@fz" in out
assert "111111111111111111" in out
assert "In reply to" in out
@mock.patch('toot.http.get')
def test_reblogged_by(mock_get, monkeypatch, capsys):
mock_get.return_value = MockResponse([{
'display_name': 'Terry Bozzio',
'acct': 'bozzio@drummers.social',
}, {
'display_name': 'Dweezil',
'acct': 'dweezil@zappafamily.social',
}])
console.run_command(app, user, 'reblogged_by', ['111111111111111111'])
calls = [
mock.call(app, user, '/api/v1/statuses/111111111111111111/reblogged_by'),
]
mock_get.assert_has_calls(calls, any_order=False)
out, err = capsys.readouterr()
# Display order
expected = "\n".join([
"Terry Bozzio",
" @bozzio@drummers.social",
"Dweezil",
" @dweezil@zappafamily.social",
"",
])
assert out == expected
@mock.patch('toot.http.post')
def test_upload(mock_post, capsys):
mock_post.return_value = MockResponse({
@ -311,136 +218,6 @@ def test_upload(mock_post, capsys):
assert __file__ in out
@mock.patch('toot.http.get')
def test_search(mock_get, capsys):
mock_get.return_value = MockResponse({
'hashtags': [
{
'history': [],
'name': 'foo',
'url': 'https://mastodon.social/tags/foo'
},
{
'history': [],
'name': 'bar',
'url': 'https://mastodon.social/tags/bar'
},
{
'history': [],
'name': 'baz',
'url': 'https://mastodon.social/tags/baz'
},
],
'accounts': [{
'acct': 'thequeen',
'display_name': 'Freddy Mercury'
}, {
'acct': 'thequeen@other.instance',
'display_name': 'Mercury Freddy'
}],
'statuses': [],
})
console.run_command(app, user, 'search', ['freddy'])
mock_get.assert_called_once_with(app, user, '/api/v2/search', {
'q': 'freddy',
'type': None,
'resolve': False,
})
out, err = capsys.readouterr()
out = uncolorize(out)
assert "Hashtags:\n#foo, #bar, #baz" in out
assert "Accounts:" in out
assert "@thequeen Freddy Mercury" in out
assert "@thequeen@other.instance Mercury Freddy" in out
@mock.patch('toot.http.post')
@mock.patch('toot.http.get')
def test_follow(mock_get, mock_post, capsys):
mock_get.return_value = MockResponse({
"accounts": [
{"id": 123, "acct": "blixa@other.acc"},
{"id": 321, "acct": "blixa"},
]
})
mock_post.return_value = MockResponse()
console.run_command(app, user, 'follow', ['blixa'])
mock_get.assert_called_once_with(app, user, '/api/v2/search', {'q': 'blixa', 'type': 'accounts', 'resolve': True})
mock_post.assert_called_once_with(app, user, '/api/v1/accounts/321/follow')
out, err = capsys.readouterr()
assert "You are now following blixa" in out
@mock.patch('toot.http.post')
@mock.patch('toot.http.get')
def test_follow_case_insensitive(mock_get, mock_post, capsys):
mock_get.return_value = MockResponse({
"accounts": [
{"id": 123, "acct": "blixa@other.acc"},
{"id": 321, "acct": "blixa"},
]
})
mock_post.return_value = MockResponse()
console.run_command(app, user, 'follow', ['bLiXa@oThEr.aCc'])
mock_get.assert_called_once_with(app, user, '/api/v2/search', {'q': 'bLiXa@oThEr.aCc', 'type': 'accounts', 'resolve': True})
mock_post.assert_called_once_with(app, user, '/api/v1/accounts/123/follow')
out, err = capsys.readouterr()
assert "You are now following bLiXa@oThEr.aCc" in out
@mock.patch('toot.http.get')
def test_follow_not_found(mock_get, capsys):
mock_get.return_value = MockResponse({"accounts": []})
with pytest.raises(ConsoleError) as ex:
console.run_command(app, user, 'follow', ['blixa'])
mock_get.assert_called_once_with(app, user, '/api/v2/search', {'q': 'blixa', 'type': 'accounts', 'resolve': True})
assert "Account not found" == str(ex.value)
@mock.patch('toot.http.post')
@mock.patch('toot.http.get')
def test_unfollow(mock_get, mock_post, capsys):
mock_get.return_value = MockResponse({
"accounts": [
{'id': 123, 'acct': 'blixa@other.acc'},
{'id': 321, 'acct': 'blixa'},
]
})
mock_post.return_value = MockResponse()
console.run_command(app, user, 'unfollow', ['blixa'])
mock_get.assert_called_once_with(app, user, '/api/v2/search', {'q': 'blixa', 'type': 'accounts', 'resolve': True})
mock_post.assert_called_once_with(app, user, '/api/v1/accounts/321/unfollow')
out, err = capsys.readouterr()
assert "You are no longer following blixa" in out
@mock.patch('toot.http.get')
def test_unfollow_not_found(mock_get, capsys):
mock_get.return_value = MockResponse({"accounts": []})
with pytest.raises(ConsoleError) as ex:
console.run_command(app, user, 'unfollow', ['blixa'])
mock_get.assert_called_once_with(app, user, '/api/v2/search', {'q': 'blixa', 'type': 'accounts', 'resolve': True})
assert "Account not found" == str(ex.value)
@mock.patch('toot.http.get')
def test_whoami(mock_get, capsys):
mock_get.return_value = MockResponse({

Wyświetl plik

@ -6,6 +6,7 @@ from toot.wcstring import wc_wrap, trunc, pad, fit_text
from toot.tui.utils import ImageCache
from PIL import Image
from collections import namedtuple
from toot.utils import urlencode_url
def test_pad():
@ -309,3 +310,8 @@ def test_cache_miss_doesnt_eject():
assert len(cache) == 2
assert "one" in cache.keys()
assert "two" in cache.keys()
def test_urlencode_url():
assert urlencode_url("https://www.example.com") == "https://www.example.com"
assert urlencode_url("https://www.example.com/url%20with%20spaces") == "https://www.example.com/url%20with%20spaces"

Wyświetl plik

@ -0,0 +1,45 @@
from urwid import Divider, Filler, Pile
from toot.tui.richtext import url_to_widget
from urwidgets import Hyperlink, TextEmbed
from toot.tui.richtext.richtext import html_to_widgets
def test_url_to_widget():
url = "http://foo.bar"
embed_widget = url_to_widget(url)
assert isinstance(embed_widget, TextEmbed)
[(filler, length)] = embed_widget.embedded
assert length == len(url)
assert isinstance(filler, Filler)
link_widget = filler.base_widget
assert isinstance(link_widget, Hyperlink)
assert link_widget.attrib == "link"
assert link_widget.text == url
assert link_widget.uri == url
def test_html_to_widgets():
html = """
<p>foo</p>
<p>foo <b>bar</b> <i>baz</i></p>
""".strip()
[foo, divider, bar] = html_to_widgets(html)
assert isinstance(foo, Pile)
assert isinstance(divider, Divider)
assert isinstance(bar, Pile)
[(foo_embed, _)] = foo.contents
assert foo_embed.embedded == []
assert foo_embed.attrib == []
assert foo_embed.text == "foo"
[(bar_embed, _)] = bar.contents
assert bar_embed.embedded == []
assert bar_embed.attrib == [(None, 4), ("b", 3), (None, 1), ("i", 3)]
assert bar_embed.text == "foo bar baz"

Wyświetl plik

@ -4,7 +4,7 @@ import sys
from os.path import join, expanduser
from collections import namedtuple
__version__ = '0.37.0'
__version__ = '0.39.0'
App = namedtuple('App', ['instance', 'base_url', 'client_id', 'client_secret'])
User = namedtuple('User', ['instance', 'username', 'access_token'])

Wyświetl plik

@ -1,8 +1,9 @@
import mimetypes
from os import path
import re
import uuid
from os import path
from requests import Response
from typing import BinaryIO, List, Optional
from urllib.parse import urlparse, urlencode, quote
@ -30,21 +31,21 @@ def find_account(app, user, account_name):
normalized_name = username
response = search(app, user, account_name, type="accounts", resolve=True)
for account in response["accounts"]:
for account in response.json()["accounts"]:
if account["acct"].lower() == normalized_name:
return account
raise ConsoleError("Account not found")
def _account_action(app, user, account, action):
def _account_action(app, user, account, action) -> Response:
url = f"/api/v1/accounts/{account}/{action}"
return http.post(app, user, url).json()
return http.post(app, user, url)
def _status_action(app, user, status_id, action, data=None):
def _status_action(app, user, status_id, action, data=None) -> Response:
url = f"/api/v1/statuses/{status_id}/{action}"
return http.post(app, user, url, data=data).json()
return http.post(app, user, url, data=data)
def _tag_action(app, user, tag_name, action):
@ -200,7 +201,7 @@ def post_status(
poll_expires_in=None,
poll_multiple=None,
poll_hide_totals=None,
):
) -> Response:
"""
Publish a new status.
https://docs.joinmastodon.org/methods/statuses/#create
@ -232,7 +233,7 @@ def post_status(
"hide_totals": poll_hide_totals,
}
return http.post(app, user, '/api/v1/statuses', json=data, headers=headers).json()
return http.post(app, user, '/api/v1/statuses', json=data, headers=headers)
def fetch_status(app, user, id):
@ -240,7 +241,7 @@ def fetch_status(app, user, id):
Fetch a single status
https://docs.joinmastodon.org/methods/statuses/#get
"""
return http.get(app, user, f"/api/v1/statuses/{id}").json()
return http.get(app, user, f"/api/v1/statuses/{id}")
def scheduled_statuses(app, user):
@ -295,14 +296,14 @@ def translate(app, user, status_id):
return _status_action(app, user, status_id, 'translate')
def context(app, user, status_id):
def context(app, user, status_id) -> Response:
url = f"/api/v1/statuses/{status_id}/context"
return http.get(app, user, url).json()
return http.get(app, user, url)
def reblogged_by(app, user, status_id):
def reblogged_by(app, user, status_id) -> Response:
url = f"/api/v1/statuses/{status_id}/reblogged_by"
return http.get(app, user, url).json()
return http.get(app, user, url)
def _get_next_path(headers):
@ -451,11 +452,13 @@ def search(app, user, query, resolve=False, type=None):
Perform a search.
https://docs.joinmastodon.org/methods/search/#v2
"""
return http.get(app, user, "/api/v2/search", {
params = drop_empty_values({
"q": query,
"resolve": resolve,
"resolve": str_bool(resolve),
"type": type
}).json()
})
return http.get(app, user, "/api/v2/search", params)
def follow(app, user, account):
@ -521,6 +524,10 @@ def unmute(app, user, account):
return _account_action(app, user, account, 'unmute')
def muted(app, user):
return _get_response_list(app, user, "/api/v1/mutes")
def block(app, user, account):
return _account_action(app, user, account, 'block')
@ -529,13 +536,12 @@ def unblock(app, user, account):
return _account_action(app, user, account, 'unblock')
def verify_credentials(app, user):
return http.get(app, user, '/api/v1/accounts/verify_credentials').json()
def blocked(app, user):
return _get_response_list(app, user, "/api/v1/blocks")
def single_status(app, user, status_id):
url = f"/api/v1/statuses/{status_id}"
return http.get(app, user, url).json()
def verify_credentials(app, user) -> Response:
return http.get(app, user, '/api/v1/accounts/verify_credentials')
def get_notifications(app, user, exclude_types=[], limit=20):
@ -547,9 +553,9 @@ def clear_notifications(app, user):
http.post(app, user, '/api/v1/notifications/clear')
def get_instance(base_url):
def get_instance(base_url: str) -> Response:
url = f"{base_url}/api/v1/instance"
return http.anon_get(url).json()
return http.anon_get(url)
def get_lists(app, user):

Wyświetl plik

@ -40,7 +40,7 @@ def create_app_interactive(base_url):
def get_instance_domain(base_url):
print_out("Looking up instance info...")
instance = api.get_instance(base_url)
instance = api.get_instance(base_url).json()
print_out(
f"Found instance <blue>{instance['title']}</blue> "
@ -66,7 +66,7 @@ def get_instance_domain(base_url):
def create_user(app, access_token):
# Username is not yet known at this point, so fetch it from Mastodon
user = User(app.instance, None, access_token)
creds = api.verify_credentials(app, user)
creds = api.verify_credentials(app, user).json()
user = User(app.instance, creds['username'], access_token)
config.save_user(user, activate=True)

Wyświetl plik

@ -1,16 +1,18 @@
from itertools import chain
import json
import sys
import platform
from datetime import datetime, timedelta, timezone
from time import sleep, time
from toot import api, config, __version__
from toot.auth import login_interactive, login_browser_interactive, create_app_interactive
from toot.entities import Instance, Notification, Status, from_dict
from toot.entities import Account, Instance, Notification, Status, from_dict
from toot.exceptions import ApiError, ConsoleError
from toot.output import (print_lists, print_out, print_instance, print_account, print_acct_list,
print_search_results, print_status, print_timeline, print_notifications, print_tag_list,
print_list_accounts, print_user_list)
print_search_results, print_status, print_table, print_timeline, print_notifications,
print_tag_list, print_list_accounts, print_user_list)
from toot.utils import args_get_instance, delete_tmp_status_file, editor_input, multiline_input, EOF_KEY
from toot.utils.datetime import parse_datetime
@ -69,25 +71,25 @@ def timeline(app, user, args, generator=None):
def status(app, user, args):
status = api.single_status(app, user, args.status_id)
status = from_dict(Status, status)
print_status(status)
response = api.fetch_status(app, user, args.status_id)
if args.json:
print(response.text)
else:
status = from_dict(Status, response.json())
print_status(status)
def thread(app, user, args):
toot = api.single_status(app, user, args.status_id)
context = api.context(app, user, args.status_id)
thread = []
for item in context['ancestors']:
thread.append(item)
context_response = api.context(app, user, args.status_id)
thread.append(toot)
if args.json:
print(context_response.text)
else:
toot = api.fetch_status(app, user, args.status_id).json()
context = context_response.json()
for item in context['descendants']:
thread.append(item)
statuses = [from_dict(Status, s) for s in thread]
print_timeline(statuses)
statuses = chain(context["ancestors"], [toot], context["descendants"])
print_timeline(from_dict(Status, s) for s in statuses)
def post(app, user, args):
@ -120,12 +122,16 @@ def post(app, user, args):
poll_hide_totals=args.poll_hide_totals,
)
if "scheduled_at" in response:
scheduled_at = parse_datetime(response["scheduled_at"])
scheduled_at = datetime.strftime(scheduled_at, "%Y-%m-%d %H:%M:%S%z")
print_out(f"Toot scheduled for: <green>{scheduled_at}</green>")
if args.json:
print(response.text)
else:
print_out(f"Toot posted: <green>{response['url']}")
status = response.json()
if "scheduled_at" in status:
scheduled_at = parse_datetime(status["scheduled_at"])
scheduled_at = datetime.strftime(scheduled_at, "%Y-%m-%d %H:%M:%S%z")
print_out(f"Toot scheduled for: <green>{scheduled_at}</green>")
else:
print_out(f"Toot posted: <green>{status['url']}")
delete_tmp_status_file()
@ -207,48 +213,75 @@ def _wait_until_processed(app, user, media, start_time, timeout):
def delete(app, user, args):
api.delete_status(app, user, args.status_id)
print_out("<green>✓ Status deleted</green>")
response = api.delete_status(app, user, args.status_id)
if args.json:
print(response.text)
else:
print_out("<green>✓ Status deleted</green>")
def favourite(app, user, args):
api.favourite(app, user, args.status_id)
print_out("<green>✓ Status favourited</green>")
response = api.favourite(app, user, args.status_id)
if args.json:
print(response.text)
else:
print_out("<green>✓ Status favourited</green>")
def unfavourite(app, user, args):
api.unfavourite(app, user, args.status_id)
print_out("<green>✓ Status unfavourited</green>")
response = api.unfavourite(app, user, args.status_id)
if args.json:
print(response.text)
else:
print_out("<green>✓ Status unfavourited</green>")
def reblog(app, user, args):
api.reblog(app, user, args.status_id, visibility=args.visibility)
print_out("<green>✓ Status reblogged</green>")
response = api.reblog(app, user, args.status_id, visibility=args.visibility)
if args.json:
print(response.text)
else:
print_out("<green>✓ Status reblogged</green>")
def unreblog(app, user, args):
api.unreblog(app, user, args.status_id)
print_out("<green>✓ Status unreblogged</green>")
response = api.unreblog(app, user, args.status_id)
if args.json:
print(response.text)
else:
print_out("<green>✓ Status unreblogged</green>")
def pin(app, user, args):
api.pin(app, user, args.status_id)
print_out("<green>✓ Status pinned</green>")
response = api.pin(app, user, args.status_id)
if args.json:
print(response.text)
else:
print_out("<green>✓ Status pinned</green>")
def unpin(app, user, args):
api.unpin(app, user, args.status_id)
print_out("<green>✓ Status unpinned</green>")
response = api.unpin(app, user, args.status_id)
if args.json:
print(response.text)
else:
print_out("<green>✓ Status unpinned</green>")
def bookmark(app, user, args):
api.bookmark(app, user, args.status_id)
print_out("<green>✓ Status bookmarked</green>")
response = api.bookmark(app, user, args.status_id)
if args.json:
print(response.text)
else:
print_out("<green>✓ Status bookmarked</green>")
def unbookmark(app, user, args):
api.unbookmark(app, user, args.status_id)
print_out("<green>✓ Status unbookmarked</green>")
response = api.unbookmark(app, user, args.status_id)
if args.json:
print(response.text)
else:
print_out("<green>✓ Status unbookmarked</green>")
def bookmarks(app, user, args):
@ -256,8 +289,14 @@ def bookmarks(app, user, args):
def reblogged_by(app, user, args):
for account in api.reblogged_by(app, user, args.status_id):
print_out("{}\n @{}".format(account['display_name'], account['acct']))
response = api.reblogged_by(app, user, args.status_id)
if args.json:
print(response.text)
else:
headers = ["Account", "Display name"]
rows = [[a["acct"], a["display_name"]] for a in response.json()]
print_table(headers, rows)
def auth(app, user, args):
@ -301,7 +340,7 @@ def update_account(app, user, args):
if all(option is None for option in options):
raise ConsoleError("Please specify at least one option to update the account")
api.update_account(
response = api.update_account(
app,
user,
avatar=args.avatar,
@ -316,7 +355,10 @@ def update_account(app, user, args):
sensitive=args.sensitive,
)
print_out("<green>✓ Account updated</green>")
if args.json:
print(response.text)
else:
print_out("<green>✓ Account updated</green>")
def login_cli(app, user, args):
@ -367,7 +409,10 @@ def upload(app, user, args):
def search(app, user, args):
response = api.search(app, user, args.query, args.resolve)
print_search_results(response)
if args.json:
print(response.text)
else:
print_search_results(response.json())
def _do_upload(app, user, file, description, thumbnail):
@ -377,26 +422,40 @@ def _do_upload(app, user, file, description, thumbnail):
def follow(app, user, args):
account = api.find_account(app, user, args.account)
api.follow(app, user, account['id'])
print_out("<green>✓ You are now following {}</green>".format(args.account))
response = api.follow(app, user, account["id"])
if args.json:
print(response.text)
else:
print_out(f"<green>✓ You are now following {args.account}</green>")
def unfollow(app, user, args):
account = api.find_account(app, user, args.account)
api.unfollow(app, user, account['id'])
print_out("<green>✓ You are no longer following {}</green>".format(args.account))
response = api.unfollow(app, user, account["id"])
if args.json:
print(response.text)
else:
print_out(f"<green>✓ You are no longer following {args.account}</green>")
def following(app, user, args):
account = api.find_account(app, user, args.account)
response = api.following(app, user, account['id'])
print_acct_list(response)
account = args.account or user.username
account = api.find_account(app, user, account)
accounts = api.following(app, user, account["id"])
if args.json:
print(json.dumps(accounts))
else:
print_acct_list(accounts)
def followers(app, user, args):
account = api.find_account(app, user, args.account)
response = api.followers(app, user, account['id'])
print_acct_list(response)
account = args.account or user.username
account = api.find_account(app, user, account)
accounts = api.followers(app, user, account["id"])
if args.json:
print(json.dumps(accounts))
else:
print_acct_list(accounts)
def tags_follow(app, user, args):
@ -483,36 +542,81 @@ def _get_list_id(app, user, args):
def mute(app, user, args):
account = api.find_account(app, user, args.account)
api.mute(app, user, account['id'])
print_out("<green>✓ You have muted {}</green>".format(args.account))
response = api.mute(app, user, account['id'])
if args.json:
print(response.text)
else:
print_out("<green>✓ You have muted {}</green>".format(args.account))
def unmute(app, user, args):
account = api.find_account(app, user, args.account)
api.unmute(app, user, account['id'])
print_out("<green>✓ {} is no longer muted</green>".format(args.account))
response = api.unmute(app, user, account['id'])
if args.json:
print(response.text)
else:
print_out("<green>✓ {} is no longer muted</green>".format(args.account))
def muted(app, user, args):
response = api.muted(app, user)
if args.json:
print(json.dumps(response))
else:
if len(response) > 0:
print("Muted accounts:")
print_acct_list(response)
else:
print("No accounts muted")
def block(app, user, args):
account = api.find_account(app, user, args.account)
api.block(app, user, account['id'])
print_out("<green>✓ You are now blocking {}</green>".format(args.account))
response = api.block(app, user, account['id'])
if args.json:
print(response.text)
else:
print_out("<green>✓ You are now blocking {}</green>".format(args.account))
def unblock(app, user, args):
account = api.find_account(app, user, args.account)
api.unblock(app, user, account['id'])
print_out("<green>✓ {} is no longer blocked</green>".format(args.account))
response = api.unblock(app, user, account['id'])
if args.json:
print(response.text)
else:
print_out("<green>✓ {} is no longer blocked</green>".format(args.account))
def blocked(app, user, args):
response = api.blocked(app, user)
if args.json:
print(json.dumps(response))
else:
if len(response) > 0:
print("Blocked accounts:")
print_acct_list(response)
else:
print("No accounts blocked")
def whoami(app, user, args):
account = api.verify_credentials(app, user)
print_account(account)
response = api.verify_credentials(app, user)
if args.json:
print(response.text)
else:
account = from_dict(Account, response.json())
print_account(account)
def whois(app, user, args):
account = api.find_account(app, user, args.account)
print_account(account)
# Here it's not possible to avoid parsing json since it's needed to find the account.
if args.json:
print(json.dumps(account))
else:
account = from_dict(Account, account)
print_account(account)
def instance(app, user, args):
@ -523,15 +627,19 @@ def instance(app, user, args):
raise ConsoleError("Please specify an instance.")
try:
instance = api.get_instance(base_url)
instance = from_dict(Instance, instance)
print_instance(instance)
response = api.get_instance(base_url)
except ApiError:
raise ConsoleError(
f"Instance not found at {base_url}.\n"
"The given domain probably does not host a Mastodon instance."
)
if args.json:
print(response.text)
else:
instance = from_dict(Instance, response.json())
print_instance(instance)
def notifications(app, user, args):
if args.clear:

Wyświetl plik

@ -204,6 +204,7 @@ common_auth_args = [
account_arg = (["account"], {
"help": "account name, e.g. 'Gargron@mastodon.social'",
})
optional_account_arg = (["account"], {
"nargs": "?",
"help": "account name, e.g. 'Gargron@mastodon.social'",
@ -245,6 +246,12 @@ tag_arg = (["tag_name"], {
"help": "tag name, e.g. Caturday, or \"#Caturday\"",
})
json_arg = (["--json"], {
"action": "store_true",
"default": False,
"help": "print json instead of plaintext",
})
# Arguments for selecting a timeline (see `toot.commands.get_timeline_generator`)
common_timeline_args = [
(["-p", "--public"], {
@ -374,8 +381,9 @@ AUTH_COMMANDS = [
}),
(["--language"], {
"type": language,
"help": "Default language to use for authored statuses (ISO 6391)."
"help": "Default language to use for authored statuses (ISO 639-1)."
}),
json_arg,
],
require_auth=True,
),
@ -405,7 +413,7 @@ READ_COMMANDS = [
Command(
name="whoami",
description="Display logged in user details",
arguments=[],
arguments=[json_arg],
require_auth=True,
),
Command(
@ -415,6 +423,7 @@ READ_COMMANDS = [
(["account"], {
"help": "account name or numeric ID"
}),
json_arg,
],
require_auth=True,
),
@ -449,6 +458,7 @@ READ_COMMANDS = [
"nargs": "?",
}),
scheme_arg,
json_arg,
],
require_auth=False,
),
@ -464,6 +474,7 @@ READ_COMMANDS = [
"default": False,
"help": "Resolve non-local accounts",
}),
json_arg,
],
require_auth=True,
),
@ -474,6 +485,7 @@ READ_COMMANDS = [
(["status_id"], {
"help": "Show thread for toot.",
}),
json_arg,
],
require_auth=True,
),
@ -484,6 +496,7 @@ READ_COMMANDS = [
(["status_id"], {
"help": "ID of the status to show.",
}),
json_arg,
],
require_auth=True,
),
@ -544,7 +557,7 @@ POST_COMMANDS = [
}),
(["-l", "--language"], {
"type": language,
"help": "ISO 639-2 language code of the toot, to skip automatic detection",
"help": "ISO 639-1 language code of the toot, to skip automatic detection",
}),
(["-e", "--editor"], {
"type": editor,
@ -589,6 +602,7 @@ POST_COMMANDS = [
"default": False,
"help": "Hide vote counts until the poll ends. Defaults to false."
}),
json_arg,
],
require_auth=True,
),
@ -613,61 +627,61 @@ STATUS_COMMANDS = [
Command(
name="delete",
description="Delete a status",
arguments=[status_id_arg],
arguments=[status_id_arg, json_arg],
require_auth=True,
),
Command(
name="favourite",
description="Favourite a status",
arguments=[status_id_arg],
arguments=[status_id_arg, json_arg],
require_auth=True,
),
Command(
name="unfavourite",
description="Unfavourite a status",
arguments=[status_id_arg],
arguments=[status_id_arg, json_arg],
require_auth=True,
),
Command(
name="reblog",
description="Reblog a status",
arguments=[status_id_arg, visibility_arg],
arguments=[status_id_arg, visibility_arg, json_arg],
require_auth=True,
),
Command(
name="unreblog",
description="Unreblog a status",
arguments=[status_id_arg],
arguments=[status_id_arg, json_arg],
require_auth=True,
),
Command(
name="reblogged_by",
description="Show accounts that reblogged the status",
arguments=[status_id_arg],
arguments=[status_id_arg, json_arg],
require_auth=False,
),
Command(
name="pin",
description="Pin a status",
arguments=[status_id_arg],
arguments=[status_id_arg, json_arg],
require_auth=True,
),
Command(
name="unpin",
description="Unpin a status",
arguments=[status_id_arg],
arguments=[status_id_arg, json_arg],
require_auth=True,
),
Command(
name="bookmark",
description="Bookmark a status",
arguments=[status_id_arg],
arguments=[status_id_arg, json_arg],
require_auth=True,
),
Command(
name="unbookmark",
description="Unbookmark a status",
arguments=[status_id_arg],
arguments=[status_id_arg, json_arg],
require_auth=True,
),
]
@ -676,65 +690,63 @@ ACCOUNTS_COMMANDS = [
Command(
name="follow",
description="Follow an account",
arguments=[
account_arg,
],
arguments=[account_arg, json_arg],
require_auth=True,
),
Command(
name="unfollow",
description="Unfollow an account",
arguments=[
account_arg,
],
arguments=[account_arg, json_arg],
require_auth=True,
),
Command(
name="following",
description="List accounts followed by the given account",
arguments=[
account_arg,
],
description="List accounts followed by the given account, " +
"or your account if no account given",
arguments=[optional_account_arg, json_arg],
require_auth=True,
),
Command(
name="followers",
description="List accounts following the given account",
arguments=[
account_arg,
],
description="List accounts following the given account, " +
"or your account if no account given",
arguments=[optional_account_arg, json_arg],
require_auth=True,
),
Command(
name="mute",
description="Mute an account",
arguments=[
account_arg,
],
arguments=[account_arg, json_arg],
require_auth=True,
),
Command(
name="unmute",
description="Unmute an account",
arguments=[
account_arg,
],
arguments=[account_arg, json_arg],
require_auth=True,
),
Command(
name="muted",
description="List muted accounts",
arguments=[json_arg],
require_auth=True,
),
Command(
name="block",
description="Block an account",
arguments=[
account_arg,
],
arguments=[account_arg, json_arg],
require_auth=True,
),
Command(
name="unblock",
description="Unblock an account",
arguments=[
account_arg,
],
arguments=[account_arg, json_arg],
require_auth=True,
),
Command(
name="blocked",
description="List blocked accounts",
arguments=[json_arg],
require_auth=True,
),
]

Wyświetl plik

@ -1,16 +1,24 @@
"""
Dataclasses which represent entities returned by the Mastodon API.
Data classes my have an optional static method named `__toot_prepare__` which is
used when constructing the data class using `from_dict`. The method will be
called with the dict and may modify it and return a modified dict. This is used
to implement any pre-processing which may be required, e.g. to support
different versions of the Mastodon API.
"""
import dataclasses
from dataclasses import dataclass, is_dataclass
from datetime import date, datetime
from typing import Dict, List, Optional, Type, TypeVar, Union
from functools import lru_cache
from typing import Any, Dict, List, Optional, Tuple, Type, TypeVar, Union
from typing import get_type_hints
from toot.typing_compat import get_args, get_origin
from toot.utils import get_text
from toot.utils.datetime import parse_datetime
@dataclass
@ -65,6 +73,17 @@ class Account:
statuses_count: int
followers_count: int
following_count: int
source: Optional[dict]
@staticmethod
def __toot_prepare__(obj: Dict) -> Dict:
# Pleroma has not yet converted last_status_at from datetime to date
# so trim it here so it doesn't break when converting to date.
# See: https://git.pleroma.social/pleroma/pleroma/-/issues/1470
last_status_at = obj.get("last_status_at")
if last_status_at:
obj.update(last_status_at=obj["last_status_at"][:10])
return obj
@property
def note_plaintext(self) -> str:
@ -246,6 +265,17 @@ class Status:
def original(self) -> "Status":
return self.reblog or self
@staticmethod
def __toot_prepare__(obj: Dict) -> Dict:
# Pleroma has a bug where created_at is set to an empty string.
# To avoid marking created_at as optional, which would require work
# because we count on it always existing, set it to current datetime.
# Possible underlying issue:
# https://git.pleroma.social/pleroma/pleroma/-/issues/2851
if not obj["created_at"]:
obj["created_at"] = datetime.now().astimezone().isoformat()
return obj
@dataclass
class Report:
@ -356,23 +386,81 @@ class Instance:
rules: List[Rule]
@dataclass
class Relationship:
"""
Represents the relationship between accounts, such as following / blocking /
muting / etc.
https://docs.joinmastodon.org/entities/Relationship/
"""
id: str
following: bool
showing_reblogs: bool
notifying: bool
languages: List[str]
followed_by: bool
blocking: bool
blocked_by: bool
muting: bool
muting_notifications: bool
requested: bool
domain_blocking: bool
endorsed: bool
note: str
# Generic data class instance
T = TypeVar("T")
class ConversionError(Exception):
"""Raised when conversion fails from JSON value to data class field."""
def __init__(
self,
data_class: Type,
field_name: str,
field_type: Type,
field_value: Optional[str]
):
super().__init__(
f"Failed converting field `{data_class.__name__}.{field_name}` "
+ f"of type `{field_type.__name__}` from value {field_value!r}"
)
def from_dict(cls: Type[T], data: Dict) -> T:
"""Convert a nested dict into an instance of `cls`."""
# Apply __toot_prepare__ if it exists
prepare = getattr(cls, '__toot_prepare__', None)
if prepare:
data = prepare(data)
def _fields():
hints = get_type_hints(cls)
for field in dataclasses.fields(cls):
field_type = _prune_optional(hints[field.name])
default_value = _get_default_value(field)
value = data.get(field.name, default_value)
yield field.name, _convert(field_type, value)
for name, type, default in get_fields(cls):
value = data.get(name, default)
converted = _convert_with_error_handling(cls, name, type, value)
yield name, converted
return cls(**dict(_fields()))
@lru_cache(maxsize=100)
def get_fields(cls: Type) -> List[Tuple[str, Type, Any]]:
hints = get_type_hints(cls)
return [
(
field.name,
_prune_optional(hints[field.name]),
_get_default_value(field)
)
for field in dataclasses.fields(cls)
]
def from_dict_list(cls: Type[T], data: List[Dict]) -> List[T]:
return [from_dict(cls, x) for x in data]
def _get_default_value(field):
if field.default is not dataclasses.MISSING:
return field.default
@ -383,6 +471,20 @@ def _get_default_value(field):
return None
def _convert_with_error_handling(
data_class: Type,
field_name: str,
field_type: Type,
field_value: Optional[str]
):
try:
return _convert(field_type, field_value)
except ConversionError:
raise
except Exception:
raise ConversionError(data_class, field_name, field_type, field_value)
def _convert(field_type, value):
if value is None:
return None
@ -391,7 +493,7 @@ def _convert(field_type, value):
return value
if field_type == datetime:
return datetime.strptime(value, "%Y-%m-%dT%H:%M:%S.%f%z")
return parse_datetime(value)
if field_type == date:
return date.fromisoformat(value)
@ -406,7 +508,7 @@ def _convert(field_type, value):
raise ValueError(f"Not implemented for type '{field_type}'")
def _prune_optional(field_type):
def _prune_optional(field_type: Type) -> Type:
"""For `Optional[<type>]` returns the encapsulated `<type>`."""
if get_origin(field_type) == Union:
args = get_args(field_type)

Wyświetl plik

@ -3,7 +3,7 @@ from requests.exceptions import RequestException
from toot import __version__
from toot.exceptions import NotFoundError, ApiError
from toot.logging import log_request, log_response
from toot.logging import log_request, log_request_exception, log_response
def send_request(request, allow_redirects=True):
@ -19,6 +19,7 @@ def send_request(request, allow_redirects=True):
settings = session.merge_environment_settings(prepared.url, {}, None, None, None)
response = session.send(prepared, allow_redirects=allow_redirects, **settings)
except RequestException as ex:
log_request_exception(request, ex)
raise ApiError(f"Request failed: {str(ex)}")
log_response(response)

Wyświetl plik

@ -2,7 +2,7 @@ import json
import sys
from logging import getLogger
from requests import Request, Response
from requests import Request, RequestException, Response
from urllib.parse import urlencode
logger = getLogger("toot")
@ -56,6 +56,10 @@ def log_response(response: Response):
logger.debug(f" <-- {content}")
def log_request_exception(request: Request, ex: RequestException):
logger.debug(f" <-- {request.method} {_url(request)} Exception: {ex}")
def _url(request):
url = request.url
if request.params:

Wyświetl plik

@ -5,10 +5,10 @@ import textwrap
from functools import lru_cache
from toot import settings
from toot.entities import Instance, Notification, Poll, Status
from toot.utils import get_text, parse_html
from toot.utils import get_text, html_to_paragraphs
from toot.entities import Account, Instance, Notification, Poll, Status
from toot.wcstring import wc_wrap
from typing import List
from typing import Iterable, List
from wcwidth import wcswidth
@ -170,31 +170,33 @@ def print_instance(instance: Instance):
print_out(f"Contact: {contact.display_name} @{contact.acct}")
def print_account(account):
print_out(f"<green>@{account['acct']}</green> {account['display_name']}")
def print_account(account: Account):
print_out(f"<green>@{account.acct}</green> {account.display_name}")
if account["note"]:
if account.note:
print_out("")
print_html(account["note"])
print_html(account.note)
since = account.created_at.strftime('%Y-%m-%d')
print_out("")
print_out(f"ID: <green>{account['id']}</green>")
print_out(f"Since: <green>{account['created_at'][:10]}</green>")
print_out(f"ID: <green>{account.id}</green>")
print_out(f"Since: <green>{since}</green>")
print_out("")
print_out(f"Followers: <yellow>{account['followers_count']}</yellow>")
print_out(f"Following: <yellow>{account['following_count']}</yellow>")
print_out(f"Statuses: <yellow>{account['statuses_count']}</yellow>")
print_out(f"Followers: <yellow>{account.followers_count}</yellow>")
print_out(f"Following: <yellow>{account.following_count}</yellow>")
print_out(f"Statuses: <yellow>{account.statuses_count}</yellow>")
if account["fields"]:
for field in account["fields"]:
name = field["name"].title()
if account.fields:
for field in account.fields:
name = field.name.title()
print_out(f'\n<yellow>{name}</yellow>:')
print_html(field["value"])
if field["verified_at"]:
print_html(field.value)
if field.verified_at:
print_out("<green>✓ Verified</green>")
print_out("")
print_out(account["url"])
print_out(account.url)
HASHTAG_PATTERN = re.compile(r'(?<!\w)(#\w+)\b')
@ -321,7 +323,7 @@ def print_status(status: Status, width: int = 80):
def print_html(text, width=80):
first = True
for paragraph in parse_html(text):
for paragraph in html_to_paragraphs(text):
if not first:
print_out("")
for line in paragraph:
@ -356,7 +358,7 @@ def print_poll(poll: Poll):
print_out(poll_footer)
def print_timeline(items: List[Status], width=100):
def print_timeline(items: Iterable[Status], width=100):
print_out("" * width)
for item in items:
print_status(item, width)

Wyświetl plik

@ -1,16 +1,17 @@
import logging
import subprocess
import urwid
import requests
import warnings
from concurrent.futures import ThreadPoolExecutor
from toot import api, config, __version__
from toot import api, config, __version__, settings
from toot.console import get_default_visibility
from toot.exceptions import ApiError
from .compose import StatusComposer
from .constants import PALETTE, MONO_PALETTE
from .constants import PALETTE
from .entities import Status
from .images import TuiScreen
from .overlays import ExceptionStackTrace, GotoMenu, Help, StatusSource, StatusLinks, StatusZoom
@ -18,7 +19,6 @@ from .overlays import StatusDeleteConfirmation, Account
from .poll import Poll
from .timeline import Timeline
from .utils import get_max_toot_chars, parse_content_links, show_media, copy_to_clipboard, ImageCache
from PIL import Image
@ -84,19 +84,20 @@ class TUI(urwid.Frame):
loop: urwid.MainLoop
screen: urwid.BaseScreen
@classmethod
def create(cls, app, user, args):
@staticmethod
def create(app, user, args):
"""Factory method, sets up TUI and an event loop."""
screen = TuiScreen()
tui = cls(app, user, screen, args)
screen = TUI.create_screen(args)
tui = TUI(app, user, screen, args)
if args.no_color:
screen.set_terminal_properties(1)
screen.reset_default_terminal_palette()
palette = PALETTE.copy()
overrides = settings.get_setting("tui.palette", dict, {})
for name, styles in overrides.items():
palette.append(tuple([name] + styles))
loop = urwid.MainLoop(
tui,
palette=MONO_PALETTE if args.no_color else PALETTE,
palette=palette,
event_loop=urwid.AsyncioEventLoop(),
unhandled_input=tui.unhandled_input,
screen=screen,
@ -105,6 +106,18 @@ class TUI(urwid.Frame):
return tui
@staticmethod
def create_screen(args):
screen = urwid.raw_display.Screen()
# Determine how many colors to use
default_colors = 1 if args.no_color else 16
colors = settings.get_setting("tui.colors", int, default_colors)
logger.debug(f"Setting colors to {colors}")
screen.set_terminal_properties(colors)
return screen
def __init__(self, app, user, screen, args):
self.app = app
self.user = user
@ -130,6 +143,8 @@ class TUI(urwid.Frame):
self.exception = None
self.can_translate = False
self.account = None
self.followed_accounts = []
self.media_viewer = settings.get_setting("tui.media_viewer", str)
if self.args.cache_size:
self.cache_max = 1024 * 1024 * self.args.cache_size
@ -140,10 +155,9 @@ class TUI(urwid.Frame):
def run(self):
self.loop.set_alarm_in(0, lambda *args: self.async_load_instance())
self.loop.set_alarm_in(0, lambda *args: self.async_load_followed_accounts())
self.loop.set_alarm_in(0, lambda *args: self.async_load_followed_tags())
self.loop.set_alarm_in(0, lambda *args: self.async_load_timeline(
is_initial=True, timeline_name="home"))
self.loop.set_alarm_in(0, lambda *args: self.async_load_followed_accounts())
self.loop.run()
self.executor.shutdown(wait=False)
@ -247,7 +261,7 @@ class TUI(urwid.Frame):
# This is pretty fast, so it's probably ok to block while context is
# loaded, can be made async later if needed
context = api.context(self.app, self.user, status.original.id)
context = api.context(self.app, self.user, status.original.id).json()
ancestors = [self.make_status(s) for s in context["ancestors"]]
descendants = [self.make_status(s) for s in context["descendants"]]
statuses = ancestors + [status] + descendants
@ -302,7 +316,7 @@ class TUI(urwid.Frame):
See: https://github.com/mastodon/mastodon/issues/19328
"""
def _load_instance():
return api.get_instance(self.app.base_url)
return api.get_instance(self.app.base_url).json()
def _done(instance):
self.max_toot_chars = get_max_toot_chars(instance, DEFAULT_MAX_TOOT_CHARS)
@ -337,22 +351,6 @@ class TUI(urwid.Frame):
self.run_in_thread(_load_accounts, done_callback=_done_accounts)
def async_load_followed_tags(self):
def _load_tag_list():
try:
return api.followed_tags(self.app, self.user)
except ApiError:
# not supported by all Mastodon servers so fail silently if necessary
return []
def _done_tag_list(tags):
if len(tags) > 0:
self.followed_tags = [t["name"] for t in tags]
else:
self.followed_tags = []
self.run_in_thread(_load_tag_list, done_callback=_done_tag_list)
def refresh_footer(self, timeline):
"""Show status details in footer."""
status, index, count = timeline.get_focused_status_with_counts()
@ -512,8 +510,13 @@ class TUI(urwid.Frame):
def show_media(self, status):
urls = [m["url"] for m in status.original.data["media_attachments"]]
if urls:
show_media(urls)
if not urls:
return
if self.media_viewer:
subprocess.run([self.media_viewer] + urls)
else:
self.footer.set_error_message("Media viewer not configured")
def show_context_menu(self, status):
# TODO: show context menu
@ -536,10 +539,15 @@ class TUI(urwid.Frame):
))
def post_status(self, content, warning, visibility, in_reply_to_id):
data = api.post_status(self.app, self.user, content,
data = api.post_status(
self.app,
self.user,
content,
spoiler_text=warning,
visibility=visibility,
in_reply_to_id=in_reply_to_id)
in_reply_to_id=in_reply_to_id
).json()
status = self.make_status(data)
# TODO: fetch new items from the timeline?

Wyświetl plik

@ -1,8 +1,19 @@
# name, fg, bg, mono, fg_h, bg_h
# Color definitions are tuples of:
# - name
# - foreground (normal mode)
# - background (normal mode)
# - foreground (monochrome mode)
# - foreground (high color mode)
# - background (high color mode)
#
# See:
# http://urwid.org/tutorial/index.html#display-attributes
# http://urwid.org/manual/displayattributes.html#using-display-attributes
PALETTE = [
# Components
('button', 'white', 'black'),
('button_focused', 'light gray', 'dark magenta'),
('button_focused', 'light gray', 'dark magenta', 'bold,underline'),
('card_author', 'yellow', ''),
('card_title', 'dark green', ''),
('columns_divider', 'white', 'dark blue'),
@ -14,7 +25,7 @@ PALETTE = [
('footer_status', 'white', 'dark blue'),
('footer_status_bold', 'white, bold', 'dark blue'),
('header', 'white', 'dark blue'),
('header_bold', 'white,bold', 'dark blue'),
('header_bold', 'white,bold', 'dark blue', 'bold'),
('intro_bigtext', 'yellow', ''),
('intro_smalltext', 'light blue', ''),
('poll_bar', 'white', 'dark blue'),
@ -22,16 +33,17 @@ PALETTE = [
('status_detail_bookmarked', 'light red', ''),
('status_detail_timestamp', 'light blue', ''),
('status_list_account', 'dark green', ''),
('status_list_selected', 'white,bold', 'dark green'),
('status_list_selected', 'white,bold', 'dark green', 'bold,underline'),
('status_list_timestamp', 'light blue', ''),
# Functional
('hashtag', 'light cyan,bold', ''),
('hashtag_followed', 'yellow,bold', ''),
('link', ',italics', ''),
('link_focused', ',italics', 'dark magenta'),
('account', 'dark green', ''),
('hashtag', 'light cyan,bold', '', 'bold'),
('hashtag_followed', 'yellow,bold', '', 'bold'),
('link', ',italics', '', ',italics'),
('link_focused', ',italics', 'dark magenta', "underline,italics"),
('shortcut', 'light blue', ''),
('shortcut_highlight', 'white,bold', ''),
('shortcut_highlight', 'white,bold', '', 'bold'),
('warning', 'light red', ''),
# Visiblity
@ -45,55 +57,29 @@ PALETTE = [
('dim', 'dark gray', ''),
('highlight', 'yellow', ''),
('success', 'dark green', ''),
]
MONO_PALETTE = [
# Components
('button', 'white', 'black'),
('button_focused', 'black', 'white'),
('card_author', 'white', ''),
('card_title', 'white, bold', ''),
('columns_divider', 'white', 'black'),
('content_warning', 'white', 'black'),
('editbox', 'white', 'black'),
('editbox_focused', 'black', 'white'),
('footer_message', 'white', 'black'),
('footer_message_error', 'white,bold', 'black'),
('footer_status', 'black', 'white'),
('footer_status_bold', 'black,bold', 'white'),
('header', 'black', 'white'),
('header_bold', 'black,bold', 'white'),
('intro_bigtext', 'white', 'black'),
('intro_smalltext', 'white', 'black'),
('poll_bar', 'black', 'white'),
('status_detail_account', 'white', ''),
('status_detail_bookmarked', 'white', ''),
('status_detail_timestamp', 'white', ''),
('status_list_account', 'white', ''),
('status_list_selected', 'white,bold', ''),
('status_list_timestamp', 'white', ''),
('warning', 'white,bold', 'black'),
# HTML tag styling
('a', ',italics', '', 'italics'),
# em tag is mapped to i
('i', ',italics', '', 'italics'),
# strong tag is mapped to b
('b', ',bold', '', 'bold'),
# special case for bold + italic nested tags
('bi', ',bold,italics', '', ',bold,italics'),
('u', ',underline', '', ',underline'),
('del', ',strikethrough', '', ',strikethrough'),
('code', 'light gray, standout', '', ',standout'),
('pre', 'light gray, standout', '', ',standout'),
('blockquote', 'light gray', '', ''),
('h1', ',bold', '', ',bold'),
('h2', ',bold', '', ',bold'),
('h3', ',bold', '', ',bold'),
('h4', ',bold', '', ',bold'),
('h5', ',bold', '', ',bold'),
('h6', ',bold', '', ',bold'),
('class_mention_hashtag', 'light cyan', '', ''),
('class_hashtag', 'light cyan', '', ''),
# Functional
('account', 'dark green', ''),
('hashtag_followed', 'white,bold', ''),
('hashtag', 'white,bold', ''),
('link', ',italics', ''),
('link_focused', ',bold,italics', ''),
('shortcut', 'white', ''),
('shortcut_highlight', 'white,bold', ''),
# Visiblity
('visibility_public', 'white', ''),
('visibility_unlisted', 'white', ''),
('visibility_private', 'white', ''),
('visibility_direct', 'white', ''),
# Styles
('bold', ',bold', ''),
('dim', 'light gray', ''),
('highlight', ',bold', ''),
('success', '', ''),
]
VISIBILITY_OPTIONS = [

Wyświetl plik

@ -5,9 +5,12 @@ import urwid
import webbrowser
from toot import __version__
from toot import api
from toot.utils import format_content
from .utils import highlight_hashtags, highlight_keys, add_corners
from .widgets import Button, EditBox, SelectableText, EmojiText
from toot.tui.utils import highlight_hashtags, highlight_keys, add_corners
from toot.tui.widgets import Button, EditBox, SelectableText, EmojiText
from toot.tui.richtext import html_to_widgets
from toot import api
from PIL import Image
from term_image.image import AutoImage
@ -323,9 +326,14 @@ class Account(urwid.ListBox):
if account["note"]:
yield urwid.Divider()
for line in format_content(account["note"]):
yield urwid.Text(highlight_hashtags(line, followed_tags=set()))
yield urwid.Divider()
widgetlist = html_to_widgets(account["note"])
for line in widgetlist:
yield (line)
yield urwid.Divider()
yield urwid.Text(["ID: ", ("highlight", f"{account['id']}")])
yield urwid.Text(["Since: ", ("highlight", f"{account['created_at'][:10]}")])
yield urwid.Divider()
if account["bot"]:
yield urwid.Text([("highlight", "Bot \N{robot face}")])
@ -352,8 +360,11 @@ class Account(urwid.ListBox):
name = field["name"].title()
yield urwid.Divider()
yield urwid.Text([("bold", f"{name.rstrip(':')}"), ":"])
for line in format_content(field["value"]):
yield urwid.Text(highlight_hashtags(line, followed_tags=set()))
widgetlist = html_to_widgets(field["value"])
for line in widgetlist:
yield (line)
if field["verified_at"]:
yield urwid.Text(("success", "✓ Verified"))
@ -365,17 +376,17 @@ def take_action(button: Button, self: Account):
action = button.get_label()
if action == "Confirm Follow":
self.relationship = api.follow(self.app, self.user, self.account["id"])
self.relationship = api.follow(self.app, self.user, self.account["id"]).json()
elif action == "Confirm Unfollow":
self.relationship = api.unfollow(self.app, self.user, self.account["id"])
self.relationship = api.unfollow(self.app, self.user, self.account["id"]).json()
elif action == "Confirm Mute":
self.relationship = api.mute(self.app, self.user, self.account["id"])
self.relationship = api.mute(self.app, self.user, self.account["id"]).json()
elif action == "Confirm Unmute":
self.relationship = api.unmute(self.app, self.user, self.account["id"])
self.relationship = api.unmute(self.app, self.user, self.account["id"]).json()
elif action == "Confirm Block":
self.relationship = api.block(self.app, self.user, self.account["id"])
self.relationship = api.block(self.app, self.user, self.account["id"]).json()
elif action == "Confirm Unblock":
self.relationship = api.unblock(self.app, self.user, self.account["id"])
self.relationship = api.unblock(self.app, self.user, self.account["id"]).json()
self.last_action = None
self.setup_listbox()

Wyświetl plik

@ -2,11 +2,9 @@ import urwid
from toot import api
from toot.exceptions import ApiError
from toot.utils import format_content
from toot.utils.datetime import parse_datetime
from .utils import highlight_hashtags
from .widgets import Button, CheckBox, RadioButton
from .richtext import html_to_widgets
class Poll(urwid.ListBox):
@ -87,8 +85,11 @@ class Poll(urwid.ListBox):
def generate_contents(self, status):
yield urwid.Divider()
for line in format_content(status.data["content"]):
yield urwid.Text(highlight_hashtags(line, set()))
widgetlist = html_to_widgets(status.data["content"])
for line in widgetlist:
yield (line)
yield urwid.Divider()
yield self.build_linebox(self.generate_poll_detail())

Wyświetl plik

@ -0,0 +1,18 @@
import urwid
from toot.tui.utils import highlight_hashtags
from toot.utils import format_content
from typing import List
try:
from .richtext import html_to_widgets, url_to_widget
except ImportError:
# Fallback if urwidgets are not available
def html_to_widgets(html: str) -> List[urwid.Widget]:
return [
urwid.Text(highlight_hashtags(line))
for line in format_content(html)
]
def url_to_widget(url: str):
return urwid.Text(("link", url))

Wyświetl plik

@ -0,0 +1,452 @@
import re
import urwid
import unicodedata
from bs4.element import NavigableString, Tag
from toot.tui.constants import PALETTE
from toot.utils import parse_html, urlencode_url
from typing import List, Tuple
from urwid.util import decompose_tagmarkup
from urwidgets import Hyperlink, TextEmbed
STYLE_NAMES = [p[0] for p in PALETTE]
# NOTE: update this list if Mastodon starts supporting more block tags
BLOCK_TAGS = ["p", "pre", "li", "blockquote", "h1", "h2", "h3", "h4", "h5", "h6"]
def html_to_widgets(html, recovery_attempt=False) -> List[urwid.Widget]:
"""Convert html to urwid widgets"""
widgets: List[urwid.Widget] = []
html = unicodedata.normalize("NFKC", html)
soup = parse_html(html)
first_tag = True
for e in soup.body or soup:
if isinstance(e, NavigableString):
if first_tag and not recovery_attempt:
# if our first "tag" is a navigable string
# the HTML is out of spec, doesn't start with a tag,
# we see this in content from Pixelfed servers.
# attempt a fix by wrapping the HTML with <p></p>
return html_to_widgets(f"<p>{html}</p>", recovery_attempt=True)
else:
continue
else:
name = e.name
# if our HTML starts with a tag, but not a block tag
# the HTML is out of spec. Attempt a fix by wrapping the
# HTML with <p></p>
if (first_tag and not recovery_attempt and name not in BLOCK_TAGS):
return html_to_widgets(f"<p>{html}</p>", recovery_attempt=True)
markup = render(name, e)
first_tag = False
if not isinstance(markup, urwid.Widget):
# plaintext, so create a padded text widget
txt = text_to_widget("", markup)
markup = urwid.Padding(
txt,
align="left",
width=("relative", 100),
min_width=None,
)
widgets.append(markup)
# separate top level widgets with a blank line
widgets.append(urwid.Divider(" "))
return widgets[:-1] # but suppress the last blank line
def url_to_widget(url: str):
widget = len(url), urwid.Filler(Hyperlink(url, "link", url))
return TextEmbed(widget)
def inline_tag_to_text(tag) -> Tuple:
"""Convert html tag to plain text with tag as attributes recursively"""
markups = process_inline_tag_children(tag)
if not markups:
return (tag.name, "")
return (tag.name, markups)
def process_inline_tag_children(tag) -> List:
"""Recursively retrieve all children
and convert to a list of markup text"""
markups = []
for child in tag.children:
if isinstance(child, Tag):
markup = render(child.name, child)
markups.append(markup)
else:
markups.append(child)
return markups
URL_PATTERN = re.compile(r"(^.+)\x03(.+$)")
def text_to_widget(attr, markup) -> urwid.Widget:
markup_list = []
for run in markup:
if isinstance(run, tuple):
txt, attr_list = decompose_tagmarkup(run)
# find anchor titles with an ETX separator followed by href
match = URL_PATTERN.match(txt)
if match:
label, url = match.groups()
anchor_attr = get_best_anchor_attr(attr_list)
markup_list.append((
len(label),
urwid.Filler(Hyperlink(url, anchor_attr, label)),
))
else:
markup_list.append(run)
else:
markup_list.append(run)
return TextEmbed(markup_list)
def process_block_tag_children(tag) -> List[urwid.Widget]:
"""Recursively retrieve all children
and convert to a list of widgets
any inline tags containing text will be
converted to Text widgets"""
pre_widget_markups = []
post_widget_markups = []
child_widgets = []
found_nested_widget = False
for child in tag.children:
if isinstance(child, Tag):
# child is a nested tag; process using custom method
# or default to inline_tag_to_text
result = render(child.name, child)
if isinstance(result, urwid.Widget):
found_nested_widget = True
child_widgets.append(result)
else:
if not found_nested_widget:
pre_widget_markups.append(result)
else:
post_widget_markups.append(result)
else:
# child is text; append to the appropriate markup list
if not found_nested_widget:
pre_widget_markups.append(child)
else:
post_widget_markups.append(child)
widget_list = []
if len(pre_widget_markups):
widget_list.append(text_to_widget(tag.name, pre_widget_markups))
if len(child_widgets):
widget_list += child_widgets
if len(post_widget_markups):
widget_list.append(text_to_widget(tag.name, post_widget_markups))
return widget_list
def get_urwid_attr_name(tag) -> str:
"""Get the class name and translate to a
name suitable for use as an urwid
text attribute name"""
if "class" in tag.attrs:
clss = tag.attrs["class"]
if len(clss) > 0:
style_name = "class_" + "_".join(clss)
# return the class name, only if we
# find it as a defined palette name
if style_name in STYLE_NAMES:
return style_name
# fallback to returning the tag name
return tag.name
def basic_block_tag_handler(tag) -> urwid.Widget:
"""default for block tags that need no special treatment"""
return urwid.Pile(process_block_tag_children(tag))
def get_best_anchor_attr(attrib_list) -> str:
if not attrib_list:
return ""
flat_al = list(flatten(attrib_list))
for a in flat_al[0]:
# ref: https://docs.joinmastodon.org/spec/activitypub/
# these are the class names (translated to attrib names)
# that we can support for display
try:
if a[0] in ["class_hashtag", "class_mention_hashtag", "class_mention"]:
return a[0]
except KeyError:
continue
return "a"
def render(attr: str, content: str):
if attr in ["a"]:
return render_anchor(content)
if attr in ["blockquote"]:
return render_blockquote(content)
if attr in ["br"]:
return render_br(content)
if attr in ["em"]:
return render_em(content)
if attr in ["ol"]:
return render_ol(content)
if attr in ["pre"]:
return render_pre(content)
if attr in ["span"]:
return render_span(content)
if attr in ["b", "strong"]:
return render_strong(content)
if attr in ["ul"]:
return render_ul(content)
# Glitch-soc and Pleroma allow <H1>...<H6> in content
# Mastodon (PR #23913) does not; header tags are converted to <P><STRONG></STRONG></P>
if attr in ["p", "div", "li", "h1", "h2", "h3", "h4", "h5", "h6"]:
return basic_block_tag_handler(content)
# Fall back to inline_tag_to_text handler
return inline_tag_to_text(content)
def render_anchor(tag) -> Tuple:
"""anchor tag handler"""
markups = process_inline_tag_children(tag)
if not markups:
return (tag.name, "")
href = tag.attrs["href"]
title, attrib_list = decompose_tagmarkup(markups)
if not attrib_list:
attrib_list = [tag]
if href:
# urlencode the path and query portions of the URL
href = urlencode_url(href)
# use ASCII ETX (end of record) as a
# delimiter between the title and the HREF
title += f"\x03{href}"
attr = get_best_anchor_attr(attrib_list)
if attr == "a":
# didn't find an attribute to use
# in the child markup, so let's
# try the anchor tag's own attributes
attr = get_urwid_attr_name(tag)
# hashtag anchors have a class of "mention hashtag"
# or "hashtag"
# we'll return style "class_mention_hashtag"
# or "class_hashtag"
# in that case; see corresponding palette entry
# in constants.py controlling hashtag highlighting
return (attr, title)
def render_blockquote(tag) -> urwid.Widget:
widget_list = process_block_tag_children(tag)
blockquote_widget = urwid.LineBox(
urwid.Padding(
urwid.Pile(widget_list),
align="left",
width=("relative", 100),
min_width=None,
left=1,
right=1,
),
tlcorner="",
tline="",
lline="",
trcorner="",
blcorner="",
rline="",
bline="",
brcorner="",
)
return urwid.Pile([urwid.AttrMap(blockquote_widget, "blockquote")])
def render_br(tag) -> Tuple:
return ("br", "\n")
def render_em(tag) -> Tuple:
# to simplify the number of palette entries
# translate EM to I (italic)
markups = process_inline_tag_children(tag)
if not markups:
return ("i", "")
# special case processing for bold and italic
for parent in tag.parents:
if parent.name == "b" or parent.name == "strong":
return ("bi", markups)
return ("i", markups)
def render_ol(tag) -> urwid.Widget:
"""ordered list tag handler"""
widgets = []
list_item_num = 1
increment = -1 if tag.has_attr("reversed") else 1
# get ol start= attribute if present
if tag.has_attr("start") and len(tag.attrs["start"]) > 0:
try:
list_item_num = int(tag.attrs["start"])
except ValueError:
pass
for li in tag.find_all("li", recursive=False):
markup = render("li", li)
# li value= attribute will change the item number
# it also overrides any ol start= attribute
if li.has_attr("value") and len(li.attrs["value"]) > 0:
try:
list_item_num = int(li.attrs["value"])
except ValueError:
pass
if not isinstance(markup, urwid.Widget):
txt = text_to_widget("li", [str(list_item_num), ". ", markup])
# 1. foo, 2. bar, etc.
widgets.append(txt)
else:
txt = text_to_widget("li", [str(list_item_num), ". "])
columns = urwid.Columns(
[txt, ("weight", 9999, markup)], dividechars=1, min_width=3
)
widgets.append(columns)
list_item_num += increment
return urwid.Pile(widgets)
def render_pre(tag) -> urwid.Widget:
# <PRE> tag spec says that text should not wrap,
# but horizontal screen space is at a premium
# and we have no horizontal scroll bar, so allow
# wrapping.
widget_list = [urwid.Divider(" ")]
widget_list += process_block_tag_children(tag)
pre_widget = urwid.Padding(
urwid.Pile(widget_list),
align="left",
width=("relative", 100),
min_width=None,
left=1,
right=1,
)
return urwid.Pile([urwid.AttrMap(pre_widget, "pre")])
def render_span(tag) -> Tuple:
markups = process_inline_tag_children(tag)
if not markups:
return (tag.name, "")
# span inherits its parent's class definition
# unless it has a specific class definition
# of its own
if "class" in tag.attrs:
# uncomment the following code to hide all HTML marked
# invisible (generally, the http:// prefix of URLs)
# could be a user preference, it's only advisable if
# the terminal supports OCS 8 hyperlinks (and that's not
# automatically detectable)
# if "invisible" in tag.attrs["class"]:
# return (tag.name, "")
style_name = get_urwid_attr_name(tag)
if style_name != "span":
# unique class name matches an entry in our palette
return (style_name, markups)
if tag.parent:
return (get_urwid_attr_name(tag.parent), markups)
else:
# fallback
return ("span", markups)
def render_strong(tag) -> Tuple:
# to simplify the number of palette entries
# translate STRONG to B (bold)
markups = process_inline_tag_children(tag)
if not markups:
return ("b", "")
# special case processing for bold and italic
for parent in tag.parents:
if parent.name == "i" or parent.name == "em":
return ("bi", markups)
return ("b", markups)
def render_ul(tag) -> urwid.Widget:
"""unordered list tag handler"""
widgets = []
for li in tag.find_all("li", recursive=False):
markup = render("li", li)
if not isinstance(markup, urwid.Widget):
txt = text_to_widget("li", ["\N{bullet} ", markup])
# * foo, * bar, etc.
widgets.append(txt)
else:
txt = text_to_widget("li", ["\N{bullet} "])
columns = urwid.Columns(
[txt, ("weight", 9999, markup)], dividechars=1, min_width=3
)
widgets.append(columns)
return urwid.Pile(widgets)
def flatten(data):
if isinstance(data, tuple):
for x in data:
yield from flatten(x)
else:
yield data

Wyświetl plik

@ -7,16 +7,17 @@ import webbrowser
from typing import List, Optional
from toot.tui import app
from toot.tui.utils import can_render_pixels, add_corners
from toot.utils import format_content
from toot.tui.utils import can_render_pixels, add_corners
from toot.tui.richtext import html_to_widgets, url_to_widget
from toot.utils.datetime import parse_datetime, time_ago
from toot.utils.language import language_name
from .entities import Status
from .scroll import Scrollable, ScrollBar
from .utils import highlight_hashtags, highlight_keys
from .widgets import SelectableText, SelectableColumns, EmojiText
from toot.entities import Status
from toot.tui.scroll import Scrollable, ScrollBar
from toot.tui.utils import highlight_keys
from toot.tui.widgets import SelectableText, SelectableColumns, EmojiText
from term_image.image import AutoImage
from term_image.widget import UrwidImage
@ -88,7 +89,7 @@ class Timeline(urwid.Columns):
return urwid.ListBox(walker)
def build_list_item(self, status):
item = StatusListItem(status)
item = StatusListItem(status, self.tui.args.relative_datetimes)
urwid.connect_signal(item, "click", lambda *args:
self.tui.show_context_menu(status))
return urwid.AttrMap(item, None, focus_map={
@ -104,6 +105,7 @@ class Timeline(urwid.Columns):
return None
poll = status.original.data.get("poll")
show_media = status.original.data["media_attachments"] and self.tui.media_viewer
options = [
"[A]ccount" if not status.is_mine else "",
@ -114,6 +116,8 @@ class Timeline(urwid.Columns):
"[V]iew",
"[T]hread" if not self.is_thread else "",
"L[i]nks",
"[M]edia" if show_media else "",
self.tui.media_viewer,
"[R]eply",
"[P]oll" if poll and not poll["expired"] else "",
"So[u]rce",
@ -347,6 +351,7 @@ class StatusDetails(urwid.Pile):
self.timeline = timeline
if self.status:
self.status.placeholders = []
self.followed_accounts = timeline.tui.followed_accounts
reblogged_by = status.author if status and status.reblog else None
widget_list = list(self.content_generator(status.original, reblogged_by)
@ -442,6 +447,11 @@ class StatusDetails(urwid.Pile):
yield self.author_header(reblogged_by)
if status.author.display_name:
yield ("pack", urwid.Text(("bold", status.author.display_name)))
account_color = "highlight" if status.author.account in self.followed_accounts else "account"
yield ("pack", urwid.Text((account_color, status.author.account)))
yield ("pack", urwid.Divider())
if status.data["spoiler_text"]:
@ -453,8 +463,10 @@ class StatusDetails(urwid.Pile):
yield ("pack", urwid.Text(("content_warning", "Marked as sensitive. Press S to view.")))
else:
content = status.original.translation if status.original.show_translation else status.data["content"]
for line in format_content(content):
yield ("pack", urwid.Text(highlight_hashtags(line, self.timeline.tui.followed_tags)))
widgetlist = html_to_widgets(content)
for line in widgetlist:
yield (line)
media = status.data["media_attachments"]
if media:
@ -481,7 +493,7 @@ class StatusDetails(urwid.Pile):
aspect = None
yield self.image_widget(m["preview_url"], aspect=aspect)
yield urwid.Divider()
yield ("pack", urwid.Text(("link", m["url"])))
yield ("pack", url_to_widget(m["url"]))
poll = status.original.data.get("poll")
if poll:
@ -541,8 +553,7 @@ class StatusDetails(urwid.Pile):
if card["description"]:
yield urwid.Text(card["description"].strip())
yield urwid.Text("")
yield urwid.Text(("link", card["url"]))
yield url_to_widget(card["url"])
if card["image"]:
if card["image"].lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.svg', '.webp')):
@ -579,14 +590,14 @@ class StatusDetails(urwid.Pile):
class StatusListItem(SelectableColumns):
def __init__(self, status):
def __init__(self, status, relative_datetimes):
edited_at = status.data.get("edited_at")
# TODO: hacky implementation to avoid creating conflicts for existing
# pull reuqests, refactor when merged.
created_at = (
time_ago(status.created_at).ljust(3, " ")
if "--relative-datetimes" in sys.argv
if relative_datetimes
else status.created_at.strftime("%Y-%m-%d %H:%M")
)

Wyświetl plik

@ -40,48 +40,19 @@ def highlight_keys(text, high_attr, low_attr=""):
return list(_gen())
def highlight_hashtags(line, followed_tags, attr="hashtag", followed_attr="hashtag_followed"):
def highlight_hashtags(line):
hline = []
for p in re.split(HASHTAG_PATTERN, line):
if p.startswith("#"):
if p[1:].lower() in (t.lower() for t in followed_tags):
hline.append((followed_attr, p))
else:
hline.append((attr, p))
hline.append(("hashtag", p))
else:
hline.append(p)
return hline
def show_media(paths):
"""
Attempt to open an image viewer to show given media files.
FIXME: This is not very thought out, but works for me.
Once settings are implemented, add an option for the user to configure their
prefered media viewer.
"""
viewer = None
potential_viewers = [
"feh",
"eog",
"display"
]
for v in potential_viewers:
viewer = shutil.which(v)
if viewer:
break
if not viewer:
raise Exception("Cannot find an image viewer")
subprocess.run([viewer] + paths)
class LinkParser(HTMLParser):
def reset(self):
super().reset()
self.links = []

Wyświetl plik

@ -10,6 +10,7 @@ from bs4 import BeautifulSoup
from typing import Dict
from toot.exceptions import ConsoleError
from urllib.parse import urlparse, urlencode, quote, unquote
def str_bool(b):
@ -22,20 +23,22 @@ def str_bool_nullable(b):
return None if b is None else str_bool(b)
def get_text(html):
"""Converts html to text, strips all tags."""
def parse_html(html: str) -> BeautifulSoup:
# Ignore warnings made by BeautifulSoup, if passed something that looks like
# a file (e.g. a dot which matches current dict), it will warn that the file
# should be opened instead of passing a filename.
with warnings.catch_warnings():
warnings.simplefilter("ignore")
text = BeautifulSoup(html.replace('&apos;', "'"), "html.parser").get_text()
return unicodedata.normalize('NFKC', text)
return BeautifulSoup(html.replace("&apos;", "'"), "html.parser")
def parse_html(html):
def get_text(html):
"""Converts html to text, strips all tags."""
text = parse_html(html).get_text()
return unicodedata.normalize("NFKC", text)
def html_to_paragraphs(html):
"""Attempt to convert html to plain text while keeping line breaks.
Returns a list of paragraphs, each being a list of lines.
"""
@ -54,7 +57,7 @@ def format_content(content):
Returns a generator yielding lines of content.
"""
paragraphs = parse_html(content)
paragraphs = html_to_paragraphs(content)
first = True
@ -186,3 +189,14 @@ def _warn_scheme_deprecated():
"instead write:",
" toot instance http://unsafehost.com\n"
]))
def urlencode_url(url):
parsed_url = urlparse(url)
# unencode before encoding, to prevent double-urlencoding
encoded_path = quote(unquote(parsed_url.path), safe="-._~()'!*:@,;+&=/")
encoded_query = urlencode({k: quote(unquote(v), safe="-._~()'!*:@,;?/") for k, v in parsed_url.params})
encoded_url = parsed_url._replace(path=encoded_path, params=encoded_query).geturl()
return encoded_url

Wyświetl plik

@ -3,11 +3,12 @@ Utilities for dealing with string containing wide characters.
"""
import re
from typing import Generator, List
from wcwidth import wcwidth, wcswidth
def _wc_hard_wrap(line, length):
def _wc_hard_wrap(line: str, length: int) -> Generator[str, None, None]:
"""
Wrap text to length characters, breaking when target length is reached,
taking into account character width.
@ -20,7 +21,7 @@ def _wc_hard_wrap(line, length):
char_len = wcwidth(char)
if chars_len + char_len > length:
yield "".join(chars)
chars = []
chars: List[str] = []
chars_len = 0
chars.append(char)
@ -30,7 +31,7 @@ def _wc_hard_wrap(line, length):
yield "".join(chars)
def wc_wrap(text, length):
def wc_wrap(text: str, length: int) -> Generator[str, None, None]:
"""
Wrap text to given length, breaking on whitespace and taking into account
character width.
@ -38,7 +39,7 @@ def wc_wrap(text, length):
Meant for use on a single line or paragraph. Will destroy spacing between
words and paragraphs and any indentation.
"""
line_words = []
line_words: List[str] = []
line_len = 0
words = re.split(r"\s+", text.strip())
@ -66,7 +67,7 @@ def wc_wrap(text, length):
yield from _wc_hard_wrap(line, length)
def trunc(text, length):
def trunc(text: str, length: int) -> str:
"""
Truncates text to given length, taking into account wide characters.
@ -98,7 +99,7 @@ def trunc(text, length):
return text[:-n].strip() + ''
def pad(text, length):
def pad(text: str, length: int) -> str:
"""Pads text to given length, taking into account wide characters."""
text_length = wcswidth(text)
@ -108,7 +109,7 @@ def pad(text, length):
return text
def fit_text(text, length):
def fit_text(text: str, length: int) -> str:
"""Makes text fit the given length by padding or truncating it."""
text_length = wcswidth(text)