From dbb0a55e36f955e582d1569eaf39f67a528d32bc Mon Sep 17 00:00:00 2001 From: stockbsd Date: Thu, 24 Oct 2019 11:27:36 +0800 Subject: [PATCH] download twitter media with aiohttp. --- README.md | 42 ++++++ example.config.json | 6 + requirements.txt | 2 + setup.py | 48 ++++++ twitter_dl/__init__.py | 3 + twitter_dl/__main__.py | 104 +++++++++++++ twitter_dl/downloader.py | 250 +++++++++++++++++++++++++++++++ twitter_dl/exceptions.py | 14 ++ twitter_dl/threaded_aio_dlder.py | 100 +++++++++++++ 9 files changed, 569 insertions(+) create mode 100644 README.md create mode 100644 example.config.json create mode 100644 requirements.txt create mode 100644 setup.py create mode 100644 twitter_dl/__init__.py create mode 100644 twitter_dl/__main__.py create mode 100644 twitter_dl/downloader.py create mode 100644 twitter_dl/exceptions.py create mode 100644 twitter_dl/threaded_aio_dlder.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..22edcb8 --- /dev/null +++ b/README.md @@ -0,0 +1,42 @@ +# Download twitter resources + +Download tweet images and videos. Run threads which has a event loop to download resources asynchronously. + +``` +pip3 install twitter-dl +``` + +``` +usage: twitter-dl [-h] [-c CONFIDENTIAL] + [-s {large,medium,small,thumb,orig}] + [--tweet] [--video] [--nophoto] + [-l LIMIT] [--rts] + [--thread-number THREAD_NUMBER] + [--coro-number CORO_NUMBER] + [--since SID] + resource_id dest + +Download all images uploaded by a twitter user you specify + +positional arguments: + resource_id An ID of a twitter user. Also accept tweet url or + tweet id. + dest Specify where to put images + +optional arguments: + -h, --help show this help message and exit + -c CONFIDENTIAL, --confidential CONFIDENTIAL + a json file containing a key and a secret + -s {large,medium,small,thumb,orig}, --size {large,medium,small,thumb,orig} + specify the size of images + --tweet indicate you gived a tweet url or tweet id + --video include video + --nophoto exclude photo + -l LIMIT, --limit LIMIT + the maximum number of tweets to check (most recent + first) + --rts save images contained in retweets + --thread-number THREAD_NUMBER + --coro-number CORO_NUMBER + --since SID +``` diff --git a/example.config.json b/example.config.json new file mode 100644 index 0000000..7e588f1 --- /dev/null +++ b/example.config.json @@ -0,0 +1,6 @@ +{ + "access_token": "", + "access_token_secret": "", + "consumer_key": "", + "consumer_secret": "" +} diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..2053719 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +requests>=2.20.0 +aiohttp>=3.4.4 diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..b3af963 --- /dev/null +++ b/setup.py @@ -0,0 +1,48 @@ +# Always prefer setuptools over distutils +from setuptools import setup, find_packages +import os + +here = os.path.abspath(os.path.dirname(__file__)) + +with open(os.path.join(here,'requirements.txt')) as fh: + requirements = [line.strip() for line in fh.readlines()] + +with open(os.path.join(here,'README.md')) as fh: + Readme = fh.read() + +def get_version(): + version_file = os.path.join(here, "twitter_dl", "__init__.py") + for line in open(version_file): + if line.startswith("version"): + version = line.split("=")[1].strip().replace("'", "").replace('"', '') + return version + raise RuntimeError("Unable to find version string in %s" % version_file) + +name = "twitter-dl" +git_repo = "https://github.com/stockbsd/{}".format(name) + +setup( + name=name, + version=get_version(), + description="Download tweet images and videos", + long_description=Readme, + long_description_content_type="text/markdown", + install_requires=requirements, + packages=find_packages(exclude=["contrib", "docs", "tests"]), + entry_points={ + "console_scripts": [ + "twitter-dl=twitter_dl.__main__:main" + ] + }, + url=git_repo, + author="stockbsd", + author_email="stockbsd@gmail.com", + keywords="twitter", + project_urls={"Bug Reports": git_repo, "Source": git_repo}, + classifiers=[ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + ], + python_requires='>=3.6', +) diff --git a/twitter_dl/__init__.py b/twitter_dl/__init__.py new file mode 100644 index 0000000..a3e1877 --- /dev/null +++ b/twitter_dl/__init__.py @@ -0,0 +1,3 @@ +from .downloader import Downloader + +version = "0.1.3" diff --git a/twitter_dl/__main__.py b/twitter_dl/__main__.py new file mode 100644 index 0000000..a1d9754 --- /dev/null +++ b/twitter_dl/__main__.py @@ -0,0 +1,104 @@ +import os +import argparse +import json +import re +import logging + +from . import Downloader +from .exceptions import * + + +def main(): + DEBUG = os.getenv("DEBUG") + logging.basicConfig(level=logging.DEBUG if DEBUG else logging.INFO, + format='%(levelname)-7s %(name)11s: %(message)s') + + parser = argparse.ArgumentParser( + description="Download all images uploaded by a twitter user you specify" + ) + parser.add_argument( + "resource_id", + help="An ID of a twitter user. Also accept tweet url or tweet id.", + ) + parser.add_argument("dest", help="Specify where to put images") + parser.add_argument( + "-c", + "--confidential", + help="a json file containing a key and a secret", + default=os.getenv("TWITTER_AUTH", os.path.expanduser("~/.twitter.json")), + ) + parser.add_argument( + "-s", + "--size", + help="specify the size of images", + default="orig", + choices=["large", "medium", "small", "thumb", "orig"], + ) + parser.add_argument( + "--tweet", + help="indicate you gived a tweet url or tweet id", + default=False, + action="store_true", + ) + parser.add_argument( + "--list", + help="indicate you gived a list by user:list", + default=False, + action="store_true", + ) + parser.add_argument( + "--video", help="include video", default=False, action="store_true" + ) + parser.add_argument( + "--nophoto", dest="photo", help="exclude photo", action="store_false" + ) + parser.add_argument( + "-l", + "--limit", + type=int, + help="the maximum number of tweets to check (most recent first)", + default=3200, + ) + parser.add_argument( + "--since", + type=int, + help="the min id of tweets to check (most recent first)", + default=0, + ) + parser.add_argument( + "--rts", help="save images contained in retweets", action="store_true" + ) + parser.add_argument("--thread-number", type=int, default=2) + parser.add_argument("--coro-number", type=int, default=5) + args = parser.parse_args() + + if args.confidential: + with open(args.confidential) as f: + confidential = json.loads(f.read()) + if "consumer_key" not in confidential or "consumer_secret" not in confidential: + raise ConfidentialsNotSuppliedError() + api_key = confidential["consumer_key"] + api_secret = confidential["consumer_secret"] + else: + raise ConfidentialsNotSuppliedError(args.confidential) + + downloader = Downloader(api_key, api_secret, args.thread_number, args.coro_number) + + if args.tweet: + downloader.download_media_of_tweet(args.resource_id, args.dest, args.size, args.video, + args.photo) + downloader.d.join() + elif args.list: + username, listname = args.resource_id.split(':') + downloader.download_media_of_list(username, listname, args.dest, args.size, + args.limit, args.rts, args.video, args.photo, args.since) + downloader.d.join() + else: + downloader.download_media_of_user(args.resource_id, args.dest, args.size, + args.limit, args.rts, args.video, args.photo, args.since) + downloader.d.join() + #print('finished!') + + +if __name__ == "__main__": + main() diff --git a/twitter_dl/downloader.py b/twitter_dl/downloader.py new file mode 100644 index 0000000..ec23960 --- /dev/null +++ b/twitter_dl/downloader.py @@ -0,0 +1,250 @@ +import os +import sys +import logging +import base64 +import json + +import requests + +from .exceptions import * +from .threaded_aio_dlder import AioDownloader + + +def ensure_dir(directory): + directory = os.path.abspath(directory) + if not os.path.exists(directory): + os.makedirs(directory, exist_ok=True) + return directory + +class Downloader: + def __init__(self, api_key, api_secret, thread_number=2, coro_number=5): + self.log = logging.getLogger("downloader") + self.bearer_token = self.bearer(api_key, api_secret) + self.log.info("Bearer token is " + self.bearer_token) + self.d = AioDownloader() + self.d.start(thread_number, coro_number) + + def bearer(self, key, secret): + """Receive the bearer token and return it. + + Args: + key: API key. + secret: API string. + """ + + # setup + credential = base64.b64encode( + bytes("{}:{}".format(key, secret), "utf-8") + ).decode() + url = "https://api.twitter.com/oauth2/token" + headers = { + "Authorization": "Basic {}".format(credential), + "Content-Type": "application/x-www-form-urlencoded;charset=UTF-8", + } + payload = {"grant_type": "client_credentials"} + + # post the request + r = requests.post(url, headers=headers, params=payload) + + # check the response + if r.status_code == 200: + return r.json()["access_token"] + else: + raise BearerTokenNotFetchedError() + + def download_media_of_tweet(self, tid, save_dest, size="large", include_video=False, + include_photo=True): + ''' ''' + save_dest = ensure_dir(save_dest) + + tweet = self.get_tweet(tid) + self.process_tweet(tweet, save_dest, size, include_video, include_photo) + + def download_media_of_user(self, user, save_dest, size="large", limit=3200, rts=False, + include_video=False, include_photo=True, since_id=0): + """Download and save images that user uploaded. + + Args: + user: User ID. + save_dest: The directory where images will be saved. + size: Which size of images to download. + rts: Whether to include retweets or not. + """ + + save_dest = ensure_dir(save_dest) + + alltweets = self.get_user_tweets(user, None, limit, rts, since_id) + for tweet in alltweets: + self.process_tweet(tweet, save_dest, include_video=include_video, include_photo=include_photo) + + def download_media_of_list(self, user, listname, save_dest, size="large", limit=3200, + rts=False, include_video=False, include_photo=True, since_id=0): + """Download and save images of a list. + + Args: + user: list owner name. + listname: list slug + save_dest: The directory where images will be saved. + size: Which size of images to download. + rts: Whether to include retweets or not. + """ + + save_dest = ensure_dir(save_dest) + + alltweets = self.get_list_tweets(user, listname, None, limit, rts, since_id) + for tweet in alltweets: + self.process_tweet(tweet, save_dest, include_video=include_video, include_photo=include_photo) + + def api_fetch_tweets(self, url, payload, start, count, rts, since_id): + # setup + bearer_token = self.bearer_token + headers = {"Authorization": "Bearer {}".format(bearer_token)} + + payload["count"]= count + payload["include_rts"] = rts + if start: + payload["max_id"] = start - 1 #max_id is inclusive + if since_id: + payload["since_id"] = since_id #since_id is exclusive + + alltweets = [] + while True: + # get the request + r = requests.get(url, headers=headers, params=payload) + # check the response + tweets = [] + if r.status_code == 200: + tweets = r.json() + else: + self.log.error(f"An error occurred with the request, status code was {r.status_code}") + + if not tweets: + break + + alltweets.extend(tweets) + payload["max_id"] = tweets[-1]['id'] - 1 + payload['count'] = count - len(alltweets) + + if len(alltweets) >= count: + #self.log.info(f" the number of tweets {len(alltweets)} checked reach the limit {count}") + break + if len(tweets) < 200: # No more tweets left:200 is the twitter-api limit + break + + self.log.info(f"Got {len(alltweets)} tweets") + return alltweets + + def get_user_tweets(self, user, start=None, count=200, rts=False, since_id=0): + """Download user's tweets and return them as a list. + + Args: + user: User ID. + start: Tweet ID. + rts: Whether to include retweets or not. + """ + + apiurl = "https://api.twitter.com/1.1/statuses/user_timeline.json" + payload = {"screen_name": user} + + return self.api_fetch_tweets(apiurl, payload, start, count, rts, since_id) + + def get_list_tweets(self, username, listname, start=None, count=200, rts=False, since_id=0): + """Download user's tweets and return them as a list. + + Args: + user: User ID. + start: Tweet ID. + rts: Whether to include retweets or not. + """ + apiurl = "https://api.twitter.com/1.1/lists/statuses.json" + payload = {"owner_screen_name": username, "slug":listname} + + return self.api_fetch_tweets(apiurl, payload, start, count, rts, since_id) + + def get_tweet(self, id): + """Download single tweet + + Args: + id: Tweet ID. + """ + + bearer_token = self.bearer_token + url = "https://api.twitter.com/1.1/statuses/show.json" + headers = {"Authorization": f"Bearer {bearer_token}"} + payload = {"id": id, "include_entities": "true"} + + # get the request + r = requests.get(url, headers=headers, params=payload) + + # check the response + if r.status_code == 200: + tweet = r.json() + self.log.info(f"Got tweet with id {id} of user @{tweet['user']['name']}") + return tweet + else: + self.log.error(f"An error occurred , status code was {r.status_code}") + return None + + def process_tweet(self, tweet, save_dest, size="large", include_video=False, include_photo=True): + if 'retweeted_status' in tweet: + tweet = tweet['retweeted_status'] + self.log.debug('this is a retweet, turn to orignal tweet') + id_str = tweet["id_str"] + # save the image + images = self.extract_media_list(tweet, include_video, include_photo) + for i, image in enumerate(images, 1): + self.save_media(image, save_dest, f"{id_str}-{i}", size) + + return len(images) + + def extract_media_list(self, tweet, include_video, include_photo): + """Return the url of the image embedded in tweet. + + Args: + tweet: A dict object representing a tweet. + """ + extended = tweet.get("extended_entities") + if not extended and ("quoted_status" in tweet): + extended = tweet['quoted_status'].get("extended_entities") + self.log.debug('Extract media from quoted') + + if not extended: + return [] + + rv = [] + if "media" in extended: + for x in extended["media"]: + if x["type"] == "photo" and include_photo: + url = x["media_url"] + rv.append(url) + elif x["type"] in ["video", "animated_gif"] and include_video: + variants = x["video_info"]["variants"] + variants.sort(key=lambda x: x.get("bitrate", 0)) + url = variants[-1]["url"].rsplit("?tag")[0] + rv.append(url) + return rv + + def save_media(self, image, path, name, size="large"): + """Download and save an image to path. + + Args: + image: The url of the image. + path: The directory where the image will be saved. + name: It is used for naming the image. + size: Which size of images to download. + """ + if image: + # image's path with a new name + ext = os.path.splitext(image)[1] + save_file = os.path.join(path, name + ext) + if ext not in [".mp4"]: + real_url = image + ":" + size + else: + real_url = image + + # save the image in the specified directory (or don't) + #ensure_dir(save_file) + if not (os.path.exists(save_file)): + self.d.add_url(real_url, save_file) + else: + self.log.info(f"Skipping downloaded {image}") diff --git a/twitter_dl/exceptions.py b/twitter_dl/exceptions.py new file mode 100644 index 0000000..1b1081a --- /dev/null +++ b/twitter_dl/exceptions.py @@ -0,0 +1,14 @@ +class Error(Exception): + '''Base-class for all exceptions raised by this module.''' + + +class ConfidentialsNotSuppliedError(Error): + '''An API key and an API sectret must be supplied.''' + + +class BearerTokenNotFetchedError(Error): + '''Couldn't fetch the bearer token.''' + + +class InvalidDownloadPathError(Error): + '''Download path must be a directory.''' diff --git a/twitter_dl/threaded_aio_dlder.py b/twitter_dl/threaded_aio_dlder.py new file mode 100644 index 0000000..2cccfbf --- /dev/null +++ b/twitter_dl/threaded_aio_dlder.py @@ -0,0 +1,100 @@ +import asyncio +import aiohttp +import threading +import logging +from queue import Queue, Empty + +# threaded asyncio +def loop_in_thread(async_entry, *args, **kwargs): + log = logging.getLogger('LoopThread') + log.debug('loop begin...') + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(async_entry(*args, **kwargs)) + loop.close() + log.debug('loop end...') + +class AioDownloader(): + def __init__(self): + self.q = Queue() + self.threads = [] + self.log = logging.getLogger('AioDlder') + + def start(self, num_threads, num_coros): + for _ in range(num_threads): + t = threading.Thread(target=loop_in_thread, + args=(self.sched_downloaders, num_coros)) + self.threads.append(t) + t.start() + + def join(self): + self.add_endsignal() + for t in self.threads: + t.join() + + def add_endsignal(self): + self.q.put((None, None)) + + def add_url(self, url, dest): + self.q.put((url, dest)) + + async def downloader(self, session, url, dest, sem): + try: + #now = loop.time() + async with session.get(url) as resp: + if resp.status == 200: + data = await resp.read() + with open(dest, 'wb') as f: + f.write(data) + #self.log.info(f'{resp.url} ==> {dest}, {loop.time()-now:.2f}s used') + self.log.info(f'{resp.url} ==> {dest}') + else: + self.log.warning(f'{resp.url} status = {resp.status}') + except Exception as e: + self.log.warning(f'{url} failed: {e}') + finally: + sem.release() + + # async entry point + async def sched_downloaders(self, num_coros): + loop = asyncio.get_event_loop() #prefer get_running_loop in py>=3.7 + sem = asyncio.Semaphore(num_coros) + async with aiohttp.ClientSession(loop=loop) as session: + tasks = [] + while True: + await sem.acquire() + + url, dest = self.q.get(True) #block + if url is None: + self.add_endsignal() #notify peer threads + break + else: + tasks.append(loop.create_task(self.downloader(session, url, dest, sem))) + # waiting for downloader tasks to finish + await asyncio.gather(*[t for t in tasks if not t.done()]) + self.log.info('Queue Finished') + +if "__main__" == __name__: + import time, sys + + logging.basicConfig( + level=logging.INFO, + format='%(threadName)10s %(name)12s: %(message)s', + stream=sys.stderr, + ) + + dld = AioDownloader() + for i in range(30): + dld.add_url(f"http://httpbin.org/delay/1?a={i}", '/dev/null') + dld.add_endsignal() + + t0 = time.time() + log = logging.getLogger('main') + log.info('start worker threads') + + dld.start(2, 5) + dld.join() + + log.info('all workers exit') + + print(f'{time.time()-t0} seconds')