Added some datastore (to SQLite) functionality

Set up CLI framework, as well.
pull/304/head
Neeraj Kashyap 2021-09-26 11:14:40 -07:00
rodzic be63141b14
commit 6439324aba
5 zmienionych plików z 299 dodań i 55 usunięć

Wyświetl plik

@ -163,3 +163,4 @@ cython_debug/
.venv/
.nfts/
venv/
.secrets/

Wyświetl plik

@ -1,16 +1,99 @@
import argparse
import contextlib
import os
import sqlite3
from typing import Optional, Union
from moonstreamdb.db import yield_db_session_ctx
from web3 import Web3
from web3 import Web3, IPCProvider, HTTPProvider
from .data import event_types, nft_event
from .datastore import setup_database
from .materialize import create_dataset
def web3_connection(web3_uri: Optional[str] = None) -> Web3:
"""
Connect to the given web3 provider. You may specify a web3 provider either as a path to an IPC
socket on your filesystem or as an HTTP(S) URI to a JSON RPC provider.
If web3_uri is not provided or is set to None, this function attempts to use the default behavior
of the web3.py IPCProvider (one of the steps is looking for .ethereum/geth.ipc, but there may be others).
"""
web3_provider: Union[IPCProvider, HTTPProvider] = Web3.IPCProvider()
if web3_uri is not None:
if web3_uri.startswith("http://") or web3_uri.startswith("https://"):
web3_provider = Web3.HTTPProvider(web3_uri)
else:
web3_provider = Web3.IPCProvider(web3_uri)
web3_client = Web3(web3_provider)
return web3_client
def handle_initdb(args: argparse.Namespace) -> None:
with contextlib.closing(sqlite3.connect(args.datastore)) as conn:
setup_database(conn)
def handle_materialize(args: argparse.Namespace) -> None:
event_type = nft_event(args.type)
print(args)
with yield_db_session_ctx() as db_session, contextlib.closing(
sqlite3.connect(args.datastore)
) as moonstream_datastore:
create_dataset(moonstream_datastore, db_session, args.web3, event_type)
def main() -> None:
"""
"nfts" command handler.
When reading this code, to find the definition of any of the "nfts" subcommands, grep for comments
of the form:
# Command: nfts <subcommand>
"""
default_web3_provider = os.environ.get("MOONSTREAM_WEB3_PROVIDER")
parser = argparse.ArgumentParser(
description="Tools to work with the Moonstream NFTs dataset"
)
subcommands = parser.add_subparsers(title="Subcommands")
# Command: nfts initdb
parser_initdb = subcommands.add_parser(
"initdb",
description="Initialize an SQLite datastore for the Moonstream NFTs dataset",
)
parser_initdb.add_argument("datastore")
parser_initdb.set_defaults(func=handle_initdb)
# Command: nfts materialize
parser_materialize = subcommands.add_parser(
"materialize", description="Create/update the NFTs dataset"
)
parser_materialize.add_argument(
"-d",
"--datastore",
required=True,
help="Path to SQLite database representing the dataset",
)
parser_materialize.add_argument(
"--web3",
default=default_web3_provider,
type=web3_connection,
help=f"Web3 provider to use when collecting data directly from the Ethereum blockchain (default: {default_web3_provider})",
)
parser_materialize.add_argument(
"-t",
"--type",
choices=event_types,
help="Type of event to materialize intermediate data for",
)
parser_materialize.set_defaults(func=handle_materialize)
args = parser.parse_args()
args.func(args)
from .materialize import get_rows, EventType, preproccess
if __name__ == "__main__":
web3_path = os.environ.get("MOONSTREAM_IPC_PATH")
web3_client = Web3(Web3.HTTPProvider(web3_path))
with yield_db_session_ctx() as db_session:
rows = get_rows(db_session, EventType.TRANSFER)
rows = preproccess(db_session, web3_client, rows)
for row in rows:
print(row)
main()

Wyświetl plik

@ -0,0 +1,40 @@
"""
Data structures used in (and as part of the maintenance of) the Moonstream NFTs dataset
"""
from dataclasses import dataclass
from enum import Enum
from typing import Optional
@dataclass
class BlockBounds:
starting_block: int
ending_block: Optional[int] = None
class EventType(Enum):
TRANSFER = "nft_transfer"
MINT = "nft_mint"
event_types = {event_type.value: event_type for event_type in EventType}
def nft_event(raw_event: str) -> EventType:
try:
return event_types[raw_event]
except KeyError:
raise ValueError(f"Unknown nft event type: {raw_event}")
@dataclass
class NFTEvent:
event_type: EventType
nft_address: str
token_id: str
from_address: str
to_address: str
transaction_hash: str
value: Optional[int] = None
block_number: Optional[int] = None
timestamp: Optional[int] = None

Wyświetl plik

