Add db execution flow.

add-db-execution
Andrey 2024-04-26 06:05:23 +03:00
rodzic 2caaa82953
commit 16d071d1d9
3 zmienionych plików z 156 dodań i 29 usunięć

Wyświetl plik

@ -16,7 +16,21 @@ from ..settings import (
MOONSTREAM_LEADERBOARD_GENERATOR_PUSH_TIMEOUT_SECONDS,
)
from ..settings import bugout_client as bc
from .utils import get_results_for_moonstream_query, leaderboard_push_batch
from ..db import (
create_moonstream_engine,
MOONSTREAM_DB_URI_READ_ONLY,
MOONSTREAM_POOL_SIZE,
sessionmaker,
contextmanager,
Session,
Generator,
)
from .utils import (
get_results_for_moonstream_query,
leaderboard_push_batch,
get_query_by_name,
)
from sqlalchemy import text
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
@ -26,6 +40,15 @@ green_c = "\033[92m"
end_c = "\033[0m"
def to_json_types(value):
if isinstance(value, (str, int, tuple, dict, list)):
return value
elif isinstance(value, set):
return list(value)
else:
return str(value)
def handle_leaderboards(args: argparse.Namespace) -> None:
"""
Run the leaderboard generator.
@ -63,6 +86,24 @@ def handle_leaderboards(args: argparse.Namespace) -> None:
logger.info(f"Found {len(leaderboards_results)} leaderboards")
if args.execute_over_db:
RO_custom_engine = create_moonstream_engine(
url=MOONSTREAM_DB_URI_READ_ONLY,
pool_size=MOONSTREAM_POOL_SIZE,
statement_timeout=420 * 1000, # 7 minutes
)
RO_SessionLocal = sessionmaker(bind=RO_custom_engine)
def yield_db_read_only_session() -> Generator[Session, None, None]:
session = RO_SessionLocal()
try:
yield session
finally:
session.close()
yield_db_read_only_session_ctx = contextmanager(yield_db_read_only_session)
for leaderboard in leaderboards_results:
logger.info(
f"Processing leaderboard: {leaderboard.title} with id: {[tag for tag in leaderboard.tags if tag.startswith('leaderboard_id')]}"
@ -79,8 +120,6 @@ def handle_leaderboards(args: argparse.Namespace) -> None:
)
continue
### get results from query API
leaderboard_id = leaderboard_data["leaderboard_id"]
query_name = leaderboard_data["query_name"]
@ -90,29 +129,70 @@ def handle_leaderboards(args: argparse.Namespace) -> None:
else:
params = leaderboard_data["params"]
blockchain = leaderboard_data.get("blockchain", None)
if args.execute_over_db is False:
### execute query
try:
query_results = get_results_for_moonstream_query(
args.query_api_access_token,
query_name,
params,
blockchain,
MOONSTREAM_API_URL,
args.max_retries,
args.interval,
args.query_api_retries,
)
except Exception as e:
logger.error(f"Could not get results for query {query_name}: error: {e}")
continue
### get results from query API
### push results to leaderboard API
blockchain = leaderboard_data.get("blockchain", None)
if query_results is None:
logger.error(f"Could not get results for query {query_name} in time")
continue
### execute query
try:
query_results = get_results_for_moonstream_query(
args.query_api_access_token,
query_name,
params,
blockchain,
MOONSTREAM_API_URL,
args.max_retries,
args.interval,
args.query_api_retries,
)
except Exception as e:
logger.error(
f"Could not get results for query {query_name}: error: {e}"
)
continue
### push results to leaderboard API
if query_results is None:
logger.error(f"Could not get results for query {query_name} in time")
continue
records = query_results["data"]
else:
logger.info(f"Executing query {query_name} over database")
### get query
try:
query = get_query_by_name(
args.query_api_access_token, query_name, MOONSTREAM_API_URL
)
except Exception as e:
logger.error(f"Could not get queries from Moonstream API: {e}")
continue
### query content
print(query)
if query["query"] is None:
logger.error(f"Query {query_name} has no content")
continue
with yield_db_read_only_session_ctx() as db_session:
try:
results = db_session.execute(text(query["query"]), params)
except Exception as e:
logger.error(f"Could not execute query {query_name}: {e}")
continue
records = [
{key: to_json_types(value) for key, value in row._asdict().items()}
for row in db_session.execute(text(query["query"]), params).all()
]
leaderboard_push_api_url = f"{MOONSTREAM_ENGINE_URL}/leaderboard/{leaderboard_id}/scores?normalize_addresses={leaderboard_data['normalize_addresses']}&overwrite=true"
@ -121,14 +201,14 @@ def handle_leaderboards(args: argparse.Namespace) -> None:
"Content-Type": "application/json",
}
if len(query_results["data"]) > leaderboard_push_batch_size:
if len(records) > leaderboard_push_batch_size:
logger.info(
f"Pushing {len(query_results['data'])} scores to leaderboard {leaderboard_id} in batches of {leaderboard_push_batch_size}"
f"Pushing {len(records)} scores to leaderboard {leaderboard_id} in batches of {leaderboard_push_batch_size}"
)
leaderboard_push_batch(
leaderboard_id,
leaderboard_data,
query_results["data"],
records,
leaderboard_api_headers,
leaderboard_push_batch_size,
timeout=leaderboard_push_timeout_seconds,
@ -138,7 +218,7 @@ def handle_leaderboards(args: argparse.Namespace) -> None:
try:
leaderboard_api_response = requests.put(
leaderboard_push_api_url,
json=query_results["data"],
json=records,
headers=leaderboard_api_headers,
timeout=leaderboard_push_timeout_seconds,
)
@ -156,7 +236,7 @@ def handle_leaderboards(args: argparse.Namespace) -> None:
)
leaderboard_api_response = requests.get(
leaderboard_api_info_url, headers=leaderboard_api_headers, timeout=10
leaderboard_api_info_url, headers=leaderboard_api_headers, timeout=30
)
try:
@ -225,6 +305,11 @@ def main():
required=True,
help="Moonstream Access Token to use for Moonstream Query API requests",
)
leaderboard_generator_parser.add_argument(
"--execute-over-db",
action="store_true",
help="Execute query over database instead of Moonstream Query API",
)
leaderboard_generator_parser.add_argument(
"--leaderboard-push-batch-size",
type=int,

Wyświetl plik

@ -106,6 +106,48 @@ def get_results_for_moonstream_query(
return result
def list_queries(
moonstream_access_token: str,
api_url: str = MOONSTREAM_API_URL,
) -> List[Dict[str, Any]]:
"""
Return a list of queries available in account.
"""
api_url = api_url.rstrip("/")
request_url = f"{api_url}/queries/list"
headers = {
"Authorization": f"Bearer {moonstream_access_token}",
"Content-Type": "application/json",
}
response = requests.get(request_url, headers=headers, timeout=10)
response.raise_for_status()
return response.json()
def get_query_by_name(
moonstream_access_token: str,
query_name: str,
api_url: str = MOONSTREAM_API_URL,
) -> Dict[str, Any]:
"""
Return a query by name.
"""
api_url = api_url.rstrip("/")
request_url = f"{api_url}/queries/{query_name}/query"
headers = {
"Authorization": f"Bearer {moonstream_access_token}",
"Content-Type": "application/json",
}
response = requests.get(request_url, headers=headers, timeout=10)
response.raise_for_status()
return response.json()
def get_data_from_url(url):
response = requests.get(url)
if response.status_code == 200:

Wyświetl plik

@ -383,5 +383,5 @@ if MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID == "":
)
MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE = 12000
MOONSTREAM_LEADERBOARD_GENERATOR_BATCH_SIZE = 10000
MOONSTREAM_LEADERBOARD_GENERATOR_PUSH_TIMEOUT_SECONDS = 60