From 3d7f7cfce37353790d1bf27702bc64a7d9a9254c Mon Sep 17 00:00:00 2001 From: Ivan Habunek Date: Thu, 30 Mar 2023 10:56:03 +0200 Subject: [PATCH] Break up integration tests --- tests/integration/__init__.py | 0 tests/integration/conftest.py | 122 +++++ tests/integration/test_accounts.py | 19 + tests/integration/test_auth.py | 125 +++++ tests/integration/test_post.py | 288 +++++++++++ tests/integration/test_read.py | 83 ++++ tests/integration/test_status.py | 89 ++++ tests/test_integration.py | 758 ----------------------------- 8 files changed, 726 insertions(+), 758 deletions(-) create mode 100644 tests/integration/__init__.py create mode 100644 tests/integration/conftest.py create mode 100644 tests/integration/test_accounts.py create mode 100644 tests/integration/test_auth.py create mode 100644 tests/integration/test_post.py create mode 100644 tests/integration/test_read.py create mode 100644 tests/integration/test_status.py delete mode 100644 tests/test_integration.py diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py new file mode 100644 index 0000000..b4aaa1e --- /dev/null +++ b/tests/integration/conftest.py @@ -0,0 +1,122 @@ +""" +This module contains integration tests meant to run against a test Mastodon instance. + +You can set up a test instance locally by following this guide: +https://docs.joinmastodon.org/dev/setup/ + +To enable integration tests, export the following environment variables to match +your test server and database: + +``` +export TOOT_TEST_HOSTNAME="localhost:3000" +export TOOT_TEST_DATABASE_DSN="dbname=mastodon_development" +``` +""" + +import re +import os +import psycopg2 +import pytest +import uuid + +from pathlib import Path +from toot import api, App, User +from toot.console import run_command + +# Host name of a test instance to run integration tests against +# DO NOT USE PUBLIC INSTANCES!!! +BASE_URL = os.getenv("TOOT_TEST_BASE_URL") + +# Mastodon database name, used to confirm user registration without having to click the link +DATABASE_DSN = os.getenv("TOOT_TEST_DATABASE_DSN") + +# Toot logo used for testing image upload +TRUMPET = str(Path(__file__).parent.parent.parent / "trumpet.png") + +ASSETS_DIR = str(Path(__file__).parent.parent / "assets") + + +if not BASE_URL or not DATABASE_DSN: + pytest.skip("Skipping integration tests", allow_module_level=True) + +# ------------------------------------------------------------------------------ +# Fixtures +# ------------------------------------------------------------------------------ + + +def create_app(): + instance = api.get_instance(BASE_URL) + response = api.create_app(BASE_URL) + return App(instance["uri"], BASE_URL, response["client_id"], response["client_secret"]) + + +def register_account(app: App): + username = str(uuid.uuid4())[-10:] + email = f"{username}@example.com" + + response = api.register_account(app, username, email, "password", "en") + confirm_user(email) + return User(app.instance, username, response["access_token"]) + + +def confirm_user(email): + conn = psycopg2.connect(DATABASE_DSN) + cursor = conn.cursor() + cursor.execute("UPDATE users SET confirmed_at = now() WHERE email = %s;", (email,)) + conn.commit() + + +@pytest.fixture(scope="session") +def app(): + return create_app() + + +@pytest.fixture(scope="session") +def user(app): + return register_account(app) + + +@pytest.fixture(scope="session") +def friend(app): + return register_account(app) + + +@pytest.fixture +def run(app, user, capsys): + def _run(command, *params, as_user=None): + run_command(app, as_user or user, command, params or []) + out, err = capsys.readouterr() + assert err == "" + return strip_ansi(out) + return _run + + +@pytest.fixture +def run_anon(capsys): + def _run(command, *params): + run_command(None, None, command, params or []) + out, err = capsys.readouterr() + assert err == "" + return strip_ansi(out) + return _run + + +# ------------------------------------------------------------------------------ +# Utils +# ------------------------------------------------------------------------------ + +strip_ansi_pattern = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") + + +def strip_ansi(string): + return strip_ansi_pattern.sub("", string).strip() + + +def posted_status_id(out): + pattern = re.compile(r"Toot posted: http://([^/]+)/([^/]+)/(.+)") + match = re.search(pattern, out) + assert match + + _, _, status_id = match.groups() + + return status_id diff --git a/tests/integration/test_accounts.py b/tests/integration/test_accounts.py new file mode 100644 index 0000000..bf01f1b --- /dev/null +++ b/tests/integration/test_accounts.py @@ -0,0 +1,19 @@ + + +def test_whoami(user, run): + out = run("whoami") + # TODO: test other fields once updating account is supported + assert f"@{user.username}" in out + + +def test_whois(app, friend, run): + variants = [ + friend.username, + f"@{friend.username}", + f"{friend.username}@{app.instance}", + f"@{friend.username}@{app.instance}", + ] + + for username in variants: + out = run("whois", username) + assert f"@{friend.username}" in out diff --git a/tests/integration/test_auth.py b/tests/integration/test_auth.py new file mode 100644 index 0000000..afe5c39 --- /dev/null +++ b/tests/integration/test_auth.py @@ -0,0 +1,125 @@ +import pytest + +from tests.integration.conftest import TRUMPET +from toot import api +from toot.exceptions import ConsoleError +from toot.utils import get_text + + +def test_update_account_no_options(run): + with pytest.raises(ConsoleError) as exc: + run("update_account") + assert str(exc.value) == "Please specify at least one option to update the account" + + +def test_update_account_display_name(run, app, user): + out = run("update_account", "--display-name", "elwood") + assert out == "✓ Account updated" + + account = api.verify_credentials(app, user) + assert account["display_name"] == "elwood" + + +def test_update_account_note(run, app, user): + note = ("It's 106 miles to Chicago, we got a full tank of gas, half a pack " + "of cigarettes, it's dark... and we're wearing sunglasses.") + + out = run("update_account", "--note", note) + assert out == "✓ Account updated" + + account = api.verify_credentials(app, user) + assert get_text(account["note"]) == note + + +def test_update_account_language(run, app, user): + out = run("update_account", "--language", "hr") + assert out == "✓ Account updated" + + account = api.verify_credentials(app, user) + assert account["source"]["language"] == "hr" + + +def test_update_account_privacy(run, app, user): + out = run("update_account", "--privacy", "private") + assert out == "✓ Account updated" + + account = api.verify_credentials(app, user) + assert account["source"]["privacy"] == "private" + + +def test_update_account_avatar(run, app, user): + account = api.verify_credentials(app, user) + old_value = account["avatar"] + + out = run("update_account", "--avatar", TRUMPET) + assert out == "✓ Account updated" + + account = api.verify_credentials(app, user) + assert account["avatar"] != old_value + + +def test_update_account_header(run, app, user): + account = api.verify_credentials(app, user) + old_value = account["header"] + + out = run("update_account", "--header", TRUMPET) + assert out == "✓ Account updated" + + account = api.verify_credentials(app, user) + assert account["header"] != old_value + + +def test_update_account_locked(run, app, user): + out = run("update_account", "--locked") + assert out == "✓ Account updated" + + account = api.verify_credentials(app, user) + assert account["locked"] is True + + out = run("update_account", "--no-locked") + assert out == "✓ Account updated" + + account = api.verify_credentials(app, user) + assert account["locked"] is False + + +def test_update_account_bot(run, app, user): + out = run("update_account", "--bot") + assert out == "✓ Account updated" + + account = api.verify_credentials(app, user) + assert account["bot"] is True + + out = run("update_account", "--no-bot") + assert out == "✓ Account updated" + + account = api.verify_credentials(app, user) + assert account["bot"] is False + + +def test_update_account_discoverable(run, app, user): + out = run("update_account", "--discoverable") + assert out == "✓ Account updated" + + account = api.verify_credentials(app, user) + assert account["discoverable"] is True + + out = run("update_account", "--no-discoverable") + assert out == "✓ Account updated" + + account = api.verify_credentials(app, user) + assert account["discoverable"] is False + + +def test_update_account_sensitive(run, app, user): + out = run("update_account", "--sensitive") + assert out == "✓ Account updated" + + account = api.verify_credentials(app, user) + assert account["source"]["sensitive"] is True + + out = run("update_account", "--no-sensitive") + assert out == "✓ Account updated" + + account = api.verify_credentials(app, user) + assert account["source"]["sensitive"] is False diff --git a/tests/integration/test_post.py b/tests/integration/test_post.py new file mode 100644 index 0000000..7ad7eb9 --- /dev/null +++ b/tests/integration/test_post.py @@ -0,0 +1,288 @@ +import re +import uuid + +from datetime import datetime, timedelta, timezone +from os import path +from tests.integration.conftest import ASSETS_DIR, posted_status_id +from toot import CLIENT_NAME, CLIENT_WEBSITE, api +from toot.utils import get_text +from unittest import mock + + +def test_post(app, user, run): + text = "i wish i was a #lumberjack" + out = run("post", text) + status_id = posted_status_id(out) + + status = api.fetch_status(app, user, status_id) + assert text == get_text(status["content"]) + assert status["visibility"] == "public" + assert status["sensitive"] is False + assert status["spoiler_text"] == "" + assert status["poll"] is None + + # Pleroma doesn't return the application + if status["application"]: + assert status["application"]["name"] == CLIENT_NAME + assert status["application"]["website"] == CLIENT_WEBSITE + + +def test_post_visibility(app, user, run): + for visibility in ["public", "unlisted", "private", "direct"]: + out = run("post", "foo", "--visibility", visibility) + status_id = posted_status_id(out) + status = api.fetch_status(app, user, status_id) + assert status["visibility"] == visibility + + +def test_post_scheduled_at(app, user, run): + text = str(uuid.uuid4()) + scheduled_at = datetime.now(timezone.utc).replace(microsecond=0) + timedelta(minutes=10) + + out = run("post", text, "--scheduled-at", scheduled_at.isoformat()) + assert "Toot scheduled for" in out + + statuses = api.scheduled_statuses(app, user) + [status] = [s for s in statuses if s["params"]["text"] == text] + assert datetime.strptime(status["scheduled_at"], "%Y-%m-%dT%H:%M:%S.%f%z") == scheduled_at + + +def test_post_scheduled_in(app, user, run): + text = str(uuid.uuid4()) + + variants = [ + ("1 day", timedelta(days=1)), + ("1 day 6 hours", timedelta(days=1, hours=6)), + ("1 day 6 hours 13 minutes", timedelta(days=1, hours=6, minutes=13)), + ("1 day 6 hours 13 minutes 51 second", timedelta(days=1, hours=6, minutes=13, seconds=51)), + ("2d", timedelta(days=2)), + ("2d6h", timedelta(days=2, hours=6)), + ("2d6h13m", timedelta(days=2, hours=6, minutes=13)), + ("2d6h13m51s", timedelta(days=2, hours=6, minutes=13, seconds=51)), + ] + + datetimes = [] + for scheduled_in, delta in variants: + out = run("post", text, "--scheduled-in", scheduled_in) + dttm = datetime.utcnow() + delta + assert out.startswith(f"Toot scheduled for: {str(dttm)[:16]}") + datetimes.append(dttm) + + scheduled = api.scheduled_statuses(app, user) + scheduled = [s for s in scheduled if s["params"]["text"] == text] + scheduled = sorted(scheduled, key=lambda s: s["scheduled_at"]) + assert len(scheduled) == 8 + + for expected, status in zip(datetimes, scheduled): + actual = datetime.strptime(status["scheduled_at"], "%Y-%m-%dT%H:%M:%S.%fZ") + delta = expected - actual + assert delta.total_seconds() < 5 + + +def test_post_poll(app, user, run): + text = str(uuid.uuid4()) + + out = run( + "post", text, + "--poll-option", "foo", + "--poll-option", "bar", + "--poll-option", "baz", + "--poll-option", "qux", + ) + + status_id = posted_status_id(out) + + status = api.fetch_status(app, user, status_id) + assert status["poll"]["expired"] is False + assert status["poll"]["multiple"] is False + assert status["poll"]["options"] == [ + {"title": "foo", "votes_count": 0}, + {"title": "bar", "votes_count": 0}, + {"title": "baz", "votes_count": 0}, + {"title": "qux", "votes_count": 0} + ] + + # Test expires_at is 24h by default + actual = datetime.strptime(status["poll"]["expires_at"], "%Y-%m-%dT%H:%M:%S.%f%z") + expected = datetime.now(timezone.utc) + timedelta(days=1) + delta = actual - expected + assert delta.total_seconds() < 5 + + +def test_post_poll_multiple(app, user, run): + text = str(uuid.uuid4()) + + out = run( + "post", text, + "--poll-option", "foo", + "--poll-option", "bar", + "--poll-multiple" + ) + + status_id = posted_status_id(out) + + status = api.fetch_status(app, user, status_id) + assert status["poll"]["multiple"] is True + + +def test_post_poll_expires_in(app, user, run): + text = str(uuid.uuid4()) + + out = run( + "post", text, + "--poll-option", "foo", + "--poll-option", "bar", + "--poll-expires-in", "8h", + ) + + status_id = posted_status_id(out) + + status = api.fetch_status(app, user, status_id) + actual = datetime.strptime(status["poll"]["expires_at"], "%Y-%m-%dT%H:%M:%S.%f%z") + expected = datetime.now(timezone.utc) + timedelta(hours=8) + delta = actual - expected + assert delta.total_seconds() < 5 + + +def test_post_poll_hide_totals(app, user, run): + text = str(uuid.uuid4()) + + out = run( + "post", text, + "--poll-option", "foo", + "--poll-option", "bar", + "--poll-hide-totals" + ) + + status_id = posted_status_id(out) + + status = api.fetch_status(app, user, status_id) + + # votes_count is None when totals are hidden + assert status["poll"]["options"] == [ + {"title": "foo", "votes_count": None}, + {"title": "bar", "votes_count": None}, + ] + + +def test_post_language(app, user, run): + out = run("post", "test", "--language", "hr") + status_id = posted_status_id(out) + status = api.fetch_status(app, user, status_id) + assert status["language"] == "hr" + + out = run("post", "test", "--language", "zh") + status_id = posted_status_id(out) + status = api.fetch_status(app, user, status_id) + assert status["language"] == "zh" + + +def test_media_thumbnail(app, user, run): + video_path = path.join(ASSETS_DIR, "small.webm") + thumbnail_path = path.join(ASSETS_DIR, "test1.png") + + out = run( + "post", + "--media", video_path, + "--thumbnail", thumbnail_path, + "--description", "foo", + "some text" + ) + + status_id = posted_status_id(out) + status = api.fetch_status(app, user, status_id) + [media] = status["media_attachments"] + + assert media["description"] == "foo" + assert media["type"] == "video" + assert media["url"].endswith(".mp4") + assert media["preview_url"].endswith(".png") + + # Video properties + assert int(media["meta"]["original"]["duration"]) == 5 + assert media["meta"]["original"]["height"] == 320 + assert media["meta"]["original"]["width"] == 560 + + # Thumbnail properties + assert media["meta"]["small"]["height"] == 50 + assert media["meta"]["small"]["width"] == 50 + + +def test_media_attachments(app, user, run): + path1 = path.join(ASSETS_DIR, "test1.png") + path2 = path.join(ASSETS_DIR, "test2.png") + path3 = path.join(ASSETS_DIR, "test3.png") + path4 = path.join(ASSETS_DIR, "test4.png") + + out = run( + "post", + "--media", path1, + "--media", path2, + "--media", path3, + "--media", path4, + "--description", "Test 1", + "--description", "Test 2", + "--description", "Test 3", + "--description", "Test 4", + "some text" + ) + + status_id = posted_status_id(out) + status = api.fetch_status(app, user, status_id) + + [a1, a2, a3, a4] = status["media_attachments"] + + # Pleroma doesn't send metadata + if "meta" in a1: + assert a1["meta"]["original"]["size"] == "50x50" + assert a2["meta"]["original"]["size"] == "50x60" + assert a3["meta"]["original"]["size"] == "50x70" + assert a4["meta"]["original"]["size"] == "50x80" + + assert a1["description"] == "Test 1" + assert a2["description"] == "Test 2" + assert a3["description"] == "Test 3" + assert a4["description"] == "Test 4" + + +@mock.patch("toot.utils.multiline_input") +@mock.patch("sys.stdin.read") +def test_media_attachment_without_text(mock_read, mock_ml, app, user, run): + # No status from stdin or readline + mock_read.return_value = "" + mock_ml.return_value = "" + + media_path = path.join(ASSETS_DIR, "test1.png") + + out = run("post", "--media", media_path) + status_id = posted_status_id(out) + + status = api.fetch_status(app, user, status_id) + assert status["content"] == "" + + [attachment] = status["media_attachments"] + assert not attachment["description"] + + # Pleroma doesn't send metadata + if "meta" in attachment: + assert attachment["meta"]["original"]["size"] == "50x50" + + +def test_reply_thread(app, user, friend, run): + status = api.post_status(app, friend, "This is the status") + + out = run("post", "--reply-to", status["id"], "This is the reply") + status_id = posted_status_id(out) + reply = api.fetch_status(app, user, status_id) + + assert reply["in_reply_to_id"] == status["id"] + + out = run("thread", status["id"]) + [s1, s2] = [s.strip() for s in re.split(r"─+", out) if s.strip()] + + assert "This is the status" in s1 + assert "This is the reply" in s2 + assert friend.username in s1 + assert user.username in s2 + assert status["id"] in s1 + assert reply["id"] in s2 diff --git a/tests/integration/test_read.py b/tests/integration/test_read.py new file mode 100644 index 0000000..13f05d1 --- /dev/null +++ b/tests/integration/test_read.py @@ -0,0 +1,83 @@ +import pytest + +from tests.integration.conftest import BASE_URL +from toot import api +from toot.exceptions import ConsoleError + + +def test_instance(app, run): + out = run("instance", "--disable-https") + assert "Mastodon" in out + assert app.instance in out + assert "running Mastodon" in out + + +def test_instance_anon(app, run_anon): + out = run_anon("instance", BASE_URL) + assert "Mastodon" in out + assert app.instance in out + assert "running Mastodon" in out + + # Need to specify the instance name when running anon + with pytest.raises(ConsoleError) as exc: + run_anon("instance") + assert str(exc.value) == "Please specify an instance." + + +def test_whoami(user, run): + out = run("whoami") + # TODO: test other fields once updating account is supported + assert f"@{user.username}" in out + + +def test_whois(app, friend, run): + variants = [ + friend.username, + f"@{friend.username}", + f"{friend.username}@{app.instance}", + f"@{friend.username}@{app.instance}", + ] + + for username in variants: + out = run("whois", username) + assert f"@{friend.username}" in out + + +def test_search_account(friend, run): + out = run("search", friend.username) + assert out == f"Accounts:\n* @{friend.username}" + + +def test_search_hashtag(app, user, run): + api.post_status(app, user, "#hashtag_x") + api.post_status(app, user, "#hashtag_y") + api.post_status(app, user, "#hashtag_z") + + out = run("search", "#hashtag") + assert out == "Hashtags:\n#hashtag_x, #hashtag_y, #hashtag_z" + + +def test_tags(run): + out = run("tags_followed") + assert out == "You're not following any hashtags." + + out = run("tags_follow", "foo") + assert out == "✓ You are now following #foo" + + out = run("tags_followed") + assert out == f"* #foo\t{BASE_URL}/tags/foo" + + out = run("tags_follow", "bar") + assert out == "✓ You are now following #bar" + + out = run("tags_followed") + assert out == "\n".join([ + f"* #bar\t{BASE_URL}/tags/bar", + f"* #foo\t{BASE_URL}/tags/foo", + ]) + + out = run("tags_unfollow", "foo") + assert out == "✓ You are no longer following #foo" + + out = run("tags_followed") + assert out == f"* #bar\t{BASE_URL}/tags/bar" diff --git a/tests/integration/test_status.py b/tests/integration/test_status.py new file mode 100644 index 0000000..8274174 --- /dev/null +++ b/tests/integration/test_status.py @@ -0,0 +1,89 @@ +import time +import pytest + +from toot import api +from toot.exceptions import NotFoundError + + +def test_delete_status(app, user, run): + status = api.post_status(app, user, "foo") + + out = run("delete", status["id"]) + assert out == "✓ Status deleted" + + with pytest.raises(NotFoundError): + api.fetch_status(app, user, status["id"]) + + +def test_favourite(app, user, run): + status = api.post_status(app, user, "foo") + assert not status["favourited"] + + out = run("favourite", status["id"]) + assert out == "✓ Status favourited" + + status = api.fetch_status(app, user, status["id"]) + assert status["favourited"] + + out = run("unfavourite", status["id"]) + assert out == "✓ Status unfavourited" + + # A short delay is required before the server returns new data + time.sleep(0.1) + + status = api.fetch_status(app, user, status["id"]) + assert not status["favourited"] + + +def test_reblog(app, user, run): + status = api.post_status(app, user, "foo") + assert not status["reblogged"] + + out = run("reblog", status["id"]) + assert out == "✓ Status reblogged" + + status = api.fetch_status(app, user, status["id"]) + assert status["reblogged"] + + out = run("reblogged_by", status["id"]) + assert out == f"@{user.username}" + + out = run("unreblog", status["id"]) + assert out == "✓ Status unreblogged" + + status = api.fetch_status(app, user, status["id"]) + assert not status["reblogged"] + + +def test_pin(app, user, run): + status = api.post_status(app, user, "foo") + assert not status["pinned"] + + out = run("pin", status["id"]) + assert out == "✓ Status pinned" + + status = api.fetch_status(app, user, status["id"]) + assert status["pinned"] + + out = run("unpin", status["id"]) + assert out == "✓ Status unpinned" + + status = api.fetch_status(app, user, status["id"]) + assert not status["pinned"] + + +def test_bookmark(app, user, run): + status = api.post_status(app, user, "foo") + assert not status["bookmarked"] + + out = run("bookmark", status["id"]) + assert out == "✓ Status bookmarked" + + status = api.fetch_status(app, user, status["id"]) + assert status["bookmarked"] + + out = run("unbookmark", status["id"]) + assert out == "✓ Status unbookmarked" + + status = api.fetch_status(app, user, status["id"]) + assert not status["bookmarked"] diff --git a/tests/test_integration.py b/tests/test_integration.py deleted file mode 100644 index aa7b89a..0000000 --- a/tests/test_integration.py +++ /dev/null @@ -1,758 +0,0 @@ -""" -This module contains integration tests meant to run against a test Mastodon instance. - -You can set up a test instance locally by following this guide: -https://docs.joinmastodon.org/dev/setup/ - -To enable integration tests, export the following environment variables to match -your test server and database: - -``` -export TOOT_TEST_HOSTNAME="localhost:3000" -export TOOT_TEST_DATABASE_DSN="dbname=mastodon_development" -``` -""" - -import os -import psycopg2 -import pytest -import re -import time -import uuid - -from datetime import datetime, timedelta, timezone -from os import path -from toot import CLIENT_NAME, CLIENT_WEBSITE, api, App, User -from toot.console import run_command -from toot.exceptions import ConsoleError, NotFoundError -from toot.utils import get_text -from unittest import mock - -# Host name of a test instance to run integration tests against -# DO NOT USE PUBLIC INSTANCES!!! -BASE_URL = os.getenv("TOOT_TEST_BASE_URL") - -# Mastodon database name, used to confirm user registration without having to click the link -DATABASE_DSN = os.getenv("TOOT_TEST_DATABASE_DSN") - -# Toot logo used for testing image upload -TRUMPET = path.join(path.dirname(path.dirname(path.realpath(__file__))), "trumpet.png") - - -if not BASE_URL or not DATABASE_DSN: - pytest.skip("Skipping integration tests", allow_module_level=True) - -# ------------------------------------------------------------------------------ -# Fixtures -# ------------------------------------------------------------------------------ - - -def create_app(): - instance = api.get_instance(BASE_URL) - response = api.create_app(BASE_URL) - return App(instance["uri"], BASE_URL, response["client_id"], response["client_secret"]) - - -def register_account(app: App): - username = str(uuid.uuid4())[-10:] - email = f"{username}@example.com" - - response = api.register_account(app, username, email, "password", "en") - confirm_user(email) - return User(app.instance, username, response["access_token"]) - - -def confirm_user(email): - conn = psycopg2.connect(DATABASE_DSN) - cursor = conn.cursor() - cursor.execute("UPDATE users SET confirmed_at = now() WHERE email = %s;", (email,)) - conn.commit() - - -@pytest.fixture(scope="session") -def app(): - return create_app() - - -@pytest.fixture(scope="session") -def user(app): - return register_account(app) - - -@pytest.fixture(scope="session") -def friend(app): - return register_account(app) - - -@pytest.fixture -def run(app, user, capsys): - def _run(command, *params, as_user=None): - run_command(app, as_user or user, command, params or []) - out, err = capsys.readouterr() - assert err == "" - return strip_ansi(out) - return _run - - -@pytest.fixture -def run_anon(capsys): - def _run(command, *params): - run_command(None, None, command, params or []) - out, err = capsys.readouterr() - assert err == "" - return strip_ansi(out) - return _run - -# ------------------------------------------------------------------------------ -# Tests -# ------------------------------------------------------------------------------ - - -def test_instance(app, run): - out = run("instance", "--disable-https") - assert "Mastodon" in out - assert app.instance in out - assert "running Mastodon" in out - - -def test_instance_anon(app, run_anon): - out = run_anon("instance", BASE_URL) - assert "Mastodon" in out - assert app.instance in out - assert "running Mastodon" in out - - # Need to specify the instance name when running anon - with pytest.raises(ConsoleError) as exc: - run_anon("instance") - assert str(exc.value) == "Please specify an instance." - - -def test_post(app, user, run): - text = "i wish i was a #lumberjack" - out = run("post", text) - status_id = _posted_status_id(out) - - status = api.fetch_status(app, user, status_id) - assert text == get_text(status["content"]) - assert status["visibility"] == "public" - assert status["sensitive"] is False - assert status["spoiler_text"] == "" - assert status["poll"] is None - - # Pleroma doesn't return the application - if status["application"]: - assert status["application"]["name"] == CLIENT_NAME - assert status["application"]["website"] == CLIENT_WEBSITE - - -def test_post_visibility(app, user, run): - for visibility in ["public", "unlisted", "private", "direct"]: - out = run("post", "foo", "--visibility", visibility) - status_id = _posted_status_id(out) - status = api.fetch_status(app, user, status_id) - assert status["visibility"] == visibility - - -def test_post_scheduled_at(app, user, run): - text = str(uuid.uuid4()) - scheduled_at = datetime.now(timezone.utc).replace(microsecond=0) + timedelta(minutes=10) - - out = run("post", text, "--scheduled-at", scheduled_at.isoformat()) - assert "Toot scheduled for" in out - - statuses = api.scheduled_statuses(app, user) - [status] = [s for s in statuses if s["params"]["text"] == text] - assert datetime.strptime(status["scheduled_at"], "%Y-%m-%dT%H:%M:%S.%f%z") == scheduled_at - - -def test_post_scheduled_in(app, user, run): - text = str(uuid.uuid4()) - - variants = [ - ("1 day", timedelta(days=1)), - ("1 day 6 hours", timedelta(days=1, hours=6)), - ("1 day 6 hours 13 minutes", timedelta(days=1, hours=6, minutes=13)), - ("1 day 6 hours 13 minutes 51 second", timedelta(days=1, hours=6, minutes=13, seconds=51)), - ("2d", timedelta(days=2)), - ("2d6h", timedelta(days=2, hours=6)), - ("2d6h13m", timedelta(days=2, hours=6, minutes=13)), - ("2d6h13m51s", timedelta(days=2, hours=6, minutes=13, seconds=51)), - ] - - datetimes = [] - for scheduled_in, delta in variants: - out = run("post", text, "--scheduled-in", scheduled_in) - dttm = datetime.utcnow() + delta - assert out.startswith(f"Toot scheduled for: {str(dttm)[:16]}") - datetimes.append(dttm) - - scheduled = api.scheduled_statuses(app, user) - scheduled = [s for s in scheduled if s["params"]["text"] == text] - scheduled = sorted(scheduled, key=lambda s: s["scheduled_at"]) - assert len(scheduled) == 8 - - for expected, status in zip(datetimes, scheduled): - actual = datetime.strptime(status["scheduled_at"], "%Y-%m-%dT%H:%M:%S.%fZ") - delta = expected - actual - assert delta.total_seconds() < 5 - - -def test_post_poll(app, user, run): - text = str(uuid.uuid4()) - - out = run( - "post", text, - "--poll-option", "foo", - "--poll-option", "bar", - "--poll-option", "baz", - "--poll-option", "qux", - ) - - status_id = _posted_status_id(out) - - status = api.fetch_status(app, user, status_id) - assert status["poll"]["expired"] is False - assert status["poll"]["multiple"] is False - assert status["poll"]["options"] == [ - {"title": "foo", "votes_count": 0}, - {"title": "bar", "votes_count": 0}, - {"title": "baz", "votes_count": 0}, - {"title": "qux", "votes_count": 0} - ] - - # Test expires_at is 24h by default - actual = datetime.strptime(status["poll"]["expires_at"], "%Y-%m-%dT%H:%M:%S.%f%z") - expected = datetime.now(timezone.utc) + timedelta(days=1) - delta = actual - expected - assert delta.total_seconds() < 5 - - -def test_post_poll_multiple(app, user, run): - text = str(uuid.uuid4()) - - out = run( - "post", text, - "--poll-option", "foo", - "--poll-option", "bar", - "--poll-multiple" - ) - - status_id = _posted_status_id(out) - - status = api.fetch_status(app, user, status_id) - assert status["poll"]["multiple"] is True - - -def test_post_poll_expires_in(app, user, run): - text = str(uuid.uuid4()) - - out = run( - "post", text, - "--poll-option", "foo", - "--poll-option", "bar", - "--poll-expires-in", "8h", - ) - - status_id = _posted_status_id(out) - - status = api.fetch_status(app, user, status_id) - actual = datetime.strptime(status["poll"]["expires_at"], "%Y-%m-%dT%H:%M:%S.%f%z") - expected = datetime.now(timezone.utc) + timedelta(hours=8) - delta = actual - expected - assert delta.total_seconds() < 5 - - -def test_post_poll_hide_totals(app, user, run): - text = str(uuid.uuid4()) - - out = run( - "post", text, - "--poll-option", "foo", - "--poll-option", "bar", - "--poll-hide-totals" - ) - - status_id = _posted_status_id(out) - - status = api.fetch_status(app, user, status_id) - - # votes_count is None when totals are hidden - assert status["poll"]["options"] == [ - {"title": "foo", "votes_count": None}, - {"title": "bar", "votes_count": None}, - ] - - -def test_post_language(app, user, run): - out = run("post", "test", "--language", "hr") - status_id = _posted_status_id(out) - status = api.fetch_status(app, user, status_id) - assert status["language"] == "hr" - - out = run("post", "test", "--language", "zh") - status_id = _posted_status_id(out) - status = api.fetch_status(app, user, status_id) - assert status["language"] == "zh" - - -def test_media_thumbnail(app, user, run): - assets_dir = path.realpath(path.join(path.dirname(__file__), "assets")) - - video_path = path.join(assets_dir, "small.webm") - thumbnail_path = path.join(assets_dir, "test1.png") - - out = run( - "post", - "--media", video_path, - "--thumbnail", thumbnail_path, - "--description", "foo", - "some text" - ) - - status_id = _posted_status_id(out) - status = api.fetch_status(app, user, status_id) - [media] = status["media_attachments"] - - assert media["description"] == "foo" - assert media["type"] == "video" - assert media["url"].endswith(".mp4") - assert media["preview_url"].endswith(".png") - - # Video properties - assert int(media["meta"]["original"]["duration"]) == 5 - assert media["meta"]["original"]["height"] == 320 - assert media["meta"]["original"]["width"] == 560 - - # Thumbnail properties - assert media["meta"]["small"]["height"] == 50 - assert media["meta"]["small"]["width"] == 50 - - -def test_media_attachments(app, user, run): - assets_dir = path.realpath(path.join(path.dirname(__file__), "assets")) - - path1 = path.join(assets_dir, "test1.png") - path2 = path.join(assets_dir, "test2.png") - path3 = path.join(assets_dir, "test3.png") - path4 = path.join(assets_dir, "test4.png") - - out = run( - "post", - "--media", path1, - "--media", path2, - "--media", path3, - "--media", path4, - "--description", "Test 1", - "--description", "Test 2", - "--description", "Test 3", - "--description", "Test 4", - "some text" - ) - - status_id = _posted_status_id(out) - status = api.fetch_status(app, user, status_id) - - [a1, a2, a3, a4] = status["media_attachments"] - - # Pleroma doesn't send metadata - if "meta" in a1: - assert a1["meta"]["original"]["size"] == "50x50" - assert a2["meta"]["original"]["size"] == "50x60" - assert a3["meta"]["original"]["size"] == "50x70" - assert a4["meta"]["original"]["size"] == "50x80" - - assert a1["description"] == "Test 1" - assert a2["description"] == "Test 2" - assert a3["description"] == "Test 3" - assert a4["description"] == "Test 4" - - -@mock.patch("toot.utils.multiline_input") -@mock.patch("sys.stdin.read") -def test_media_attachment_without_text(mock_read, mock_ml, app, user, run): - # No status from stdin or readline - mock_read.return_value = "" - mock_ml.return_value = "" - - assets_dir = path.realpath(path.join(path.dirname(__file__), "assets")) - media_path = path.join(assets_dir, "test1.png") - - out = run("post", "--media", media_path) - status_id = _posted_status_id(out) - - status = api.fetch_status(app, user, status_id) - assert status["content"] == "" - - [attachment] = status["media_attachments"] - assert not attachment["description"] - - # Pleroma doesn't send metadata - if "meta" in attachment: - assert attachment["meta"]["original"]["size"] == "50x50" - - -def test_delete_status(app, user, run): - status = api.post_status(app, user, "foo") - - out = run("delete", status["id"]) - assert out == "✓ Status deleted" - - with pytest.raises(NotFoundError): - api.fetch_status(app, user, status["id"]) - - -def test_reply_thread(app, user, friend, run): - status = api.post_status(app, friend, "This is the status") - - out = run("post", "--reply-to", status["id"], "This is the reply") - status_id = _posted_status_id(out) - reply = api.fetch_status(app, user, status_id) - - assert reply["in_reply_to_id"] == status["id"] - - out = run("thread", status["id"]) - [s1, s2] = [s.strip() for s in re.split(r"─+", out) if s.strip()] - - assert "This is the status" in s1 - assert "This is the reply" in s2 - assert friend.username in s1 - assert user.username in s2 - assert status["id"] in s1 - assert reply["id"] in s2 - - -def test_favourite(app, user, run): - status = api.post_status(app, user, "foo") - assert not status["favourited"] - - out = run("favourite", status["id"]) - assert out == "✓ Status favourited" - - status = api.fetch_status(app, user, status["id"]) - assert status["favourited"] - - out = run("unfavourite", status["id"]) - assert out == "✓ Status unfavourited" - - # A short delay is required before the server returns new data - time.sleep(0.1) - - status = api.fetch_status(app, user, status["id"]) - assert not status["favourited"] - - -def test_reblog(app, user, run): - status = api.post_status(app, user, "foo") - assert not status["reblogged"] - - out = run("reblog", status["id"]) - assert out == "✓ Status reblogged" - - status = api.fetch_status(app, user, status["id"]) - assert status["reblogged"] - - out = run("reblogged_by", status["id"]) - assert out == f"@{user.username}" - - out = run("unreblog", status["id"]) - assert out == "✓ Status unreblogged" - - status = api.fetch_status(app, user, status["id"]) - assert not status["reblogged"] - - -def test_pin(app, user, run): - status = api.post_status(app, user, "foo") - assert not status["pinned"] - - out = run("pin", status["id"]) - assert out == "✓ Status pinned" - - status = api.fetch_status(app, user, status["id"]) - assert status["pinned"] - - out = run("unpin", status["id"]) - assert out == "✓ Status unpinned" - - status = api.fetch_status(app, user, status["id"]) - assert not status["pinned"] - - -def test_bookmark(app, user, run): - status = api.post_status(app, user, "foo") - assert not status["bookmarked"] - - out = run("bookmark", status["id"]) - assert out == "✓ Status bookmarked" - - status = api.fetch_status(app, user, status["id"]) - assert status["bookmarked"] - - out = run("unbookmark", status["id"]) - assert out == "✓ Status unbookmarked" - - status = api.fetch_status(app, user, status["id"]) - assert not status["bookmarked"] - - -def test_whoami(user, run): - out = run("whoami") - # TODO: test other fields once updating account is supported - assert f"@{user.username}" in out - - -def test_whois(app, friend, run): - variants = [ - friend.username, - f"@{friend.username}", - f"{friend.username}@{app.instance}", - f"@{friend.username}@{app.instance}", - ] - - for username in variants: - out = run("whois", username) - assert f"@{friend.username}" in out - - -def test_search_account(friend, run): - out = run("search", friend.username) - assert out == f"Accounts:\n* @{friend.username}" - - -def test_search_hashtag(app, user, run): - api.post_status(app, user, "#hashtag_x") - api.post_status(app, user, "#hashtag_y") - api.post_status(app, user, "#hashtag_z") - - out = run("search", "#hashtag") - assert out == "Hashtags:\n#hashtag_x, #hashtag_y, #hashtag_z" - - -def test_follow(friend, run): - out = run("follow", friend.username) - assert out == f"✓ You are now following {friend.username}" - - out = run("unfollow", friend.username) - assert out == f"✓ You are no longer following {friend.username}" - - -def test_follow_case_insensitive(friend, run): - username = friend.username.upper() - - out = run("follow", username) - assert out == f"✓ You are now following {username}" - - out = run("unfollow", username) - assert out == f"✓ You are no longer following {username}" - - -# TODO: improve testing stderr, catching exceptions is not optimal -def test_follow_not_found(run): - with pytest.raises(ConsoleError) as ex_info: - run("follow", "banana") - assert str(ex_info.value) == "Account not found" - - -def test_mute(app, user, friend, run): - out = run("mute", friend.username) - assert out == f"✓ You have muted {friend.username}" - - [muted_account] = api.get_muted_accounts(app, user) - assert muted_account["acct"] == friend.username - - out = run("unmute", friend.username) - assert out == f"✓ {friend.username} is no longer muted" - - assert api.get_muted_accounts(app, user) == [] - - -def test_block(app, user, friend, run): - out = run("block", friend.username) - assert out == f"✓ You are now blocking {friend.username}" - - [blockd_account] = api.get_blocked_accounts(app, user) - assert blockd_account["acct"] == friend.username - - out = run("unblock", friend.username) - assert out == f"✓ {friend.username} is no longer blocked" - - assert api.get_blocked_accounts(app, user) == [] - - -def test_following_followers(user, friend, run): - out = run("following", user.username) - assert out == "" - - run("follow", friend.username) - - out = run("following", user.username) - assert out == f"* @{friend.username}" - - out = run("followers", friend.username) - assert out == f"* @{user.username}" - - -def test_tags(run): - out = run("tags_followed") - assert out == "You're not following any hashtags." - - out = run("tags_follow", "foo") - assert out == "✓ You are now following #foo" - - out = run("tags_followed") - assert out == f"* #foo\t{BASE_URL}/tags/foo" - - out = run("tags_follow", "bar") - assert out == "✓ You are now following #bar" - - out = run("tags_followed") - assert out == "\n".join([ - f"* #bar\t{BASE_URL}/tags/bar", - f"* #foo\t{BASE_URL}/tags/foo", - ]) - - out = run("tags_unfollow", "foo") - assert out == "✓ You are no longer following #foo" - - out = run("tags_followed") - assert out == f"* #bar\t{BASE_URL}/tags/bar" - - -def test_update_account_no_options(run): - with pytest.raises(ConsoleError) as exc: - run("update_account") - assert str(exc.value) == "Please specify at least one option to update the account" - - -def test_update_account_display_name(run, app, user): - out = run("update_account", "--display-name", "elwood") - assert out == "✓ Account updated" - - account = api.verify_credentials(app, user) - assert account["display_name"] == "elwood" - - -def test_update_account_note(run, app, user): - note = ("It's 106 miles to Chicago, we got a full tank of gas, half a pack " - "of cigarettes, it's dark... and we're wearing sunglasses.") - - out = run("update_account", "--note", note) - assert out == "✓ Account updated" - - account = api.verify_credentials(app, user) - assert get_text(account["note"]) == note - - -def test_update_account_language(run, app, user): - out = run("update_account", "--language", "hr") - assert out == "✓ Account updated" - - account = api.verify_credentials(app, user) - assert account["source"]["language"] == "hr" - - -def test_update_account_privacy(run, app, user): - out = run("update_account", "--privacy", "private") - assert out == "✓ Account updated" - - account = api.verify_credentials(app, user) - assert account["source"]["privacy"] == "private" - - -def test_update_account_avatar(run, app, user): - account = api.verify_credentials(app, user) - old_value = account["avatar"] - - out = run("update_account", "--avatar", TRUMPET) - assert out == "✓ Account updated" - - account = api.verify_credentials(app, user) - assert account["avatar"] != old_value - - -def test_update_account_header(run, app, user): - account = api.verify_credentials(app, user) - old_value = account["header"] - - out = run("update_account", "--header", TRUMPET) - assert out == "✓ Account updated" - - account = api.verify_credentials(app, user) - assert account["header"] != old_value - - -def test_update_account_locked(run, app, user): - out = run("update_account", "--locked") - assert out == "✓ Account updated" - - account = api.verify_credentials(app, user) - assert account["locked"] is True - - out = run("update_account", "--no-locked") - assert out == "✓ Account updated" - - account = api.verify_credentials(app, user) - assert account["locked"] is False - - -def test_update_account_bot(run, app, user): - out = run("update_account", "--bot") - assert out == "✓ Account updated" - - account = api.verify_credentials(app, user) - assert account["bot"] is True - - out = run("update_account", "--no-bot") - assert out == "✓ Account updated" - - account = api.verify_credentials(app, user) - assert account["bot"] is False - - -def test_update_account_discoverable(run, app, user): - out = run("update_account", "--discoverable") - assert out == "✓ Account updated" - - account = api.verify_credentials(app, user) - assert account["discoverable"] is True - - out = run("update_account", "--no-discoverable") - assert out == "✓ Account updated" - - account = api.verify_credentials(app, user) - assert account["discoverable"] is False - - -def test_update_account_sensitive(run, app, user): - out = run("update_account", "--sensitive") - assert out == "✓ Account updated" - - account = api.verify_credentials(app, user) - assert account["source"]["sensitive"] is True - - out = run("update_account", "--no-sensitive") - assert out == "✓ Account updated" - - account = api.verify_credentials(app, user) - assert account["source"]["sensitive"] is False - - -# ------------------------------------------------------------------------------ -# Utils -# ------------------------------------------------------------------------------ - -strip_ansi_pattern = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") - - -def strip_ansi(string): - return strip_ansi_pattern.sub("", string).strip() - - -def _posted_status_id(out): - pattern = re.compile(r"Toot posted: http://([^/]+)/([^/]+)/(.+)") - match = re.search(pattern, out) - assert match - - _, _, status_id = match.groups() - - return status_id