diff --git a/crawlers/mooncrawl/mooncrawl/leaderboards_generator/cli.py b/crawlers/mooncrawl/mooncrawl/leaderboards_generator/cli.py index 5c9f9da3..23867a8f 100644 --- a/crawlers/mooncrawl/mooncrawl/leaderboards_generator/cli.py +++ b/crawlers/mooncrawl/mooncrawl/leaderboards_generator/cli.py @@ -1,9 +1,13 @@ import argparse +from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import TimeoutError import json import logging import os -from typing import cast, List +from typing import cast, List, Dict, Any, Optional import uuid +import traceback + import requests # type: ignore from bugout.data import BugoutSearchResult @@ -28,6 +32,116 @@ green_c = "\033[92m" end_c = "\033[0m" +def _get_leaderboard_results( + leaderboard: BugoutSearchResult, + query_api_access_token: str, + max_retries: int = 10, + interval: float = 30.0, + params: Optional[Dict[str, Any]] = None, +) -> None: + logger.info( + f"Processing leaderboard: {leaderboard.title} with id: {[tag for tag in leaderboard.tags if tag.startswith('leaderboard_id')]}" + ) + + if leaderboard.content is None: + return + + try: + leaderboard_data = json.loads(leaderboard.content) + except json.JSONDecodeError: + logger.error( + f"Could not parse leaderboard content: {[tag for tag in leaderboard.tags if tag.startswith('leaderboard_id')]} in entry {leaderboard.entry_url.split('/')[-1]}" + ) + return + + ### get results from query API + + leaderboard_id = leaderboard_data["leaderboard_id"] + + query_name = leaderboard_data["query_name"] + + if not params: + params = leaderboard_data["params"] + + blockchain = leaderboard_data.get("blockchain", None) + + ### execute query + try: + query_results = get_results_for_moonstream_query( + query_api_access_token, + query_name, + params, # type: ignore + blockchain, + MOONSTREAM_API_URL, + max_retries, + interval, + ) + except Exception as e: + logger.error(f"Could not get results for query {query_name}: error: {e}") + return + + ### push results to leaderboard API + + if query_results is None: + logger.error(f"Could not get results for query {query_name} in time") + return + + leaderboard_push_api_url = f"{MOONSTREAM_ENGINE_URL}/leaderboard/{leaderboard_id}/scores?normalize_addresses={leaderboard_data['normalize_addresses']}&overwrite=true" + + leaderboard_api_headers = { + "Authorization": f"Bearer {query_api_access_token}", + "Content-Type": "application/json", + } + + try: + leaderboard_api_response = requests.put( + leaderboard_push_api_url, + json=query_results["data"], + headers=leaderboard_api_headers, + timeout=10, + ) + except Exception as e: + logger.error( + f"Could not push results to leaderboard API: {e} for leaderboard {leaderboard_id}" + ) + return + + try: + leaderboard_api_response.raise_for_status() + except requests.exceptions.HTTPError as http_error: + logger.error( + f"Could not push results to leaderboard API: {http_error.response.text} with status code {http_error.response.status_code}" + ) + return + + ### get leaderboard from leaderboard API + + leaderboard_api_info_url = ( + f"{MOONSTREAM_ENGINE_URL}/leaderboard/info?leaderboard_id={leaderboard_id}" + ) + + leaderboard_api_response = requests.get( + leaderboard_api_info_url, headers=leaderboard_api_headers, timeout=10 + ) + + try: + leaderboard_api_response.raise_for_status() + except requests.exceptions.HTTPError as http_error: + logger.error( + f"Could not get leaderboard info from leaderboard API: {http_error.response.text} with status code {http_error.response.status_code}" + ) + return + + info = leaderboard_api_response.json() + + logger.info( + f"Successfully pushed results to leaderboard {info['id']}:{blue_c} {info['title']} {end_c}" + ) + logger.info( + f"can be check on:{green_c} {MOONSTREAM_ENGINE_URL}/leaderboard/?leaderboard_id={leaderboard_id} {end_c}" + ) + + def handle_leaderboards(args: argparse.Namespace) -> None: """ Run the leaderboard generator. @@ -60,110 +174,33 @@ def handle_leaderboards(args: argparse.Namespace) -> None: logger.info(f"Found {len(leaderboards_results)} leaderboards") - for leaderboard in leaderboards_results: - logger.info( - f"Processing leaderboard: {leaderboard.title} with id: {[tag for tag in leaderboard.tags if tag.startswith('leaderboard_id')]}" - ) + timeout = args.max_retries * args.interval + 10 - if leaderboard.content is None: - continue + tasks = [] - try: - leaderboard_data = json.loads(leaderboard.content) - except json.JSONDecodeError: - logger.error( - f"Could not parse leaderboard content: {[tag for tag in leaderboard.tags if tag.startswith('leaderboard_id')]} in entry {leaderboard.entry_url.split('/')[-1]}" - ) - continue - - ### get results from query API - - leaderboard_id = leaderboard_data["leaderboard_id"] - - query_name = leaderboard_data["query_name"] - - if args.params: - params = json.loads(args.params) - else: - params = leaderboard_data["params"] - - blockchain = leaderboard_data.get("blockchain", None) - - ### execute query - try: - query_results = get_results_for_moonstream_query( + with ThreadPoolExecutor(max_workers=2) as executor: + for leaderboard in leaderboards_results: + task = executor.submit( + _get_leaderboard_results, + leaderboard, args.query_api_access_token, - query_name, - params, - blockchain, - MOONSTREAM_API_URL, args.max_retries, args.interval, + args.params, ) - except Exception as e: - logger.error(f"Could not get results for query {query_name}: error: {e}") - continue + tasks.append(task) - ### push results to leaderboard API + for task in tasks: + try: + task.result(timeout=timeout) + except TimeoutError: + logger.error("Timeout") + task.cancel() - if query_results is None: - logger.error(f"Could not get results for query {query_name} in time") - continue - - leaderboard_push_api_url = f"{MOONSTREAM_ENGINE_URL}/leaderboard/{leaderboard_id}/scores?normalize_addresses={leaderboard_data['normalize_addresses']}&overwrite=true" - - leaderboard_api_headers = { - "Authorization": f"Bearer {args.query_api_access_token}", - "Content-Type": "application/json", - } - - try: - leaderboard_api_response = requests.put( - leaderboard_push_api_url, - json=query_results["data"], - headers=leaderboard_api_headers, - timeout=10, - ) - except Exception as e: - logger.error( - f"Could not push results to leaderboard API: {e} for leaderboard {leaderboard_id}" - ) - continue - - try: - leaderboard_api_response.raise_for_status() - except requests.exceptions.HTTPError as http_error: - logger.error( - f"Could not push results to leaderboard API: {http_error.response.text} with status code {http_error.response.status_code}" - ) - continue - - ### get leaderboard from leaderboard API - - leaderboard_api_info_url = ( - f"{MOONSTREAM_ENGINE_URL}/leaderboard/info?leaderboard_id={leaderboard_id}" - ) - - leaderboard_api_response = requests.get( - leaderboard_api_info_url, headers=leaderboard_api_headers, timeout=10 - ) - - try: - leaderboard_api_response.raise_for_status() - except requests.exceptions.HTTPError as http_error: - logger.error( - f"Could not get leaderboard info from leaderboard API: {http_error.response.text} with status code {http_error.response.status_code}" - ) - continue - - info = leaderboard_api_response.json() - - logger.info( - f"Successfully pushed results to leaderboard {info['id']}:{blue_c} {info['title']} {end_c}" - ) - logger.info( - f"can be check on:{green_c} {MOONSTREAM_ENGINE_URL}/leaderboard/?leaderboard_id={leaderboard_id} {end_c}" - ) + except Exception as e: + traceback.print_exc() + logger.error(f"Could not get results for leaderboard: {e}") + task.cancel() def main(): @@ -187,7 +224,7 @@ def main(): leaderboard_generator_parser.add_argument( "--max-retries", type=int, - default=100, + default=10, help="Number of times to retry requests for Moonstream Query results", ) leaderboard_generator_parser.add_argument( diff --git a/crawlers/mooncrawl/mooncrawl/leaderboards_generator/utils.py b/crawlers/mooncrawl/mooncrawl/leaderboards_generator/utils.py index fa235cca..8ffc38cf 100644 --- a/crawlers/mooncrawl/mooncrawl/leaderboards_generator/utils.py +++ b/crawlers/mooncrawl/mooncrawl/leaderboards_generator/utils.py @@ -21,7 +21,7 @@ def get_results_for_moonstream_query( params: Dict[str, Any], blockchain: Optional[str] = None, api_url: str = MOONSTREAM_API_URL, - max_retries: int = 100, + max_retries: int = 10, interval: float = 30.0, ) -> Optional[Dict[str, Any]]: """