diff --git a/.gitignore b/.gitignore index e31ad70..4dd3ef5 100644 --- a/.gitignore +++ b/.gitignore @@ -27,4 +27,5 @@ instaloader.session orchestration.yaml auto_archiver.egg-info* logs* -*.csv \ No newline at end of file +*.csv +archived/ diff --git a/README.md b/README.md index 62baa31..354d451 100644 --- a/README.md +++ b/README.md @@ -177,6 +177,38 @@ To use Google Drive storage you need the id of the shared folder in the `config. #### Telethon + Instagram with telegram bot The first time you run, you will be prompted to do a authentication with the phone number associated, alternatively you can put your `anon.session` in the root. +#### Atlos +When integrating with [Atlos](https://atlos.org), you will need to provide an API token in your configuration. You can learn more about Atlos and how to get an API token [here](https://docs.atlos.org/technical/api). You will have to provide this token to the `atlos_feeder`, `atlos_storage`, and `atlos_db` steps in your orchestration file. If you use a custom or self-hosted Atlos instance, you can also specify the `atlos_url` option to point to your custom instance's URL. For example: + +```yaml +# orchestration.yaml content +steps: + feeder: atlos_feeder + archivers: # order matters + - youtubedl_archiver + enrichers: + - thumbnail_enricher + - hash_enricher + formatter: html_formatter + storages: + - atlos_storage + databases: + - console_db + - atlos_db + +configurations: + atlos_feeder: + atlos_url: "https://platform.atlos.org" # optional + api_token: "...your API token..." + atlos_db: + atlos_url: "https://platform.atlos.org" # optional + api_token: "...your API token..." + atlos_storage: + atlos_url: "https://platform.atlos.org" # optional + api_token: "...your API token..." + hash_enricher: + algorithm: "SHA-256" +``` ## Running on Google Sheets Feeder (gsheet_feeder) The `--gsheet_feeder.sheet` property is the name of the Google Sheet to check for URLs. diff --git a/src/auto_archiver/core/media.py b/src/auto_archiver/core/media.py index 7b18b27..bf9580e 100644 --- a/src/auto_archiver/core/media.py +++ b/src/auto_archiver/core/media.py @@ -25,10 +25,11 @@ class Media: _mimetype: str = None # eg: image/jpeg _stored: bool = field(default=False, repr=False, metadata=config(exclude=lambda _: True)) # always exclude - def store(self: Media, override_storages: List = None, url: str = "url-not-available"): - # stores the media into the provided/available storages [Storage] - # repeats the process for its properties, in case they have inner media themselves - # for now it only goes down 1 level but it's easy to make it recursive if needed + def store(self: Media, override_storages: List = None, url: str = "url-not-available", metadata: Any = None): + # 'Any' typing for metadata to avoid circular imports. Stores the media + # into the provided/available storages [Storage] repeats the process for + # its properties, in case they have inner media themselves for now it + # only goes down 1 level but it's easy to make it recursive if needed. storages = override_storages or ArchivingContext.get("storages") if not len(storages): logger.warning(f"No storages found in local context or provided directly for {self.filename}.") @@ -36,7 +37,7 @@ class Media: for s in storages: for any_media in self.all_inner_media(include_self=True): - s.store(any_media, url) + s.store(any_media, url, metadata=metadata) def all_inner_media(self, include_self=False): """ Media can be inside media properties, examples include transformations on original media. diff --git a/src/auto_archiver/core/metadata.py b/src/auto_archiver/core/metadata.py index 5e2182a..693b77d 100644 --- a/src/auto_archiver/core/metadata.py +++ b/src/auto_archiver/core/metadata.py @@ -48,7 +48,7 @@ class Metadata: self.remove_duplicate_media_by_hash() storages = override_storages or ArchivingContext.get("storages") for media in self.media: - media.store(override_storages=storages, url=self.get_url()) + media.store(override_storages=storages, url=self.get_url(), metadata=self) def set(self, key: str, val: Any) -> Metadata: self.metadata[key] = val diff --git a/src/auto_archiver/core/orchestrator.py b/src/auto_archiver/core/orchestrator.py index 94268f5..0594dde 100644 --- a/src/auto_archiver/core/orchestrator.py +++ b/src/auto_archiver/core/orchestrator.py @@ -120,7 +120,7 @@ class ArchivingOrchestrator: # 6 - format and store formatted if needed if (final_media := self.formatter.format(result)): - final_media.store(url=url) + final_media.store(url=url, metadata=result) result.set_final_media(final_media) if result.is_empty(): diff --git a/src/auto_archiver/databases/__init__.py b/src/auto_archiver/databases/__init__.py index 538cd72..df48f39 100644 --- a/src/auto_archiver/databases/__init__.py +++ b/src/auto_archiver/databases/__init__.py @@ -2,4 +2,5 @@ from .database import Database from .gsheet_db import GsheetsDb from .console_db import ConsoleDb from .csv_db import CSVDb -from .api_db import AAApiDb \ No newline at end of file +from .api_db import AAApiDb +from .atlos_db import AtlosDb \ No newline at end of file diff --git a/src/auto_archiver/databases/atlos_db.py b/src/auto_archiver/databases/atlos_db.py new file mode 100644 index 0000000..16c4910 --- /dev/null +++ b/src/auto_archiver/databases/atlos_db.py @@ -0,0 +1,79 @@ +import os +from typing import Union +from loguru import logger +from csv import DictWriter +from dataclasses import asdict +import requests + +from . import Database +from ..core import Metadata +from ..utils import get_atlos_config_options + + +class AtlosDb(Database): + """ + Outputs results to Atlos + """ + + name = "atlos_db" + + def __init__(self, config: dict) -> None: + # without this STEP.__init__ is not called + super().__init__(config) + + @staticmethod + def configs() -> dict: + return get_atlos_config_options() + + def failed(self, item: Metadata, reason: str) -> None: + """Update DB accordingly for failure""" + # If the item has no Atlos ID, there's nothing for us to do + if not item.metadata.get("atlos_id"): + logger.info(f"Item {item.get_url()} has no Atlos ID, skipping") + return + + requests.post( + f"{self.atlos_url}/api/v2/source_material/metadata/{item.metadata['atlos_id']}/auto_archiver", + headers={"Authorization": f"Bearer {self.api_token}"}, + json={"metadata": {"processed": True, "status": "error", "error": reason}}, + ).raise_for_status() + logger.info( + f"Stored failure for {item.get_url()} (ID {item.metadata['atlos_id']}) on Atlos: {reason}" + ) + + def fetch(self, item: Metadata) -> Union[Metadata, bool]: + """check and fetch if the given item has been archived already, each + database should handle its own caching, and configuration mechanisms""" + return False + + def _process_metadata(self, item: Metadata) -> dict: + """Process metadata for storage on Atlos. Will convert any datetime + objects to ISO format.""" + + return { + k: v.isoformat() if hasattr(v, "isoformat") else v + for k, v in item.metadata.items() + } + + def done(self, item: Metadata, cached: bool = False) -> None: + """archival result ready - should be saved to DB""" + + if not item.metadata.get("atlos_id"): + logger.info(f"Item {item.get_url()} has no Atlos ID, skipping") + return + + requests.post( + f"{self.atlos_url}/api/v2/source_material/metadata/{item.metadata['atlos_id']}/auto_archiver", + headers={"Authorization": f"Bearer {self.api_token}"}, + json={ + "metadata": dict( + processed=True, + status="success", + results=self._process_metadata(item), + ) + }, + ).raise_for_status() + + logger.info( + f"Stored success for {item.get_url()} (ID {item.metadata['atlos_id']}) on Atlos" + ) diff --git a/src/auto_archiver/enrichers/whisper_enricher.py b/src/auto_archiver/enrichers/whisper_enricher.py index 7ceab54..c0089a4 100644 --- a/src/auto_archiver/enrichers/whisper_enricher.py +++ b/src/auto_archiver/enrichers/whisper_enricher.py @@ -44,7 +44,7 @@ class WhisperEnricher(Enricher): job_results = {} for i, m in enumerate(to_enrich.media): if m.is_video() or m.is_audio(): - m.store(url=url) + m.store(url=url, metadata=to_enrich) try: job_id = self.submit_job(m) job_results[job_id] = False diff --git a/src/auto_archiver/feeders/__init__.py b/src/auto_archiver/feeders/__init__.py index f42f98f..84a8495 100644 --- a/src/auto_archiver/feeders/__init__.py +++ b/src/auto_archiver/feeders/__init__.py @@ -1,3 +1,4 @@ from.feeder import Feeder from .gsheet_feeder import GsheetsFeeder -from .cli_feeder import CLIFeeder \ No newline at end of file +from .cli_feeder import CLIFeeder +from .atlos_feeder import AtlosFeeder \ No newline at end of file diff --git a/src/auto_archiver/feeders/atlos_feeder.py b/src/auto_archiver/feeders/atlos_feeder.py new file mode 100644 index 0000000..d3acc00 --- /dev/null +++ b/src/auto_archiver/feeders/atlos_feeder.py @@ -0,0 +1,56 @@ +from loguru import logger +import requests + +from . import Feeder +from ..core import Metadata, ArchivingContext +from ..utils import get_atlos_config_options + + +class AtlosFeeder(Feeder): + name = "atlos_feeder" + + def __init__(self, config: dict) -> None: + # without this STEP.__init__ is not called + super().__init__(config) + if type(self.api_token) != str: + raise Exception("Atlos Feeder did not receive an Atlos API token") + + @staticmethod + def configs() -> dict: + return get_atlos_config_options() + + def __iter__(self) -> Metadata: + # Get all the urls from the Atlos API + count = 0 + cursor = None + while True: + response = requests.get( + f"{self.atlos_url}/api/v2/source_material", + headers={"Authorization": f"Bearer {self.api_token}"}, + params={"cursor": cursor}, + ) + data = response.json() + response.raise_for_status() + cursor = data["next"] + + for item in data["results"]: + if ( + item["source_url"] not in [None, ""] + and ( + item["metadata"] + .get("auto_archiver", {}) + .get("processed", False) + != True + ) + and item["visibility"] == "visible" + and item["status"] not in ["processing", "pending"] + ): + yield Metadata().set_url(item["source_url"]).set( + "atlos_id", item["id"] + ) + count += 1 + + if len(data["results"]) == 0 or cursor is None: + break + + logger.success(f"Processed {count} URL(s)") diff --git a/src/auto_archiver/storages/__init__.py b/src/auto_archiver/storages/__init__.py index c375f8e..5f768a6 100644 --- a/src/auto_archiver/storages/__init__.py +++ b/src/auto_archiver/storages/__init__.py @@ -1,4 +1,5 @@ from .storage import Storage from .s3 import S3Storage from .local import LocalStorage -from .gd import GDriveStorage \ No newline at end of file +from .gd import GDriveStorage +from .atlos import AtlosStorage \ No newline at end of file diff --git a/src/auto_archiver/storages/atlos.py b/src/auto_archiver/storages/atlos.py new file mode 100644 index 0000000..3b13aa0 --- /dev/null +++ b/src/auto_archiver/storages/atlos.py @@ -0,0 +1,74 @@ +import os +from typing import IO, List, Optional +from loguru import logger +import requests +import hashlib + +from ..core import Media, Metadata +from ..storages import Storage +from ..utils import get_atlos_config_options + + +class AtlosStorage(Storage): + name = "atlos_storage" + + def __init__(self, config: dict) -> None: + super().__init__(config) + + @staticmethod + def configs() -> dict: + return dict(Storage.configs(), **get_atlos_config_options()) + + def get_cdn_url(self, _media: Media) -> str: + # It's not always possible to provide an exact URL, because it's + # possible that the media once uploaded could have been copied to + # another project. + return self.atlos_url + + def _hash(self, media: Media) -> str: + # Hash the media file using sha-256. We don't use the existing auto archiver + # hash because there's no guarantee that the configuerer is using sha-256, which + # is how Atlos hashes files. + + sha256 = hashlib.sha256() + with open(media.filename, "rb") as f: + while True: + buf = f.read(4096) + if not buf: break + sha256.update(buf) + return sha256.hexdigest() + + def upload(self, media: Media, metadata: Optional[Metadata]=None, **_kwargs) -> bool: + atlos_id = metadata.get("atlos_id") + if atlos_id is None: + logger.error(f"No Atlos ID found in metadata; can't store {media.filename} on Atlos") + return False + + media_hash = self._hash(media) + + # Check whether the media has already been uploaded + source_material = requests.get( + f"{self.atlos_url}/api/v2/source_material/{atlos_id}", + headers={"Authorization": f"Bearer {self.api_token}"}, + ).json()["result"] + existing_media = [x["file_hash_sha256"] for x in source_material.get("artifacts", [])] + if media_hash in existing_media: + logger.info(f"{media.filename} with SHA256 {media_hash} already uploaded to Atlos") + return True + + # Upload the media to the Atlos API + requests.post( + f"{self.atlos_url}/api/v2/source_material/upload/{atlos_id}", + headers={"Authorization": f"Bearer {self.api_token}"}, + params={ + "title": media.properties + }, + files={"file": (os.path.basename(media.filename), open(media.filename, "rb"))}, + ).raise_for_status() + + logger.info(f"Uploaded {media.filename} to Atlos with ID {atlos_id} and title {media.key}") + + return True + + # must be implemented even if unused + def uploadf(self, file: IO[bytes], key: str, **kwargs: dict) -> bool: pass diff --git a/src/auto_archiver/storages/storage.py b/src/auto_archiver/storages/storage.py index 1ee3942..c9b55e0 100644 --- a/src/auto_archiver/storages/storage.py +++ b/src/auto_archiver/storages/storage.py @@ -1,12 +1,12 @@ from __future__ import annotations from abc import abstractmethod from dataclasses import dataclass -from typing import IO +from typing import IO, Optional import os from ..utils.misc import random_str -from ..core import Media, Step, ArchivingContext +from ..core import Media, Step, ArchivingContext, Metadata from ..enrichers import HashEnricher from loguru import logger from slugify import slugify @@ -43,12 +43,12 @@ class Storage(Step): # only for typing... return Step.init(name, config, Storage) - def store(self, media: Media, url: str) -> None: + def store(self, media: Media, url: str, metadata: Optional[Metadata]=None) -> None: if media.is_stored(): logger.debug(f"{media.key} already stored, skipping") return self.set_key(media, url) - self.upload(media) + self.upload(media, metadata=metadata) media.add_url(self.get_cdn_url(media)) @abstractmethod diff --git a/src/auto_archiver/utils/__init__.py b/src/auto_archiver/utils/__init__.py index 42ea0f5..fe5cb58 100644 --- a/src/auto_archiver/utils/__init__.py +++ b/src/auto_archiver/utils/__init__.py @@ -3,4 +3,5 @@ from .gworksheet import GWorksheet from .misc import * from .webdriver import Webdriver from .gsheet import Gsheets -from .url import UrlUtil \ No newline at end of file +from .url import UrlUtil +from .atlos import get_atlos_config_options \ No newline at end of file diff --git a/src/auto_archiver/utils/atlos.py b/src/auto_archiver/utils/atlos.py new file mode 100644 index 0000000..c47c711 --- /dev/null +++ b/src/auto_archiver/utils/atlos.py @@ -0,0 +1,13 @@ +def get_atlos_config_options(): + return { + "api_token": { + "default": None, + "help": "An Atlos API token. For more information, see https://docs.atlos.org/technical/api/", + "cli_set": lambda cli_val, _: cli_val + }, + "atlos_url": { + "default": "https://platform.atlos.org", + "help": "The URL of your Atlos instance (e.g., https://platform.atlos.org), without a trailing slash.", + "cli_set": lambda cli_val, _: cli_val + }, + } \ No newline at end of file diff --git a/src/auto_archiver/version.py b/src/auto_archiver/version.py index 1873d98..3bf42a3 100644 --- a/src/auto_archiver/version.py +++ b/src/auto_archiver/version.py @@ -3,7 +3,7 @@ _MAJOR = "0" _MINOR = "11" # On main and in a nightly release the patch should be one ahead of the last # released build. -_PATCH = "0" +_PATCH = "1" # This is mainly for nightly builds which have the suffix ".dev$DATE". See # https://semver.org/#is-v123-a-semantic-version for the semantics. _SUFFIX = ""