From 2691d62b79d8df4aae7f74e2651e206d59d6d84e Mon Sep 17 00:00:00 2001 From: Yhtyyar Sahatov Date: Mon, 4 Apr 2022 17:37:21 +0300 Subject: [PATCH] Added checkpointing --- datasets/nfts/nfts/cli.py | 9 ++++++++- datasets/nfts/nfts/datastore.py | 15 +++++++++++++++ datasets/nfts/nfts/materialize.py | 4 +++- 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/datasets/nfts/nfts/cli.py b/datasets/nfts/nfts/cli.py index 1730949a..6bd62d77 100644 --- a/datasets/nfts/nfts/cli.py +++ b/datasets/nfts/nfts/cli.py @@ -11,7 +11,7 @@ from moonstreamdb.db import yield_db_session_ctx from moonstreamdb.models import EthereumLabel, PolygonLabel from .data import BlockBounds -from .datastore import setup_database +from .datastore import setup_database, get_last_saved_block from .derive import ( current_owners, current_market_values, @@ -74,6 +74,13 @@ def handle_materialize(args: argparse.Namespace) -> None: with yield_db_session_ctx() as db_session, contextlib.closing( sqlite3.connect(args.datastore) ) as moonstream_datastore: + last_saved_block = get_last_saved_block(moonstream_datastore) + if last_saved_block >= bounds.starting_block: + logger.info( + f"Skipping blocks {bounds.starting_block}-{last_saved_block}, starting from {last_saved_block + 1}" + ) + bounds.starting_block = last_saved_block + 1 + crawl_erc721_labels( db_session, moonstream_datastore, diff --git a/datasets/nfts/nfts/datastore.py b/datasets/nfts/nfts/datastore.py index 156a8b43..59415a06 100644 --- a/datasets/nfts/nfts/datastore.py +++ b/datasets/nfts/nfts/datastore.py @@ -380,6 +380,21 @@ def insert_events( conn.commit() +def get_last_saved_block(conn: sqlite3.Connection, blockchain_type: str) -> int: + """ + Returns the last block number that was saved to the database. + """ + cur = conn.cursor() + + query = f"SELECT MAX(block_number) FROM transactions WHERE blockchain_type = '{blockchain_type}'" + + cur.execute(query) + if cur.fetchone()[0] is None: + return 0 + + return cur.fetchone()[0] + + def setup_database(conn: sqlite3.Connection) -> None: """ Sets up the schema of the Moonstream NFTs dataset in the given SQLite database. diff --git a/datasets/nfts/nfts/materialize.py b/datasets/nfts/nfts/materialize.py index 29e1e59e..0095b109 100644 --- a/datasets/nfts/nfts/materialize.py +++ b/datasets/nfts/nfts/materialize.py @@ -207,6 +207,8 @@ def crawl_erc721_labels( ) ) + logger.info(f"Found {labels.count()} labels") + transactions = [] events = [] for label in labels: @@ -217,6 +219,6 @@ def crawl_erc721_labels( events.append(parse_event(label)) insert_transactions(conn, transactions) insert_events(conn, events) - logger.info(f"Found {len(events)} events and {len(transactions)} transactions") + logger.info(f"Saved {len(events)} events and {len(transactions)} transactions") pbar.update(batch_end - current_block + 1) current_block = batch_end + 1