historical-crawl-filters
Andrey 2023-02-06 17:21:32 +02:00
rodzic 9cd4dedabe
commit 8cb008f618
5 zmienionych plików z 109 dodań i 35 usunięć

Wyświetl plik

@ -4,7 +4,7 @@ from typing import Optional
from uuid import UUID
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamdb.db import yield_db_session_ctx
from moonstreamdb.db import yield_db_session_ctx, yield_db_read_only_session_ctx
from web3 import Web3
from web3.middleware import geth_poa_middleware
@ -169,7 +169,7 @@ def handle_historical_crawl(args: argparse.Namespace) -> None:
)
logger.info(f"Blockchain type: {blockchain_type.value}")
with yield_db_session_ctx() as db_session:
with yield_db_session_ctx() as db_session, yield_db_read_only_session_ctx() as db_read_only_session:
web3: Optional[Web3] = None
if args.web3 is None:
logger.info(
@ -225,8 +225,14 @@ def handle_historical_crawl(args: argparse.Namespace) -> None:
f"Start block {start_block} is less than end block {args.end}. This crawler crawls in the reverse direction."
)
list_of_addresses = []
for address in args.addresses_filter:
list_of_addresses.append(Web3.toChecksumAddress(address))
historical_crawler(
db_session,
db_read_only_session,
blockchain_type,
web3,
filtered_event_jobs,
@ -236,6 +242,7 @@ def handle_historical_crawl(args: argparse.Namespace) -> None:
args.max_blocks_batch,
args.min_sleep_time,
access_id=args.access_id,
list_of_addresses=list_of_addresses,
)
@ -407,6 +414,13 @@ def main() -> None:
default=False,
help="Only crawl events",
)
historical_crawl_parser.add_argument(
"--project-addresses",
nargs="*",
type=str,
default=None,
help="Comma separated list of project addresses",
)
historical_crawl_parser.set_defaults(func=handle_historical_crawl)
args = parser.parse_args()

Wyświetl plik

