Rewrite insert logic.

1. Add new label_name_unverified which go to db without confirmatios.
2. Add insert as 1 operation in sql with deduplication.

Blocks cache will include labels_table.
As well state of database will go to blocks cahce once per from/to blocks range.

Add delete from database label with label_name_unverified wich already exists ind database.
pull/924/head
Andrey 2023-10-11 16:14:47 +03:00
rodzic c3dd0bac9b
commit ac2a1c11e3
5 zmienionych plików z 367 dodań i 65 usunięć

Wyświetl plik

@ -26,7 +26,13 @@ from .crawler import (
merge_function_call_crawl_jobs,
moonworm_crawler_update_job_as_pickedup,
)
from .db import add_events_to_session, add_function_calls_to_session, commit_session
from .db import (
add_events_to_session,
add_function_calls_to_session,
commit_session,
write_to_db,
delete_unverified_duplicates,
)
from .event_crawler import _crawl_events
from .function_call_crawler import _crawl_functions
from ..settings import (
@ -95,7 +101,7 @@ def continuous_crawler(
function_call_crawl_jobs: List[FunctionCallCrawlJob],
start_block: int,
max_blocks_batch: int = 100,
min_blocks_batch: int = 40,
min_blocks_batch: int = 5,
confirmations: int = 60,
min_sleep_time: float = 0.1,
heartbeat_interval: float = 60,
@ -156,13 +162,13 @@ def continuous_crawler(
logger.info(f"Starting continuous event crawler start_block={start_block}")
logger.info("Sending initial heartbeat")
heartbeat(
crawler_type=crawler_type,
blockchain_type=blockchain_type,
crawler_status=heartbeat_template,
)
# heartbeat(
# crawler_type=crawler_type,
# blockchain_type=blockchain_type,
# crawler_status=heartbeat_template,
# )
last_heartbeat_time = datetime.utcnow()
blocks_cache: Dict[int, int] = {}
blocks_cache: Dict[int, Optional[int]] = {}
current_sleep_time = min_sleep_time
failed_count = 0
try:
@ -171,7 +177,7 @@ def continuous_crawler(
time.sleep(current_sleep_time)
end_block = min(
web3.eth.blockNumber - confirmations,
web3.eth.blockNumber - min_blocks_batch,
start_block + max_blocks_batch,
)
@ -192,7 +198,7 @@ def continuous_crawler(
from_block=start_block,
to_block=end_block,
blocks_cache=blocks_cache,
db_block_query_batch=min_blocks_batch * 3,
db_block_query_batch=max_blocks_batch * 3,
)
logger.info(
f"Crawled {len(all_events)} events from {start_block} to {end_block}."
@ -220,6 +226,18 @@ def continuous_crawler(
current_time = datetime.utcnow()
write_to_db(
web3=web3, db_session=db_session, blockchain_type=blockchain_type
)
delete_unverified_duplicates(
db_session=db_session, blockchain_type=blockchain_type
)
commit_session(db_session)
### fetch confirmed transactions and events
if current_time - jobs_refetchet_time > timedelta(
seconds=new_jobs_refetch_interval
):
@ -243,8 +261,6 @@ def continuous_crawler(
if current_time - last_heartbeat_time > timedelta(
seconds=heartbeat_interval
):
# Commiting to db
commit_session(db_session)
# Update heartbeat
heartbeat_template["last_block"] = end_block
heartbeat_template["current_time"] = _date_to_str(current_time)
@ -260,11 +276,11 @@ def continuous_crawler(
heartbeat_template[
"function_call metrics"
] = ethereum_state_provider.metrics
heartbeat(
crawler_type=crawler_type,
blockchain_type=blockchain_type,
crawler_status=heartbeat_template,
)
# heartbeat(
# crawler_type=crawler_type,
# blockchain_type=blockchain_type,
# crawler_status=heartbeat_template,
# )
logger.info("Sending heartbeat.", heartbeat_template)
last_heartbeat_time = datetime.utcnow()
@ -304,13 +320,13 @@ def continuous_crawler(
heartbeat_template[
"die_reason"
] = f"{e.__class__.__name__}: {e}\n error_summary: {error_summary}\n error_traceback: {error_traceback}"
heartbeat_template["last_block"] = end_block
heartbeat(
crawler_type=crawler_type,
blockchain_type=blockchain_type,
crawler_status=heartbeat_template,
is_dead=True,
)
heartbeat_template["last_block"] = end_block # type: ignore
# heartbeat(
# crawler_type=crawler_type,
# blockchain_type=blockchain_type,
# crawler_status=heartbeat_template,
# is_dead=True,
# )
logger.exception(e)
raise e

Wyświetl plik

@ -124,7 +124,7 @@ def _retry_connect_web3(
logger.info(f"Retrying in {sleep_time} seconds")
time.sleep(sleep_time)
raise Exception(
f"Failed to connect to {blockchain_type} blockchain after {retry_count} retries: {error}"
f"Failed to connect to {blockchain_type} blockchain after {retry_count} retries: {error}" # type: ignore
)
@ -226,6 +226,7 @@ def find_all_deployed_blocks(
"""
all_deployed_blocks = {}
logger.info(f"Finding deployment blocks for {len(addresses_set)} addresses")
for address in addresses_set:
try:
code = web3.eth.getCode(address)
@ -237,6 +238,7 @@ def find_all_deployed_blocks(
)
if block is not None:
all_deployed_blocks[address] = block
logger.info(f"Found deployment block for {address}: {block}")
if block is None:
logger.error(f"Failed to get deployment block for {address}")
except Exception as e:
@ -259,7 +261,8 @@ def make_event_crawl_jobs(entries: List[BugoutSearchResult]) -> List[EventCrawlJ
crawl_job_by_selector: Dict[str, EventCrawlJob] = {}
for entry in entries:
abi_selector = _get_tag(entry, "abi_selector")
abi_selector = _get_tag(entry, "abi_method_hash")
# abi_selector = _get_tag(entry, "abi_selector")
contract_address = Web3().toChecksumAddress(_get_tag(entry, "address"))
entry_id = UUID(entry.entry_url.split("/")[-1]) # crying emoji
@ -300,8 +303,8 @@ def make_function_call_crawl_jobs(
entry_id = UUID(entry.entry_url.split("/")[-1]) # crying emoji
contract_address = Web3().toChecksumAddress(_get_tag(entry, "address"))
abi = json.loads(cast(str, entry.content))
# method_signature = encode_function_signature(abi)
method_signature = _get_tag(entry, "abi_selector")
method_signature = encode_function_signature(abi)
# method_signature = _get_tag(entry, "abi_selector")
if method_signature is None:
raise ValueError(f"{abi} is not a function ABI")

Wyświetl plik

@ -2,17 +2,23 @@ import logging
import json
from typing import Dict, List, Optional
from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model
from moonstreamdb.blockchain import (
AvailableBlockchainType,
get_label_model,
get_block_model,
)
from moonstreamdb.models import Base
from moonworm.crawler.function_call_crawler import ContractFunctionCall # type: ignore
# from sqlalchemy.dialects.postgresql.dml import insert
from sqlalchemy.orm import Session
from sqlalchemy import insert
from sqlalchemy.orm import Session, aliased
from sqlalchemy import insert, text, and_, exists, or_, func, update
from ..settings import CRAWLER_LABEL
from .event_crawler import Event
from web3 import Web3
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@ -20,7 +26,7 @@ logger = logging.getLogger(__name__)
def _event_to_label(
blockchain_type: AvailableBlockchainType, event: Event, label_name=CRAWLER_LABEL
) -> Base:
) -> Base: # type: ignore
"""
Creates a label model.
"""
@ -51,7 +57,7 @@ def _function_call_to_label(
blockchain_type: AvailableBlockchainType,
function_call: ContractFunctionCall,
label_name=CRAWLER_LABEL,
) -> Base:
) -> Base: # type: ignore
"""
Creates a label model.
"""
@ -211,3 +217,205 @@ def add_function_calls_to_session(
insert_statement = insert(label_model).values(function_calls_insert)
db_session.execute(insert_statement)
def write_to_db(
web3: Web3,
blockchain_type: AvailableBlockchainType,
db_session: Session,
label_name=CRAWLER_LABEL,
conformations: int = 100,
):
"""
Take all unvirified labels and update label to label_name.
Get last block number from block model and update label to label_name with deduplication of labels distinct ON (transaction_hash, log_index) for events and distinct ON (transaction_hash) for function calls.
And that
"""
label_model = get_label_model(blockchain_type)
block_model = get_block_model(blockchain_type)
label_name_unverified = f"{label_name}-unverified"
label_model_alias = aliased(label_model, name="polygon_labels2")
# update all labels to label_name
blockchain_block_number = web3.eth.block_number
latest_block = (
db_session.query(block_model.block_number)
.order_by(block_model.block_number.desc())
.limit(1)
)
latest_block_cte = latest_block.cte("latest_block")
events = (
db_session.query(
label_model.id, label_model.transaction_hash, label_model.log_index
)
.distinct(label_model.transaction_hash, label_model.log_index)
.filter(label_model.label == label_name_unverified)
.filter(label_model.log_index != None)
.filter(
or_(
label_model.block_number <= latest_block_cte.c.block_number,
label_model.block_number <= blockchain_block_number - conformations,
)
)
.filter(
~db_session.query(label_model_alias)
.filter(
and_(
label_model_alias.label == label_name,
label_model_alias.transaction_hash == label_model.transaction_hash,
label_model_alias.log_index == label_model.log_index,
)
)
.exists()
)
.order_by(
label_model.transaction_hash,
label_model.log_index,
label_model.block_number.desc(),
)
)
events_cte = events.cte("events")
function_calls = (
db_session.query(
label_model.id, label_model.transaction_hash, label_model.log_index
)
.distinct(label_model.transaction_hash)
.filter(label_model.label == label_name_unverified)
.filter(label_model.log_index == None)
.filter(
or_(
label_model.block_number <= latest_block_cte.c.block_number,
label_model.block_number <= blockchain_block_number - conformations,
)
)
.filter(
~db_session.query(label_model_alias)
.filter(
and_(
label_model_alias.label == label_name,
label_model_alias.transaction_hash == label_model.transaction_hash,
label_model_alias.log_index == None,
)
)
.exists()
)
.order_by(label_model.transaction_hash, label_model.block_number.desc())
)
function_calls_cte = function_calls.cte("function_calls")
union_all_subquery = (
db_session.query(events_cte)
.union_all(db_session.query(function_calls_cte))
.subquery()
)
logger.info("Updating labels")
# Update query
updated_labels = (
db_session.query(label_model)
.filter(label_model.id == union_all_subquery.c.events_id)
.update(
{"label": label_name}, synchronize_session=False
) # returns number of rows updated by query
)
logger.info(
f"latest block number database {latest_block.one_or_none()} , blockchain {blockchain_block_number} - conformations {conformations}"
)
logger.info(f"Updated {updated_labels} labels")
def delete_unverified_duplicates(
db_session: Session,
blockchain_type: AvailableBlockchainType,
label_name=CRAWLER_LABEL,
):
"""
Delete all unverified labels which already have verified labels.
"""
label_model = get_label_model(blockchain_type)
label_name_unverified = f"{label_name}-unverified"
label_model_alias = aliased(label_model, name="polygon_labels2")
duplicated_events = (
db_session.query(
label_model.id, label_model.transaction_hash, label_model.log_index
)
.distinct(label_model.transaction_hash, label_model.log_index)
.filter(label_model.label == label_name_unverified)
.filter(label_model.log_index != None)
.filter(
db_session.query(label_model_alias)
.filter(
and_(
label_model_alias.label == label_name,
label_model_alias.transaction_hash == label_model.transaction_hash,
label_model_alias.log_index == label_model.log_index,
)
)
.exists()
)
.order_by(
label_model.transaction_hash,
label_model.log_index,
label_model.block_number.desc(),
)
)
events_cte = duplicated_events.cte("events")
duplicated_function_calls = (
db_session.query(
label_model.id, label_model.transaction_hash, label_model.log_index
)
.distinct(label_model.transaction_hash)
.filter(label_model.label == label_name_unverified)
.filter(label_model.log_index == None)
.filter(
db_session.query(label_model_alias)
.filter(
and_(
label_model_alias.label == label_name,
label_model_alias.transaction_hash == label_model.transaction_hash,
label_model_alias.log_index == None,
)
)
.exists()
)
.order_by(label_model.transaction_hash, label_model.block_number.desc())
)
function_calls_cte = duplicated_function_calls.cte("function_calls")
union_all_subquery = (
db_session.query(events_cte)
.union_all(db_session.query(function_calls_cte))
.subquery()
)
logger.info("Deleting duplicates labels")
# Delete query
deleted_labels = (
db_session.query(label_model)
.filter(label_model.id == union_all_subquery.c.events_id)
.delete(synchronize_session=False) # returns number of rows updated by query
)
logger.info(f"Deleted duplicates {deleted_labels} labels")

Wyświetl plik

@ -1,14 +1,23 @@
import logging
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
import time
from moonstreamdb.blockchain import AvailableBlockchainType, get_block_model
from moonstreamdb.blockchain import (
AvailableBlockchainType,
get_block_model,
get_label_model,
)
from moonworm.crawler.log_scanner import _fetch_events_chunk, _crawl_events as moonworm_autoscale_crawl_events # type: ignore
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import and_, func
from sqlalchemy import text
from web3 import Web3
from ..settings import CRAWLER_LABEL
from .crawler import EventCrawlJob
from .db import get_block_model, get_label_model
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@ -43,10 +52,11 @@ def get_block_timestamp(
web3: Web3,
blockchain_type: AvailableBlockchainType,
block_number: int,
blocks_cache: Dict[int, int],
blocks_cache: Dict[int, Optional[int]],
from_block: int,
to_block: int,
max_blocks_batch: int = 30,
label_name: str = CRAWLER_LABEL,
) -> int:
"""
Get the timestamp of a block.
@ -64,55 +74,103 @@ def get_block_timestamp(
"""
assert max_blocks_batch > 0
if block_number in blocks_cache:
return blocks_cache[block_number]
if block_number in blocks_cache and blocks_cache[block_number] is not None:
return blocks_cache[block_number] # type: ignore
block_model = get_block_model(blockchain_type)
label_model = get_label_model(blockchain_type)
# from_block and to_block can be in reverse order
if from_block > to_block:
from_block, to_block = to_block, from_block
from_block_filter = from_block - max_blocks_batch - 1
to_block_filter = to_block + max_blocks_batch + 1
if block_number not in blocks_cache:
from_block_filter = from_block - max_blocks_batch - 1
to_block_filter = to_block + max_blocks_batch + 1
blocks = (
db_session.query(
func.json_object_agg(block_model.block_number, block_model.timestamp)
)
.filter(
and_(
block_model.block_number >= from_block_filter,
block_model.block_number <= to_block_filter,
blocks_range = db_session.query(
func.generate_series(from_block_filter, to_block_filter).label(
"block_number"
)
).cte("blocks_range")
blocks_table_cache = (
db_session.query(block_model.block_number, block_model.timestamp)
.filter(
and_(
block_model.block_number >= from_block_filter,
block_model.block_number <= to_block_filter,
)
)
.cte("blocks_table_cache")
)
.scalar()
)
### transform all keys to int to avoid casting after
label_table_cache = (
db_session.query(label_model.block_number, label_model.block_timestamp)
.filter(
and_(
label_model.block_number >= from_block_filter,
label_model.block_number <= to_block_filter,
label_model.label == label_name,
)
)
.distinct(label_model.block_number)
.cte("label_table_cache")
)
if blocks is not None:
blocks = {
int(block_number): timestamp for block_number, timestamp in blocks.items()
}
full_blocks_cache = (
db_session.query(
blocks_range.c.block_number,
func.coalesce(
blocks_table_cache.c.timestamp, label_table_cache.c.block_timestamp
).label("block_timestamp"),
)
.outerjoin(
blocks_table_cache,
blocks_range.c.block_number == blocks_table_cache.c.block_number,
)
.outerjoin(
label_table_cache,
blocks_range.c.block_number == label_table_cache.c.block_number,
)
.cte("blocks_cache")
)
blocks = db_session.query(
func.json_object_agg(
full_blocks_cache.c.block_number, full_blocks_cache.c.block_timestamp
)
).one()[0]
### transform all keys to int to avoid casting after
if blocks is not None:
blocks = {
int(block_number): timestamp
for block_number, timestamp in blocks.items()
}
blocks_cache.update(blocks)
target_block_timestamp: Optional[int] = None
if blocks:
target_block_timestamp = blocks.get(str(block_number))
if target_block_timestamp is None:
if blocks_cache[block_number] is None:
target_block_timestamp = _get_block_timestamp_from_web3(
web3, block_number
) # can be improved by using batch call
blocks_cache[block_number] = target_block_timestamp
if len(blocks_cache) + len(blocks) > (max_blocks_batch * 3 + 2):
if len(blocks_cache) > (max_blocks_batch * 3 + 2):
# clear cache lower than from_block
blocks_cache = blocks
blocks_cache = {
block_number: timestamp
for block_number, timestamp in blocks_cache.items()
if block_number >= from_block
}
return target_block_timestamp
return target_block_timestamp # type: ignore
def _crawl_events(
@ -122,7 +180,7 @@ def _crawl_events(
jobs: List[EventCrawlJob],
from_block: int,
to_block: int,
blocks_cache: Dict[int, int] = {},
blocks_cache: Dict[int, Optional[int]] = {},
db_block_query_batch=10,
) -> List[Event]:
all_events = []
@ -144,9 +202,9 @@ def _crawl_events(
blockchain_type,
raw_event["blockNumber"],
blocks_cache,
db_block_query_batch,
from_block,
to_block,
db_block_query_batch,
)
event = Event(
event_name=raw_event["event"],
@ -169,7 +227,7 @@ def _autoscale_crawl_events(
jobs: List[EventCrawlJob],
from_block: int,
to_block: int,
blocks_cache: Dict[int, int] = {},
blocks_cache: Dict[int, Optional[int]] = {},
batch_size: int = 1000,
db_block_query_batch=10,
) -> Tuple[List[Event], int]:
@ -186,6 +244,7 @@ def _autoscale_crawl_events(
batch_size,
job.contracts[0],
)
for raw_event in raw_events:
raw_event["blockTimestamp"] = get_block_timestamp(
db_session,

Wyświetl plik

@ -18,7 +18,13 @@ from .crawler import (
_retry_connect_web3,
update_entries_status_and_progress,
)
from .db import add_events_to_session, add_function_calls_to_session, commit_session
from .db import (
add_events_to_session,
add_function_calls_to_session,
commit_session,
write_to_db,
delete_unverified_duplicates,
)
from .event_crawler import _crawl_events, _autoscale_crawl_events
from .function_call_crawler import _crawl_functions
@ -76,7 +82,7 @@ def historical_crawler(
logger.info(f"Starting historical event crawler start_block={start_block}")
blocks_cache: Dict[int, int] = {}
blocks_cache: Dict[int, Optional[int]] = {}
failed_count = 0
original_start_block = start_block
@ -163,6 +169,16 @@ def historical_crawler(
progess_map=progess_map,
)
write_to_db(
web3,
blockchain_type,
db_session,
)
delete_unverified_duplicates(
db_session=db_session, blockchain_type=blockchain_type
)
# Commiting to db
commit_session(db_session)