update of continuous crawler to batch save session

pull/510/head
yhtiyar 2021-12-21 18:33:31 +03:00
rodzic 3228e5e740
commit 6be687b1ed
4 zmienionych plików z 22 dodań i 15 usunięć

Wyświetl plik

@ -167,7 +167,7 @@ def main() -> None:
"--confirmations",
"-c",
type=int,
default=100,
default=175,
help="Number of confirmations to wait for",
)

Wyświetl plik

@ -24,7 +24,7 @@ from .crawler import (
merge_event_crawl_jobs,
merge_function_call_crawl_jobs,
)
from .db import save_events, save_function_calls
from .db import add_events_to_session, add_function_calls_to_session, commit_session
from .event_crawler import _crawl_events
from .function_call_crawler import _crawl_functions
@ -122,7 +122,7 @@ def continuous_crawler(
heartbeat_interval: float = 60,
new_jobs_refetch_interval: float = 120,
):
crawler_type = "continuous"
assert (
min_blocks_batch < max_blocks_batch
), "min_blocks_batch must be less than max_blocks_batch"
@ -166,7 +166,7 @@ def continuous_crawler(
logger.info(f"Starting continuous event crawler start_block={start_block}")
logger.info("Sending initial heartbeat")
heartbeat(
crawler_type="event",
crawler_type=crawler_type,
blockchain_type=blockchain_type,
crawler_status=heartbeat_template,
)
@ -209,7 +209,7 @@ def continuous_crawler(
f"Crawled {len(all_events)} events from {start_block} to {end_block}."
)
save_events(db_session, all_events, blockchain_type)
add_events_to_session(db_session, all_events, blockchain_type)
logger.info(
f"Crawling function calls from {start_block} to {end_block}"
@ -225,7 +225,9 @@ def continuous_crawler(
f"Crawled {len(all_function_calls)} function calls from {start_block} to {end_block}."
)
save_function_calls(db_session, all_function_calls, blockchain_type)
add_function_calls_to_session(
db_session, all_function_calls, blockchain_type
)
current_time = datetime.utcnow()
@ -243,6 +245,9 @@ def continuous_crawler(
if current_time - last_heartbeat_time > timedelta(
seconds=heartbeat_interval
):
# Commiting to db
commit_session(db_session)
# Update heartbeat
heartbeat_template["last_block"] = end_block
heartbeat_template["current_time"] = _date_to_str(current_time)
@ -259,7 +264,7 @@ def continuous_crawler(
"function_call metrics"
] = ethereum_state_provider.metrics
heartbeat(
crawler_type="event",
crawler_type=crawler_type,
blockchain_type=blockchain_type,
crawler_status=heartbeat_template,
)
@ -303,7 +308,7 @@ def continuous_crawler(
] = f"{e.__class__.__name__}: {e}\n error_summary: {error_summary}\n error_traceback: {error_traceback}"
heartbeat_template["last_block"] = end_block
heartbeat(
crawler_type="event",
crawler_type=crawler_type,
blockchain_type=blockchain_type,
crawler_status=heartbeat_template,
is_dead=True,

Wyświetl plik

@ -332,7 +332,7 @@ def heartbeat(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
entry_id=heartbeat_entry_id,
title=f"{crawler_type} Heartbeat - {blockchain_type.value}",
title=f"{crawler_type} Heartbeat - {blockchain_type.value}. Status: {crawler_status['status']} - {crawler_status['current_time']}",
content=f"{json.dumps(crawler_status, indent=2)}",
)
if is_dead:

Wyświetl plik

@ -87,12 +87,12 @@ def get_last_labeled_block_number(
return block_number[0] if block_number else None
def save_labels(db_session: Session, labels: List[Base]) -> None:
def commit_session(db_session: Session) -> None:
"""
Save labels in the database.
"""
try:
db_session.add_all(labels)
logger.info("Committing session to database")
db_session.commit()
except Exception as e:
logger.error(f"Failed to save labels: {e}")
@ -100,7 +100,7 @@ def save_labels(db_session: Session, labels: List[Base]) -> None:
raise e
def save_events(
def add_events_to_session(
db_session: Session, events: List[Event], blockchain_type: AvailableBlockchainType
) -> None:
label_model = get_label_model(blockchain_type)
@ -134,10 +134,11 @@ def save_events(
):
labels_to_save.append(_event_to_label(blockchain_type, event))
save_labels(db_session, labels_to_save)
logger.info(f"Saving {len(labels_to_save)} labels to session")
db_session.add_all(labels_to_save)
def save_function_calls(
def add_function_calls_to_session(
db_session: Session,
function_calls: List[ContractFunctionCall],
blockchain_type: AvailableBlockchainType,
@ -166,4 +167,5 @@ def save_function_calls(
if function_call.transaction_hash not in existing_labels_transactions
]
save_labels(db_session, labels_to_save)
logger.info(f"Saving {len(labels_to_save)} labels to session")
db_session.add_all(labels_to_save)