diff --git a/datasets/nfts/nfts/cli.py b/datasets/nfts/nfts/cli.py index d0ea90b4..25e6e6ea 100644 --- a/datasets/nfts/nfts/cli.py +++ b/datasets/nfts/nfts/cli.py @@ -1,5 +1,6 @@ import argparse import contextlib +from enum import Enum import logging import os import sqlite3 @@ -7,10 +8,10 @@ from shutil import copyfile from typing import Optional from moonstreamdb.db import yield_db_session_ctx +from moonstreamdb.models import EthereumLabel, PolygonLabel -from .enrich import EthereumBatchloader, enrich -from .data import EventType, event_types, nft_event, BlockBounds -from .datastore import setup_database, import_data, filter_data +from .data import BlockBounds +from .datastore import setup_database from .derive import ( current_owners, current_market_values, @@ -22,7 +23,7 @@ from .derive import ( transfer_holding_times, transfers_mints_connection_table, ) -from .materialize import create_dataset +from .materialize import crawl_erc721_labels logging.basicConfig(level=logging.INFO) @@ -42,94 +43,43 @@ derive_functions = { } +class Blockchain(Enum): + ETHEREUM = "ethereum" + POLYGON = "polygon" + + def handle_initdb(args: argparse.Namespace) -> None: with contextlib.closing(sqlite3.connect(args.datastore)) as conn: setup_database(conn) -def handle_import_data(args: argparse.Namespace) -> None: - event_type = nft_event(args.type) - with contextlib.closing( - sqlite3.connect(args.target) - ) as target_conn, contextlib.closing(sqlite3.connect(args.source)) as source_conn: - import_data(target_conn, source_conn, event_type, args.batch_size) - - -def handle_filter_data(args: argparse.Namespace) -> None: - - with contextlib.closing(sqlite3.connect(args.source)) as source_conn: - - if args.target == args.source and args.source is not None: - sqlite_path = f"{args.target}.dump" - else: - sqlite_path = args.target - - print(f"Creating new database:{sqlite_path}") - - copyfile(args.source, sqlite_path) - - # do connection - with contextlib.closing(sqlite3.connect(sqlite_path)) as source_conn: - print("Start filtering") - filter_data( - source_conn, - start_time=args.start_time, - end_time=args.end_time, - ) - print("Filtering end.") - for index, function_name in enumerate(derive_functions.keys()): - print( - f"Derive process {function_name} {index+1}/{len(derive_functions.keys())}" - ) - derive_functions[function_name](source_conn) - - # Apply derive to new data - - def handle_materialize(args: argparse.Namespace) -> None: - event_type = nft_event(args.type) + bounds: Optional[BlockBounds] = None if args.start is not None: bounds = BlockBounds(starting_block=args.start, ending_block=args.end) elif args.end is not None: raise ValueError("You cannot set --end unless you also set --start") - batch_loader = EthereumBatchloader(jsonrpc_url=args.jsonrpc) - logger.info(f"Materializing NFT events to datastore: {args.datastore}") logger.info(f"Block bounds: {bounds}") + label_model = ( + EthereumLabel if args.blockchain == Blockchain.ETHEREUM else PolygonLabel + ) + + print(label_model) + with yield_db_session_ctx() as db_session, contextlib.closing( sqlite3.connect(args.datastore) ) as moonstream_datastore: - create_dataset( - moonstream_datastore, + crawl_erc721_labels( db_session, - event_type, - bounds, - args.batch_size, - ) - - -def handle_enrich(args: argparse.Namespace) -> None: - - batch_loader = EthereumBatchloader(jsonrpc_url=args.jsonrpc) - - logger.info(f"Enriching NFT events in datastore: {args.datastore}") - - with contextlib.closing(sqlite3.connect(args.datastore)) as moonstream_datastore: - enrich( moonstream_datastore, - EventType.TRANSFER, - batch_loader, - args.batch_size, - ) - - enrich( - moonstream_datastore, - EventType.MINT, - batch_loader, - args.batch_size, + label_model, + start_block=bounds.starting_block, + end_block=bounds.starting_block + 500000, + batch_size=10000, ) @@ -186,18 +136,7 @@ def main() -> None: required=True, help="Path to SQLite database representing the dataset", ) - parser_materialize.add_argument( - "--jsonrpc", - default=default_web3_provider, - type=str, - help=f"Http uri provider to use when collecting data directly from the Ethereum blockchain (default: {default_web3_provider})", - ) - parser_materialize.add_argument( - "-t", - "--type", - choices=event_types, - help="Type of event to materialize intermediate data for", - ) + parser_materialize.add_argument( "--start", type=int, default=None, help="Starting block number" ) @@ -211,6 +150,14 @@ def main() -> None: default=1000, help="Number of events to process per batch", ) + + parser_materialize.add_argument( + "--blockchain", + type=Blockchain, + choices=[Blockchain.ETHEREUM, Blockchain.POLYGON], + help="Blockchain to use", + ) + parser_materialize.set_defaults(func=handle_materialize) parser_derive = subcommands.add_parser( @@ -231,86 +178,6 @@ def main() -> None: ) parser_derive.set_defaults(func=handle_derive) - parser_import_data = subcommands.add_parser( - "import-data", - description="Import data from another source NFTs dataset datastore. This operation is performed per table, and replaces the existing table in the target datastore.", - ) - parser_import_data.add_argument( - "--target", - required=True, - help="Datastore into which you want to import data", - ) - parser_import_data.add_argument( - "--source", required=True, help="Datastore from which you want to import data" - ) - parser_import_data.add_argument( - "--type", - required=True, - choices=event_types, - help="Type of data you would like to import from source to target", - ) - parser_import_data.add_argument( - "-N", - "--batch-size", - type=int, - default=10000, - help="Batch size for database commits into target datastore.", - ) - parser_import_data.set_defaults(func=handle_import_data) - - # Create dump of filtered data - - parser_filtered_copy = subcommands.add_parser( - "filter-data", - description="Create copy of database with applied filters.", - ) - parser_filtered_copy.add_argument( - "--target", - required=True, - help="Datastore into which you want to import data", - ) - parser_filtered_copy.add_argument( - "--source", required=True, help="Datastore from which you want to import data" - ) - parser_filtered_copy.add_argument( - "--start-time", - required=False, - type=int, - help="Start timestamp.", - ) - parser_filtered_copy.add_argument( - "--end-time", - required=False, - type=int, - help="End timestamp.", - ) - - parser_filtered_copy.set_defaults(func=handle_filter_data) - - parser_enrich = subcommands.add_parser( - "enrich", description="enrich dataset from geth node" - ) - parser_enrich.add_argument( - "-d", - "--datastore", - required=True, - help="Path to SQLite database representing the dataset", - ) - parser_enrich.add_argument( - "--jsonrpc", - default=default_web3_provider, - type=str, - help=f"Http uri provider to use when collecting data directly from the Ethereum blockchain (default: {default_web3_provider})", - ) - parser_enrich.add_argument( - "-n", - "--batch-size", - type=int, - default=1000, - help="Number of events to process per batch", - ) - parser_enrich.set_defaults(func=handle_enrich) - args = parser.parse_args() args.func(args) diff --git a/datasets/nfts/nfts/data.py b/datasets/nfts/nfts/data.py index 7358dcd4..5190e62f 100644 --- a/datasets/nfts/nfts/data.py +++ b/datasets/nfts/nfts/data.py @@ -4,7 +4,7 @@ Data structures used in (and as part of the maintenance of) the Moonstream NFTs from dataclasses import dataclass from enum import Enum from os import name -from typing import Optional +from typing import Any, Dict, Optional, Union @dataclass @@ -13,38 +13,59 @@ class BlockBounds: ending_block: Optional[int] = None -class EventType(Enum): - TRANSFER = "nft_transfer" - MINT = "nft_mint" - ERC721 = "erc721" - - -event_types = {event_type.value: event_type for event_type in EventType} - - -def nft_event(raw_event: str) -> EventType: - try: - return event_types[raw_event] - except KeyError: - raise ValueError(f"Unknown nft event type: {raw_event}") +@dataclass +class NftTransaction: + blockchain_type: str + block_number: int + block_timestamp: int + transaction_hash: str + contract_address: str + caller_address: str + function_name: str + function_args: Union[Dict[str, Any], str] + gas_used: int + gas_price: int + value: int + status: int + max_fee_per_gas: Optional[int] = None + max_priority_fee_per_gas: Optional[int] = None @dataclass -class NFTEvent: - event_id: str - event_type: EventType - nft_address: str +class NftApprovalEvent: + blockchain_type: str + owner: str + approved: str token_id: str + transaction_hash: str + log_index: int + + +@dataclass +class NftApprovalForAllEvent: + blockchain_type: str + owner: str + approved: str + operator: str + transaction_hash: str + log_index: int + + +@dataclass +class NftTransferEvent: + blockchain_type: str from_address: str to_address: str + token_id: str transaction_hash: str - value: Optional[int] = None - block_number: Optional[int] = None - timestamp: Optional[int] = None + log_index: int @dataclass -class NFTMetadata: - address: str - name: str - symbol: str +class Erc20TransferEvent: + blockchain_type: str + from_address: str + to_address: str + value: int + transaction_hash: str + log_index: int diff --git a/datasets/nfts/nfts/datastore.py b/datasets/nfts/nfts/datastore.py index 0f3f361e..06872af7 100644 --- a/datasets/nfts/nfts/datastore.py +++ b/datasets/nfts/nfts/datastore.py @@ -2,195 +2,370 @@ This module provides tools to interact with and maintain a SQLite database which acts/should act as a datastore for a Moonstream NFTs dataset. """ +from ctypes import Union +import json import logging import sqlite3 from typing import Any, cast, List, Tuple, Optional from tqdm import tqdm - -from .data import EventType, NFTEvent, NFTMetadata +from .data import ( + NftTransaction, + NftApprovalEvent, + NftTransferEvent, + NftApprovalForAllEvent, + Erc20TransferEvent, +) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -event_tables = {EventType.TRANSFER: "transfers", EventType.MINT: "mints"} - -CREATE_NFTS_TABLE_QUERY = """CREATE TABLE IF NOT EXISTS nfts - ( - address TEXT NOT NULL UNIQUE ON CONFLICT FAIL, - name TEXT, - symbol TEXT, - UNIQUE(address, name, symbol) - ); -""" - -BACKUP_NFTS_TABLE_QUERY = "ALTER TABLE nfts RENAME TO nfts_backup;" -DROP_BACKUP_NFTS_TABLE_QUERY = "DROP TABLE IF EXISTS nfts_backup;" -SELECT_NFTS_QUERY = "SELECT address, name, symbol FROM nfts;" - -CREATE_CHECKPOINT_TABLE_QUERY = """CREATE TABLE IF NOT EXISTS checkpoint - ( - event_type STRING, - offset INTEGER - ); -""" - - -def create_events_table_query(event_type: EventType) -> str: +def create_transactions_table_query(tabel_name) -> str: creation_query = f""" -CREATE TABLE IF NOT EXISTS {event_tables[event_type]} - ( - event_id TEXT NOT NULL UNIQUE ON CONFLICT FAIL, - transaction_hash TEXT, - block_number INTEGER, - nft_address TEXT REFERENCES nfts(address), - token_id TEXT, - from_address TEXT, - to_address TEXT, - transaction_value INTEGER, - timestamp INTEGER +CREATE TABLE IF NOT EXISTS {tabel_name} + ( + blockchainType TEXT NOT NULL, + transactionHash TEXT NOT NULL, + blockNumber INTEGER NOT NULL, + blockTimestamp INTEGER NOT NULL, + contractAddress TEXT, + from_address TEXT NOT NULL, + functionName TEXT NOT NULL, + functionArgs JSON NOT NULL, + value INTEGER NOT NULL, + gasUsed INTEGER NOT NULL, + gasPrice INTEGER NOT NULL, + maxFeePerGas INTEGER, + maxPriorityFeePerGas INTEGER, + UNIQUE(blockchainType, transactionHash) ); """ return creation_query -def backup_events_table_query(event_type: EventType) -> str: - backup_query = f"ALTER TABLE {event_tables[event_type]} RENAME TO {event_tables[event_type]}_backup;" - return backup_query - - -def drop_backup_events_table_query(event_type: EventType) -> str: - drop_query = f"DROP TABLE IF EXISTS {event_tables[event_type]}_backup;" - return drop_query - - -def select_events_table_query(event_type: EventType) -> str: - selection_query = f""" -SELECT - event_id, - transaction_hash, - nft_address, - token_id, - from_address, - to_address, - transaction_value, - block_number, - timestamp -FROM {event_tables[event_type]}; +def create_approvals_table_query(tabel_name) -> str: + creation_query = f""" +CREATE TABLE IF NOT EXISTS {tabel_name} + ( + blockchainType TEXT NOT NULL, + owner TEXT NOT NULL, + approved TEXT NOT NULL, + tokenId TEXT NOT NULL, + transactionHash TEXT NOT NULL, + logIndex INTEGER NOT NULL, + UNIQUE(blockchainType, transactionHash, logIndex) + ); """ - - return selection_query + return creation_query -def get_events_for_enrich( - conn: sqlite3.Connection, event_type: EventType -) -> List[NFTEvent]: - def select_query(event_type: EventType) -> str: - selection_query = f""" - SELECT - event_id, - transaction_hash, - block_number, - nft_address, - token_id, +def create_approval_for_all_table_query(tabel_name) -> str: + creation_query = f""" +CREATE TABLE IF NOT EXISTS {tabel_name} + ( + blockchainType TEXT NOT NULL, + owner TEXT NOT NULL, + approved BOOL NOT NULL, + operator TEXT NOT NULL, + transactionHash TEXT NOT NULL, + logIndex INTEGER NOT NULL, + UNIQUE(blockchainType, transactionHash, logIndex) + ); + """ + return creation_query + + +def create_transfers_table_query(tabel_name) -> str: + creation_query = f""" +CREATE TABLE IF NOT EXISTS {tabel_name} + ( + blockchainType TEXT NOT NULL, + from_address TEXT NOT NULL, + to_address TEXT NOT NULL, + tokenId TEXT NOT NULL, + transactionHash TEXT NOT NULL, + logIndex INTEGER NOT NULL, + UNIQUE(blockchainType, transactionHash, logIndex) + ); + """ + return creation_query + + +def create_erc20_transfers_table_query(tabel_name) -> str: + creation_query = f""" +CREATE TABLE IF NOT EXISTS {tabel_name} + ( + blockchainType TEXT NOT NULL, + from_address TEXT NOT NULL, + to_address TEXT NOT NULL, + value INTEGER NOT NULL, + transactionHash TEXT NOT NULL, + logIndex INTEGER NOT NULL, + UNIQUE(blockchainType, transactionHash, logIndex) + ); + """ + return creation_query + + +def insertTransactionQuery(tabel_name): + query = f""" +INSERT INTO {tabel_name} + ( + blockchainType, + transactionHash, + blockNumber, + blockTimestamp, + contractAddress, + from_address, + functionName, + functionArgs, + value, + gasUsed, + gasPrice, + maxFeePerGas, + maxPriorityFeePerGas + ) +VALUES + ( + ?,?,?,?,?,?,?,?,?,?,?,?,? + ); + """ + return query + + +def insert_nft_approval_query(tabel_name): + query = f""" +INSERT INTO {tabel_name} + ( + blockchainType, + owner, + approved, + tokenId, + transactionHash, + logIndex + ) +VALUES + ( + ?,?,?,?,?,? + ); + """ + return query + + +def insert_nft_approval_for_all_query(tabel_name): + query = f""" +INSERT INTO {tabel_name} + ( + blockchainType, + owner, + approved, + operator, + transactionHash, + logIndex + ) +VALUES + ( + ?,?,?,?,?,? + ); + """ + return query + + +def insert_nft_transfers_query(tabel_name): + query = f""" +INSERT INTO {tabel_name} + ( + blockchainType, from_address, to_address, - transaction_value, - timestamp - FROM {event_tables[event_type]} WHERE block_number = 'None'; - """ + tokenId, + transactionHash, + logIndex + ) +VALUES - return selection_query - - logger.info(f"Loading {event_tables[event_type]} table events for enrich") - cur = conn.cursor() - cur.execute(select_query(event_type)) - - events: List[NFTEvent] = [] - - for row in cur: - ( - event_id, - transaction_hash, - block_number, - nft_address, - token_id, - from_address, - to_address, - value, - timestamp, - ) = cast( - Tuple[ - str, - str, - Optional[int], - str, - str, - str, - str, - Optional[int], - Optional[int], - ], - row, - ) - event = NFTEvent( - event_id=event_id, - event_type=event_type, # Original argument to this function - nft_address=nft_address, - token_id=token_id, - from_address=from_address, - to_address=to_address, - transaction_hash=transaction_hash, - value=value, - block_number=block_number, - timestamp=timestamp, - ) - events.append(event) - logger.info(f"Found {len(events)} events to enrich") - return events + ( + ?,?,?,?,?,? + ); + """ + return query -def update_events_batch(conn: sqlite3.Connection, events: List[NFTEvent]) -> None: - def replace_query(event_type: EventType) -> str: - query = f""" - REPLACE INTO {event_tables[event_type]}( - event_id, - transaction_hash, - block_number, - nft_address, - token_id, +def insert_erc20_transfer_query(tabel_name): + query = f""" +INSERT INTO {tabel_name} + ( + blockchainType, from_address, to_address, - transaction_value, - timestamp - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) - """ - return query + value, + transactionHash, + logIndex + ) +VALUES + ( + ?,?,?,?,?,? + ); + """ + return query - logger.info("Updating events in sqlite") + +def create_blockchain_type_index_query(tabel_name) -> str: + creation_query = f""" +CREATE INDEX IF NOT EXISTS {tabel_name}_blockchainType ON {tabel_name} (blockchainType); + """ + return creation_query + + +def nft_transaction_to_tuple(nft_transaction: NftTransaction) -> Tuple[Any]: + """ + Converts a NftTransaction object to a tuple which can be inserted into the database. + """ + return ( + nft_transaction.blockchain_type, + nft_transaction.transaction_hash, + nft_transaction.block_number, + nft_transaction.block_timestamp, + nft_transaction.contract_address, + nft_transaction.caller_address, + nft_transaction.function_name, + json.dumps(nft_transaction.function_args), + str(nft_transaction.value), + str(nft_transaction.gas_used), + str(nft_transaction.gas_price), + str(nft_transaction.max_fee_per_gas), + str(nft_transaction.max_priority_fee_per_gas), + ) + + +def nft_approval_to_tuple(nft_approval: NftApprovalEvent) -> Tuple[Any]: + """ + Converts a NftApprovalEvent object to a tuple which can be inserted into the database. + """ + return ( + nft_approval.blockchain_type, + nft_approval.owner, + nft_approval.approved, + str(nft_approval.token_id), + nft_approval.transaction_hash, + nft_approval.log_index, + ) + + +def nft_approval_for_all_to_tuple( + nft_approval_for_all: NftApprovalForAllEvent, +) -> Tuple[Any]: + """ + Converts a NftApprovalForAllEvent object to a tuple which can be inserted into the database. + """ + return ( + nft_approval_for_all.blockchain_type, + nft_approval_for_all.owner, + nft_approval_for_all.approved, + nft_approval_for_all.operator, + nft_approval_for_all.transaction_hash, + nft_approval_for_all.log_index, + ) + + +def nft_transfer_to_tuple(nft_transfer: NftTransferEvent) -> Tuple[Any]: + """ + Converts a NftTransferEvent object to a tuple which can be inserted into the database. + """ + return ( + nft_transfer.blockchain_type, + nft_transfer.from_address, + nft_transfer.to_address, + str(nft_transfer.token_id), + nft_transfer.transaction_hash, + nft_transfer.log_index, + ) + + +def erc20_nft_transfer_to_tuple( + erc20_nft_transfer: Erc20TransferEvent, +) -> Tuple[Any]: + """ + Converts a Erc20NftTransferEvent object to a tuple which can be inserted into the database. + """ + return ( + erc20_nft_transfer.blockchain_type, + erc20_nft_transfer.from_address, + erc20_nft_transfer.to_address, + str(erc20_nft_transfer.value), + erc20_nft_transfer.transaction_hash, + erc20_nft_transfer.log_index, + ) + + +def insert_transactions( + conn: sqlite3.Connection, transactions: List[NftTransaction] +) -> None: + """ + Inserts the given NftTransaction objects into the database. + """ cur = conn.cursor() - try: - transfers = [ - nft_event_to_tuple(event) - for event in events - if event.event_type == EventType.TRANSFER - ] - mints = [ - nft_event_to_tuple(event) - for event in events - if event.event_type == EventType.MINT - ] + query = insertTransactionQuery("transactions") - cur.executemany(replace_query(EventType.TRANSFER), transfers) - cur.executemany(replace_query(EventType.MINT), mints) + cur.executemany( + query, + [nft_transaction_to_tuple(nft_transaction) for nft_transaction in transactions], + ) - conn.commit() - except Exception as e: - logger.error(f"FAILED TO replace!!! :{events}") - conn.rollback() - raise e + conn.commit() + + +def insert_events( + conn: sqlite3.Connection, + events: list, +) -> None: + """ + Inserts the given NftApprovalForAllEvent, NftApprovalEvent, or NftTransferEvent objects into the database. + """ + cur = conn.cursor() + + nft_transfers = [] + erc20_transfers = [] + approvals = [] + approvals_for_all = [] + + for event in events: + if isinstance(event, NftApprovalEvent): + approvals.append(nft_approval_to_tuple(event)) + elif isinstance(event, NftApprovalForAllEvent): + approvals_for_all.append(nft_approval_for_all_to_tuple(event)) + elif isinstance(event, NftTransferEvent): + nft_transfers.append(nft_transfer_to_tuple(event)) + elif isinstance(event, Erc20TransferEvent): + erc20_transfers.append(erc20_nft_transfer_to_tuple(event)) + else: + raise ValueError(f"Unknown event type: {type(event)}") + + if len(nft_transfers) > 0: + query = insert_nft_transfers_query("transfers") + cur.executemany( + query, + nft_transfers, + ) + + if len(approvals) > 0: + query = insert_nft_approval_query("approvals") + cur.executemany( + query, + approvals, + ) + + if len(approvals_for_all) > 0: + query = insert_nft_approval_for_all_query("approvals_for_all") + cur.executemany(query, approvals_for_all) + + if len(erc20_transfers) > 0: + query = insert_erc20_transfer_query("erc20_transfers") + cur.executemany(query, erc20_transfers) + + conn.commit() def setup_database(conn: sqlite3.Connection) -> None: @@ -199,266 +374,16 @@ def setup_database(conn: sqlite3.Connection) -> None: """ cur = conn.cursor() - cur.execute(CREATE_NFTS_TABLE_QUERY) - cur.execute(create_events_table_query(EventType.TRANSFER)) - cur.execute(create_events_table_query(EventType.MINT)) - cur.execute(CREATE_CHECKPOINT_TABLE_QUERY) + cur.execute(create_transactions_table_query("transactions")) + cur.execute(create_approvals_table_query("approvals")) + cur.execute(create_approval_for_all_table_query("approvals_for_all")) + cur.execute(create_transfers_table_query("transfers")) + cur.execute(create_erc20_transfers_table_query("erc20_transfers")) + + cur.execute(create_blockchain_type_index_query("transactions")) + cur.execute(create_blockchain_type_index_query("approvals")) + cur.execute(create_blockchain_type_index_query("approvals_for_all")) + cur.execute(create_blockchain_type_index_query("transfers")) + cur.execute(create_blockchain_type_index_query("erc20_transfers")) conn.commit() - - -def insert_events_query(event_type: EventType) -> str: - """ - Generates a query which inserts NFT events into the appropriate events table. - """ - query = f""" -INSERT OR IGNORE INTO {event_tables[event_type]}( - event_id, - transaction_hash, - block_number, - nft_address, - token_id, - from_address, - to_address, - transaction_value, - timestamp -) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) - """ - return query - - -def nft_event_to_tuple( - event: NFTEvent, -) -> Tuple[str, str, str, str, str, str, str, str, str]: - """ - Converts an NFT event into a tuple for use with sqlite cursor executemany. This includes - dropping e.g. the event_type field. - """ - return ( - str(event.event_id), - str(event.transaction_hash), - str(event.block_number), - str(event.nft_address), - str(event.token_id), - str(event.from_address), - str(event.to_address), - str(event.value), - str(event.timestamp), - ) - - -def get_checkpoint_offset( - conn: sqlite3.Connection, event_type: EventType -) -> Optional[int]: - cur = conn.cursor() - response = cur.execute( - f"SELECT * from checkpoint where event_type='{event_type.value}' order by rowid desc limit 1" - ) - for row in response: - return row[1] - return None - - -def delete_checkpoints( - conn: sqlite3.Connection, event_type: EventType, commit: bool = True -) -> None: - cur = conn.cursor() - cur.execute(f"DELETE FROM checkpoint where event_type='{event_type.value}';") - if commit: - try: - conn.commit() - except: - conn.rollback() - raise - - -def insert_checkpoint(conn: sqlite3.Connection, event_type: EventType, offset: int): - query = f""" -INSERT INTO checkpoint ( - event_type, - offset -) VALUES (?, ?) - """ - cur = conn.cursor() - cur.execute(query, [event_type.value, offset]) - conn.commit() - - -def insert_address_metadata( - conn: sqlite3.Connection, metadata_list: List[NFTMetadata] -) -> None: - cur = conn.cursor() - query = f""" -INSERT INTO nfts ( - address, - name, - symbol -) VALUES (?, ?, ?) - """ - try: - nfts = [ - (metadata.address, metadata.name, metadata.symbol) - for metadata in metadata_list - ] - cur.executemany(query, nfts) - conn.commit() - except Exception as e: - logger.error(f"Failed to save :\n {metadata_list}") - conn.rollback() - raise e - - -def insert_events(conn: sqlite3.Connection, events: List[NFTEvent]) -> None: - """ - Inserts the given events into the appropriate events table in the given SQLite database. - - This method works with batches of events. - """ - cur = conn.cursor() - try: - transfers = [ - nft_event_to_tuple(event) - for event in events - if event.event_type == EventType.TRANSFER - ] - - mints = [ - nft_event_to_tuple(event) - for event in events - if event.event_type == EventType.MINT - ] - - cur.executemany(insert_events_query(EventType.TRANSFER), transfers) - cur.executemany(insert_events_query(EventType.MINT), mints) - - conn.commit() - except Exception as e: - logger.error(f"FAILED TO SAVE :{events}") - conn.rollback() - raise e - - -def import_data( - target_conn: sqlite3.Connection, - source_conn: sqlite3.Connection, - event_type: EventType, - batch_size: int = 1000, -) -> None: - """ - Imports the data correspondong to the given event type from the source database into the target - database. - - Any existing data of that type in the target database is first deleted. It is a good idea to - create a backup copy of your target database before performing this operation. - """ - target_cur = target_conn.cursor() - drop_backup_query = DROP_BACKUP_NFTS_TABLE_QUERY - backup_table_query = BACKUP_NFTS_TABLE_QUERY - create_table_query = CREATE_NFTS_TABLE_QUERY - source_selection_query = SELECT_NFTS_QUERY - if event_type != EventType.ERC721: - drop_backup_query = drop_backup_events_table_query(event_type) - backup_table_query = backup_events_table_query(event_type) - create_table_query = create_events_table_query(event_type) - source_selection_query = select_events_table_query(event_type) - - target_cur.execute(drop_backup_query) - target_cur.execute(backup_table_query) - target_cur.execute(create_table_query) - target_conn.commit() - - source_cur = source_conn.cursor() - source_cur.execute(source_selection_query) - - batch: List[Any] = [] - - for row in tqdm(source_cur, desc="Rows processed"): - if event_type == EventType.ERC721: - batch.append(NFTMetadata(*cast(Tuple[str, str, str], row))) - else: - # Order matches select query returned by select_events_table_query - ( - event_id, - transaction_hash, - nft_address, - token_id, - from_address, - to_address, - value, - block_number, - timestamp, - ) = cast( - Tuple[ - str, - str, - str, - str, - str, - str, - Optional[int], - Optional[int], - Optional[int], - ], - row, - ) - event = NFTEvent( - event_id=event_id, - event_type=event_type, # Original argument to this function - nft_address=nft_address, - token_id=token_id, - from_address=from_address, - to_address=to_address, - transaction_hash=transaction_hash, - value=value, - block_number=block_number, - timestamp=timestamp, - ) - batch.append(event) - - if len(batch) == batch_size: - if event_type == EventType.ERC721: - insert_address_metadata(target_conn, cast(List[NFTMetadata], batch)) - else: - insert_events(target_conn, cast(List[NFTEvent], batch)) - - if event_type == EventType.ERC721: - insert_address_metadata(target_conn, cast(List[NFTMetadata], batch)) - else: - insert_events(target_conn, cast(List[NFTEvent], batch)) - - target_cur.execute(CREATE_CHECKPOINT_TABLE_QUERY) - target_conn.commit() - - source_offset = get_checkpoint_offset(source_conn, event_type) - if source_offset is not None: - delete_checkpoints(target_conn, event_type, commit=False) - insert_checkpoint(target_conn, event_type, source_offset) - - -def filter_data( - sqlite_db: sqlite3.Connection, - start_time: Optional[int] = None, - end_time: Optional[int] = None, -): - """ - Run Deletes query depends on filters - """ - - cur = sqlite_db.cursor() - print(f"Remove by timestamp < {start_time}") - if start_time: - cur.execute(f"DELETE from transfers where timestamp < {start_time}") - print(f"Transfers filtered out: {cur.rowcount}") - sqlite_db.commit() - cur.execute(f"DELETE from mints where timestamp < {start_time}") - print(f"Mints filtered out: {cur.rowcount}") - sqlite_db.commit() - - print(f"Remove by timestamp > {end_time}") - if end_time: - cur.execute(f"DELETE from transfers where timestamp > {end_time}") - print(f"Transfers filtered out: {cur.rowcount}") - sqlite_db.commit() - cur.execute(f"DELETE from mints where timestamp > {end_time}") - print(f"Mints filtered out: {cur.rowcount}") - sqlite_db.commit() diff --git a/datasets/nfts/nfts/enrich.py b/datasets/nfts/nfts/enrich.py index 41410674..60387d2b 100644 --- a/datasets/nfts/nfts/enrich.py +++ b/datasets/nfts/nfts/enrich.py @@ -6,156 +6,4 @@ import json from tqdm import tqdm import requests -from .data import BlockBounds, EventType, NFTEvent, event_types -from .datastore import ( - get_checkpoint_offset, - get_events_for_enrich, - insert_address_metadata, - insert_checkpoint, - insert_events, - update_events_batch, -) - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - - -class EthereumBatchloader: - def __init__(self, jsonrpc_url) -> None: - self.jsonrpc_url = jsonrpc_url - self.message_number = 0 - self.commands: List[Any] = [] - self.requests_banch: List[Any] = [] - - def load_blocks(self, block_list: List[int], with_transactions: bool): - """ - Request list of blocks - """ - rpc = [ - { - "jsonrpc": "2.0", - "id": index, - "method": "eth_getBlockByNumber", - "params": params_single, - } - for index, params_single in enumerate( - [[hex(block_number), with_transactions] for block_number in block_list] - ) - ] - response = self.send_json_message(rpc) - return response - - def load_transactions(self, transaction_hashes: List[str]): - """ - Request list of transactions - """ - - rpc = [ - { - "jsonrpc": "2.0", - "method": "eth_getTransactionByHash", - "id": index, - "params": [tx_hash], - } - for index, tx_hash in enumerate(transaction_hashes) - ] - response = self.send_json_message(rpc) - return response - - def send_message(self, payload): - headers = {"Content-Type": "application/json"} - - try: - r = requests.post( - self.jsonrpc_url, headers=headers, data=payload, timeout=300 - ) - except Exception as e: - print(e) - raise e - return r - - def send_json_message(self, message): - encoded_json = json.dumps(message) - raw_response = self.send_message(encoded_json.encode("utf8")) - response = raw_response.json() - return response - - -def enrich_from_web3( - nft_events: List[NFTEvent], - batch_loader: EthereumBatchloader, -) -> List[NFTEvent]: - """ - Adds block number, value, timestamp from web3 if they are None (because that transaction is missing in db) - """ - transactions_to_query = set() - indices_to_update: List[int] = [] - for index, nft_event in enumerate(nft_events): - if ( - nft_event.block_number == "None" - or nft_event.value == "None" - or nft_event.timestamp == "None" - ): - transactions_to_query.add(nft_event.transaction_hash) - indices_to_update.append(index) - - if len(transactions_to_query) == 0: - return nft_events - logger.info("Calling JSON RPC API") - jsonrpc_transactions_response = batch_loader.load_transactions( - list(transactions_to_query) - ) - - transactions_map = { - result["result"]["hash"]: ( - int(result["result"]["value"], 16), - int(result["result"]["blockNumber"], 16), - ) - for result in jsonrpc_transactions_response - } - - blocks_to_query: Set[int] = set() - for index in indices_to_update: - nft_events[index].value, nft_events[index].block_number = transactions_map[ - nft_events[index].transaction_hash - ] - blocks_to_query.add(cast(int, nft_events[index].block_number)) - - if len(blocks_to_query) == 0: - return nft_events - jsonrpc_blocks_response = batch_loader.load_blocks(list(blocks_to_query), False) - blocks_map = { - int(result["result"]["number"], 16): int(result["result"]["timestamp"], 16) - for result in jsonrpc_blocks_response - } - for index in indices_to_update: - nft_events[index].timestamp = blocks_map[cast(int, nft_event.block_number)] - - return nft_events - - -def enrich( - datastore_conn: sqlite3.Connection, - event_type: EventType, - batch_loader: EthereumBatchloader, - batch_size: int = 1000, -) -> None: - events = get_events_for_enrich(datastore_conn, event_type) - events_batch = [] - for event in tqdm(events, f"Processing events for {event_type.value} event type"): - events_batch.append(event) - if len(events_batch) == batch_size: - logger.info("Getting data from JSONrpc") - enriched_events = enrich_from_web3( - events_batch, - batch_loader, - ) - update_events_batch(datastore_conn, enriched_events) - events_batch = [] - - logger.info("Getting data from JSONrpc") - enriched_events = enrich_from_web3( - events_batch, - batch_loader, - ) - update_events_batch(datastore_conn, enriched_events) +from .data import BlockBounds diff --git a/datasets/nfts/nfts/materialize.py b/datasets/nfts/nfts/materialize.py index 9edc2774..852b48d4 100644 --- a/datasets/nfts/nfts/materialize.py +++ b/datasets/nfts/nfts/materialize.py @@ -1,190 +1,215 @@ import logging import sqlite3 -from typing import Any, cast, Iterator, List, Optional, Set +from typing import Any, Dict, Union, cast, Iterator, List, Optional, Set import json +from attr import dataclass from moonstreamdb.models import ( EthereumLabel, - EthereumTransaction, - EthereumBlock, + PolygonLabel, ) + from sqlalchemy import or_, and_ from sqlalchemy.orm import Session from tqdm import tqdm from web3 import Web3 -import requests -from .data import BlockBounds, EventType, NFTEvent, NFTMetadata, event_types -from .datastore import ( - get_checkpoint_offset, - insert_address_metadata, - insert_checkpoint, - insert_events, +from .data import ( + NftApprovalEvent, + NftApprovalForAllEvent, + NftTransaction, + NftTransferEvent, + Erc20TransferEvent, ) +from .datastore import insert_events, insert_transactions logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) +ERC721_LABEL = "erc721" -def add_events( - datastore_conn: sqlite3.Connection, - db_session: Session, - event_type: EventType, - initial_offset=0, - bounds: Optional[BlockBounds] = None, - batch_size: int = 10, -) -> None: - raw_created_at_list = ( - db_session.query(EthereumLabel.created_at) - .filter(EthereumLabel.label == event_type.value) - .order_by(EthereumLabel.created_at.asc()) - .distinct(EthereumLabel.created_at) - ).all() - created_at_list = [ - created_at[0] for created_at in raw_created_at_list[initial_offset:] - ] - query = ( - db_session.query( - EthereumLabel.id, - EthereumLabel.label, - EthereumLabel.address, - EthereumLabel.label_data, - EthereumLabel.transaction_hash, - EthereumTransaction.value, - EthereumTransaction.block_number, - EthereumBlock.timestamp, - ) - .filter(EthereumLabel.label == event_type.value) - .outerjoin( - EthereumTransaction, - EthereumLabel.transaction_hash == EthereumTransaction.hash, - ) - .outerjoin( - EthereumBlock, - EthereumTransaction.block_number == EthereumBlock.block_number, - ) - .order_by( - EthereumLabel.created_at.asc(), - ) +def _get_last_labeled_erc721_block( + session: Session, label_model: Union[EthereumLabel, PolygonLabel] +) -> int: + last = ( + session.query(label_model.block_number) + .filter(label_model.label == ERC721_LABEL) + .order_by(label_model.block_number.desc()) + .first() ) - if bounds is not None: - time_filters = [EthereumTransaction.block_number >= bounds.starting_block] - if bounds.ending_block is not None: - time_filters.append(EthereumTransaction.block_number <= bounds.ending_block) - bounds_filters = [EthereumTransaction.hash == None, and_(*time_filters)] - - query = query.filter(or_(*bounds_filters)) - - pbar = tqdm(total=(len(raw_created_at_list))) - pbar.set_description(f"Processing created ats") - pbar.update(initial_offset) - batch_start = 0 - batch_end = batch_start + batch_size - while batch_start <= len(created_at_list): - - events = query.filter( - EthereumLabel.created_at.in_(created_at_list[batch_start : batch_end + 1]) - ).all() - if not events: - continue - - raw_events_batch = [] - for ( - event_id, - label, - address, - label_data, - transaction_hash, - value, - block_number, - timestamp, - ) in events: - raw_event = NFTEvent( - event_id=event_id, - event_type=event_types[label], - nft_address=address, - token_id=label_data["tokenId"], - from_address=label_data["from"], - to_address=label_data["to"], - transaction_hash=transaction_hash, - value=value, - block_number=block_number, - timestamp=timestamp, - ) - raw_events_batch.append(raw_event) - - logger.info(f"Adding {len(raw_events_batch)} to database") - insert_events( - datastore_conn, raw_events_batch - ) # TODO REMOVED WEB3 enrich, since node is down - insert_checkpoint(datastore_conn, event_type, batch_end + initial_offset) - pbar.update(batch_end - batch_start + 1) - batch_start = batch_end + 1 - batch_end = min(batch_end + batch_size, len(created_at_list)) + if last is None: + raise ValueError(f"No ERC721 labels found in {label_model.__tablename__} table") + return last[0] -def create_dataset( - datastore_conn: sqlite3.Connection, - db_session: Session, - event_type: EventType, - bounds: Optional[BlockBounds] = None, - batch_size: int = 10, -) -> None: - """ - Creates Moonstream NFTs dataset in the given SQLite datastore. - """ - offset = get_checkpoint_offset(datastore_conn, event_type) - if offset is not None: - logger.info(f"Found checkpoint for {event_type.value}: offset = {offset}") +def parse_transaction_label( + label_model: Union[EthereumLabel, PolygonLabel] +) -> NftTransaction: + assert ( + label_model.label_data["type"] == "tx_call" + ), "Expected label to be of type 'tx_call'" + + if isinstance(label_model, EthereumLabel): + blockchain_type = "ethereum" else: - offset = 0 - logger.info(f"Did not found any checkpoint for {event_type.value}") + blockchain_type = "polygon" - if event_type == EventType.ERC721: - add_contracts_metadata(datastore_conn, db_session, offset, batch_size) - else: - add_events( - datastore_conn, - db_session, - event_type, - offset, - bounds, - batch_size, - ) + # TODO: this is done because I forgot to add value in polygon labels + value = 0 + if label_model.label_data.get("value") is not None: + value = label_model.label_data["value"] - -def add_contracts_metadata( - datastore_conn: sqlite3.Connection, - db_session: Session, - initial_offset: int = 0, - batch_size: int = 1000, -) -> None: - logger.info("Adding erc721 contract metadata") - query = ( - db_session.query(EthereumLabel.label_data, EthereumLabel.address) - .filter(EthereumLabel.label == EventType.ERC721.value) - .order_by(EthereumLabel.created_at) + return NftTransaction( + blockchain_type=blockchain_type, + block_number=label_model.block_number, + block_timestamp=label_model.block_timestamp, + transaction_hash=label_model.transaction_hash, + contract_address=label_model.address, + caller_address=label_model.label_data["caller"], + function_name=label_model.label_data["name"], + function_args=label_model.label_data["args"], + gas_used=label_model.label_data["gasUsed"], + gas_price=label_model.label_data["gasPrice"], + value=value, + status=label_model.label_data["status"], + max_fee_per_gas=label_model.label_data["maxFeePerGas"], + max_priority_fee_per_gas=label_model.label_data["maxPriorityFeePerGas"], ) - offset = initial_offset - while True: - events = query.offset(offset).limit(batch_size).all() - if not events: - break - offset += len(events) - events_batch: List[NFTMetadata] = [] - for label_data, address in events: - events_batch.append( - NFTMetadata( - address=address, - name=label_data.get("name", None), - symbol=label_data.get("symbol", None), - ) +def _parse_transfer_event( + label_model: Union[EthereumLabel, PolygonLabel] +) -> NftTransferEvent: + assert ( + label_model.label_data["type"] == "event" + ), "Expected label to be of type 'event'" + assert ( + label_model.label_data["name"] == "Transfer" + ), "Expected label to be of type 'Transfer'" + + if isinstance(label_model, EthereumLabel): + blockchain_type = "ethereum" + else: + blockchain_type = "polygon" + if label_model.label_data["args"].get("tokenId") is not None: + return NftTransferEvent( + blockchain_type=blockchain_type, + from_address=label_model.label_data["args"]["from"], + to_address=label_model.label_data["args"]["to"], + token_id=label_model.label_data["args"]["tokenId"], + log_index=label_model.log_index, + transaction_hash=label_model.transaction_hash, + ) + else: + return Erc20TransferEvent( + blockchain_type=blockchain_type, + from_address=label_model.label_data["args"]["from"], + to_address=label_model.label_data["args"]["to"], + value=label_model.label_data["args"]["value"], + log_index=label_model.log_index, + transaction_hash=label_model.transaction_hash, + ) + + +def _parse_approval_event( + label_model: Union[EthereumLabel, PolygonLabel] +) -> NftApprovalEvent: + assert ( + label_model.label_data["type"] == "event" + ), "Expected label to be of type 'event'" + assert ( + label_model.label_data["name"] == "Approval" + ), "Expected label to be of type 'Approval'" + + if isinstance(label_model, EthereumLabel): + blockchain_type = "ethereum" + else: + blockchain_type = "polygon" + return NftApprovalEvent( + blockchain_type=blockchain_type, + owner=label_model.label_data["args"]["owner"], + approved=label_model.label_data["args"]["approved"], + token_id=label_model.label_data["args"]["tokenId"], + log_index=label_model.log_index, + transaction_hash=label_model.transaction_hash, + ) + + +def _parse_approval_for_all_event( + label_model: Union[EthereumLabel, PolygonLabel] +) -> NftApprovalForAllEvent: + assert ( + label_model.label_data["type"] == "event" + ), "Expected label to be of type 'event'" + assert ( + label_model.label_data["name"] == "ApprovalForAll" + ), "Expected label to be of type 'ApprovalForAll'" + + if isinstance(label_model, EthereumLabel): + blockchain_type = "ethereum" + else: + blockchain_type = "polygon" + return NftApprovalForAllEvent( + blockchain_type=blockchain_type, + owner=label_model.label_data["args"]["owner"], + operator=label_model.label_data["args"]["operator"], + approved=label_model.label_data["args"]["approved"], + log_index=label_model.log_index, + transaction_hash=label_model.transaction_hash, + ) + + +def parse_event( + label_model: Union[EthereumLabel, PolygonLabel] +) -> Union[NftTransferEvent, NftApprovalEvent, NftApprovalForAllEvent]: + if label_model.label_data["name"] == "Transfer": + return _parse_transfer_event(label_model) + elif label_model.label_data["name"] == "Approval": + return _parse_approval_event(label_model) + elif label_model.label_data["name"] == "ApprovalForAll": + return _parse_approval_for_all_event(label_model) + else: + raise ValueError(f"Unknown label type: {label_model.label_data['name']}") + + +def crawl_erc721_labels( + db_session: Session, + conn: sqlite3.Connection, + label_model: Union[EthereumLabel, PolygonLabel], + start_block: int, + end_block: int, + batch_size: int = 10000, +): + logger.info( + f"Crawling {label_model.__tablename__} from {start_block} to {end_block}" + ) + pbar = tqdm(total=(end_block - start_block + 1)) + pbar.set_description( + f"Crawling {label_model.__tablename__} blocks {start_block}-{end_block}" + ) + current_block = start_block + while current_block <= end_block: + batch_end = min(current_block + batch_size, end_block) + logger.info(f"Crawling {current_block}-{batch_end}") + labels = db_session.query(label_model).filter( + and_( + label_model.block_number >= current_block, + label_model.block_number <= batch_end, + label_model.label == ERC721_LABEL, ) - insert_address_metadata(datastore_conn, events_batch) - insert_checkpoint(datastore_conn, EventType.ERC721, offset) - logger.info(f"Already added {offset}") + ) - logger.info(f"Added total of {offset-initial_offset} nfts metadata") + transactions = [] + events = [] + for label in labels: + + if label.label_data["type"] == "tx_call": + transactions.append(parse_transaction_label(label)) + else: + events.append(parse_event(label)) + insert_transactions(conn, transactions) + insert_events(conn, events) + logger.info(f"Found {len(events)} events and {len(transactions)} transactions") + pbar.update(batch_end - current_block + 1) + current_block = batch_end + 1