diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py index ee58258c..be040b45 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py @@ -26,7 +26,13 @@ from .crawler import ( merge_function_call_crawl_jobs, moonworm_crawler_update_job_as_pickedup, ) -from .db import add_events_to_session, add_function_calls_to_session, commit_session +from .db import ( + add_events_to_session, + add_function_calls_to_session, + commit_session, + write_to_db, + delete_unverified_duplicates, +) from .event_crawler import _crawl_events from .function_call_crawler import _crawl_functions from ..settings import ( @@ -95,7 +101,7 @@ def continuous_crawler( function_call_crawl_jobs: List[FunctionCallCrawlJob], start_block: int, max_blocks_batch: int = 100, - min_blocks_batch: int = 40, + min_blocks_batch: int = 5, confirmations: int = 60, min_sleep_time: float = 0.1, heartbeat_interval: float = 60, @@ -156,13 +162,13 @@ def continuous_crawler( logger.info(f"Starting continuous event crawler start_block={start_block}") logger.info("Sending initial heartbeat") - heartbeat( - crawler_type=crawler_type, - blockchain_type=blockchain_type, - crawler_status=heartbeat_template, - ) + # heartbeat( + # crawler_type=crawler_type, + # blockchain_type=blockchain_type, + # crawler_status=heartbeat_template, + # ) last_heartbeat_time = datetime.utcnow() - blocks_cache: Dict[int, int] = {} + blocks_cache: Dict[int, Optional[int]] = {} current_sleep_time = min_sleep_time failed_count = 0 try: @@ -171,7 +177,7 @@ def continuous_crawler( time.sleep(current_sleep_time) end_block = min( - web3.eth.blockNumber - confirmations, + web3.eth.blockNumber - min_blocks_batch, start_block + max_blocks_batch, ) @@ -192,7 +198,7 @@ def continuous_crawler( from_block=start_block, to_block=end_block, blocks_cache=blocks_cache, - db_block_query_batch=min_blocks_batch * 3, + db_block_query_batch=max_blocks_batch * 3, ) logger.info( f"Crawled {len(all_events)} events from {start_block} to {end_block}." @@ -220,6 +226,18 @@ def continuous_crawler( current_time = datetime.utcnow() + write_to_db( + web3=web3, db_session=db_session, blockchain_type=blockchain_type + ) + + delete_unverified_duplicates( + db_session=db_session, blockchain_type=blockchain_type + ) + + commit_session(db_session) + + ### fetch confirmed transactions and events + if current_time - jobs_refetchet_time > timedelta( seconds=new_jobs_refetch_interval ): @@ -243,8 +261,6 @@ def continuous_crawler( if current_time - last_heartbeat_time > timedelta( seconds=heartbeat_interval ): - # Commiting to db - commit_session(db_session) # Update heartbeat heartbeat_template["last_block"] = end_block heartbeat_template["current_time"] = _date_to_str(current_time) @@ -260,11 +276,11 @@ def continuous_crawler( heartbeat_template[ "function_call metrics" ] = ethereum_state_provider.metrics - heartbeat( - crawler_type=crawler_type, - blockchain_type=blockchain_type, - crawler_status=heartbeat_template, - ) + # heartbeat( + # crawler_type=crawler_type, + # blockchain_type=blockchain_type, + # crawler_status=heartbeat_template, + # ) logger.info("Sending heartbeat.", heartbeat_template) last_heartbeat_time = datetime.utcnow() @@ -304,13 +320,13 @@ def continuous_crawler( heartbeat_template[ "die_reason" ] = f"{e.__class__.__name__}: {e}\n error_summary: {error_summary}\n error_traceback: {error_traceback}" - heartbeat_template["last_block"] = end_block - heartbeat( - crawler_type=crawler_type, - blockchain_type=blockchain_type, - crawler_status=heartbeat_template, - is_dead=True, - ) + heartbeat_template["last_block"] = end_block # type: ignore + # heartbeat( + # crawler_type=crawler_type, + # blockchain_type=blockchain_type, + # crawler_status=heartbeat_template, + # is_dead=True, + # ) logger.exception(e) raise e diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py index 3edacc93..50ba1b76 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py @@ -124,7 +124,7 @@ def _retry_connect_web3( logger.info(f"Retrying in {sleep_time} seconds") time.sleep(sleep_time) raise Exception( - f"Failed to connect to {blockchain_type} blockchain after {retry_count} retries: {error}" + f"Failed to connect to {blockchain_type} blockchain after {retry_count} retries: {error}" # type: ignore ) @@ -226,6 +226,7 @@ def find_all_deployed_blocks( """ all_deployed_blocks = {} + logger.info(f"Finding deployment blocks for {len(addresses_set)} addresses") for address in addresses_set: try: code = web3.eth.getCode(address) @@ -237,6 +238,7 @@ def find_all_deployed_blocks( ) if block is not None: all_deployed_blocks[address] = block + logger.info(f"Found deployment block for {address}: {block}") if block is None: logger.error(f"Failed to get deployment block for {address}") except Exception as e: @@ -259,7 +261,8 @@ def make_event_crawl_jobs(entries: List[BugoutSearchResult]) -> List[EventCrawlJ crawl_job_by_selector: Dict[str, EventCrawlJob] = {} for entry in entries: - abi_selector = _get_tag(entry, "abi_selector") + abi_selector = _get_tag(entry, "abi_method_hash") + # abi_selector = _get_tag(entry, "abi_selector") contract_address = Web3().toChecksumAddress(_get_tag(entry, "address")) entry_id = UUID(entry.entry_url.split("/")[-1]) # crying emoji @@ -300,8 +303,8 @@ def make_function_call_crawl_jobs( entry_id = UUID(entry.entry_url.split("/")[-1]) # crying emoji contract_address = Web3().toChecksumAddress(_get_tag(entry, "address")) abi = json.loads(cast(str, entry.content)) - # method_signature = encode_function_signature(abi) - method_signature = _get_tag(entry, "abi_selector") + method_signature = encode_function_signature(abi) + # method_signature = _get_tag(entry, "abi_selector") if method_signature is None: raise ValueError(f"{abi} is not a function ABI") diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py index 2fa8d5ac..20e94027 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py @@ -2,17 +2,23 @@ import logging import json from typing import Dict, List, Optional -from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model +from moonstreamdb.blockchain import ( + AvailableBlockchainType, + get_label_model, + get_block_model, +) from moonstreamdb.models import Base from moonworm.crawler.function_call_crawler import ContractFunctionCall # type: ignore # from sqlalchemy.dialects.postgresql.dml import insert -from sqlalchemy.orm import Session -from sqlalchemy import insert +from sqlalchemy.orm import Session, aliased +from sqlalchemy import insert, text, and_, exists, or_, func, update from ..settings import CRAWLER_LABEL from .event_crawler import Event +from web3 import Web3 + logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -20,7 +26,7 @@ logger = logging.getLogger(__name__) def _event_to_label( blockchain_type: AvailableBlockchainType, event: Event, label_name=CRAWLER_LABEL -) -> Base: +) -> Base: # type: ignore """ Creates a label model. """ @@ -51,7 +57,7 @@ def _function_call_to_label( blockchain_type: AvailableBlockchainType, function_call: ContractFunctionCall, label_name=CRAWLER_LABEL, -) -> Base: +) -> Base: # type: ignore """ Creates a label model. """ @@ -211,3 +217,205 @@ def add_function_calls_to_session( insert_statement = insert(label_model).values(function_calls_insert) db_session.execute(insert_statement) + + +def write_to_db( + web3: Web3, + blockchain_type: AvailableBlockchainType, + db_session: Session, + label_name=CRAWLER_LABEL, + conformations: int = 100, +): + """ + Take all unvirified labels and update label to label_name. + Get last block number from block model and update label to label_name with deduplication of labels distinct ON (transaction_hash, log_index) for events and distinct ON (transaction_hash) for function calls. + And that + """ + + label_model = get_label_model(blockchain_type) + + block_model = get_block_model(blockchain_type) + + label_name_unverified = f"{label_name}-unverified" + + label_model_alias = aliased(label_model, name="polygon_labels2") + + # update all labels to label_name + + blockchain_block_number = web3.eth.block_number + + latest_block = ( + db_session.query(block_model.block_number) + .order_by(block_model.block_number.desc()) + .limit(1) + ) + + latest_block_cte = latest_block.cte("latest_block") + + events = ( + db_session.query( + label_model.id, label_model.transaction_hash, label_model.log_index + ) + .distinct(label_model.transaction_hash, label_model.log_index) + .filter(label_model.label == label_name_unverified) + .filter(label_model.log_index != None) + .filter( + or_( + label_model.block_number <= latest_block_cte.c.block_number, + label_model.block_number <= blockchain_block_number - conformations, + ) + ) + .filter( + ~db_session.query(label_model_alias) + .filter( + and_( + label_model_alias.label == label_name, + label_model_alias.transaction_hash == label_model.transaction_hash, + label_model_alias.log_index == label_model.log_index, + ) + ) + .exists() + ) + .order_by( + label_model.transaction_hash, + label_model.log_index, + label_model.block_number.desc(), + ) + ) + + events_cte = events.cte("events") + + function_calls = ( + db_session.query( + label_model.id, label_model.transaction_hash, label_model.log_index + ) + .distinct(label_model.transaction_hash) + .filter(label_model.label == label_name_unverified) + .filter(label_model.log_index == None) + .filter( + or_( + label_model.block_number <= latest_block_cte.c.block_number, + label_model.block_number <= blockchain_block_number - conformations, + ) + ) + .filter( + ~db_session.query(label_model_alias) + .filter( + and_( + label_model_alias.label == label_name, + label_model_alias.transaction_hash == label_model.transaction_hash, + label_model_alias.log_index == None, + ) + ) + .exists() + ) + .order_by(label_model.transaction_hash, label_model.block_number.desc()) + ) + + function_calls_cte = function_calls.cte("function_calls") + + union_all_subquery = ( + db_session.query(events_cte) + .union_all(db_session.query(function_calls_cte)) + .subquery() + ) + + logger.info("Updating labels") + # Update query + updated_labels = ( + db_session.query(label_model) + .filter(label_model.id == union_all_subquery.c.events_id) + .update( + {"label": label_name}, synchronize_session=False + ) # returns number of rows updated by query + ) + + logger.info( + f"latest block number database {latest_block.one_or_none()} , blockchain {blockchain_block_number} - conformations {conformations}" + ) + + logger.info(f"Updated {updated_labels} labels") + + +def delete_unverified_duplicates( + db_session: Session, + blockchain_type: AvailableBlockchainType, + label_name=CRAWLER_LABEL, +): + """ + Delete all unverified labels which already have verified labels. + """ + + label_model = get_label_model(blockchain_type) + + label_name_unverified = f"{label_name}-unverified" + + label_model_alias = aliased(label_model, name="polygon_labels2") + + duplicated_events = ( + db_session.query( + label_model.id, label_model.transaction_hash, label_model.log_index + ) + .distinct(label_model.transaction_hash, label_model.log_index) + .filter(label_model.label == label_name_unverified) + .filter(label_model.log_index != None) + .filter( + db_session.query(label_model_alias) + .filter( + and_( + label_model_alias.label == label_name, + label_model_alias.transaction_hash == label_model.transaction_hash, + label_model_alias.log_index == label_model.log_index, + ) + ) + .exists() + ) + .order_by( + label_model.transaction_hash, + label_model.log_index, + label_model.block_number.desc(), + ) + ) + + events_cte = duplicated_events.cte("events") + + duplicated_function_calls = ( + db_session.query( + label_model.id, label_model.transaction_hash, label_model.log_index + ) + .distinct(label_model.transaction_hash) + .filter(label_model.label == label_name_unverified) + .filter(label_model.log_index == None) + .filter( + db_session.query(label_model_alias) + .filter( + and_( + label_model_alias.label == label_name, + label_model_alias.transaction_hash == label_model.transaction_hash, + label_model_alias.log_index == None, + ) + ) + .exists() + ) + .order_by(label_model.transaction_hash, label_model.block_number.desc()) + ) + + function_calls_cte = duplicated_function_calls.cte("function_calls") + + union_all_subquery = ( + db_session.query(events_cte) + .union_all(db_session.query(function_calls_cte)) + .subquery() + ) + + logger.info("Deleting duplicates labels") + + # Delete query + + deleted_labels = ( + db_session.query(label_model) + .filter(label_model.id == union_all_subquery.c.events_id) + .delete(synchronize_session=False) # returns number of rows updated by query + ) + + logger.info(f"Deleted duplicates {deleted_labels} labels") diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py index 775dbf08..13d2df80 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py @@ -1,14 +1,23 @@ import logging from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple +import time -from moonstreamdb.blockchain import AvailableBlockchainType, get_block_model +from moonstreamdb.blockchain import ( + AvailableBlockchainType, + get_block_model, + get_label_model, +) from moonworm.crawler.log_scanner import _fetch_events_chunk, _crawl_events as moonworm_autoscale_crawl_events # type: ignore from sqlalchemy.orm.session import Session from sqlalchemy.sql.expression import and_, func +from sqlalchemy import text from web3 import Web3 +from ..settings import CRAWLER_LABEL from .crawler import EventCrawlJob +from .db import get_block_model, get_label_model + logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -43,10 +52,11 @@ def get_block_timestamp( web3: Web3, blockchain_type: AvailableBlockchainType, block_number: int, - blocks_cache: Dict[int, int], + blocks_cache: Dict[int, Optional[int]], from_block: int, to_block: int, max_blocks_batch: int = 30, + label_name: str = CRAWLER_LABEL, ) -> int: """ Get the timestamp of a block. @@ -64,55 +74,103 @@ def get_block_timestamp( """ assert max_blocks_batch > 0 - if block_number in blocks_cache: - return blocks_cache[block_number] + if block_number in blocks_cache and blocks_cache[block_number] is not None: + return blocks_cache[block_number] # type: ignore block_model = get_block_model(blockchain_type) + label_model = get_label_model(blockchain_type) # from_block and to_block can be in reverse order if from_block > to_block: from_block, to_block = to_block, from_block - from_block_filter = from_block - max_blocks_batch - 1 - to_block_filter = to_block + max_blocks_batch + 1 + if block_number not in blocks_cache: + from_block_filter = from_block - max_blocks_batch - 1 + to_block_filter = to_block + max_blocks_batch + 1 - blocks = ( - db_session.query( - func.json_object_agg(block_model.block_number, block_model.timestamp) - ) - .filter( - and_( - block_model.block_number >= from_block_filter, - block_model.block_number <= to_block_filter, + blocks_range = db_session.query( + func.generate_series(from_block_filter, to_block_filter).label( + "block_number" ) + ).cte("blocks_range") + + blocks_table_cache = ( + db_session.query(block_model.block_number, block_model.timestamp) + .filter( + and_( + block_model.block_number >= from_block_filter, + block_model.block_number <= to_block_filter, + ) + ) + .cte("blocks_table_cache") ) - .scalar() - ) - ### transform all keys to int to avoid casting after + label_table_cache = ( + db_session.query(label_model.block_number, label_model.block_timestamp) + .filter( + and_( + label_model.block_number >= from_block_filter, + label_model.block_number <= to_block_filter, + label_model.label == label_name, + ) + ) + .distinct(label_model.block_number) + .cte("label_table_cache") + ) - if blocks is not None: - blocks = { - int(block_number): timestamp for block_number, timestamp in blocks.items() - } + full_blocks_cache = ( + db_session.query( + blocks_range.c.block_number, + func.coalesce( + blocks_table_cache.c.timestamp, label_table_cache.c.block_timestamp + ).label("block_timestamp"), + ) + .outerjoin( + blocks_table_cache, + blocks_range.c.block_number == blocks_table_cache.c.block_number, + ) + .outerjoin( + label_table_cache, + blocks_range.c.block_number == label_table_cache.c.block_number, + ) + .cte("blocks_cache") + ) + + blocks = db_session.query( + func.json_object_agg( + full_blocks_cache.c.block_number, full_blocks_cache.c.block_timestamp + ) + ).one()[0] + + ### transform all keys to int to avoid casting after + + if blocks is not None: + blocks = { + int(block_number): timestamp + for block_number, timestamp in blocks.items() + } + + blocks_cache.update(blocks) target_block_timestamp: Optional[int] = None - if blocks: - target_block_timestamp = blocks.get(str(block_number)) - if target_block_timestamp is None: + if blocks_cache[block_number] is None: target_block_timestamp = _get_block_timestamp_from_web3( web3, block_number ) # can be improved by using batch call blocks_cache[block_number] = target_block_timestamp - if len(blocks_cache) + len(blocks) > (max_blocks_batch * 3 + 2): + if len(blocks_cache) > (max_blocks_batch * 3 + 2): # clear cache lower than from_block - blocks_cache = blocks + blocks_cache = { + block_number: timestamp + for block_number, timestamp in blocks_cache.items() + if block_number >= from_block + } - return target_block_timestamp + return target_block_timestamp # type: ignore def _crawl_events( @@ -122,7 +180,7 @@ def _crawl_events( jobs: List[EventCrawlJob], from_block: int, to_block: int, - blocks_cache: Dict[int, int] = {}, + blocks_cache: Dict[int, Optional[int]] = {}, db_block_query_batch=10, ) -> List[Event]: all_events = [] @@ -144,9 +202,9 @@ def _crawl_events( blockchain_type, raw_event["blockNumber"], blocks_cache, - db_block_query_batch, from_block, to_block, + db_block_query_batch, ) event = Event( event_name=raw_event["event"], @@ -169,7 +227,7 @@ def _autoscale_crawl_events( jobs: List[EventCrawlJob], from_block: int, to_block: int, - blocks_cache: Dict[int, int] = {}, + blocks_cache: Dict[int, Optional[int]] = {}, batch_size: int = 1000, db_block_query_batch=10, ) -> Tuple[List[Event], int]: @@ -186,6 +244,7 @@ def _autoscale_crawl_events( batch_size, job.contracts[0], ) + for raw_event in raw_events: raw_event["blockTimestamp"] = get_block_timestamp( db_session, diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py index 24d7c33f..78dd1878 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py @@ -18,7 +18,13 @@ from .crawler import ( _retry_connect_web3, update_entries_status_and_progress, ) -from .db import add_events_to_session, add_function_calls_to_session, commit_session +from .db import ( + add_events_to_session, + add_function_calls_to_session, + commit_session, + write_to_db, + delete_unverified_duplicates, +) from .event_crawler import _crawl_events, _autoscale_crawl_events from .function_call_crawler import _crawl_functions @@ -76,7 +82,7 @@ def historical_crawler( logger.info(f"Starting historical event crawler start_block={start_block}") - blocks_cache: Dict[int, int] = {} + blocks_cache: Dict[int, Optional[int]] = {} failed_count = 0 original_start_block = start_block @@ -163,6 +169,16 @@ def historical_crawler( progess_map=progess_map, ) + write_to_db( + web3, + blockchain_type, + db_session, + ) + + delete_unverified_duplicates( + db_session=db_session, blockchain_type=blockchain_type + ) + # Commiting to db commit_session(db_session)