From d832e95f41a5af410e2c7e7c516cc525acd2b399 Mon Sep 17 00:00:00 2001 From: yhtiyar Date: Mon, 8 Nov 2021 17:15:03 +0300 Subject: [PATCH] dataset generator for contract deployments --- datasets/nfts/contracts/__init__.py | 0 datasets/nfts/contracts/cli.py | 81 +++++++++++++++++++++++ datasets/nfts/contracts/data.py | 20 ++++++ datasets/nfts/contracts/datastore.py | 90 ++++++++++++++++++++++++++ datasets/nfts/contracts/materialize.py | 84 ++++++++++++++++++++++++ datasets/nfts/requirements.txt | 1 + 6 files changed, 276 insertions(+) create mode 100644 datasets/nfts/contracts/__init__.py create mode 100644 datasets/nfts/contracts/cli.py create mode 100644 datasets/nfts/contracts/data.py create mode 100644 datasets/nfts/contracts/datastore.py create mode 100644 datasets/nfts/contracts/materialize.py create mode 100644 datasets/nfts/requirements.txt diff --git a/datasets/nfts/contracts/__init__.py b/datasets/nfts/contracts/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/datasets/nfts/contracts/cli.py b/datasets/nfts/contracts/cli.py new file mode 100644 index 00000000..044e7d88 --- /dev/null +++ b/datasets/nfts/contracts/cli.py @@ -0,0 +1,81 @@ +import argparse +import contextlib +import logging +import os +import sqlite3 +from shutil import copyfile +from typing import Optional + + +from moonstreamdb.db import yield_db_session_ctx + +from .materialize import add_contract_deployments +from .datastore import setup_database +from .data import BlockBounds + + +def handle_initdb(args: argparse.Namespace) -> None: + with contextlib.closing(sqlite3.connect(args.datastore)) as conn: + setup_database(conn) + + +def handle_materialize(args: argparse.Namespace) -> None: + bounds: Optional[BlockBounds] = None + if args.start is not None: + bounds = BlockBounds(starting_block=args.start, ending_block=args.end) + elif args.end is not None: + raise ValueError("You cannot set --end unless you also set --start") + with yield_db_session_ctx() as db_session, contextlib.closing( + sqlite3.connect(args.datastore) + ) as datastore: + add_contract_deployments( + db_session, datastore, batch_size=args.batch_size, bounds=bounds + ) + + +def generate_arg_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="Generate a database of contracts deployed on Ethereum." + ) + subcommands = parser.add_subparsers(dest="subcommand", title="subcommands") + + parser_initdb = subcommands.add_parser( + "initdb", + description="Initialize an SQLite datastore for contract deployments", + ) + parser_initdb.add_argument( + "-d", + "--datastore", + required=True, + help="Path to SQLite database representing the dataset", + ) + parser_initdb.set_defaults(func=handle_initdb) + + parser_materialize = subcommands.add_parser( + "materialize", + description="Materialize the contract deployments database", + ) + + parser_materialize.add_argument( + "-d", + "--datastore", + required=True, + help="Path to SQLite database representing the dataset", + ) + + parser_materialize.add_argument( + "--start", type=int, default=None, help="Starting block number" + ) + parser_materialize.add_argument( + "--end", type=int, default=None, help="Ending block number" + ) + parser_materialize.add_argument( + "-n", + "--batch-size", + type=int, + default=10, + help="Number of events to process per batch", + ) + parser_materialize.set_defaults(func=handle_materialize) + + return parser diff --git a/datasets/nfts/contracts/data.py b/datasets/nfts/contracts/data.py new file mode 100644 index 00000000..13a0867b --- /dev/null +++ b/datasets/nfts/contracts/data.py @@ -0,0 +1,20 @@ +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class ContractDeployment: + address: str + block_number: int + transaction_hash: str + deployer_address: str + block_timestamp: int + gas_used: int + gas_price: int + transaction_fee: int + + +@dataclass +class BlockBounds: + starting_block: int + ending_block: Optional[int] = None diff --git a/datasets/nfts/contracts/datastore.py b/datasets/nfts/contracts/datastore.py new file mode 100644 index 00000000..452c5db4 --- /dev/null +++ b/datasets/nfts/contracts/datastore.py @@ -0,0 +1,90 @@ +import logging +import sqlite3 + +from dataclasses import dataclass +from typing import List +from .data import ContractDeployment + + +CREATE_CONTRACT_DEPLOYMENTS_TABLE_QUERY = """ +CREATE TABLE IF NOT EXISTS contract_deployments ( + id INTEGER PRIMARY KEY, + transaction_hash TEXT NOT NULL, + block_number INTEGER NOT NULL, + timestamp INTEGER NOT NULL, + contract_address TEXT NOT NULL, + deployer_address TEXT NOT NULL, + gas_used INTEGER NOT NULL, + gas_price INTEGER NOT NULL, + transaction_fee INTEGER NOT NULL, + ) +""" + +CREATE_CHECKPOINT_TABLE_QUERY = """CREATE TABLE IF NOT EXISTS checkpoint + ( + label STRING, + offset INTEGER + ); +""" + + +def setup_database(conn: sqlite3.Connection): + """ + Create the database tables if they don't exist. + """ + cur = conn.cursor() + cur.execute(CREATE_CONTRACT_DEPLOYMENTS_TABLE_QUERY) + cur.execute(CREATE_CHECKPOINT_TABLE_QUERY) + conn.commit() + + +def insert_contract_deployments( + conn: sqlite3.Connection, contract_deployments: List[ContractDeployment] +): + """ + Insert a list of contract deployments into the database. + """ + cur = conn.cursor() + for contract_deployment in contract_deployments: + cur.execute( + "INSERT INTO contract_deployments (transaction_hash, block_number, timestamp, contract_address, deployer_address, gas_used, gas_price, transaction_fee) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + ( + contract_deployment.transaction_hash, + contract_deployment.block_number, + contract_deployment.block_timestamp, + contract_deployment.address, + contract_deployment.deployer_address, + contract_deployment.gas_used, + contract_deployment.gas_price, + contract_deployment.transaction_fee, + ), + ) + conn.commit() + + +def load_checkpoint(conn: sqlite3.Connection, label: str) -> int: + """ + Load the checkpoint with the given label. + """ + cur = conn.cursor() + cur.execute( + "SELECT offset FROM checkpoint WHERE label = ?", + (label,), + ) + row = cur.fetchone() + if row is None: + return 0 + else: + return row[0] + + +def save_checkpoint(conn: sqlite3.Connection, label: str, offset: int): + """ + Save the checkpoint with the given label. + """ + cur = conn.cursor() + cur.execute( + "INSERT OR REPLACE INTO checkpoint (label, offset) VALUES (?, ?)", + (label, offset), + ) + conn.commit() diff --git a/datasets/nfts/contracts/materialize.py b/datasets/nfts/contracts/materialize.py new file mode 100644 index 00000000..4b8caad0 --- /dev/null +++ b/datasets/nfts/contracts/materialize.py @@ -0,0 +1,84 @@ +import logging +import sqlite3 +from typing import Any, cast, Iterator, List, Optional, Set +import json +from tqdm import tqdm + + +from moonstreamdb.models import EthereumLabel +from moonstreamdb.db import yield_db_session_ctx +from sqlalchemy.orm import Session +from sqlalchemy import or_, and_ + +from .datastore import load_checkpoint, save_checkpoint, insert_contract_deployments +from .data import BlockBounds, ContractDeployment + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def add_contract_deployments( + datastore_conn: sqlite3.Connection, + db_session: Session, + initial_offset=0, + bounds: Optional[BlockBounds] = None, + batch_size: int = 10, +): + """ + Get all contract deployments in a given block bound and add to sqlite3 database + """ + + raw_created_at_list = ( + db_session.query(EthereumLabel.created_at) + .filter(EthereumLabel.label == "contract_deployment") + .order_by(EthereumLabel.created_at.asc()) + .distinct(EthereumLabel.created_at) + ).all() + + created_at_list = [ + created_at[0] for created_at in raw_created_at_list[initial_offset:] + ] + + query = db_session.query(EthereumLabel).filter( + EthereumLabel.label == "contract_deployment" + ) + + if bounds is not None: + time_filters = [EthereumLabel.block_number >= bounds.starting_block] + if bounds.ending_block is not None: + time_filters.append(EthereumLabel.block_number <= bounds.ending_block) + bounds_filters = [EthereumLabel.hash == None, and_(*time_filters)] + + query = query.filter(or_(*bounds_filters)) + + pbar = tqdm(total=(len(raw_created_at_list))) + pbar.set_description(f"Processing created ats") + pbar.update(initial_offset) + batch_start = 0 + batch_end = batch_start + batch_size + while batch_start <= len(created_at_list): + labels = query.filter( + EthereumLabel.created_at.in_(created_at_list[batch_start : batch_end + 1]) + ).all() + if len(labels) == 0: + continue + contract_deployment_batch: List[ContractDeployment] = [] + for label in labels: + contract_deployment_batch.append( + ContractDeployment( + address=label.address, + transaction_hash=label.transaction_hash, + block_number=label.block_number, + block_timestamp=label.block_timestamp, + deployer_address=label.label_data["deployer"], + gas_used=label.label_data["gasUsed"], + gas_price=label.label_data["gasPrice"], + transaction_fee=label.label_data["transactionFee"], + ) + ) + logger.info(f"Adding {len(contract_deployment_batch)} contract deployments") + insert_contract_deployments(datastore_conn, contract_deployment_batch) + pbar.update(batch_end - batch_start + 1) + batch_start = batch_end + 1 + batch_end = min(batch_end + batch_size, len(created_at_list)) + logger.info("Finished adding contract deployments") diff --git a/datasets/nfts/requirements.txt b/datasets/nfts/requirements.txt new file mode 100644 index 00000000..7c9fd710 --- /dev/null +++ b/datasets/nfts/requirements.txt @@ -0,0 +1 @@ +-e git+https://git@github.com/bugout-dev/moonstream.git@67fe019f1086c435dd3b58f1ade2778acc2167c7#egg=moonstreamdb&subdirectory=db \ No newline at end of file