fediverse.space/backend/lib/backend/scheduler.ex

370 wiersze
12 KiB
Elixir
Czysty Zwykły widok Historia

2019-07-14 11:47:06 +00:00
defmodule Backend.Scheduler do
@moduledoc """
This module runs recurring tasks.
"""
use Quantum, otp_app: :backend
2019-07-14 11:47:06 +00:00
2019-08-27 13:50:16 +00:00
alias Backend.{Crawl, CrawlInteraction, Edge, FederationRestriction, Instance, Repo}
2019-08-21 12:30:47 +00:00
alias Backend.Mailer.AdminEmail
import Backend.Util
2019-07-14 11:47:06 +00:00
import Ecto.Query
2019-08-21 12:30:47 +00:00
2019-07-14 11:47:06 +00:00
require Logger
@doc """
Prunes all crawls that are more than `integer` `unit`s old.
For example, to delete crawls older than one month, call `prune(1, "month")`.
`unit` must singular, e.g. "second", "minute", "hour", "month", "year", etc...
"""
@spec prune_crawls(integer, String.t()) :: any
def prune_crawls(amount, unit) do
{deleted_num, _} =
Crawl
|> where(
[i],
i.inserted_at <
datetime_add(^NaiveDateTime.utc_now(), -1 * ^amount, ^unit)
)
|> Repo.delete_all(timeout: :infinity)
2019-07-14 11:47:06 +00:00
Logger.info("Pruned #{deleted_num} old crawls.")
end
@doc """
Calculates every instance's "insularity score" -- that is, the percentage of mentions that are among users on the
instance, rather than at other instances.
2019-07-14 11:47:06 +00:00
"""
2019-08-21 12:30:47 +00:00
def generate_insularity_scores do
now = get_now()
crawls_subquery =
Crawl
|> select([c], %{
instance_domain: c.instance_domain,
2019-08-31 18:54:39 +00:00
statuses_seen: sum(c.statuses_seen),
interactions_seen: sum(c.interactions_seen)
})
|> group_by([c], c.instance_domain)
2019-08-31 18:54:39 +00:00
self_mentions_subquery =
CrawlInteraction
|> where([ci], ci.source_domain == ci.target_domain)
2019-08-31 18:54:39 +00:00
|> select([ci], %{
domain: ci.source_domain,
2019-08-31 18:54:39 +00:00
self_mentions: sum(ci.mentions)
})
|> group_by([ci], ci.source_domain)
scores =
Instance
|> join(:inner, [i], c in subquery(crawls_subquery), on: i.domain == c.instance_domain)
|> join(:left, [i, c], ci in subquery(self_mentions_subquery), on: i.domain == ci.domain)
# don't generate insularity scores for instances where we haven't seen any activity
# (e.g. server types where the timeline isn't crawled)
|> where([i, c, ci], c.statuses_seen > 0)
|> select([i, c, ci], %{
domain: i.domain,
mentions: ci.self_mentions,
interactions: c.interactions_seen
})
|> Repo.all(timeout: :infinity)
|> Enum.map(fn %{domain: domain, mentions: mentions, interactions: interactions} ->
2019-08-31 18:54:39 +00:00
insularity =
cond do
# if we haven't seen any self mentions, but there are interactions, it means that users on the instance
# only mentions others, i.e. insularity is 0
mentions == nil and interactions != 0 ->
0.0
interactions > 0 ->
mentions / interactions
true ->
nil
end
%{
domain: domain,
2019-08-31 18:54:39 +00:00
insularity: insularity,
inserted_at: now,
updated_at: now
}
2019-07-14 11:47:06 +00:00
end)
Instance
|> Repo.insert_all(scores,
on_conflict: {:replace, [:insularity, :updated_at]},
conflict_target: :domain,
timeout: :infinity
)
end
2019-07-14 11:47:06 +00:00
2019-07-27 17:58:40 +00:00
@doc """
This function calculates the average number of statuses per hour over the last month.
"""
2019-08-21 12:30:47 +00:00
def generate_status_rate do
2019-07-27 17:58:40 +00:00
now = get_now()
# We want the earliest sucessful crawl so that we can exclude it from the statistics.
# This is because the first crawl goes up to one month into the past -- this would mess up the counts!
# The statistics from here assume that all statuses were written at exactly the crawl's inserted_at timestamp.
earliest_crawl_subquery =
2019-07-27 17:58:40 +00:00
Crawl
|> group_by([c], c.instance_domain)
|> select([c], %{
instance_domain: c.instance_domain,
earliest_crawl: min(c.inserted_at)
})
instances =
2019-08-31 16:51:05 +00:00
Instance
|> join(:inner, [i], c in Crawl, on: i.domain == c.instance_domain)
|> join(:inner, [i], c2 in subquery(earliest_crawl_subquery),
on: i.domain == c2.instance_domain
2019-07-27 17:58:40 +00:00
)
|> where(
2019-08-31 16:51:05 +00:00
[i, c, c2],
c.inserted_at > c2.earliest_crawl and c.statuses_seen > 0
2019-07-27 17:58:40 +00:00
)
2019-08-31 16:51:05 +00:00
|> select([i, c], %{
domain: i.domain,
2019-07-27 17:58:40 +00:00
status_count: sum(c.statuses_seen),
second_earliest_crawl: min(c.inserted_at)
})
2019-08-31 16:51:05 +00:00
|> group_by([i], i.domain)
|> Repo.all(timeout: :infinity)
2019-07-27 17:58:40 +00:00
|> Enum.map(fn %{
2019-08-31 16:51:05 +00:00
domain: domain,
2019-07-27 17:58:40 +00:00
status_count: status_count,
second_earliest_crawl: oldest_timestamp
} ->
time_diff_days = NaiveDateTime.diff(now, oldest_timestamp, :second) / (3600 * 24)
# (we're actually only ever updating, not inserting, so inserted_at will always be ignored... but ecto
# requires it)
2019-07-27 17:58:40 +00:00
%{
domain: domain,
statuses_per_day: status_count / time_diff_days,
updated_at: now,
inserted_at: now
}
end)
Instance
|> Repo.insert_all(instances,
on_conflict: {:replace, [:statuses_per_day, :updated_at]},
conflict_target: :domain,
timeout: :infinity
2019-07-27 17:58:40 +00:00
)
end
@doc """
This function aggregates statistics from the interactions in the database.
2019-07-18 20:05:16 +00:00
It calculates the strength of edges between nodes. Self-edges are not generated.
2019-08-27 13:50:16 +00:00
Edges are only generated if
* both instances have been succesfully crawled
* neither of the instances have blocked each other
* there are interactions in each direction (if :require_bidirectional_edges is true in config)
"""
2019-08-21 12:30:47 +00:00
def generate_edges do
now = get_now()
2019-07-14 11:47:06 +00:00
crawls_subquery =
Crawl
|> select([c], %{
instance_domain: c.instance_domain,
statuses_seen: sum(c.statuses_seen)
})
|> group_by([c], c.instance_domain)
2019-07-14 11:47:06 +00:00
interactions =
CrawlInteraction
2019-07-18 20:05:16 +00:00
|> join(:inner, [ci], c_source in subquery(crawls_subquery),
on: ci.source_domain == c_source.instance_domain
)
2019-07-18 20:05:16 +00:00
|> join(:inner, [ci], c_target in subquery(crawls_subquery),
on: ci.target_domain == c_target.instance_domain
)
2019-08-27 15:02:59 +00:00
|> join(:inner, [ci], i_source in Instance, on: ci.source_domain == i_source.domain)
|> join(:inner, [ci], i_target in Instance, on: ci.target_domain == i_target.domain)
|> select([ci, c_source, c_target, i_source, i_target], %{
source_domain: ci.source_domain,
target_domain: ci.target_domain,
mentions: sum(ci.mentions),
# we can take min() because every row is the same
2019-08-27 15:02:59 +00:00
source_type: min(i_source.type),
target_type: min(i_target.type),
source_statuses_seen: min(c_source.statuses_seen),
target_statuses_seen: min(c_target.statuses_seen)
})
2019-08-27 15:02:59 +00:00
|> where([ci], ci.source_domain != ci.target_domain)
|> group_by([ci], [ci.source_domain, ci.target_domain])
|> Repo.all(timeout: :infinity)
2019-07-14 11:47:06 +00:00
2019-08-27 13:50:16 +00:00
federation_blocks =
FederationRestriction
|> select([fr], {fr.source_domain, fr.target_domain})
|> where([fr], fr.type == "reject")
|> Repo.all()
|> MapSet.new()
2019-08-27 15:02:59 +00:00
new_edges =
interactions
|> filter_to_eligible_interactions(federation_blocks)
|> combine_mention_directions()
|> Enum.map(fn {{source_domain, target_domain}, {mention_count, statuses_seen}} ->
%{
source_domain: source_domain,
target_domain: target_domain,
weight: mention_count / statuses_seen,
inserted_at: now,
updated_at: now
}
end)
# Get edges and their weights
Repo.transaction(
fn ->
Edge
|> Repo.delete_all(timeout: :infinity)
Edge
2019-08-27 15:02:59 +00:00
|> Repo.insert_all(new_edges, timeout: :infinity)
end,
timeout: :infinity
)
2019-07-14 11:47:06 +00:00
end
2019-07-25 15:56:03 +00:00
@doc """
This function checks to see if a lot of instances on the same base domain have been created recently. If so,
notifies the server admin over SMS.
"""
2019-08-21 12:30:47 +00:00
def check_for_spam_instances do
2019-07-27 10:32:42 +00:00
hour_range = 3
2019-07-25 15:56:03 +00:00
count_subquery =
Instance
|> where(
[i],
i.inserted_at > datetime_add(^NaiveDateTime.utc_now(), -1 * ^hour_range, "hour")
)
|> group_by(:base_domain)
|> select([i], %{
count: count(i.id),
base_domain: i.base_domain
})
potential_spam_instances =
Instance
|> join(:inner, [i], c in subquery(count_subquery), on: i.domain == c.base_domain)
|> where([i, c], c.count > 2)
|> select([i, c], %{
base_domain: i.base_domain,
count: c.count
})
|> Repo.all()
if length(potential_spam_instances) > 0 do
message =
potential_spam_instances
|> Enum.map(fn %{count: count, base_domain: base_domain} ->
"* #{count} new at #{base_domain}"
end)
|> Enum.join("\n")
|> (fn lines ->
"fediverse.space detected the following potential spam domains from the last #{
hour_range
} hours:\n#{lines}"
end).()
Logger.info(message)
2019-08-21 12:30:47 +00:00
AdminEmail.send("Potential spam", message)
2019-07-25 15:56:03 +00:00
else
Logger.debug("Did not find potential spam instances.")
end
end
2019-08-21 12:30:47 +00:00
2019-08-27 15:02:59 +00:00
# Takes a list of Interactions
2019-08-21 12:30:47 +00:00
# Returns a map of %{{source, target} => {total_mention_count, total_statuses_seen}}
2019-08-27 15:02:59 +00:00
defp combine_mention_directions(interactions) do
2019-08-21 12:30:47 +00:00
Enum.reduce(interactions, %{}, fn
%{
source_domain: source_domain,
target_domain: target_domain,
mentions: mentions,
source_statuses_seen: source_statuses_seen,
target_statuses_seen: target_statuses_seen
},
acc ->
key = get_interaction_key(source_domain, target_domain)
# target_statuses_seen might be nil if that instance was never crawled. default to 0.
target_statuses_seen =
case target_statuses_seen do
nil -> 0
_ -> target_statuses_seen
end
statuses_seen = source_statuses_seen + target_statuses_seen
2019-08-27 15:02:59 +00:00
Map.update(acc, key, {mentions, statuses_seen}, fn {curr_mentions, curr_statuses_seen} ->
{curr_mentions + mentions, curr_statuses_seen}
end)
2019-08-21 12:30:47 +00:00
end)
end
2019-08-27 13:50:16 +00:00
2019-08-27 15:02:59 +00:00
defp filter_to_eligible_interactions(interactions, federation_blocks) do
# A map of {source_domain, target_domain} => mention_count. Used to find out whether a mention in the reverse
# direction has been seen.
mention_directions =
interactions
|> Enum.reduce(%{}, fn %{source_domain: source, target_domain: target, mentions: mentions},
acc ->
Map.put(acc, {source, target}, mentions)
2019-08-27 13:50:16 +00:00
end)
2019-08-27 15:02:59 +00:00
interactions
|> Enum.filter(&is_eligible_interaction?(&1, mention_directions, federation_blocks))
2019-08-27 13:50:16 +00:00
end
2019-08-27 15:02:59 +00:00
# Returns true if
# * there's no federation block in either direction between the two instances
# * there are mentions in both directions (if enabled in configuration)
2019-08-27 15:02:59 +00:00
defp is_eligible_interaction?(
%{
source_domain: source,
target_domain: target,
mentions: mention_count,
source_type: source_type,
target_type: target_type
},
mention_directions,
federation_blocks
) do
mentions_were_seen = mention_count > 0
2019-08-27 13:50:16 +00:00
# If :require_bidirectional_edges is set to `true` in the config, then an edge is only created if both instances
# have mentioned each other
2019-08-27 15:02:59 +00:00
opposite_mention_exists =
if get_config(:require_bidirectional_mentions) and is_timeline_crawlable_type?(source_type) and
is_timeline_crawlable_type?(target_type) do
2019-08-27 15:02:59 +00:00
Map.has_key?(mention_directions, {target, source}) and
Map.get(mention_directions, {target, source}) > 0
else
true
end
2019-08-27 13:50:16 +00:00
2019-08-27 15:02:59 +00:00
federation_block_exists =
MapSet.member?(federation_blocks, {source, target}) or
MapSet.member?(federation_blocks, {target, source})
mentions_were_seen and opposite_mention_exists and not federation_block_exists
end
defp is_timeline_crawlable_type?(type) do
Enum.member?(["mastodon", "gab", "pleroma", "gnusocial", "misskey"], type)
2019-08-27 13:50:16 +00:00
end
2019-07-14 11:47:06 +00:00
end