kopia lustrzana https://github.com/bellingcat/auto-archiver
118 wiersze
5.4 KiB
Python
118 wiersze
5.4 KiB
Python
|
|
import json, mimetypes
|
|
from datetime import datetime
|
|
from loguru import logger
|
|
from pytwitter import Api
|
|
from slugify import slugify
|
|
|
|
from . import Archiver
|
|
from .twitter_archiver import TwitterArchiver
|
|
from ..core import Metadata,Media
|
|
|
|
|
|
class TwitterApiArchiver(TwitterArchiver, Archiver):
|
|
name = "twitter_api_archiver"
|
|
|
|
def __init__(self, config: dict) -> None:
|
|
super().__init__(config)
|
|
|
|
self.api_index = 0
|
|
self.apis = []
|
|
if len(self.bearer_tokens):
|
|
self.apis.extend([Api(bearer_token=bearer_token) for bearer_token in self.bearer_tokens])
|
|
if self.bearer_token:
|
|
self.assert_valid_string("bearer_token")
|
|
self.apis.append(Api(bearer_token=self.bearer_token))
|
|
if self.consumer_key and self.consumer_secret and self.access_token and self.access_secret:
|
|
self.assert_valid_string("consumer_key")
|
|
self.assert_valid_string("consumer_secret")
|
|
self.assert_valid_string("access_token")
|
|
self.assert_valid_string("access_secret")
|
|
self.apis.append(Api(consumer_key=self.consumer_key, consumer_secret=self.consumer_secret,
|
|
access_token=self.access_token, access_secret=self.access_secret))
|
|
assert self.api_client is not None, "Missing Twitter API configurations, please provide either AND/OR (consumer_key, consumer_secret, access_token, access_secret) to use this archiver, you can provide both for better rate-limit results."
|
|
|
|
@staticmethod
|
|
def configs() -> dict:
|
|
return {
|
|
"bearer_token": {"default": None, "help": "[deprecated: see bearer_tokens] twitter API bearer_token which is enough for archiving, if not provided you will need consumer_key, consumer_secret, access_token, access_secret"},
|
|
"bearer_tokens": {"default": [], "help": " a list of twitter API bearer_token which is enough for archiving, if not provided you will need consumer_key, consumer_secret, access_token, access_secret, if provided you can still add those for better rate limits. CSV of bearer tokens if provided via the command line", "cli_set": lambda cli_val, cur_val: list(set(cli_val.split(",")))},
|
|
"consumer_key": {"default": None, "help": "twitter API consumer_key"},
|
|
"consumer_secret": {"default": None, "help": "twitter API consumer_secret"},
|
|
"access_token": {"default": None, "help": "twitter API access_token"},
|
|
"access_secret": {"default": None, "help": "twitter API access_secret"},
|
|
}
|
|
|
|
@property # getter .mimetype
|
|
def api_client(self) -> str:
|
|
return self.apis[self.api_index]
|
|
|
|
|
|
def download(self, item: Metadata) -> Metadata:
|
|
# call download retry until success or no more apis
|
|
while self.api_index < len(self.apis):
|
|
if res := self.download_retry(item): return res
|
|
self.api_index += 1
|
|
self.api_index = 0
|
|
return False
|
|
|
|
def download_retry(self, item: Metadata) -> Metadata:
|
|
url = item.get_url()
|
|
# detect URLs that we definitely cannot handle
|
|
username, tweet_id = self.get_username_tweet_id(url)
|
|
if not username: return False
|
|
|
|
try:
|
|
tweet = self.api_client.get_tweet(tweet_id, expansions=["attachments.media_keys"], media_fields=["type", "duration_ms", "url", "variants"], tweet_fields=["attachments", "author_id", "created_at", "entities", "id", "text", "possibly_sensitive"])
|
|
logger.debug(tweet)
|
|
except Exception as e:
|
|
logger.error(f"Could not get tweet: {e}")
|
|
return False
|
|
|
|
result = Metadata()
|
|
result.set_title(tweet.data.text)
|
|
result.set_timestamp(datetime.strptime(tweet.data.created_at, "%Y-%m-%dT%H:%M:%S.%fZ"))
|
|
|
|
urls = []
|
|
if tweet.includes:
|
|
for i, m in enumerate(tweet.includes.media):
|
|
media = Media(filename="")
|
|
if m.url and len(m.url):
|
|
media.set("src", m.url)
|
|
media.set("duration", (m.duration_ms or 1) // 1000)
|
|
mimetype = "image/jpeg"
|
|
elif hasattr(m, "variants"):
|
|
variant = self.choose_variant(m.variants)
|
|
if not variant: continue
|
|
media.set("src", variant.url)
|
|
mimetype = variant.content_type
|
|
else:
|
|
continue
|
|
logger.info(f"Found media {media}")
|
|
ext = mimetypes.guess_extension(mimetype)
|
|
media.filename = self.download_from_url(media.get("src"), f'{slugify(url)}_{i}{ext}', item)
|
|
result.add_media(media)
|
|
|
|
result.set_content(json.dumps({
|
|
"id": tweet.data.id,
|
|
"text": tweet.data.text,
|
|
"created_at": tweet.data.created_at,
|
|
"author_id": tweet.data.author_id,
|
|
"geo": tweet.data.geo,
|
|
"lang": tweet.data.lang,
|
|
"media": urls
|
|
}, ensure_ascii=False, indent=4))
|
|
return result.success("twitter")
|
|
|
|
def choose_variant(self, variants):
|
|
# choosing the highest quality possible
|
|
variant, bit_rate = None, -1
|
|
for var in variants:
|
|
if var.content_type == "video/mp4":
|
|
if var.bit_rate > bit_rate:
|
|
bit_rate = var.bit_rate
|
|
variant = var
|
|
else:
|
|
variant = var if not variant else variant
|
|
return variant
|