kopia lustrzana https://github.com/bugout-dev/moonstream
Added checkpointing
rodzic
5c8d003079
commit
2691d62b79
|
@ -11,7 +11,7 @@ from moonstreamdb.db import yield_db_session_ctx
|
|||
from moonstreamdb.models import EthereumLabel, PolygonLabel
|
||||
|
||||
from .data import BlockBounds
|
||||
from .datastore import setup_database
|
||||
from .datastore import setup_database, get_last_saved_block
|
||||
from .derive import (
|
||||
current_owners,
|
||||
current_market_values,
|
||||
|
@ -74,6 +74,13 @@ def handle_materialize(args: argparse.Namespace) -> None:
|
|||
with yield_db_session_ctx() as db_session, contextlib.closing(
|
||||
sqlite3.connect(args.datastore)
|
||||
) as moonstream_datastore:
|
||||
last_saved_block = get_last_saved_block(moonstream_datastore)
|
||||
if last_saved_block >= bounds.starting_block:
|
||||
logger.info(
|
||||
f"Skipping blocks {bounds.starting_block}-{last_saved_block}, starting from {last_saved_block + 1}"
|
||||
)
|
||||
bounds.starting_block = last_saved_block + 1
|
||||
|
||||
crawl_erc721_labels(
|
||||
db_session,
|
||||
moonstream_datastore,
|
||||
|
|
|
@ -380,6 +380,21 @@ def insert_events(
|
|||
conn.commit()
|
||||
|
||||
|
||||
def get_last_saved_block(conn: sqlite3.Connection, blockchain_type: str) -> int:
|
||||
"""
|
||||
Returns the last block number that was saved to the database.
|
||||
"""
|
||||
cur = conn.cursor()
|
||||
|
||||
query = f"SELECT MAX(block_number) FROM transactions WHERE blockchain_type = '{blockchain_type}'"
|
||||
|
||||
cur.execute(query)
|
||||
if cur.fetchone()[0] is None:
|
||||
return 0
|
||||
|
||||
return cur.fetchone()[0]
|
||||
|
||||
|
||||
def setup_database(conn: sqlite3.Connection) -> None:
|
||||
"""
|
||||
Sets up the schema of the Moonstream NFTs dataset in the given SQLite database.
|
||||
|
|
|
@ -207,6 +207,8 @@ def crawl_erc721_labels(
|
|||
)
|
||||
)
|
||||
|
||||
logger.info(f"Found {labels.count()} labels")
|
||||
|
||||
transactions = []
|
||||
events = []
|
||||
for label in labels:
|
||||
|
@ -217,6 +219,6 @@ def crawl_erc721_labels(
|
|||
events.append(parse_event(label))
|
||||
insert_transactions(conn, transactions)
|
||||
insert_events(conn, events)
|
||||
logger.info(f"Found {len(events)} events and {len(transactions)} transactions")
|
||||
logger.info(f"Saved {len(events)} events and {len(transactions)} transactions")
|
||||
pbar.update(batch_end - current_block + 1)
|
||||
current_block = batch_end + 1
|
||||
|
|
Ładowanie…
Reference in New Issue