Merge branch 'main' into historical-crawl-fixes

pull/883/head
Andrey 2023-08-03 02:30:43 +03:00
commit c4b18145b0
22 zmienionych plików z 476 dodań i 471 usunięć

Wyświetl plik

@ -82,7 +82,7 @@ def get_entity_subscription_collection_id(
resource_type: str,
token: Union[uuid.UUID, str],
user_id: uuid.UUID,
) -> Optional[str]:
) -> str:
"""
Get collection_id from brood resources. If collection not exist and create_if_not_exist is True
"""

Wyświetl plik

@ -9,41 +9,39 @@ from typing import Any, Dict, List
from uuid import UUID
import boto3 # type: ignore
from bugout.data import BugoutResource
from entity.data import EntityResponse # type: ignore
from bugout.data import BugoutJournalEntity, BugoutResource
from fastapi import BackgroundTasks, FastAPI
from fastapi.middleware.cors import CORSMiddleware
from moonstreamdb.blockchain import (
AvailableBlockchainType,
get_label_model,
get_block_model,
get_label_model,
get_transaction_model,
)
from sqlalchemy import text
from .actions import (
generate_s3_access_links,
query_parameter_hash,
get_entity_subscription_collection_id,
EntityCollectionNotFoundException,
)
from . import data
from .actions import (
EntityCollectionNotFoundException,
generate_s3_access_links,
get_entity_subscription_collection_id,
query_parameter_hash,
)
from .middleware import MoonstreamHTTPException
from .settings import (
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
DOCS_TARGET_PATH,
LINKS_EXPIRATION_TIME,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_S3_QUERIES_BUCKET,
MOONSTREAM_S3_QUERIES_BUCKET_PREFIX,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
NB_CONTROLLER_ACCESS_ID,
ORIGINS,
LINKS_EXPIRATION_TIME,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
)
from .settings import bugout_client as bc, entity_client as ec
from .settings import bugout_client as bc
from .stats_worker import dashboard, queries
from .version import MOONCRAWL_VERSION
@ -115,12 +113,11 @@ async def status_handler(
)
try:
collection_id = get_entity_subscription_collection_id(
journal_id = get_entity_subscription_collection_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
user_id=UUID(stats_update.user_id),
)
except EntityCollectionNotFoundException as e:
raise MoonstreamHTTPException(
status_code=404,
@ -136,20 +133,19 @@ async def status_handler(
s3_client = boto3.client("s3")
subscription_by_id: Dict[str, EntityResponse] = {}
subscription_by_id: Dict[str, BugoutJournalEntity] = {}
for dashboard_subscription_filters in dashboard_resource.resource_data[
"subscription_settings"
]:
# get subscription by id
subscription: EntityResponse = ec.get_entity(
subscription: BugoutJournalEntity = bc.get_entity(
token=stats_update.token,
collection_id=collection_id,
journal_id=journal_id,
entity_id=dashboard_subscription_filters["subscription_id"],
)
subscription_by_id[str(subscription.entity_id)] = subscription
subscription_by_id[str(subscription.id)] = subscription
try:
background_tasks.add_task(
@ -182,7 +178,7 @@ async def status_handler(
subscriprions_type = reqired_field["subscription_type_id"]
for timescale in stats_update.timescales:
presigned_urls_response[subscription_entity.entity_id] = {}
presigned_urls_response[subscription_entity.id] = {}
try:
result_key = f"{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{dashboard.blockchain_by_subscription_id[subscriprions_type]}/contracts_data/{subscription_entity.address}/{stats_update.dashboard_id}/v1/{timescale}.json"
@ -201,7 +197,7 @@ async def status_handler(
HttpMethod="GET",
)
presigned_urls_response[subscription_entity.entity_id][timescale] = {
presigned_urls_response[subscription_entity.id][timescale] = {
"url": stats_presigned_url,
"headers": {
"If-Modified-Since": (

Wyświetl plik

@ -2,11 +2,11 @@ import argparse
import json
import logging
import os
from typing import Any, Dict
from typing import cast, List
import uuid
import requests # type: ignore
from bugout.data import BugoutSearchResult
from .utils import get_results_for_moonstream_query
from ..settings import (
@ -49,17 +49,18 @@ def handle_leaderboards(args: argparse.Namespace) -> None:
limit=100,
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
leaderboards_results = cast(List[BugoutSearchResult], leaderboards.results)
except Exception as e:
logger.error(f"Could not get leaderboards from journal: {e}")
return
if len(leaderboards.results) == 0:
if len(leaderboards_results) == 0:
logger.error("No leaderboard found")
return
logger.info(f"Found {len(leaderboards.results)} leaderboards")
logger.info(f"Found {len(leaderboards_results)} leaderboards")
for leaderboard in leaderboards.results:
for leaderboard in leaderboards_results:
logger.info(
f"Processing leaderboard: {leaderboard.title} with id: {[tag for tag in leaderboard.tags if tag.startswith('leaderboard_id')]}"
)

Wyświetl plik

@ -195,7 +195,7 @@ def get_crawl_job_entries(
query += f" created_at:>={created_at_filter}"
current_offset = 0
entries = []
entries: List[BugoutSearchResult] = []
while True:
search_result = bugout_client.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
@ -205,10 +205,11 @@ def get_crawl_job_entries(
limit=limit,
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
entries.extend(search_result.results)
search_results = cast(List[BugoutSearchResult], search_result.results)
entries.extend(search_results)
# if len(entries) >= search_result.total_results:
if len(search_result.results) == 0:
if len(search_results) == 0:
break
current_offset += limit
return entries
@ -402,8 +403,9 @@ def _get_heartbeat_entry_id(
limit=1,
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
if entries.results:
return entries.results[0].entry_url.split("/")[-1]
search_results = cast(List[BugoutSearchResult], entries.results)
if search_results:
return search_results[0].entry_url.split("/")[-1]
else:
logger.info(f"No {crawler_type} heartbeat entry found, creating one")
entry = bugout_client.create_entry(

Wyświetl plik

@ -3,10 +3,8 @@ from typing import Dict, Optional
from uuid import UUID
from bugout.app import Bugout
from entity.client import Entity # type: ignore
from moonstreamdb.blockchain import AvailableBlockchainType
# Bugout
BUGOUT_BROOD_URL = os.environ.get("BUGOUT_BROOD_URL", "https://auth.bugout.dev")
BUGOUT_SPIRE_URL = os.environ.get("BUGOUT_SPIRE_URL", "https://spire.bugout.dev")
@ -14,15 +12,6 @@ BUGOUT_SPIRE_URL = os.environ.get("BUGOUT_SPIRE_URL", "https://spire.bugout.dev"
bugout_client = Bugout(brood_api_url=BUGOUT_BROOD_URL, spire_api_url=BUGOUT_SPIRE_URL)
# Entity
MOONSTREAM_ENTITY_URL = os.environ.get("MOONSTREAM_ENTITY_URL", "")
if MOONSTREAM_ENTITY_URL == "":
raise ValueError("MOONSTREAM_ENTITY_URL environment variable must be set")
entity_client = Entity(MOONSTREAM_ENTITY_URL)
MOONSTREAM_API_URL = os.environ.get("MOONSTREAM_API_URL", "https://api.moonstream.to")
MOONSTREAM_ENGINE_URL = os.environ.get(
"MOONSTREAM_ENGINE_URL", "https://engineapi.moonstream.to"

Wyświetl plik

@ -6,21 +6,25 @@ import hashlib
import json
import logging
import time
import traceback
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Union
from typing import Any, Callable, cast, Dict, List, Optional, Union
from uuid import UUID
import boto3 # type: ignore
from bugout.data import BugoutResource, BugoutResources
from entity.data import EntityResponse, EntityCollectionResponse # type: ignore
from bugout.data import (
BugoutJournalEntity,
BugoutResource,
BugoutResources,
BugoutSearchResultAsEntity,
)
from moonstreamdb.blockchain import (
AvailableBlockchainType,
get_label_model,
get_transaction_model,
)
from sqlalchemy import and_, cast, distinct, extract, func, text
from sqlalchemy import and_, distinct, extract, func, text
from sqlalchemy import cast as sqlalchemy_cast
from sqlalchemy.orm import Session
from sqlalchemy.sql.operators import in_op
from web3 import Web3
@ -29,14 +33,14 @@ from ..blockchain import connect
from ..db import yield_db_read_only_session_ctx
from ..reporter import reporter
from ..settings import (
BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
CRAWLER_LABEL,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
NB_CONTROLLER_ACCESS_ID,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
)
from ..settings import bugout_client as bc, entity_client as ec
from ..settings import bugout_client as bc
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@ -157,11 +161,13 @@ def generate_data(
.filter(in_op(label_model.label_data["name"].astext, functions))
.filter(
label_model.block_timestamp
>= cast(extract("epoch", start), label_model.block_timestamp.type)
>= sqlalchemy_cast(
extract("epoch", start), label_model.block_timestamp.type
)
)
.filter(
label_model.block_timestamp
< cast(
< sqlalchemy_cast(
extract("epoch", (start + timescales_delta[timescale]["timedelta"])),
label_model.block_timestamp.type,
)
@ -652,25 +658,26 @@ def stats_generate_handler(args: argparse.Namespace):
address_dashboard_id_subscription_id_tree: Dict[str, Any] = {}
for user_id, collection_id in user_collection_by_id.items():
for user_id, journal_id in user_collection_by_id.items():
# request all subscriptions for user
user_subscriptions: EntityCollectionResponse = ec.search_entities(
user_subscriptions = bc.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
collection_id=collection_id,
required_field=[
"subscription_type_id:{}".format(
subscription_id_by_blockchain[args.blockchain]
)
],
journal_id=journal_id,
query=f"tag:subscription_type_id:{subscription_id_by_blockchain[args.blockchain]}",
representation="entity",
)
user_subscriptions_results = cast(
List[BugoutSearchResultAsEntity], user_subscriptions.results
)
logger.info(
f"Amount of user subscriptions: {len(user_subscriptions.entities)}"
f"Amount of user subscriptions: {len(user_subscriptions_results)}"
)
for subscription in user_subscriptions.entities:
subscription_id = str(subscription.entity_id)
for subscription in user_subscriptions_results:
entity_url_list = subscription.entity_url.split("/")
subscription_id = entity_url_list[len(entity_url_list) - 1]
if subscription_id not in dashboards_by_subscription:
logger.info(
@ -1014,7 +1021,7 @@ def stats_generate_handler(args: argparse.Namespace):
def stats_generate_api_task(
timescales: List[str],
dashboard: BugoutResource,
subscription_by_id: Dict[str, EntityResponse],
subscription_by_id: Dict[str, BugoutJournalEntity],
access_id: Optional[UUID] = None,
):
"""

Wyświetl plik

@ -4,9 +4,6 @@ export BUGOUT_SPIRE_URL="https://spire.bugout.dev"
export HUMBUG_REPORTER_CRAWLERS_TOKEN="<Bugout_Humbug_token_for_crash_reports>"
# Entity environment variables
export MOONSTREAM_ENTITY_URL="https://api.moonstream.to/entity"
# Engine environment variables
export MOONSTREAM_ENGINE_URL="https://engineapi.moonstream.to"

Wyświetl plik

@ -34,12 +34,11 @@ setup(
zip_safe=False,
install_requires=[
"boto3",
"bugout>=0.2.8",
"bugout>=0.2.12",
"chardet",
"fastapi",
"moonstreamdb>=0.3.4",
"moonstream>=0.1.1",
"moonstream-entity>=0.0.5",
"moonworm[moonstream]>=0.6.2",
"humbug",
"pydantic==1.9.2",

Wyświetl plik

@ -1,28 +1,27 @@
from collections import OrderedDict
import hashlib
import json
from itertools import chain
import logging
from typing import List, Optional, Dict, Any, Union
from enum import Enum
import uuid
from collections import OrderedDict
from enum import Enum
from itertools import chain
from typing import Any, Dict, List, Optional, Union
import boto3 # type: ignore
from bugout.data import (
BugoutSearchResults,
BugoutSearchResult,
BugoutJournal,
BugoutJournals,
BugoutResource,
BugoutResources,
BugoutSearchResult,
BugoutSearchResults,
)
from bugout.journal import SearchOrder
from bugout.exceptions import BugoutResponseException
from entity.data import EntityCollectionsResponse, EntityCollectionResponse # type: ignore
from entity.exceptions import EntityUnexpectedResponse # type: ignore
from bugout.journal import SearchOrder
from ens.utils import is_valid_ens_name # type: ignore
from eth_utils.address import is_address # type: ignore
from moonstreamdb.models import EthereumLabel
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamdb.models import EthereumLabel
from slugify import slugify # type: ignore
from sqlalchemy import text
from sqlalchemy.orm import Session
@ -32,24 +31,20 @@ from web3._utils.validation import validate_abi
from . import data
from .middleware import MoonstreamHTTPException
from .reporter import reporter
from .selectors_storage import selectors
from .settings import (
BUGOUT_REQUEST_TIMEOUT_SECONDS,
ETHERSCAN_SMARTCONTRACTS_BUCKET,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_APPLICATION_ID,
MOONSTREAM_DATA_JOURNAL_ID,
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
support_interfaces,
supportsInterface_abi,
multicall_contracts,
)
from .settings import bugout_client as bc, entity_client as ec
from .web3_provider import multicall, FunctionSignature, connect
from .selectors_storage import selectors
from .settings import bugout_client as bc
from .settings import multicall_contracts, support_interfaces, supportsInterface_abi
from .web3_provider import FunctionSignature, connect, multicall
logger = logging.getLogger(__name__)
@ -88,9 +83,9 @@ class ResourceQueryFetchException(Exception):
"""
class EntityCollectionNotFoundException(Exception):
class EntityJournalNotFoundException(Exception):
"""
Raised when entity collection is not found
Raised when journal (collection prev.) with entities not found.
"""
@ -482,7 +477,7 @@ def get_all_entries_from_search(
results: List[BugoutSearchResult] = []
existing_metods = bc.search(
existing_methods = bc.search(
token=token,
journal_id=journal_id,
query=search_query,
@ -491,11 +486,11 @@ def get_all_entries_from_search(
limit=limit,
offset=offset,
)
results.extend(existing_metods.results)
results.extend(existing_methods.results) # type: ignore
if len(results) != existing_metods.total_results:
for offset in range(limit, existing_metods.total_results, limit):
existing_metods = bc.search(
if len(results) != existing_methods.total_results:
for offset in range(limit, existing_methods.total_results, limit):
existing_methods = bc.search(
token=token,
journal_id=journal_id,
query=search_query,
@ -504,7 +499,7 @@ def get_all_entries_from_search(
limit=limit,
offset=offset,
)
results.extend(existing_metods.results)
results.extend(existing_methods.results) # type: ignore
return results
@ -641,14 +636,14 @@ def get_query_by_name(query_name: str, token: uuid.UUID) -> str:
return query_id
def get_entity_subscription_collection_id(
def get_entity_subscription_journal_id(
resource_type: str,
token: Union[uuid.UUID, str],
user_id: uuid.UUID,
create_if_not_exist: bool = False,
) -> Optional[str]:
) -> str:
"""
Get collection_id from brood resources. If collection not exist and create_if_not_exist is True
Get collection_id (journal_id) from brood resources. If journal not exist and create_if_not_exist is True
"""
params = {
@ -668,52 +663,49 @@ def get_entity_subscription_collection_id(
if len(resources.resources) == 0:
if not create_if_not_exist:
raise EntityCollectionNotFoundException(
"Subscription collection not found."
)
collection_id = generate_collection_for_user(resource_type, token, user_id)
raise EntityJournalNotFoundException("Subscription journal not found.")
journal_id = generate_journal_for_user(resource_type, token, user_id)
return collection_id
return journal_id
else:
resource = resources.resources[0]
return resource.resource_data["collection_id"]
def generate_collection_for_user(
def generate_journal_for_user(
resource_type: str,
token: Union[uuid.UUID, str],
user_id: uuid.UUID,
) -> str:
try:
# try get collection
# Try get journal
collections: EntityCollectionsResponse = ec.list_collections(token=token)
journals: BugoutJournals = bc.list_journals(token=token)
available_collections: Dict[str, str] = {
collection.name: collection.collection_id
for collection in collections.collections
available_journals: Dict[str, str] = {
journal.name: str(journal.id) for journal in journals.journals
}
subscription_collection_name = f"subscriptions_{user_id}"
subscription_journal_name = f"subscriptions_{user_id}"
if subscription_collection_name not in available_collections:
collection: EntityCollectionResponse = ec.add_collection(
token=token, name=subscription_collection_name
if subscription_journal_name not in available_journals:
journal: BugoutJournal = bc.create_journal(
token=token, name=subscription_journal_name
)
collection_id = collection.collection_id
journal_id = str(journal.id)
else:
collection_id = available_collections[subscription_collection_name]
except EntityUnexpectedResponse as e:
logger.error(f"Error create collection, error: {str(e)}")
journal_id = available_journals[subscription_journal_name]
except Exception as e:
logger.error(f"Error create journal, error: {str(e)}")
raise MoonstreamHTTPException(
status_code=500, detail="Can't create collection for subscriptions"
status_code=500, detail="Can't create journal for subscriptions"
)
resource_data = {
"type": resource_type,
"user_id": str(user_id),
"collection_id": str(collection_id),
"collection_id": journal_id,
}
try:
@ -727,14 +719,14 @@ def generate_collection_for_user(
except Exception as e:
logger.error(f"Error creating subscription resource: {str(e)}")
logger.error(
f"Required create resource data: {resource_data}, and grand access to journal: {collection_id}, for user: {user_id}"
f"Required create resource data: {resource_data}, and grand access to journal: {journal_id}, for user: {user_id}"
)
raise MoonstreamHTTPException(status_code=500, internal_error=e)
try:
bc.update_journal_scopes(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=collection_id,
journal_id=journal_id,
holder_type="user",
holder_id=user_id,
permission_list=[
@ -746,16 +738,16 @@ def generate_collection_for_user(
],
)
logger.info(
f"Grand access to journal: {collection_id}, for user: {user_id} successfully"
f"Grand access to journal: {journal_id}, for user: {user_id} successfully"
)
except Exception as e:
logger.error(f"Error updating journal scopes: {str(e)}")
logger.error(
f"Required grand access to journal: {collection_id}, for user: {user_id}"
f"Required grand access to journal: {journal_id}, for user: {user_id}"
)
raise MoonstreamHTTPException(status_code=500, internal_error=e)
return collection_id
return journal_id
def generate_s3_access_links(
@ -840,14 +832,14 @@ def get_list_of_support_interfaces(
list_of_interfaces.sort()
for interaface in list_of_interfaces:
for interface in list_of_interfaces:
calls.append(
(
contract.address,
FunctionSignature(
contract.get_function_by_name("supportsInterface")
)
.encode_data([bytes.fromhex(interaface)])
.encode_data([bytes.fromhex(interface)])
.hex(),
)
)

Wyświetl plik

@ -2,65 +2,65 @@
Generate entity subscriptions from existing brood resources subscriptions
"""
import hashlib
import logging
import json
import logging
import os
import traceback
from typing import List, Optional, Dict, Any, Union, Tuple
import uuid
import time
from typing import Any, Dict, List, Optional, Tuple, Union
import boto3 # type: ignore
from bugout.data import BugoutResources, BugoutResource
from bugout.exceptions import BugoutResponseException
from entity.exceptions import EntityUnexpectedResponse # type: ignore
from entity.data import EntityCollectionResponse, EntityResponse # type: ignore
from bugout.data import (
BugoutJournal,
BugoutJournalEntity,
BugoutResource,
BugoutResources,
)
from bugout.exceptions import BugoutResponseException, BugoutUnexpectedResponse
from ...settings import (
BUGOUT_REQUEST_TIMEOUT_SECONDS,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
MOONSTREAM_APPLICATION_ID,
BUGOUT_RESOURCE_TYPE_DASHBOARD,
BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_APPLICATION_ID,
)
from ...settings import bugout_client as bc, entity_client as ec
from ...settings import bugout_client as bc
from ..subscription_types import CANONICAL_SUBSCRIPTION_TYPES
logger = logging.getLogger(__name__)
### create collection for user
### Create journal for user
def create_collection_for_user(user_id: uuid.UUID) -> str:
def create_journal_for_user(user_id: uuid.UUID) -> str:
"""
Create collection for user if not exist
Create journal (collection) for user if not exist
"""
try:
# try get collection
collection: EntityCollectionResponse = ec.add_collection(
# Try to get journal
journal: BugoutJournal = bc.create_journal(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN, name=f"subscriptions_{user_id}"
)
collection_id = collection.collection_id
except EntityUnexpectedResponse as e:
logger.error(f"Error create collection, error: {str(e)}")
return str(collection_id)
journal_id = journal.id
except BugoutUnexpectedResponse as e:
logger.error(f"Error create journal, error: {str(e)}")
return str(journal_id)
def add_entity_subscription(
user_id: uuid.UUID,
subscription_type_id: str,
collection_id: str,
journal_id: str,
address: str,
color: str,
label: str,
content: Dict[str, Any],
) -> EntityResponse:
) -> BugoutJournalEntity:
"""
Add subscription to collection
Add subscription to journal (collection).
"""
if subscription_type_id not in CANONICAL_SUBSCRIPTION_TYPES:
@ -68,17 +68,18 @@ def add_entity_subscription(
f"Unknown subscription type ID: {subscription_type_id}. "
f"Known subscription type IDs: {CANONICAL_SUBSCRIPTION_TYPES.keys()}"
)
elif CANONICAL_SUBSCRIPTION_TYPES[subscription_type_id].blockchain is None:
blockchain = CANONICAL_SUBSCRIPTION_TYPES[subscription_type_id].blockchain
if blockchain is None:
raise ValueError(
f"Subscription type ID {subscription_type_id} is not a blockchain subscription type."
)
entity = ec.add_entity(
entity = bc.create_entity(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
collection_id=collection_id,
journal_id=journal_id,
address=address,
blockchain=CANONICAL_SUBSCRIPTION_TYPES[subscription_type_id].blockchain,
name=label,
blockchain=blockchain,
title=label,
required_fields=[
{"type": "subscription"},
{"subscription_type_id": f"{subscription_type_id}"},
@ -105,34 +106,19 @@ def get_abi_from_s3(s3_path: str, bucket: str):
logger.error(f"Error get ABI from S3: {str(e)}")
def revoke_collection_permissions_from_user(
user_id: uuid.UUID, collection_id: str, permissions: List[str]
):
"""
Remove all permissions from user
"""
bc.delete_journal_scopes(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=collection_id,
holder_type="user",
holder_id=user_id,
permission_list=permissions,
)
def find_user_collection(
def find_user_journal(
user_id: uuid.UUID,
create_if_not_exists: bool = False,
) -> Tuple[Optional[str], Optional[str]]:
"""
Find user collection in Brood resources
Can create new collection if not exists and create_if_not_exists = True
Find user journal (collection) in Brood resources
Can create new journal (collection) if not exists and create_if_not_exists = True
"""
params = {
"type": BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
"user_id": str(user_id),
}
logger.info(f"Looking for collection for user {user_id}")
logger.info(f"Looking for journal (collection) for user {user_id}")
try:
user_entity_resources: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN, params=params
@ -147,18 +133,16 @@ def find_user_collection(
)
if len(user_entity_resources.resources) > 0:
collection_id = user_entity_resources.resources[0].resource_data[
"collection_id"
]
journal_id = user_entity_resources.resources[0].resource_data["collection_id"]
logger.info(
f"Collection found for user {user_id}. collection_id: {collection_id}"
f"Journal (collection) found for user {user_id}. journal_id: {journal_id}"
)
return collection_id, str(user_entity_resources.resources[0].id)
return journal_id, str(user_entity_resources.resources[0].id)
elif create_if_not_exists:
# Create collection new collection for user
logger.info(f"Creating new collection")
collection = create_collection_for_user(user_id)
return collection, None
# Create new journal for user
logger.info(f"Creating new journal (collection)")
journal_id = create_journal_for_user(user_id)
return journal_id, None
return None, None
@ -223,33 +207,35 @@ def generate_entity_subscriptions_from_brood_resources() -> None:
logger.info(f"parsed users: {len(users_subscriptions)}")
### Create collections and add subscriptions
### Create journals (collections) and add subscriptions
try:
for user_id, subscriptions in users_subscriptions.items():
user_id = str(user_id)
collection_id = None
journal_id = None
resource_id_of_user_collection = None
### Collection can already exist in stages.json
### Journal can already exist in stages.json
if "collection_id" in stages[user_id]:
collection_id = stages[user_id]["collection_id"]
journal_id = stages[user_id]["collection_id"]
if "subscription_resource_id" in stages[user_id]:
resource_id_of_user_collection = stages[user_id][
"subscription_resource_id"
]
else:
### look for collection in brood resources
collection_id, resource_id_of_user_collection = find_user_collection(
journal_id, resource_id_of_user_collection = find_user_journal(
user_id, create_if_not_exists=True
)
if collection_id is None:
logger.info(f"Collection not found or create for user {user_id}")
if journal_id is None:
logger.info(
f"Journal (collection) not found or create for user {user_id}"
)
continue
stages[user_id]["collection_id"] = collection_id
stages[user_id]["collection_id"] = journal_id
# Create user subscription collection resource
@ -262,7 +248,7 @@ def generate_entity_subscriptions_from_brood_resources() -> None:
resource_data = {
"type": BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
"user_id": str(user_id),
"collection_id": str(collection_id),
"collection_id": str(journal_id),
"version": "1.0.0",
}
@ -318,11 +304,11 @@ def generate_entity_subscriptions_from_brood_resources() -> None:
# Add subscription to collection
logger.info(f"Add subscription to collection: {collection_id}")
logger.info(f"Add subscription to journal (collection): {journal_id}")
entity = add_entity_subscription(
user_id=user_id,
collection_id=collection_id,
journal_id=journal_id,
subscription_type_id=subscription_type_id,
address=address,
color=color,
@ -331,7 +317,7 @@ def generate_entity_subscriptions_from_brood_resources() -> None:
)
stages[user_id]["processed_subscriptions"][
str(subscription["subscription_id"])
] = {"entity_id": str(entity.entity_id), "dashboard_ids": []}
] = {"entity_id": str(entity.id), "dashboard_ids": []}
# Add permissions to user
@ -342,7 +328,7 @@ def generate_entity_subscriptions_from_brood_resources() -> None:
try:
bc.update_journal_scopes(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=collection_id,
journal_id=journal_id,
holder_type="user",
holder_id=user_id,
permission_list=[
@ -361,12 +347,12 @@ def generate_entity_subscriptions_from_brood_resources() -> None:
continue
else:
logger.warn(
f"User {user_id} == {admin_user_id} permissions not changed. Unexpected behaivior!"
f"User {user_id} == {admin_user_id} permissions not changed. Unexpected behavior!"
)
except Exception as e:
traceback.print_exc()
logger.error(f"Failed to proccess user subscriptions: {str(e)}")
logger.error(f"Failed to process user subscriptions: {str(e)}")
finally:
try:
with open("stages.json", "w") as f:
@ -561,18 +547,18 @@ def delete_generated_entity_subscriptions_from_brood_resources():
logger.info(f"parsed users: {len(users_subscriptions)}")
### Create collections and add subscriptions
### Create journals and add subscriptions
try:
for user_id, _ in users_subscriptions.items():
user_id = str(user_id)
collection_id = None
journal_id = None
resource_id_of_user_collection = None
### Collection can already exist in stages.json
if "collection_id" in stages[user_id]:
collection_id = stages[user_id]["collection_id"]
journal_id = stages[user_id]["collection_id"]
if "subscription_resource_id" in stages[user_id]:
resource_id_of_user_collection = stages[user_id][
@ -581,35 +567,37 @@ def delete_generated_entity_subscriptions_from_brood_resources():
else:
### look for collection in brood resources
collection_id, resource_id_of_user_collection = find_user_collection(
journal_id, resource_id_of_user_collection = find_user_journal(
user_id, create_if_not_exists=False
)
if collection_id is None:
if journal_id is None:
logger.info(f"Collection not found or create for user {user_id}")
continue
### Delete collection
try:
ec.delete_collection(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN, collection_id=collection_id
bc.delete_journal(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN, journal_id=journal_id
)
logger.info(f"Collection deleted {collection_id}")
logger.info(f"Journal (collection) deleted {journal_id}")
except Exception as e:
logger.error(f"Failed to delete collection: {str(e)}")
logger.error(f"Failed to delete journal (collection): {str(e)}")
### Delete collection resource
try:
logger.info(f"Collection resource id {resource_id_of_user_collection}")
logger.info(
f"Journal (collection) resource id {resource_id_of_user_collection}"
)
bc.delete_resource(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
resource_id=resource_id_of_user_collection,
)
logger.info(
f"Collection resource deleted {resource_id_of_user_collection}"
f"Journal (collection) resource deleted {resource_id_of_user_collection}"
)
# clear stages
@ -617,12 +605,14 @@ def delete_generated_entity_subscriptions_from_brood_resources():
stages[user_id] = {}
except Exception as e:
logger.error(f"Failed to delete collection resource: {str(e)}")
logger.error(
f"Failed to delete journal (collection) resource: {str(e)}"
)
continue
except Exception as e:
traceback.print_exc()
logger.error(f"Failed to proccess user subscriptions: {str(e)}")
logger.error(f"Failed to process user subscriptions: {str(e)}")
def restore_dashboard_state():
@ -659,7 +649,7 @@ def restore_dashboard_state():
dashboards_by_user[user_id].append(dashboard)
### Retunr all dashboards to old state
### Return all dashboards to old state
logger.info(f"Amount of users: {len(dashboards_by_user)}")
@ -738,41 +728,41 @@ def fix_duplicates_keys_in_entity_subscription():
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
# get collection ids from that resources
# get journal ids from that resources
collection_id_user_id_mappig = {}
collection_id_user_id_mapping = {}
for subscription in subscriptions.resources:
if "collection_id" in subscription.resource_data:
if (
subscription.resource_data["collection_id"]
not in collection_id_user_id_mappig
not in collection_id_user_id_mapping
):
collection_id_user_id_mappig[
collection_id_user_id_mapping[
subscription.resource_data["collection_id"]
] = subscription.resource_data["user_id"]
else:
raise Exception(
f"Duplicate collection_id {subscription.resource_data['collection_id']} in subscriptions"
)
# go through all collections and fix entities.
# go through all journals and fix entities.
# Will creating one new entity with same data but without "type:subscription" in required_fields
for collection_id, user_id in collection_id_user_id_mappig.items():
# get collection entities
collection_entities = ec.search_entities(
for journal_id, user_id in collection_id_user_id_mapping.items():
# get journal entities
journal_entities = bc.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
collection_id=collection_id,
journal_id=journal_id,
required_field=[f"type:subscription"],
limit=1000,
representation="entity",
)
logger.info(
f"Amount of entities in user: {user_id} collection {collection_id}: {len(collection_entities.entities)}"
f"Amount of entities in user: {user_id} journal (collection) {journal_id}: {len(journal_entities.entities)}"
)
for entity in collection_entities.entities:
for entity in journal_entities.entities:
# get entity data
if entity.secondary_fields is None:
@ -785,12 +775,7 @@ def fix_duplicates_keys_in_entity_subscription():
secondary_fields = secondary_fields["secondary_fields"]
# get entity id
entity_id = entity.entity_id
# get entity type
entity_type = None
# extract required fields
@ -811,45 +796,45 @@ def fix_duplicates_keys_in_entity_subscription():
new_required_fields.append(
{"type": "copy_of_malformed_entity_20230213"}
)
new_required_fields.append({"entity_id": str(entity_id)})
new_required_fields.append({"entity_id": str(entity.id)})
new_entity = ec.add_entity(
new_entity = bc.create_entity(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
collection_id=collection_id,
journal_id=journal_id,
blockchain=entity.blockchain,
address=entity.address,
name=entity.name,
title=entity.title,
required_fields=new_required_fields,
secondary_fields=entity.secondary_fields,
)
logger.info(
f"Entity {new_entity.entity_id} created successfully for collection {collection_id}"
f"Entity {new_entity.id} created successfully for journal (collection) {journal_id}"
)
except Exception as e:
logger.error(
f"Failed to create entity {entity_id} for collection {collection_id}: {str(e)}, user_id: {user_id}"
f"Failed to create entity {entity.id} for journal (collection) {journal_id}: {str(e)}, user_id: {user_id}"
)
continue
# Update old entity without secondary_fields duplicate
try:
ec.update_entity(
bc.update_entity(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
collection_id=collection_id,
entity_id=entity_id,
journal_id=journal_id,
entity_id=entity.id,
blockchain=entity.blockchain,
address=entity.address,
name=entity.name,
title=entity.title,
required_fields=entity.required_fields,
secondary_fields=secondary_fields,
)
logger.info(
f"Entity {entity_id} updated successfully for collection {collection_id}"
f"Entity {entity.id} updated successfully for journal (collection) {journal_id}"
)
except Exception as e:
logger.error(
f"Failed to update entity {entity_id} for collection {collection_id}: {str(e)}, user_id: {user_id}"
f"Failed to update entity {entity.id} for journal (collection) {journal_id}: {str(e)}, user_id: {user_id}"
)

Wyświetl plik

@ -1,25 +1,23 @@
import argparse
from collections import Counter
import json
import logging
import textwrap
from typing import Any, Dict
from bugout.data import BugoutResources
from bugout.exceptions import BugoutResponseException
from moonstream.client import Moonstream # type: ignore
import logging
from typing import Dict, Any
import textwrap
from sqlalchemy import text
from ..actions import get_all_entries_from_search, name_normalization
from ..data import BUGOUT_RESOURCE_QUERY_RESOLVER
from ..settings import (
BUGOUT_REQUEST_TIMEOUT_SECONDS,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_QUERIES_JOURNAL_ID,
MOONSTREAM_QUERY_TEMPLATE_CONTEXT_TYPE,
)
from ..settings import bugout_client as bc, MOONSTREAM_QUERY_TEMPLATE_CONTEXT_TYPE
from ..actions import get_all_entries_from_search, name_normalization
from ..settings import bugout_client as bc
logger = logging.getLogger(__name__)

Wyświetl plik

@ -1,10 +1,10 @@
"""
Pydantic schemas for the Moonstream HTTP API
"""
from datetime import datetime
import json
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List, Optional, Union, Literal
from typing import Any, Dict, List, Literal, Optional, Union
from uuid import UUID
from xmlrpc.client import Boolean
@ -12,7 +12,6 @@ from fastapi import Form
from pydantic import BaseModel, Field, validator
from sqlalchemy import false
USER_ONBOARDING_STATE = "onboarding_state"
BUGOUT_RESOURCE_QUERY_RESOLVER = "query_name_resolver"
@ -58,13 +57,6 @@ class SubscriptionResourceData(BaseModel):
updated_at: Optional[datetime]
class CreateSubscriptionRequest(BaseModel):
address: str
color: str
label: str
subscription_type_id: str
class PingResponse(BaseModel):
"""
Schema for ping response
@ -244,7 +236,7 @@ class OnboardingState(BaseModel):
steps: Dict[str, int]
class SubdcriptionsAbiResponse(BaseModel):
class SubscriptionsAbiResponse(BaseModel):
abi: str

Wyświetl plik

@ -4,10 +4,10 @@ Event providers powered by Bugout journals.
import json
import logging
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from typing import Dict, List, Optional, Tuple, Union
from bugout.app import Bugout
from bugout.data import BugoutResource, BugoutSearchResult
from bugout.data import BugoutResource, BugoutSearchResult, BugoutSearchResultAsEntity
from bugout.journal import SearchOrder
from dateutil.parser import isoparse
from dateutil.tz import UTC
@ -155,7 +155,7 @@ class BugoutEventProvider:
timeout=self.timeout,
order=SearchOrder.DESCENDING,
)
events.extend([self.entry_event(entry) for entry in search_results.results])
events.extend([self.entry_event(entry) for entry in search_results.results]) # type: ignore
offset = search_results.next_offset
return stream_boundary, events
@ -192,7 +192,7 @@ class BugoutEventProvider:
timeout=self.timeout,
order=SearchOrder.DESCENDING,
)
return [self.entry_event(entry) for entry in search_results.results]
return [self.entry_event(entry) for entry in search_results.results] # type: ignore
def next_event(
self,
@ -233,7 +233,7 @@ class BugoutEventProvider:
)
if not search_results.results:
return None
return self.entry_event(search_results.results[0])
return self.entry_event(search_results.results[0]) # type: ignore
def previous_event(
self,
@ -274,7 +274,7 @@ class BugoutEventProvider:
)
if not search_results.results:
return None
return self.entry_event(search_results.results[0])
return self.entry_event(search_results.results[0]) # type: ignore
class EthereumTXPoolProvider(BugoutEventProvider):

Wyświetl plik

@ -1,14 +1,12 @@
import json
import logging
from os import read
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Union, cast
from uuid import UUID
import boto3 # type: ignore
import requests # type: ignore
from bugout.data import BugoutResource, BugoutResources
from bugout.data import BugoutResource, BugoutResources, BugoutSearchResultAsEntity
from bugout.exceptions import BugoutResponseException
from entity.data import EntitiesResponse, EntityResponse # type: ignore
from fastapi import APIRouter, Body, Path, Query, Request
from .. import actions, data
@ -16,15 +14,15 @@ from ..middleware import MoonstreamHTTPException
from ..reporter import reporter
from ..settings import (
BUGOUT_REQUEST_TIMEOUT_SECONDS,
BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_APPLICATION_ID,
MOONSTREAM_CRAWLERS_SERVER_URL,
MOONSTREAM_CRAWLERS_SERVER_PORT,
MOONSTREAM_CRAWLERS_SERVER_URL,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
)
from ..settings import bugout_client as bc, entity_client as ec
from ..settings import bugout_client as bc
logger = logging.getLogger(__name__)
@ -52,27 +50,29 @@ async def add_dashboard_handler(
subscription_settings = dashboard.subscription_settings
# Get user collection id
# Get user journal (collection) id
collection_id = actions.get_entity_subscription_collection_id(
journal_id = actions.get_entity_subscription_journal_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
user_id=user.id,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
)
subscriprions_list = ec.search_entities(
subscriptions_list = bc.search(
token=token,
collection_id=collection_id,
required_field=[f"type:subscription"],
journal_id=journal_id,
query="tag:type:subscription",
limit=1000,
representation="entity",
)
# process existing subscriptions with supplied ids
available_subscriptions_ids: Dict[Union[UUID, str], EntityResponse] = {
subscription.entity_id: subscription
for subscription in subscriprions_list.entities
}
available_subscriptions_ids: Dict[Union[UUID, str], BugoutSearchResultAsEntity]
for result in subscriptions_list.results:
entity = cast(BugoutSearchResultAsEntity, result)
entity_url_list = entity.entity_url.split("/")
subscription_id = entity_url_list[len(entity_url_list) - 1]
available_subscriptions_ids[subscription_id] = entity
for dashboard_subscription in subscription_settings:
if dashboard_subscription.subscription_id in available_subscriptions_ids.keys():
@ -137,7 +137,7 @@ async def add_dashboard_handler(
tags=["subscriptions"],
response_model=BugoutResource,
)
async def delete_subscription_handler(request: Request, dashboard_id: str):
async def delete_subscription_handler(request: Request, dashboard_id: str = Path(...)):
"""
Delete subscriptions.
"""
@ -181,9 +181,9 @@ async def get_dashboards_handler(
return resources
@router.get("/{dashboarsd_id}", tags=["dashboards"], response_model=BugoutResource)
@router.get("/{dashboard_id}", tags=["dashboards"], response_model=BugoutResource)
async def get_dashboard_handler(
request: Request, dashboarsd_id: UUID
request: Request, dashboard_id: UUID = Path(...)
) -> BugoutResource:
"""
Get user's subscriptions.
@ -193,7 +193,7 @@ async def get_dashboard_handler(
try:
resource: BugoutResource = bc.get_resource(
token=token,
resource_id=dashboarsd_id,
resource_id=dashboard_id,
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
except BugoutResponseException as e:
@ -211,7 +211,7 @@ async def get_dashboard_handler(
@router.put("/{dashboard_id}", tags=["dashboards"], response_model=BugoutResource)
async def update_dashboard_handler(
request: Request,
dashboard_id: str,
dashboard_id: str = Path(...),
dashboard: data.DashboardUpdate = Body(...),
) -> BugoutResource:
"""
@ -224,25 +224,28 @@ async def update_dashboard_handler(
subscription_settings = dashboard.subscription_settings
# Get user collection id
# Get user journal (collection) id
collection_id = actions.get_entity_subscription_collection_id(
journal_id = actions.get_entity_subscription_journal_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
user_id=user.id,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
)
subscriprions_list = ec.search_entities(
subscriptions_list = bc.search(
token=token,
collection_id=collection_id,
required_field=[f"type:subscription"],
journal_id=journal_id,
query="tag:type:subscription",
limit=1000,
representation="entity",
)
available_subscriptions_ids: Dict[Union[UUID, str], EntityResponse] = {
subscription.entity_id: subscription
for subscription in subscriprions_list.entities
}
available_subscriptions_ids: Dict[Union[UUID, str], BugoutSearchResultAsEntity]
for result in subscriptions_list.results:
entity = cast(BugoutSearchResultAsEntity, result)
entity_url_list = entity.entity_url.split("/")
subscription_id = entity_url_list[len(entity_url_list) - 1]
available_subscriptions_ids[subscription_id] = entity
for dashboard_subscription in subscription_settings:
if dashboard_subscription.subscription_id in available_subscriptions_ids:
@ -259,12 +262,10 @@ async def update_dashboard_handler(
status_code=404,
detail=f"Error on dashboard resource {dashboard_subscription.subscription_id} does not have an abi",
)
abi = json.loads(
available_subscriptions_ids[
dashboard_subscription.subscription_id
].secondary_fields.get("abi")
)
abi_raw = available_subscriptions_ids[
dashboard_subscription.subscription_id
].secondary_fields.get("abi")
abi = json.loads(abi_raw if abi_raw is not None else "")
actions.dashboards_abi_validation(dashboard_subscription, abi)
@ -301,7 +302,7 @@ async def update_dashboard_handler(
@router.get("/{dashboard_id}/stats", tags=["dashboards"])
async def get_dashboard_data_links_handler(
request: Request, dashboard_id: str
request: Request, dashboard_id: str = Path(...)
) -> Dict[Union[UUID, str], Any]:
"""
Get s3 presign urls for dashboard grafics
@ -328,20 +329,21 @@ async def get_dashboard_data_links_handler(
# get subscriptions
collection_id = actions.get_entity_subscription_collection_id(
journal_id = actions.get_entity_subscription_journal_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
user_id=user.id,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
)
subscriprions_list = ec.search_entities(
subscriptions_list = bc.search(
token=token,
collection_id=collection_id,
required_field=[f"type:subscription"],
journal_id=journal_id,
query="tag:type:subscription",
limit=1000,
representation="entity",
)
# filter out dasboards
# filter out dashboards
subscriptions_ids = [
subscription_meta["subscription_id"]
@ -350,11 +352,13 @@ async def get_dashboard_data_links_handler(
]
]
dashboard_subscriptions: Dict[Union[UUID, str], EntitiesResponse] = {
subscription.entity_id: subscription
for subscription in subscriprions_list.entities
if str(subscription.entity_id) in subscriptions_ids
}
dashboard_subscriptions: Dict[Union[UUID, str], BugoutSearchResultAsEntity]
for result in subscriptions_list.results:
entity = cast(BugoutSearchResultAsEntity, result)
entity_url_list = entity.entity_url.split("/")
subscription_id = entity_url_list[len(entity_url_list) - 1]
if str(subscription_id) in subscriptions_ids:
dashboard_subscriptions[subscription_id] = entity
# generate s3 links

Wyświetl plik

@ -1,40 +1,43 @@
"""
The Moonstream queries HTTP API
"""
from datetime import datetime
import logging
from typing import Any, Dict, List, Optional, Tuple, Union
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple, Union, cast
from uuid import UUID
from bugout.data import BugoutResources, BugoutJournalEntryContent, BugoutJournalEntry
from bugout.exceptions import BugoutResponseException
from fastapi import APIRouter, Body, Request
import requests # type: ignore
from bugout.data import (
BugoutJournalEntry,
BugoutJournalEntryContent,
BugoutResources,
BugoutSearchResult,
)
from bugout.exceptions import BugoutResponseException
from fastapi import APIRouter, Body, Path, Request
from moonstreamdb.blockchain import AvailableBlockchainType
from sqlalchemy import text
from .. import data
from ..actions import (
NameNormalizationException,
generate_s3_access_links,
get_query_by_name,
name_normalization,
NameNormalizationException,
query_parameter_hash,
generate_s3_access_links,
)
from ..middleware import MoonstreamHTTPException
from ..settings import (
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_APPLICATION_ID,
MOONSTREAM_CRAWLERS_SERVER_URL,
MOONSTREAM_CRAWLERS_SERVER_PORT,
MOONSTREAM_CRAWLERS_SERVER_URL,
MOONSTREAM_QUERIES_JOURNAL_ID,
MOONSTREAM_QUERY_TEMPLATE_CONTEXT_TYPE,
MOONSTREAM_S3_QUERIES_BUCKET,
MOONSTREAM_S3_QUERIES_BUCKET_PREFIX,
MOONSTREAM_QUERIES_JOURNAL_ID,
)
from ..settings import bugout_client as bc, MOONSTREAM_QUERY_TEMPLATE_CONTEXT_TYPE
from ..settings import bugout_client as bc
logger = logging.getLogger(__name__)
@ -199,7 +202,9 @@ def get_suggested_queries(
interfaces: Dict[str, Any] = {}
for entry in queries.results:
queries_results = cast(List[BugoutSearchResult], queries.results)
for entry in queries_results:
for tag in entry.tags:
if tag.startswith("interface:"):
interface = tag.split(":")[1]
@ -210,7 +215,7 @@ def get_suggested_queries(
interfaces[interface].append(entry)
return data.SuggestedQueriesResponse(
queries=queries.results,
queries=queries_results,
interfaces=interfaces,
)
@ -232,7 +237,6 @@ async def get_query_handler(
)
# check in templates
try:
entries = bc.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
@ -277,9 +281,11 @@ async def get_query_handler(
status_code=403, detail="Query not approved yet."
)
else:
query_id = entries.results[0].entry_url.split("/")[-1]
entries_results = cast(List[BugoutSearchResult], entries.results)
query_id = entries_results[0].entry_url.split("/")[-1]
entry = entries.results[0]
entries_results = cast(List[BugoutSearchResult], entries.results)
entry = entries_results[0]
try:
if entry.content is None:
@ -390,7 +396,6 @@ async def update_query_data_handler(
)
# check in templates
try:
entries = bc.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
@ -435,14 +440,16 @@ async def update_query_data_handler(
status_code=403, detail="Query not approved yet."
)
else:
query_id = entries.results[0].entry_url.split("/")[-1]
entries_results = cast(List[BugoutSearchResult], entries.results)
query_id = entries_results[0].entry_url.split("/")[-1]
s3_response = None
if entries.results[0].content:
content = entries.results[0].content
entries_results = cast(List[BugoutSearchResult], entries.results)
if entries_results[0].content:
content = entries_results[0].content
tags = entries.results[0].tags
tags = entries_results[0].tags
file_type = "json"
@ -497,7 +504,6 @@ async def get_access_link_handler(
)
# check in templattes
try:
entries = bc.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
@ -540,13 +546,14 @@ async def get_access_link_handler(
status_code=403, detail="Query not approved yet."
)
entries_results = cast(List[BugoutSearchResult], entries.results)
try:
s3_response = None
if entries.results[0].content:
if entries_results[0].content:
passed_params = dict(request_update.params)
tags = entries.results[0].tags
tags = entries_results[0].tags
file_type = "json"

Wyświetl plik

@ -1,42 +1,38 @@
"""
The Moonstream subscriptions HTTP API
"""
from concurrent.futures import as_completed, ProcessPoolExecutor, ThreadPoolExecutor
import hashlib
import json
import logging
from typing import Any, Dict, List, Optional
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from typing import Any, Dict, List, Optional, Union, cast
from bugout.data import BugoutSearchResult, BugoutSearchResultAsEntity
from bugout.exceptions import BugoutResponseException
from bugout.data import BugoutSearchResult
from fastapi import APIRouter, Depends, Request, Form, BackgroundTasks
from fastapi import APIRouter, BackgroundTasks, Depends, Form, Path, Query, Request
from moonstreamdb.blockchain import AvailableBlockchainType
from web3 import Web3
from .. import data
from ..actions import (
AddressNotSmartContractException,
validate_abi_json,
EntityJournalNotFoundException,
apply_moonworm_tasks,
get_entity_subscription_collection_id,
EntityCollectionNotFoundException,
check_if_smart_contract,
get_entity_subscription_journal_id,
get_list_of_support_interfaces,
validate_abi_json,
)
from ..admin import subscription_types
from .. import data
from ..admin import subscription_types
from ..middleware import MoonstreamHTTPException
from ..reporter import reporter
from ..settings import bugout_client as bc, entity_client as ec
from ..settings import (
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_ENTITIES_RESERVED_TAGS,
THREAD_TIMEOUT_SECONDS,
)
from ..web3_provider import (
yield_web3_provider,
)
from ..settings import bugout_client as bc
from ..web3_provider import yield_web3_provider
logger = logging.getLogger(__name__)
@ -140,7 +136,7 @@ async def add_subscription_handler(
if description:
content["description"] = description
allowed_required_fields = []
allowed_required_fields: List[Any] = []
if tags:
allowed_required_fields = [
item
@ -148,7 +144,7 @@ async def add_subscription_handler(
if not any(key in item for key in MOONSTREAM_ENTITIES_RESERVED_TAGS)
]
required_fields = [
required_fields: List[Dict[str, Union[str, bool, int, List[Any]]]] = [
{"type": "subscription"},
{"subscription_type_id": f"{subscription_type_id}"},
{"color": f"{color}"},
@ -160,53 +156,58 @@ async def add_subscription_handler(
required_fields.extend(allowed_required_fields)
try:
collection_id = get_entity_subscription_collection_id(
journal_id = get_entity_subscription_journal_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
user_id=user.id,
create_if_not_exist=True,
)
entity = ec.add_entity(
blockchain = subscription_types.CANONICAL_SUBSCRIPTION_TYPES[
subscription_type_id
].blockchain
entity = bc.create_entity(
token=token,
collection_id=collection_id,
journal_id=journal_id,
address=address,
blockchain=subscription_types.CANONICAL_SUBSCRIPTION_TYPES[
subscription_type_id
].blockchain,
name=label,
blockchain=blockchain if blockchain is not None else "",
title=label,
required_fields=required_fields,
secondary_fields=content,
)
except EntityCollectionNotFoundException as e:
except EntityJournalNotFoundException as e:
raise MoonstreamHTTPException(
status_code=404,
detail="User subscriptions collection not found",
detail="User subscriptions journal not found",
internal_error=e,
)
except Exception as e:
logger.error(f"Failed to get collection id")
logger.error(f"Failed to get journal id")
raise MoonstreamHTTPException(
status_code=500,
internal_error=e,
detail="Currently unable to get collection id",
detail="Currently unable to get journal id",
)
entity_required_fields = (
entity.required_fields if entity.required_fields is not None else []
)
entity_secondary_fields = (
entity.secondary_fields if entity.secondary_fields is not None else {}
)
normalized_entity_tags = [
f"{key}:{value}"
for tag in entity.required_fields
for tag in entity_required_fields
for key, value in tag.items()
if key not in MOONSTREAM_ENTITIES_RESERVED_TAGS
]
return data.SubscriptionResourceData(
id=str(entity.entity_id),
id=str(entity.id),
user_id=str(user.id),
address=address,
color=color,
label=label,
abi=entity.secondary_fields.get("abi"),
description=entity.secondary_fields.get("description"),
abi=entity_secondary_fields.get("abi"),
description=entity_secondary_fields.get("description"),
tags=normalized_entity_tags,
subscription_type_id=subscription_type_id,
updated_at=entity.updated_at,
@ -219,28 +220,29 @@ async def add_subscription_handler(
tags=["subscriptions"],
response_model=data.SubscriptionResourceData,
)
async def delete_subscription_handler(request: Request, subscription_id: str):
async def delete_subscription_handler(
request: Request, subscription_id: str = Path(...)
):
"""
Delete subscriptions.
"""
token = request.state.token
user = request.state.user
try:
collection_id = get_entity_subscription_collection_id(
journal_id = get_entity_subscription_journal_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
user_id=user.id,
)
deleted_entity = ec.delete_entity(
deleted_entity = bc.delete_entity(
token=token,
collection_id=collection_id,
journal_id=journal_id,
entity_id=subscription_id,
)
except EntityCollectionNotFoundException as e:
except EntityJournalNotFoundException as e:
raise MoonstreamHTTPException(
status_code=404,
detail="User subscriptions collection not found",
detail="User subscriptions journal not found",
internal_error=e,
)
except Exception as e:
@ -250,36 +252,46 @@ async def delete_subscription_handler(request: Request, subscription_id: str):
detail="Internal error",
)
tags = deleted_entity.required_fields
tags_raw = (
deleted_entity.required_fields
if deleted_entity.required_fields is not None
else {}
)
subscription_type_id = None
color = None
label = None
abi = None
description = None
if tags is not None:
for tag in tags:
if "subscription_type_id" in tag:
subscription_type_id = tag["subscription_type_id"]
for tag in tags_raw:
if "subscription_type_id" in tag:
subscription_type_id = tag["subscription_type_id"]
if "color" in tag:
color = tag["color"]
if "label" in tag:
label = tag["label"]
if "color" in tag:
color = tag["color"]
if "label" in tag:
label = tag["label"]
normalized_entity_tags = [
f"{key}:{value}"
for tag in tags_raw
for key, value in tag.items()
if key not in MOONSTREAM_ENTITIES_RESERVED_TAGS
]
if deleted_entity.secondary_fields is not None:
abi = deleted_entity.secondary_fields.get("abi")
description = deleted_entity.secondary_fields.get("description")
return data.SubscriptionResourceData(
id=str(deleted_entity.entity_id),
id=str(deleted_entity.id),
user_id=str(user.id),
address=deleted_entity.address,
color=color,
label=label,
abi=abi,
description=deleted_entity.secondary_fields.get("description"),
tags=deleted_entity.required_fields,
description=description,
tags=normalized_entity_tags,
subscription_type_id=subscription_type_id,
updated_at=deleted_entity.updated_at,
created_at=deleted_entity.created_at,
@ -289,8 +301,8 @@ async def delete_subscription_handler(request: Request, subscription_id: str):
@router.get("/", tags=["subscriptions"], response_model=data.SubscriptionsListResponse)
async def get_subscriptions_handler(
request: Request,
limit: Optional[int] = 10,
offset: Optional[int] = 0,
limit: int = Query(10),
offset: int = Query(0),
) -> data.SubscriptionsListResponse:
"""
Get user's subscriptions.
@ -298,25 +310,25 @@ async def get_subscriptions_handler(
token = request.state.token
user = request.state.user
try:
collection_id = get_entity_subscription_collection_id(
journal_id = get_entity_subscription_journal_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
user_id=user.id,
create_if_not_exist=True,
)
subscriprions_list = ec.search_entities(
subscriptions_list: Any = bc.search(
token=token,
collection_id=collection_id,
required_field=[f"type:subscription"],
journal_id=journal_id,
query="tag:type:subscription",
limit=limit,
offset=offset,
representation="entity",
)
except EntityCollectionNotFoundException as e:
except EntityJournalNotFoundException as e:
raise MoonstreamHTTPException(
status_code=404,
detail="User subscriptions collection not found",
detail="User subscriptions journal not found",
internal_error=e,
)
except Exception as e:
@ -328,7 +340,11 @@ async def get_subscriptions_handler(
subscriptions = []
for subscription in subscriprions_list.entities:
user_subscriptions_results = cast(
List[BugoutSearchResultAsEntity], subscriptions_list.results
)
for subscription in user_subscriptions_results:
tags = subscription.required_fields
label, color, subscription_type_id = None, None, None
@ -352,7 +368,7 @@ async def get_subscriptions_handler(
subscriptions.append(
data.SubscriptionResourceData(
id=str(subscription.entity_id),
id=str(subscription.entity_url.split("/")[-1]),
user_id=str(user.id),
address=subscription.address,
color=color,
@ -378,8 +394,8 @@ async def get_subscriptions_handler(
)
async def update_subscriptions_handler(
request: Request,
subscription_id: str,
background_tasks: BackgroundTasks,
subscription_id: str = Path(...),
) -> data.SubscriptionResourceData:
"""
Get user's subscriptions.
@ -401,46 +417,51 @@ async def update_subscriptions_handler(
tags = form_data.tags
try:
collection_id = get_entity_subscription_collection_id(
journal_id = get_entity_subscription_journal_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
user_id=user.id,
)
# get subscription entity
subscription_entity = ec.get_entity(
subscription_entity = bc.get_entity(
token=token,
collection_id=collection_id,
journal_id=journal_id,
entity_id=subscription_id,
)
update_required_fields = []
if subscription_entity.required_fields is not None:
update_required_fields = [
field
for field in subscription_entity.required_fields
if any(key in field for key in MOONSTREAM_ENTITIES_RESERVED_TAGS)
]
update_secondary_fields = (
subscription_entity.secondary_fields
if subscription_entity.secondary_fields is not None
else {}
)
subscription_type_id = None
update_required_fields = [
field
for field in subscription_entity.required_fields
if any(key in field for key in MOONSTREAM_ENTITIES_RESERVED_TAGS)
]
update_secondary_fields = subscription_entity.secondary_fields
for field in update_required_fields:
if "subscription_type_id" in field:
subscription_type_id = field["subscription_type_id"]
if not subscription_type_id:
logger.error(
f"Subscription entity {subscription_id} in collection {collection_id} has no subscription_type_id malformed subscription entity"
f"Subscription entity {subscription_id} in journal (collection) {journal_id} has no subscription_type_id malformed subscription entity"
)
raise MoonstreamHTTPException(
status_code=409,
detail="Not valid subscription entity",
)
except EntityCollectionNotFoundException as e:
except EntityJournalNotFoundException as e:
raise MoonstreamHTTPException(
status_code=404,
detail="User subscriptions collection not found",
detail="User subscriptions journal not found",
internal_error=e,
)
except Exception as e:
@ -487,18 +508,27 @@ async def update_subscriptions_handler(
if allowed_required_fields:
update_required_fields.extend(allowed_required_fields)
address = subscription_entity.address
if address is None:
logger.error(f"Lost address at entity {subscription_id} for subscription")
raise MoonstreamHTTPException(status_code=500)
try:
subscription = ec.update_entity(
subscription = bc.update_entity(
token=token,
collection_id=collection_id,
journal_id=journal_id,
entity_id=subscription_id,
address=subscription_entity.address,
blockchain=subscription_entity.blockchain,
name=subscription_entity.name,
title=subscription_entity.title
if subscription_entity.title is not None
else "",
address=address,
blockchain=subscription_entity.blockchain
if subscription_entity.blockchain is not None
else "",
required_fields=update_required_fields,
secondary_fields=update_secondary_fields,
)
except Exception as e:
logger.error(f"Error update user subscriptions: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
@ -508,24 +538,32 @@ async def update_subscriptions_handler(
apply_moonworm_tasks,
subscription_type_id,
json_abi,
subscription.address,
address,
)
subscription_required_fields = (
subscription.required_fields if subscription.required_fields is not None else {}
)
subscription_secondary_fields = (
subscription.secondary_fields
if subscription.secondary_fields is not None
else {}
)
normalized_entity_tags = [
f"{key}:{value}"
for tag in subscription.required_fields
for tag in subscription_required_fields
for key, value in tag.items()
if key not in MOONSTREAM_ENTITIES_RESERVED_TAGS
]
return data.SubscriptionResourceData(
id=str(subscription.entity_id),
id=str(subscription.id),
user_id=str(user.id),
address=subscription.address,
color=color,
label=label,
abi=subscription.secondary_fields.get("abi"),
description=subscription.secondary_fields.get("description"),
abi=subscription_secondary_fields.get("abi"),
description=subscription_secondary_fields.get("description"),
tags=normalized_entity_tags,
subscription_type_id=subscription_type_id,
updated_at=subscription_entity.updated_at,
@ -536,43 +574,47 @@ async def update_subscriptions_handler(
@router.get(
"/{subscription_id}/abi",
tags=["subscriptions"],
response_model=data.SubdcriptionsAbiResponse,
response_model=data.SubscriptionsAbiResponse,
)
async def get_subscription_abi_handler(
request: Request,
subscription_id: str,
) -> data.SubdcriptionsAbiResponse:
subscription_id: str = Path(...),
) -> data.SubscriptionsAbiResponse:
token = request.state.token
user = request.state.user
try:
collection_id = get_entity_subscription_collection_id(
journal_id = get_entity_subscription_journal_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
user_id=user.id,
)
# get subscription entity
subscription_resource = ec.get_entity(
subscription_resource = bc.get_entity(
token=token,
collection_id=collection_id,
journal_id=journal_id,
entity_id=subscription_id,
)
except EntityCollectionNotFoundException as e:
except EntityJournalNotFoundException as e:
raise MoonstreamHTTPException(
status_code=404,
detail="User subscriptions collection not found",
detail="User subscriptions journal not found",
internal_error=e,
)
except Exception as e:
logger.error(f"Error get subscriptions for user ({user}), error: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
if subscription_resource.secondary_fields is None:
raise MoonstreamHTTPException(
status_code=500, detail=f"Malformed subscription entity {subscription_id}"
)
if "abi" not in subscription_resource.secondary_fields.keys():
raise MoonstreamHTTPException(status_code=404, detail="Abi not found")
return data.SubdcriptionsAbiResponse(
return data.SubscriptionsAbiResponse(
abi=subscription_resource.secondary_fields["abi"]
)
@ -605,7 +647,7 @@ async def list_subscription_types() -> data.SubscriptionTypesListResponse:
tags=["subscriptions"],
response_model=data.ContractInfoResponse,
)
async def address_info(request: Request, address: str):
async def address_info(request: Request, address: str = Query(...)):
"""
Looking if address is contract
"""
@ -668,8 +710,8 @@ async def address_info(request: Request, address: str):
)
def get_contract_interfaces(
request: Request,
address: str,
blockchain: str,
address: str = Query(...),
blockchain: str = Query(...),
):
"""
Request contract interfaces from web3

Wyświetl plik

@ -10,7 +10,7 @@ from bugout.exceptions import BugoutResponseException
from fastapi import APIRouter, Body, Form, Request
from .. import data
from ..actions import create_onboarding_resource, generate_collection_for_user
from ..actions import create_onboarding_resource
from ..middleware import MoonstreamHTTPException
from ..settings import BUGOUT_REQUEST_TIMEOUT_SECONDS, MOONSTREAM_APPLICATION_ID
from ..settings import bugout_client as bc

Wyświetl plik

@ -1,4 +1,6 @@
selectors = {
from typing import Any, Dict
selectors: Dict[str, Any] = {
"274c7b3c": {
"name": "ERC20PresetMinterPauser",
"selector": "274c7b3c",

Wyświetl plik

@ -1,12 +1,9 @@
import os
from typing import Optional, Dict
from uuid import UUID
from typing import Dict, Optional
from bugout.app import Bugout
from entity.client import Entity # type: ignore
from moonstreamdb.blockchain import AvailableBlockchainType
# Bugout
BUGOUT_BROOD_URL = os.environ.get("BUGOUT_BROOD_URL", "https://auth.bugout.dev")
BUGOUT_SPIRE_URL = os.environ.get("BUGOUT_SPIRE_URL", "https://spire.bugout.dev")
@ -14,15 +11,6 @@ BUGOUT_SPIRE_URL = os.environ.get("BUGOUT_SPIRE_URL", "https://spire.bugout.dev"
bugout_client = Bugout(brood_api_url=BUGOUT_BROOD_URL, spire_api_url=BUGOUT_SPIRE_URL)
# Entity
MOONSTREAM_ENTITY_URL = os.environ.get("MOONSTREAM_ENTITY_URL", "")
if MOONSTREAM_ENTITY_URL == "":
raise ValueError("MOONSTREAM_ENTITY_URL environment variable must be set")
entity_client = Entity(MOONSTREAM_ENTITY_URL)
BUGOUT_REQUEST_TIMEOUT_SECONDS = 5
HUMBUG_REPORTER_BACKEND_TOKEN = os.environ.get("HUMBUG_REPORTER_BACKEND_TOKEN")

Wyświetl plik

@ -8,3 +8,9 @@ ignore_missing_imports = True
[mypy-pyevmasm.*]
ignore_missing_imports = True
[mypy-requests.*]
ignore_missing_imports = True
[mypy-dateutil.*]
ignore_missing_imports = True

Wyświetl plik

@ -9,7 +9,7 @@ base58==2.1.1
bitarray==2.6.0
boto3==1.26.5
botocore==1.29.5
bugout>=0.2.10
bugout>=0.2.12
certifi==2022.9.24
charset-normalizer==2.1.1
click==8.1.3
@ -36,7 +36,6 @@ jsonschema==4.17.0
lru-dict==1.1.8
Mako==1.2.3
MarkupSafe==2.1.1
moonstream-entity==0.0.5
moonstreamdb==0.3.4
multiaddr==0.0.9
multidict==6.0.2

Wyświetl plik

@ -13,12 +13,11 @@ setup(
install_requires=[
"appdirs",
"boto3",
"bugout>=0.2.10",
"moonstream-entity>=0.0.5",
"bugout>=0.2.12",
"fastapi",
"moonstreamdb>=0.3.4",
"humbug",
"pydantic",
"pydantic==1.10.2",
"pyevmasm",
"python-dateutil",
"python-multipart",