@ -1,11 +1,14 @@
import logging
import json
import time
from typing import Dict, List, Optional
from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model
from moonstreamdb.models import Base
from moonworm.crawler.function_call_crawler import ContractFunctionCall # type: ignore
from sqlalchemy.orm import Session
from sqlalchemy.dialects.postgresql import insert
from ..settings import CRAWLER_LABEL
from .event_crawler import Event
@ -154,37 +157,62 @@ def add_events_to_session(
) -> None:
label_model = get_label_model(blockchain_type)
events_hashes_to_save = [event.transaction_hash for event in events]
# events_hashes_to_save = [event.transaction_hash for event in events]
existing_labels = (
db_session.query(label_model.transaction_hash, label_model.log_index)
.filter(
label_model.label == label_name,
label_model.log_index != None,
label_model.transaction_hash.in_(events_hashes_to_save),
)
.all()
# existing_labels = (
# db_session.query(label_model.transaction_hash, label_model.log_index)
# .filter(
# label_model.label == label_name,
# label_model.log_index != None,
# label_model.transaction_hash.in_(events_hashes_to_save),
# )
# .all()
# )
# existing_labels_transactions = []
# existing_log_index_by_tx_hash: Dict[str, List[int]] = {}
# for label in existing_labels:
# if label[0] not in existing_labels_transactions:
# existing_labels_transactions.append(label[0])
# existing_log_index_by_tx_hash[label[0]] = []
# existing_log_index_by_tx_hash[label[0]].append(label[1])
# labels_to_save = []
# for event in events:
# if event.transaction_hash not in existing_labels_transactions:
# labels_to_save.append(_event_to_label(blockchain_type, event, label_name))
# elif (
# event.log_index not in existing_log_index_by_tx_hash[event.transaction_hash]
# ):
# labels_to_save.append(_event_to_label(blockchain_type, event, label_name))
logger.info(f"Saving {len(events)} event labels to session")
logger.info(f"Test")
# db_session.add_all(labels_to_save)
insert_statement = insert(label_model).values(events)
result_stmt = insert_statement.on_conflict_do_update(
index_elements=[
label_model.address,
label_model.transaction_hash,
label_model.log_index,
],
set_=dict(
label=insert_statement.excluded.label,
label_data=insert_statement.excluded.label_data,
block_number=insert_statement.excluded.block_number,
),
)
existing_labels_transactions = []
existing_log_index_by_tx_hash: Dict[str, List[int]] = {}
for label in existing_labels:
if label[0] not in existing_labels_transactions:
existing_labels_transactions.append(label[0])
existing_log_index_by_tx_hash[label[0]] = []
existing_log_index_by_tx_hash[label[0]].append(label[1])
labels_to_save = []
for event in events:
if event.transaction_hash not in existing_labels_transactions:
labels_to_save.append(_event_to_label(blockchain_type, event, label_name))
elif (
event.log_index not in existing_log_index_by_tx_hash[event.transaction_hash]
):
labels_to_save.append(_event_to_label(blockchain_type, event, label_name))
logger.info(f"Saving {len(labels_to_save)} event labels to session")
db_session.add_all(labels_to_save)
try:
start = time.time()
logger.info("Adding events to session")
db_session.execute(result_stmt)
db_session.commit()
logger.info(f"Adding events to session took {time.time() - start} seconds")
time.sleep(0.1)
except:
db_session.rollback()
def add_function_calls_to_session(

Wyświetl plik

@ -1,5 +1,6 @@
import logging
from dataclasses import dataclass
import time
from typing import Any, Dict, List, Optional, Tuple
from moonstreamdb.blockchain import AvailableBlockchainType, get_block_model
@ -98,6 +99,7 @@ def get_block_timestamp(
def _crawl_events(
db_session: Session,
db_read_only_session: Session,
blockchain_type: AvailableBlockchainType,
web3: Web3,
jobs: List[EventCrawlJob],
@ -105,6 +107,7 @@ def _crawl_events(
to_block: int,
blocks_cache: Dict[int, int] = {},
db_block_query_batch=10,
list_of_addresses: List[str] = [],
) -> List[Event]:
all_events = []
for job in jobs:
@ -117,10 +120,11 @@ def _crawl_events(
on_decode_error=lambda e: print(
f"Error decoding event: {e}"
), # TODO report via humbug
list_of_addresses=list_of_addresses,
)
for raw_event in raw_events:
raw_event["blockTimestamp"] = get_block_timestamp(
db_session,
db_read_only_session,
web3,
blockchain_type,
raw_event["blockNumber"],
@ -143,6 +147,7 @@ def _crawl_events(
def _autoscale_crawl_events(
db_session: Session,
db_read_only_session: Session,
blockchain_type: AvailableBlockchainType,
web3: Web3,
jobs: List[EventCrawlJob],
@ -151,6 +156,7 @@ def _autoscale_crawl_events(
blocks_cache: Dict[int, int] = {},
batch_size: int = 1000,
db_block_query_batch=10,
list_of_addresses: List[Any] = [],
) -> Tuple[List[Event], int]:
"""
@ -159,6 +165,11 @@ def _autoscale_crawl_events(
all_events = []
for job in jobs:
start_time = time.time()
logger.info(
f"Start crawling events for {job.event_abi['name']} from {from_block} to {to_block} with batch_size {batch_size}"
)
raw_events, batch_size = moonworm_autoscale_crawl_events(
web3,
job.event_abi,
@ -167,9 +178,12 @@ def _autoscale_crawl_events(
batch_size,
job.contracts[0],
)
logger.info(
f"Finished crawling events for {job.event_abi['name']} from {from_block} to {to_block} with batch_size {batch_size} in {time.time() - start_time} seconds"
)
for raw_event in raw_events:
raw_event["blockTimestamp"] = get_block_timestamp(
db_session,
db_read_only_session,
web3,
blockchain_type,
raw_event["blockNumber"],
@ -186,5 +200,8 @@ def _autoscale_crawl_events(
log_index=raw_event["logIndex"],
)
all_events.append(event)
logger.info(
f"Finished processing events for {job.event_abi['name']} from {from_block} to {to_block} with batch_size {batch_size} in {time.time() - start_time} seconds"
)
return all_events, batch_size

Wyświetl plik

@ -1,6 +1,6 @@
import logging
import time
from typing import Dict, List, Optional, Tuple
from typing import Dict, List, Optional, Tuple, Any
from uuid import UUID
from moonstreamdb.blockchain import AvailableBlockchainType
@ -22,6 +22,7 @@ logger = logging.getLogger(__name__)
def historical_crawler(
db_session: Session,
db_read_only_session: Session,
blockchain_type: AvailableBlockchainType,
web3: Optional[Web3],
event_crawl_jobs: List[EventCrawlJob],
@ -31,6 +32,7 @@ def historical_crawler(
max_blocks_batch: int = 100,
min_sleep_time: float = 0.1,
access_id: Optional[UUID] = None,
list_of_addresses: Optional[Any] = None,
):
assert max_blocks_batch > 0, "max_blocks_batch must be greater than 0"
assert min_sleep_time > 0, "min_sleep_time must be greater than 0"
@ -60,6 +62,9 @@ def historical_crawler(
blocks_cache: Dict[int, int] = {}
failed_count = 0
if list_of_addresses is not None:
project_addresses_holders = [""]
while start_block >= end_block:
try:
@ -75,6 +80,7 @@ def historical_crawler(
if function_call_crawl_jobs:
all_events = _crawl_events(
db_session=db_session,
db_read_only_session=db_read_only_session,
blockchain_type=blockchain_type,
web3=web3,
jobs=event_crawl_jobs,
@ -82,12 +88,14 @@ def historical_crawler(
to_block=start_block,
blocks_cache=blocks_cache,
db_block_query_batch=max_blocks_batch,
list_of_addresses=project_addresses_holders,
)
else:
all_events, max_blocks_batch = _autoscale_crawl_events(
db_session=db_session,
db_read_only_session=db_read_only_session,
blockchain_type=blockchain_type,
web3=web3,
jobs=event_crawl_jobs,
@ -95,13 +103,20 @@ def historical_crawler(
to_block=start_block,
blocks_cache=blocks_cache,
db_block_query_batch=max_blocks_batch,
list_of_addresses=project_addresses_holders,
)
logger.info(
f"Crawled {len(all_events)} events from {start_block} to {batch_end_block}."
)
# Adding events to db
start_time = time.time()
logger.info(f"Adding events to db.")
add_events_to_session(db_session, all_events, blockchain_type)
logger.info(f"Added events to db in {time.time() - start_time} seconds.")
if function_call_crawl_jobs:
logger.info(
f"Crawling function calls from {start_block} to {batch_end_block}"

Wyświetl plik

@ -2,4 +2,4 @@
Moonstream crawlers version.
"""
MOONCRAWL_VERSION = "0.2.5"
MOONCRAWL_VERSION = "0.2.6"