@ -0,0 +1,109 @@
"""
This module provides tools to interact with and maintain a SQLite database which acts/should act as
a datastore for a Moonstream NFTs dataset.
"""
import sqlite3
from typing import Any, List, Tuple
from .data import EventType, NFTEvent
event_tables = {EventType.TRANSFER: "transfers", EventType.MINT: "mints"}
CREATE_NFTS_TABLE_QUERY = """CREATE TABLE nfts
(
address TEXT NOT NULL UNIQUE ON CONFLICT FAIL,
name TEXT,
symbol TEXT
);
"""
def create_events_table_query(event_type: EventType) -> str:
creation_query = f"""
CREATE TABLE {event_tables[event_type]}
(
transaction_hash TEXT,
block_number INTEGER,
nft_address TEXT REFERENCES nfts(address),
token_id TEXT,
from_address TEXT,
to_address TEXT,
value INT,
timestamp INT
);
"""
return creation_query
def setup_database(conn: sqlite3.Connection) -> None:
"""
Sets up the schema of the Moonstream NFTs dataset in the given SQLite database.
"""
cur = conn.cursor()
cur.execute(CREATE_NFTS_TABLE_QUERY)
cur.execute(create_events_table_query(EventType.TRANSFER))
cur.execute(create_events_table_query(EventType.MINT))
conn.commit()
def insert_events_query(event_type: EventType) -> str:
"""
Generates a query which inserts NFT events into the appropriate events table.
"""
query = f"""
INSERT INTO {event_tables[event_type]}(
transaction_hash,
block_number,
nft_address,
token_id,
from_address,
to_address,
value,
timestamp
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
"""
return query
def nft_event_to_tuple(event: NFTEvent) -> Tuple[Any]:
"""
Converts an NFT event into a tuple for use with sqlite cursor executemany. This includes
dropping e.g. the event_type field.
"""
return (
event.transaction_hash,
event.block_number,
event.nft_address,
event.token_id,
event.from_address,
event.to_address,
int(event.value),
event.timestamp,
)
def insert_events(conn: sqlite3.Connection, events: List[NFTEvent]) -> None:
"""
Inserts the given events into the appropriate events table in the given SQLite database.
This method works with batches of events.
"""
cur = conn.cursor()
try:
transfers = [
nft_event_to_tuple(event)
for event in events
if event.event_type == EventType.TRANSFER
]
cur.executemany(insert_events_query(EventType.TRANSFER), transfers)
mints = [
nft_event_to_tuple(event)
for event in events
if event.event_type == EventType.MINT
]
cur.executemany(insert_events_query(EventType.MINT), mints)
conn.commit()
except Exception as e:
conn.rollback()
raise e

Wyświetl plik

@ -1,6 +1,5 @@
from dataclasses import dataclass
from enum import Enum
from typing import List, Optional
import sqlite3
from typing import Iterator, List, Optional
from moonstreamdb.models import (
EthereumAddress,
@ -9,52 +8,33 @@ from moonstreamdb.models import (
EthereumBlock,
)
from sqlalchemy.orm import Session
from tqdm import tqdm
from web3 import Web3
@dataclass
class BlockBounds:
starting_block: int
ending_block: Optional[int] = None
from .data import BlockBounds, EventType, NFTEvent, event_types
from .datastore import insert_events
class EventType(Enum):
TRANSFER = "nft_transfer"
MINT = "nft_mint"
@dataclass
class NFTEvent:
event_type: EventType
nft_address: str
token_id: str
from_address: str
to_address: str
transaction_hash: str
value: Optional[int] = None
block_number: Optional[int] = None
timestamp: Optional[int] = None
def preproccess(
db_session: Session, web3_client: Web3, nft_events: List[NFTEvent]
) -> List[NFTEvent]:
def enrich_from_web3(web3_client: Web3, nft_event: NFTEvent) -> NFTEvent:
"""
Adds block number, value, timestamp from web3 if they are None (because that transaction is missing in db)
"""
for nft_event in nft_events:
if nft_event.block_number is None:
transaction = web3_client.eth.get_transaction(nft_event.transaction_hash)
nft_event.value = transaction["value"]
nft_event.block_number = transaction["blockNumber"]
block = web3_client.eth.get_block(transaction["blockNumber"])
nft_event.timestamp = block["timestamp"]
return nft_events
if (
nft_event.block_number is None
or nft_event.value is None
or nft_event.timestamp is None
):
transaction = web3_client.eth.get_transaction(nft_event.transaction_hash)
nft_event.value = transaction["value"]
nft_event.block_number = transaction["blockNumber"]
block = web3_client.eth.get_block(transaction["blockNumber"])
nft_event.timestamp = block["timestamp"]
return nft_event
def get_rows(
def get_events_from_db(
db_session: Session, event_type: EventType, bounds: Optional[BlockBounds] = None
) -> List[NFTEvent]:
) -> Iterator[NFTEvent]:
query = (
db_session.query(
EthereumLabel.label,
@ -75,11 +55,20 @@ def get_rows(
EthereumTransaction.block_number == EthereumBlock.block_number,
)
.filter(EthereumLabel.label == event_type.value)
.limit(10)
.limit(5)
)
return [
NFTEvent(
event_type=label,
for (
label,
address,
label_data,
transaction_hash,
value,
block_number,
timestamp,
) in query:
yield NFTEvent(
event_type=event_types[label],
nft_address=address,
token_id=label_data["tokenId"],
from_address=label_data["from"],
@ -89,5 +78,27 @@ def get_rows(
block_number=block_number,
timestamp=timestamp,
)
for label, address, label_data, transaction_hash, value, block_number, timestamp in query
]
def create_dataset(
datastore_conn: sqlite3.Connection,
db_session: Session,
web3_client: Web3,
event_type: EventType,
batch_size: int = 1000,
) -> None:
"""
Creates Moonstream NFTs dataset in the given SQLite datastore.
"""
events = map(
lambda e: enrich_from_web3(web3_client, e),
get_events_from_db(db_session, event_type),
)
events_batch: List[NFTEvent] = []
for event in tqdm(events):
print(event)
events_batch.append(event)
if len(events_batch) == batch_size:
insert_events(datastore_conn, events_batch)
events_batch = []
insert_events(datastore_conn, events_batch)