Setup click, migrate read commands

pull/444/head
Ivan Habunek 2023-11-26 18:00:57 +01:00
rodzic 1c5abb8419
commit 9ecfa79db8
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: F5F0623FF5EBCB3D
9 zmienionych plików z 376 dodań i 82 usunięć

Wyświetl plik

@ -34,6 +34,7 @@ setup(
packages=['toot', 'toot.tui', 'toot.tui.richtext', 'toot.utils'],
python_requires=">=3.7",
install_requires=[
"click~=8.1",
"requests>=2.13,<3.0",
"beautifulsoup4>=4.5.0,<5.0",
"wcwidth>=0.1.7",
@ -62,7 +63,7 @@ setup(
},
entry_points={
'console_scripts': [
'toot=toot.console:main',
'toot=toot.cli:cli',
],
}
)

Wyświetl plik

@ -20,8 +20,10 @@ import psycopg2
import pytest
import uuid
from click.testing import CliRunner, Result
from pathlib import Path
from toot import api, App, User
from toot.cli import Context
from toot.console import run_command
from toot.exceptions import ApiError, ConsoleError
from toot.output import print_out
@ -105,19 +107,21 @@ def friend_id(app, user, friend):
return api.find_account(app, user, friend.username)["id"]
@pytest.fixture
def run(app, user, capsys):
def _run(command, *params, as_user=None):
# The try/catch duplicates logic from console.main to convert exceptions
# to printed error messages. TODO: could be deduped
try:
run_command(app, as_user or user, command, params or [])
except (ConsoleError, ApiError) as e:
print_out(str(e))
@pytest.fixture(scope="session", autouse=True)
def testing_env():
os.environ["TOOT_TESTING"] = "true"
out, err = capsys.readouterr()
assert err == ""
return strip_ansi(out)
@pytest.fixture(scope="session")
def runner():
return CliRunner(mix_stderr=False)
@pytest.fixture
def run(app, user, runner):
def _run(command, *params, as_user=None) -> Result:
ctx = Context(app, as_user or user)
return runner.invoke(command, params, obj=ctx)
return _run
@ -130,12 +134,10 @@ def run_json(run):
@pytest.fixture
def run_anon(capsys):
def _run(command, *params):
run_command(None, None, command, params or [])
out, err = capsys.readouterr()
assert err == ""
return strip_ansi(out)
def run_anon(runner):
def _run(command, *params) -> Result:
ctx = Context(None, None)
return runner.invoke(command, params, obj=ctx)
return _run

Wyświetl plik

@ -1,45 +1,58 @@
import json
from pprint import pprint
import pytest
import re
from toot import api
from toot.entities import Account, from_dict_list
from toot.exceptions import ConsoleError
from toot import api, cli
from toot.entities import Account, Status, from_dict, from_dict_list
from uuid import uuid4
def test_instance(app, run):
out = run("instance", "--disable-https")
assert "Mastodon" in out
assert app.instance in out
assert "running Mastodon" in out
result = run(cli.instance)
assert result.exit_code == 0
assert "Mastodon" in result.stdout
assert app.instance in result.stdout
assert "running Mastodon" in result.stdout
def test_instance_json(app, run):
out = run("instance", "--json")
data = json.loads(out)
result = run(cli.instance, "--json")
assert result.exit_code == 0
data = json.loads(result.stdout)
assert data["title"] is not None
assert data["description"] is not None
assert data["version"] is not None
def test_instance_anon(app, run_anon, base_url):
out = run_anon("instance", base_url)
assert "Mastodon" in out
assert app.instance in out
assert "running Mastodon" in out
result = run_anon(cli.instance, base_url)
assert result.exit_code == 0
assert "Mastodon" in result.stdout
assert app.instance in result.stdout
assert "running Mastodon" in result.stdout
# Need to specify the instance name when running anon
with pytest.raises(ConsoleError) as exc:
run_anon("instance")
assert str(exc.value) == "Please specify an instance."
result = run_anon(cli.instance)
assert result.exit_code == 1
assert result.stderr == "Error: Please specify an instance.\n"
def test_whoami(user, run):
out = run("whoami")
# TODO: test other fields once updating account is supported
assert f"@{user.username}" in out
result = run(cli.whoami)
assert result.exit_code == 0
assert f"@{user.username}" in result.stdout
def test_whoami_json(user, run):
result = run(cli.whoami, "--json")
assert result.exit_code == 0
data = json.loads(result.stdout)
account = from_dict(Account, data)
assert account.username == user.username
assert account.acct == user.username
def test_whois(app, friend, run):
@ -51,18 +64,33 @@ def test_whois(app, friend, run):
]
for username in variants:
out = run("whois", username)
assert f"@{friend.username}" in out
result = run(cli.whois, username)
assert result.exit_code == 0
assert f"@{friend.username}" in result.stdout
def test_whois_json(app, friend, run):
result = run(cli.whois, friend.username, "--json")
assert result.exit_code == 0
data = json.loads(result.stdout)
account = from_dict(Account, data)
assert account.username == friend.username
assert account.acct == friend.username
def test_search_account(friend, run):
out = run("search", friend.username)
assert out == f"Accounts:\n* @{friend.username}"
result = run(cli.search, friend.username)
assert result.exit_code == 0
assert result.stdout.strip() == f"Accounts:\n* @{friend.username}"
def test_search_account_json(friend, run_json):
out = run_json("search", friend.username, "--json")
[account] = from_dict_list(Account, out["accounts"])
def test_search_account_json(friend, run):
result = run(cli.search, friend.username, "--json")
assert result.exit_code == 0
data = json.loads(result.stdout)
[account] = from_dict_list(Account, data["accounts"])
assert account.acct == friend.username
@ -71,17 +99,21 @@ def test_search_hashtag(app, user, run):
api.post_status(app, user, "#hashtag_y")
api.post_status(app, user, "#hashtag_z")
out = run("search", "#hashtag")
assert out == "Hashtags:\n#hashtag_x, #hashtag_y, #hashtag_z"
result = run(cli.search, "#hashtag")
assert result.exit_code == 0
assert result.stdout.strip() == "Hashtags:\n#hashtag_x, #hashtag_y, #hashtag_z"
def test_search_hashtag_json(app, user, run_json):
def test_search_hashtag_json(app, user, run):
api.post_status(app, user, "#hashtag_x")
api.post_status(app, user, "#hashtag_y")
api.post_status(app, user, "#hashtag_z")
out = run_json("search", "#hashtag", "--json")
[h1, h2, h3] = sorted(out["hashtags"], key=lambda h: h["name"])
result = run(cli.search, "#hashtag", "--json")
assert result.exit_code == 0
data = json.loads(result.stdout)
[h1, h2, h3] = sorted(data["hashtags"], key=lambda h: h["name"])
assert h1["name"] == "hashtag_x"
assert h2["name"] == "hashtag_y"
@ -89,50 +121,78 @@ def test_search_hashtag_json(app, user, run_json):
def test_tags(run, base_url):
out = run("tags_followed")
assert out == "You're not following any hashtags."
result = run(cli.tags_followed)
assert result.exit_code == 0
assert result.stdout.strip() == "You're not following any hashtags."
out = run("tags_follow", "foo")
assert out == "✓ You are now following #foo"
result = run(cli.tags_follow, "foo")
assert result.exit_code == 0
assert result.stdout.strip() == "✓ You are now following #foo"
out = run("tags_followed")
assert out == f"* #foo\t{base_url}/tags/foo"
result = run(cli.tags_followed)
assert result.exit_code == 0
assert result.stdout.strip() == f"* #foo\t{base_url}/tags/foo"
out = run("tags_follow", "bar")
assert out == "✓ You are now following #bar"
result = run(cli.tags_follow, "bar")
assert result.exit_code == 0
assert result.stdout.strip() == "✓ You are now following #bar"
out = run("tags_followed")
assert out == "\n".join([
result = run(cli.tags_followed)
assert result.exit_code == 0
assert result.stdout.strip() == "\n".join([
f"* #bar\t{base_url}/tags/bar",
f"* #foo\t{base_url}/tags/foo",
])
out = run("tags_unfollow", "foo")
assert out == "✓ You are no longer following #foo"
result = run(cli.tags_unfollow, "foo")
assert result.exit_code == 0
assert result.stdout.strip() == "✓ You are no longer following #foo"
out = run("tags_followed")
assert out == f"* #bar\t{base_url}/tags/bar"
result = run(cli.tags_followed)
assert result.exit_code == 0
assert result.stdout.strip() == f"* #bar\t{base_url}/tags/bar"
def test_status(app, user, run):
uuid = str(uuid4())
response = api.post_status(app, user, uuid).json()
status_id = api.post_status(app, user, uuid).json()["id"]
out = run("status", response["id"])
result = run(cli.status, status_id)
assert result.exit_code == 0
out = result.stdout.strip()
assert uuid in out
assert user.username in out
assert response["id"] in out
assert status_id in out
def test_status_json(app, user, run):
uuid = str(uuid4())
status_id = api.post_status(app, user, uuid).json()["id"]
result = run(cli.status, status_id, "--json")
assert result.exit_code == 0
status = from_dict(Status, json.loads(result.stdout))
assert status.id == status_id
assert status.account.acct == user.username
assert uuid in status.content
def test_thread(app, user, run):
uuid = str(uuid4())
s1 = api.post_status(app, user, uuid + "1").json()
s2 = api.post_status(app, user, uuid + "2", in_reply_to_id=s1["id"]).json()
s3 = api.post_status(app, user, uuid + "3", in_reply_to_id=s2["id"]).json()
uuid1 = str(uuid4())
uuid2 = str(uuid4())
uuid3 = str(uuid4())
s1 = api.post_status(app, user, uuid1).json()
s2 = api.post_status(app, user, uuid2, in_reply_to_id=s1["id"]).json()
s3 = api.post_status(app, user, uuid3, in_reply_to_id=s2["id"]).json()
for status in [s1, s2, s3]:
out = run("thread", status["id"])
bits = re.split(r"─+", out)
result = run(cli.thread, status["id"])
assert result.exit_code == 0
bits = re.split(r"─+", result.stdout.strip())
bits = [b for b in bits if b]
assert len(bits) == 3
@ -141,6 +201,6 @@ def test_thread(app, user, run):
assert s2["id"] in bits[1]
assert s3["id"] in bits[2]
assert f"{uuid}1" in bits[0]
assert f"{uuid}2" in bits[1]
assert f"{uuid}3" in bits[2]
assert uuid1 in bits[0]
assert uuid2 in bits[1]
assert uuid3 in bits[2]

Wyświetl plik

@ -1,3 +1,15 @@
from .console import main
import sys
from toot.cli import cli
from toot.exceptions import ConsoleError
from toot.output import print_err
from toot.settings import load_settings
main()
try:
defaults = load_settings().get("commands", {})
cli(default_map=defaults)
except ConsoleError as ex:
print_err(str(ex))
sys.exit(1)
except KeyboardInterrupt:
print_err("Aborted")
sys.exit(1)

Wyświetl plik

@ -0,0 +1,4 @@
from toot.cli.base import cli, Context # noqa
from toot.cli.read import *
from toot.cli.tags import *

67
toot/cli/base.py 100644
Wyświetl plik

@ -0,0 +1,67 @@
import logging
import sys
import click
from functools import wraps
from toot import App, User, config
from typing import Callable, Concatenate, NamedTuple, Optional, ParamSpec, TypeVar
# Tweak the Click context
# https://click.palletsprojects.com/en/8.1.x/api/#context
CONTEXT = dict(
# Enable using environment variables to set options
auto_envvar_prefix="TOOT",
# Add shorthand -h for invoking help
help_option_names=["-h", "--help"],
# Give help some more room (default is 80)
max_content_width=100,
# Always show default values for options
show_default=True,
)
# Data object to add to Click context
class Context(NamedTuple):
app: Optional[App] = None
user: Optional[User] = None
color: bool = False
debug: bool = False
quiet: bool = False
P = ParamSpec("P")
R = TypeVar("R")
T = TypeVar("T")
def pass_context(f: Callable[Concatenate[Context, P], R]) -> Callable[P, R]:
"""Pass `obj` from click context as first argument."""
@wraps(f)
def wrapped(*args: P.args, **kwargs: P.kwargs) -> R:
ctx = click.get_current_context()
return f(ctx.obj, *args, **kwargs)
return wrapped
json_option = click.option(
"--json",
is_flag=True,
default=False,
help="Print data as JSON rather than human readable text"
)
@click.group(context_settings=CONTEXT)
@click.option("--debug/--no-debug", default=False, help="Log debug info to stderr")
@click.option("--color/--no-color", default=sys.stdout.isatty(), help="Use ANSI color in output")
@click.option("--quiet/--no-quiet", default=False, help="Don't print anything to stdout")
@click.pass_context
def cli(ctx, color, debug, quiet, app=None, user=None):
"""Toot is a Mastodon CLI"""
user, app = config.get_active_user_app()
ctx.obj = Context(app, user, color, debug, quiet)
if debug:
logging.basicConfig(level=logging.DEBUG)

112
toot/cli/read.py 100644
Wyświetl plik

@ -0,0 +1,112 @@
import click
import json as pyjson
from itertools import chain
from typing import Optional
from toot import api
from toot.entities import Instance, Status, from_dict, Account
from toot.exceptions import ApiError, ConsoleError
from toot.output import print_account, print_instance, print_search_results, print_status, print_tag_list, print_timeline
from toot.cli.base import cli, json_option, pass_context, Context
@cli.command()
@json_option
@pass_context
def whoami(ctx: Context, json: bool):
"""Display logged in user details"""
response = api.verify_credentials(ctx.app, ctx.user)
if json:
click.echo(response.text)
else:
account = from_dict(Account, response.json())
print_account(account)
@cli.command()
@click.argument("account")
@json_option
@pass_context
def whois(ctx: Context, account: str, json: bool):
"""Display account details"""
account_dict = api.find_account(ctx.app, ctx.user, account)
# Here it's not possible to avoid parsing json since it's needed to find the account.
if json:
click.echo(pyjson.dumps(account_dict))
else:
account_obj = from_dict(Account, account_dict)
print_account(account_obj)
@cli.command()
@click.argument("instance_url", required=False)
@json_option
@pass_context
def instance(ctx: Context, instance_url: Optional[str], json: bool):
"""Display instance details"""
default_url = ctx.app.base_url if ctx.app else None
base_url = instance_url or default_url
if not base_url:
raise ConsoleError("Please specify an instance.")
try:
response = api.get_instance(base_url)
except ApiError:
raise ConsoleError(
f"Instance not found at {base_url}.\n" +
"The given domain probably does not host a Mastodon instance."
)
if json:
print(response.text)
else:
instance = from_dict(Instance, response.json())
print_instance(instance)
@cli.command()
@click.argument("query")
@click.option("-r", "--resolve", is_flag=True, help="Resolve non-local accounts")
@json_option
@pass_context
def search(ctx: Context, query: str, resolve: bool, json: bool):
response = api.search(ctx.app, ctx.user, query, resolve)
if json:
print(response.text)
else:
print_search_results(response.json())
@cli.command()
@click.argument("status_id")
@json_option
@pass_context
def status(ctx: Context, status_id: str, json: bool):
"""Show a single status"""
response = api.fetch_status(ctx.app, ctx.user, status_id)
if json:
print(response.text)
else:
status = from_dict(Status, response.json())
print_status(status)
@cli.command()
@click.argument("status_id")
@json_option
@pass_context
def thread(ctx: Context, status_id: str, json: bool):
"""Show thread for a toot."""
context_response = api.context(ctx.app, ctx.user, status_id)
if json:
print(context_response.text)
else:
toot = api.fetch_status(ctx.app, ctx.user, status_id).json()
context = context_response.json()
statuses = chain(context["ancestors"], [toot], context["descendants"])
print_timeline(from_dict(Status, s) for s in statuses)

33
toot/cli/tags.py 100644
Wyświetl plik

@ -0,0 +1,33 @@
import click
from toot import api
from toot.cli.base import cli, pass_context, Context
from toot.output import print_tag_list
@cli.command(name="tags_followed")
@pass_context
def tags_followed(ctx: Context):
"""List hashtags you follow"""
response = api.followed_tags(ctx.app, ctx.user)
print_tag_list(response)
@cli.command(name="tags_follow")
@click.argument("tag")
@pass_context
def tags_follow(ctx: Context, tag: str):
"""Follow a hashtag"""
tag = tag.lstrip("#")
api.follow_tag(ctx.app, ctx.user, tag)
click.secho(f"✓ You are now following #{tag}", fg="green")
@cli.command(name="tags_unfollow")
@click.argument("tag")
@pass_context
def tags_unfollow(ctx: Context, tag: str):
"""Unfollow a hashtag"""
tag = tag.lstrip("#")
api.unfollow_tag(ctx.app, ctx.user, tag)
click.secho(f"✓ You are no longer following #{tag}", fg="green")

Wyświetl plik

@ -1,4 +1,7 @@
class ApiError(Exception):
from click import ClickException
class ApiError(ClickException):
"""Raised when an API request fails for whatever reason."""
@ -10,5 +13,5 @@ class AuthenticationError(ApiError):
"""Raised when login fails."""
class ConsoleError(Exception):
class ConsoleError(ClickException):
"""Raised when an error occurs which needs to be show to the user."""