leaderboard-generator-threads
Andrey 2023-08-05 08:38:55 +03:00
rodzic a45ea9e36f
commit 8a308f03a1
2 zmienionych plików z 136 dodań i 99 usunięć

Wyświetl plik

@ -1,9 +1,13 @@
import argparse
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import TimeoutError
import json
import logging
import os
from typing import cast, List
from typing import cast, List, Dict, Any, Optional
import uuid
import traceback
import requests # type: ignore
from bugout.data import BugoutSearchResult
@ -28,6 +32,116 @@ green_c = "\033[92m"
end_c = "\033[0m"
def _get_leaderboard_results(
leaderboard: BugoutSearchResult,
query_api_access_token: str,
max_retries: int = 10,
interval: float = 30.0,
params: Optional[Dict[str, Any]] = None,
) -> None:
logger.info(
f"Processing leaderboard: {leaderboard.title} with id: {[tag for tag in leaderboard.tags if tag.startswith('leaderboard_id')]}"
)
if leaderboard.content is None:
return
try:
leaderboard_data = json.loads(leaderboard.content)
except json.JSONDecodeError:
logger.error(
f"Could not parse leaderboard content: {[tag for tag in leaderboard.tags if tag.startswith('leaderboard_id')]} in entry {leaderboard.entry_url.split('/')[-1]}"
)
return
### get results from query API
leaderboard_id = leaderboard_data["leaderboard_id"]
query_name = leaderboard_data["query_name"]
if not params:
params = leaderboard_data["params"]
blockchain = leaderboard_data.get("blockchain", None)
### execute query
try:
query_results = get_results_for_moonstream_query(
query_api_access_token,
query_name,
params, # type: ignore
blockchain,
MOONSTREAM_API_URL,
max_retries,
interval,
)
except Exception as e:
logger.error(f"Could not get results for query {query_name}: error: {e}")
return
### push results to leaderboard API
if query_results is None:
logger.error(f"Could not get results for query {query_name} in time")
return
leaderboard_push_api_url = f"{MOONSTREAM_ENGINE_URL}/leaderboard/{leaderboard_id}/scores?normalize_addresses={leaderboard_data['normalize_addresses']}&overwrite=true"
leaderboard_api_headers = {
"Authorization": f"Bearer {query_api_access_token}",
"Content-Type": "application/json",
}
try:
leaderboard_api_response = requests.put(
leaderboard_push_api_url,
json=query_results["data"],
headers=leaderboard_api_headers,
timeout=10,
)
except Exception as e:
logger.error(
f"Could not push results to leaderboard API: {e} for leaderboard {leaderboard_id}"
)
return
try:
leaderboard_api_response.raise_for_status()
except requests.exceptions.HTTPError as http_error:
logger.error(
f"Could not push results to leaderboard API: {http_error.response.text} with status code {http_error.response.status_code}"
)
return
### get leaderboard from leaderboard API
leaderboard_api_info_url = (
f"{MOONSTREAM_ENGINE_URL}/leaderboard/info?leaderboard_id={leaderboard_id}"
)
leaderboard_api_response = requests.get(
leaderboard_api_info_url, headers=leaderboard_api_headers, timeout=10
)
try:
leaderboard_api_response.raise_for_status()
except requests.exceptions.HTTPError as http_error:
logger.error(
f"Could not get leaderboard info from leaderboard API: {http_error.response.text} with status code {http_error.response.status_code}"
)
return
info = leaderboard_api_response.json()
logger.info(
f"Successfully pushed results to leaderboard {info['id']}:{blue_c} {info['title']} {end_c}"
)
logger.info(
f"can be check on:{green_c} {MOONSTREAM_ENGINE_URL}/leaderboard/?leaderboard_id={leaderboard_id} {end_c}"
)
def handle_leaderboards(args: argparse.Namespace) -> None:
"""
Run the leaderboard generator.
@ -60,110 +174,33 @@ def handle_leaderboards(args: argparse.Namespace) -> None:
logger.info(f"Found {len(leaderboards_results)} leaderboards")
for leaderboard in leaderboards_results:
logger.info(
f"Processing leaderboard: {leaderboard.title} with id: {[tag for tag in leaderboard.tags if tag.startswith('leaderboard_id')]}"
)
timeout = args.max_retries * args.interval + 10
if leaderboard.content is None:
continue
tasks = []
try:
leaderboard_data = json.loads(leaderboard.content)
except json.JSONDecodeError:
logger.error(
f"Could not parse leaderboard content: {[tag for tag in leaderboard.tags if tag.startswith('leaderboard_id')]} in entry {leaderboard.entry_url.split('/')[-1]}"
)
continue
### get results from query API
leaderboard_id = leaderboard_data["leaderboard_id"]
query_name = leaderboard_data["query_name"]
if args.params:
params = json.loads(args.params)
else:
params = leaderboard_data["params"]
blockchain = leaderboard_data.get("blockchain", None)
### execute query
try:
query_results = get_results_for_moonstream_query(
with ThreadPoolExecutor(max_workers=2) as executor:
for leaderboard in leaderboards_results:
task = executor.submit(
_get_leaderboard_results,
leaderboard,
args.query_api_access_token,
query_name,
params,
blockchain,
MOONSTREAM_API_URL,
args.max_retries,
args.interval,
args.params,
)
except Exception as e:
logger.error(f"Could not get results for query {query_name}: error: {e}")
continue
tasks.append(task)
### push results to leaderboard API
for task in tasks:
try:
task.result(timeout=timeout)
except TimeoutError:
logger.error("Timeout")
task.cancel()
if query_results is None:
logger.error(f"Could not get results for query {query_name} in time")
continue
leaderboard_push_api_url = f"{MOONSTREAM_ENGINE_URL}/leaderboard/{leaderboard_id}/scores?normalize_addresses={leaderboard_data['normalize_addresses']}&overwrite=true"
leaderboard_api_headers = {
"Authorization": f"Bearer {args.query_api_access_token}",
"Content-Type": "application/json",
}
try:
leaderboard_api_response = requests.put(
leaderboard_push_api_url,
json=query_results["data"],
headers=leaderboard_api_headers,
timeout=10,
)
except Exception as e:
logger.error(
f"Could not push results to leaderboard API: {e} for leaderboard {leaderboard_id}"
)
continue
try:
leaderboard_api_response.raise_for_status()
except requests.exceptions.HTTPError as http_error:
logger.error(
f"Could not push results to leaderboard API: {http_error.response.text} with status code {http_error.response.status_code}"
)
continue
### get leaderboard from leaderboard API
leaderboard_api_info_url = (
f"{MOONSTREAM_ENGINE_URL}/leaderboard/info?leaderboard_id={leaderboard_id}"
)
leaderboard_api_response = requests.get(
leaderboard_api_info_url, headers=leaderboard_api_headers, timeout=10
)
try:
leaderboard_api_response.raise_for_status()
except requests.exceptions.HTTPError as http_error:
logger.error(
f"Could not get leaderboard info from leaderboard API: {http_error.response.text} with status code {http_error.response.status_code}"
)
continue
info = leaderboard_api_response.json()
logger.info(
f"Successfully pushed results to leaderboard {info['id']}:{blue_c} {info['title']} {end_c}"
)
logger.info(
f"can be check on:{green_c} {MOONSTREAM_ENGINE_URL}/leaderboard/?leaderboard_id={leaderboard_id} {end_c}"
)
except Exception as e:
traceback.print_exc()
logger.error(f"Could not get results for leaderboard: {e}")
task.cancel()
def main():
@ -187,7 +224,7 @@ def main():
leaderboard_generator_parser.add_argument(
"--max-retries",
type=int,
default=100,
default=10,
help="Number of times to retry requests for Moonstream Query results",
)
leaderboard_generator_parser.add_argument(

Wyświetl plik

@ -21,7 +21,7 @@ def get_results_for_moonstream_query(
params: Dict[str, Any],
blockchain: Optional[str] = None,
api_url: str = MOONSTREAM_API_URL,
max_retries: int = 100,
max_retries: int = 10,
interval: float = 30.0,
) -> Optional[Dict[str, Any]]:
"""