From 42849691ff43ea0f4d9298f2e4287af141099aac Mon Sep 17 00:00:00 2001 From: Andrey Date: Sat, 1 Jul 2023 17:07:12 +0300 Subject: [PATCH] Add subscriptions filtering. --- moonstreamapi/moonstreamapi/actions.py | 83 ++++++++++++++----- moonstreamapi/moonstreamapi/data.py | 5 ++ .../moonstreamapi/routes/subscriptions.py | 11 +-- 3 files changed, 71 insertions(+), 28 deletions(-) diff --git a/moonstreamapi/moonstreamapi/actions.py b/moonstreamapi/moonstreamapi/actions.py index 1dbb0edf..d32679a1 100644 --- a/moonstreamapi/moonstreamapi/actions.py +++ b/moonstreamapi/moonstreamapi/actions.py @@ -473,20 +473,16 @@ def get_all_entries_from_search( results: List[BugoutSearchResult] = [] - try: - existing_metods = bc.search( - token=token, - journal_id=journal_id, - query=search_query, - content=False, - timeout=10.0, - limit=limit, - offset=offset, - ) - results.extend(existing_metods.results) - - except Exception as e: - reporter.error_report(e) + existing_metods = bc.search( + token=token, + journal_id=journal_id, + query=search_query, + content=False, + timeout=10.0, + limit=limit, + offset=offset, + ) + results.extend(existing_metods.results) if len(results) != existing_metods.total_results: for offset in range(limit, existing_metods.total_results, limit): @@ -786,19 +782,60 @@ def query_parameter_hash(params: Dict[str, Any]) -> str: return hash -def get_moonworm_jobs( - address: str, - subscription_type_id: str, - entries_limit: int = 100, -): +def parse_abi_to_name_tags(user_abi: List[Dict[str, Any]]): + return [ + f"abi_name:{method['name']}" + for method in user_abi + if method["type"] in ("event", "function") + ] + + +def filter_tasks(entries, tag_filters): + return [entry for entry in entries if any(tag in tag_filters for tag in entry.tags)] + + +def fetch_and_filter_tasks( + journal_id, address, subscription_type_id, token, user_abi, limit=100 +) -> List[BugoutSearchResult]: + """ + Fetch tasks from journal and filter them by user abi + """ entries = get_all_entries_from_search( - journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, + journal_id=journal_id, search_query=f"tag:address:{address} tag:subscription_type:{subscription_type_id}", - limit=entries_limit, # load per request - token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + limit=limit, + token=token, ) - return entries + user_loaded_abi_tags = parse_abi_to_name_tags(json.loads(user_abi)) + + moonworm_tasks = filter_tasks(entries, user_loaded_abi_tags) + + return moonworm_tasks + + +def get_moonworm_tasks( + subscription_type_id: str, + address: str, + user_abi: List[Dict[str, Any]], +) -> List[BugoutSearchResult]: + """ + Get moonworm tasks from journal and filter them by user abi + """ + + try: + moonworm_tasks = fetch_and_filter_tasks( + journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, + address=address, + subscription_type_id=subscription_type_id, + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + user_abi=user_abi, + ) + except Exception as e: + logger.error(f"Error get moonworm tasks: {str(e)}") + MoonstreamHTTPException(status_code=500, internal_error=e) + + return moonworm_tasks def get_list_of_support_interfaces( diff --git a/moonstreamapi/moonstreamapi/data.py b/moonstreamapi/moonstreamapi/data.py index 55a9d790..e0e12751 100644 --- a/moonstreamapi/moonstreamapi/data.py +++ b/moonstreamapi/moonstreamapi/data.py @@ -299,6 +299,11 @@ class QueryInfoResponse(BaseModel): updated_at: Optional[datetime] +class SuggestedQueriesResponse(BaseModel): + interfaces: Dict[str, Any] = Field(default_factory=dict) + queries: List[Any] = Field(default_factory=list) + + class ContractInfoResponse(BaseModel): contract_info: Dict[str, Any] = Field(default_factory=dict) diff --git a/moonstreamapi/moonstreamapi/routes/subscriptions.py b/moonstreamapi/moonstreamapi/routes/subscriptions.py index c97612d8..9722ca88 100644 --- a/moonstreamapi/moonstreamapi/routes/subscriptions.py +++ b/moonstreamapi/moonstreamapi/routes/subscriptions.py @@ -9,6 +9,7 @@ from typing import Any, Dict, List, Optional import traceback from bugout.exceptions import BugoutResponseException +from bugout.data import BugoutSearchResult from fastapi import APIRouter, Depends, Request, Form, BackgroundTasks from moonstreamdb.blockchain import AvailableBlockchainType from web3 import Web3 @@ -18,7 +19,7 @@ from ..actions import ( apply_moonworm_tasks, get_entity_subscription_collection_id, EntityCollectionNotFoundException, - get_moonworm_jobs, + get_moonworm_tasks, check_if_smartcontract, get_list_of_support_interfaces, ) @@ -495,7 +496,7 @@ async def get_subscription_abi_handler( @router.get( "/{subscription_id}/jobs", tags=["subscriptions"], - response_model=data.SubdcriptionsAbiResponse, + response_model=List[BugoutSearchResult], ) async def get_subscription_jobs_handler( request: Request, @@ -534,12 +535,12 @@ async def get_subscription_jobs_handler( if "subscription_type_id" in field: subscription_type_id = field["subscription_type_id"] - if "address" in field: - subscription_address = field["address"] + subscription_address = subscription_resource.address - get_moonworm_jobs_response = get_moonworm_jobs( + get_moonworm_jobs_response = get_moonworm_tasks( subscription_type_id=subscription_type_id, address=subscription_address, + user_abi=subscription_resource.secondary_fields.get("abi") or [], ) return get_moonworm_jobs_response