Database models for crawlers

pull/15/head
kompotkot 2021-07-26 14:03:33 +03:00
rodzic 50952d1c29
commit 826af41dcc
13 zmienionych plików z 642 dodań i 1 usunięć

2
backend/.gitignore vendored
Wyświetl plik

@ -164,4 +164,4 @@ cython_debug/
dev.env
prod.env
.moonstream
.env
.venv

169
db/.gitignore vendored 100644
Wyświetl plik

@ -0,0 +1,169 @@
# Created by https://www.toptal.com/developers/gitignore/api/python,visualstudiocode
# Edit at https://www.toptal.com/developers/gitignore?templates=python,visualstudiocode
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
### VisualStudioCode ###
.vscode/*
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
*.code-workspace
# Local History for Visual Studio Code
.history/
### VisualStudioCode Patch ###
# Ignore all local history of files
.history
.ionide
# End of https://www.toptal.com/developers/gitignore/api/python,visualstudiocode
# Custom
dev.env
prod.env
alembic.dev.ini
alembic.prod.ini
.db/
.venv/

89
db/alembic.ini 100644
Wyświetl plik

@ -0,0 +1,89 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = alembic
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .
# timezone to use when rendering the date
# within the migration file as well as the filename.
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; this defaults
# to alembic/versions. When using multiple version
# directories, initial revisions must be specified with --version-path
# version_locations = %(here)s/bar %(here)s/bat alembic/versions
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
sqlalchemy.url = driver://user:pass@localhost/dbname
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

Wyświetl plik

@ -0,0 +1 @@
Generic single-database configuration.

93
db/alembic/env.py 100644
Wyświetl plik

@ -0,0 +1,93 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
from db.models import Base as ExplorationBase
target_metadata = ExplorationBase.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
from db.models import EthereumBlock, EthereumTransaction, EthereumPendingTransaction
def include_symbol(tablename, schema):
return tablename in {
EthereumBlock.__tablename__,
EthereumTransaction.__tablename__,
EthereumPendingTransaction.__tablename__,
}
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
version_table="alembic_exploration_version",
include_symbol=include_symbol,
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
version_table="alembic_exploration_version",
include_symbol=include_symbol,
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

Wyświetl plik

@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

Wyświetl plik

@ -0,0 +1,93 @@
"""Inital for blocks and transactions
Revision ID: aa903a90b8bf
Revises:
Create Date: 2021-07-26 13:55:38.057312
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'aa903a90b8bf'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('ethereum_blocks',
sa.Column('block_number', sa.BigInteger(), nullable=False),
sa.Column('difficulty', sa.BigInteger(), nullable=True),
sa.Column('extra_data', sa.VARCHAR(length=128), nullable=True),
sa.Column('gas_limit', sa.BigInteger(), nullable=True),
sa.Column('gas_used', sa.BigInteger(), nullable=True),
sa.Column('hash', sa.VARCHAR(length=256), nullable=True),
sa.Column('logs_bloom', sa.VARCHAR(length=1024), nullable=True),
sa.Column('miner', sa.VARCHAR(length=256), nullable=True),
sa.Column('nonce', sa.VARCHAR(length=256), nullable=True),
sa.Column('parent_hash', sa.VARCHAR(length=256), nullable=True),
sa.Column('receipt_root', sa.VARCHAR(length=256), nullable=True),
sa.Column('uncles', sa.VARCHAR(length=256), nullable=True),
sa.Column('size', sa.Integer(), nullable=True),
sa.Column('state_root', sa.VARCHAR(length=256), nullable=True),
sa.Column('timestamp', sa.BigInteger(), nullable=True),
sa.Column('total_difficulty', sa.VARCHAR(length=256), nullable=True),
sa.Column('transactions_root', sa.VARCHAR(length=256), nullable=True),
sa.Column('indexed_at', sa.DateTime(timezone=True), server_default=sa.text("TIMEZONE('utc', statement_timestamp())"), nullable=False),
sa.PrimaryKeyConstraint('block_number', name=op.f('pk_ethereum_blocks')),
sa.UniqueConstraint('block_number', name=op.f('uq_ethereum_blocks_block_number'))
)
op.create_index(op.f('ix_ethereum_blocks_timestamp'), 'ethereum_blocks', ['timestamp'], unique=False)
op.create_table('ethereum_pending_transactions',
sa.Column('hash', sa.VARCHAR(length=256), nullable=False),
sa.Column('block_number', sa.BigInteger(), nullable=False),
sa.Column('from_address', sa.VARCHAR(length=256), nullable=True),
sa.Column('to_address', sa.VARCHAR(length=256), nullable=True),
sa.Column('gas', sa.Text(), nullable=True),
sa.Column('gas_price', sa.Text(), nullable=True),
sa.Column('input', sa.Text(), nullable=True),
sa.Column('nonce', sa.VARCHAR(length=256), nullable=True),
sa.Column('transaction_index', sa.BigInteger(), nullable=True),
sa.Column('value', sa.Text(), nullable=True),
sa.Column('indexed_at', sa.DateTime(timezone=True), server_default=sa.text("TIMEZONE('utc', statement_timestamp())"), nullable=False),
sa.ForeignKeyConstraint(['block_number'], ['ethereum_blocks.block_number'], name=op.f('fk_ethereum_pending_transactions_block_number_ethereum_blocks'), ondelete='CASCADE'),
sa.PrimaryKeyConstraint('hash', name=op.f('pk_ethereum_pending_transactions')),
sa.UniqueConstraint('hash', name=op.f('uq_ethereum_pending_transactions_hash'))
)
op.create_index(op.f('ix_ethereum_pending_transactions_from_address'), 'ethereum_pending_transactions', ['from_address'], unique=False)
op.create_index(op.f('ix_ethereum_pending_transactions_to_address'), 'ethereum_pending_transactions', ['to_address'], unique=False)
op.create_table('ethereum_transactions',
sa.Column('hash', sa.VARCHAR(length=256), nullable=False),
sa.Column('block_number', sa.BigInteger(), nullable=False),
sa.Column('from_address', sa.VARCHAR(length=256), nullable=True),
sa.Column('to_address', sa.VARCHAR(length=256), nullable=True),
sa.Column('gas', sa.Text(), nullable=True),
sa.Column('gas_price', sa.Text(), nullable=True),
sa.Column('input', sa.Text(), nullable=True),
sa.Column('nonce', sa.VARCHAR(length=256), nullable=True),
sa.Column('transaction_index', sa.BigInteger(), nullable=True),
sa.Column('value', sa.Text(), nullable=True),
sa.Column('indexed_at', sa.DateTime(timezone=True), server_default=sa.text("TIMEZONE('utc', statement_timestamp())"), nullable=False),
sa.ForeignKeyConstraint(['block_number'], ['ethereum_blocks.block_number'], name=op.f('fk_ethereum_transactions_block_number_ethereum_blocks'), ondelete='CASCADE'),
sa.PrimaryKeyConstraint('hash', name=op.f('pk_ethereum_transactions')),
sa.UniqueConstraint('hash', name=op.f('uq_ethereum_transactions_hash'))
)
op.create_index(op.f('ix_ethereum_transactions_from_address'), 'ethereum_transactions', ['from_address'], unique=False)
op.create_index(op.f('ix_ethereum_transactions_to_address'), 'ethereum_transactions', ['to_address'], unique=False)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_ethereum_transactions_to_address'), table_name='ethereum_transactions')
op.drop_index(op.f('ix_ethereum_transactions_from_address'), table_name='ethereum_transactions')
op.drop_table('ethereum_transactions')
op.drop_index(op.f('ix_ethereum_pending_transactions_to_address'), table_name='ethereum_pending_transactions')
op.drop_index(op.f('ix_ethereum_pending_transactions_from_address'), table_name='ethereum_pending_transactions')
op.drop_table('ethereum_pending_transactions')
op.drop_index(op.f('ix_ethereum_blocks_timestamp'), table_name='ethereum_blocks')
op.drop_table('ethereum_blocks')
# ### end Alembic commands ###

Wyświetl plik

40
db/db/db.py 100644
Wyświetl plik

@ -0,0 +1,40 @@
"""
Exploration database connection.
"""
from contextlib import contextmanager
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
EXPLORATION_DB_URI = os.environ.get("EXPLORATION_DB_URI")
if EXPLORATION_DB_URI is None:
raise ValueError("EXPLORATION_DB_URI environment variable must be set")
EXPLORATION_POOL_SIZE_RAW = os.environ.get("EXPLORATION_POOL_SIZE", 0)
try:
if EXPLORATION_POOL_SIZE_RAW is not None:
EXPLORATION_POOL_SIZE = int(EXPLORATION_POOL_SIZE_RAW)
except:
raise Exception(
f"Could not parse EXPLORATION_POOL_SIZE as int: {EXPLORATION_POOL_SIZE_RAW}"
)
# https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool
engine = create_engine(EXPLORATION_DB_URI, pool_size=EXPLORATION_POOL_SIZE)
SessionLocal = sessionmaker(bind=engine)
def yield_db_session() -> Session:
"""
Yields a database connection (created using environment variables).
As per FastAPI docs:
https://fastapi.tiangolo.com/tutorial/sql-databases/#create-a-dependency
"""
session = SessionLocal()
try:
yield session
finally:
session.close()
yield_db_session_ctx = contextmanager(yield_db_session)

131
db/db/models.py 100644
Wyświetl plik

@ -0,0 +1,131 @@
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import (
BigInteger,
Column,
DateTime,
Integer,
ForeignKey,
MetaData,
Text,
VARCHAR,
)
from sqlalchemy.sql import expression
from sqlalchemy.ext.compiler import compiles
"""
Naming conventions doc
https://docs.sqlalchemy.org/en/13/core/constraints.html#configuring-constraint-naming-conventions
"""
convention = {
"ix": "ix_%(column_0_label)s",
"uq": "uq_%(table_name)s_%(column_0_name)s",
"ck": "ck_%(table_name)s_%(constraint_name)s",
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
"pk": "pk_%(table_name)s",
}
metadata = MetaData(naming_convention=convention)
Base = declarative_base(metadata=metadata)
"""
Creating a utcnow function which runs on the Posgres database server when created_at and updated_at
fields are populated.
Following:
1. https://docs.sqlalchemy.org/en/13/core/compiler.html#utc-timestamp-function
2. https://www.postgresql.org/docs/current/functions-datetime.html#FUNCTIONS-DATETIME-CURRENT
3. https://stackoverflow.com/a/33532154/13659585
"""
class utcnow(expression.FunctionElement):
type = DateTime
@compiles(utcnow, "postgresql")
def pg_utcnow(element, compiler, **kwargs):
return "TIMEZONE('utc', statement_timestamp())"
class EthereumBlock(Base): # type: ignore
__tablename__ = "ethereum_blocks"
block_number = Column(
BigInteger,
primary_key=True,
unique=True,
nullable=False,
)
difficulty = Column(BigInteger)
extra_data = Column(VARCHAR(128))
gas_limit = Column(BigInteger)
gas_used = Column(BigInteger)
hash = Column(VARCHAR(256))
logs_bloom = Column(VARCHAR(1024))
miner = Column(VARCHAR(256))
nonce = Column(VARCHAR(256))
parent_hash = Column(VARCHAR(256))
receipt_root = Column(VARCHAR(256))
uncles = Column(VARCHAR(256))
size = Column(Integer)
state_root = Column(VARCHAR(256))
timestamp = Column(BigInteger, index=True)
total_difficulty = Column(VARCHAR(256))
transactions_root = Column(VARCHAR(256))
indexed_at = Column(
DateTime(timezone=True), server_default=utcnow(), nullable=False
)
class EthereumTransaction(Base): # type: ignore
__tablename__ = "ethereum_transactions"
hash = Column(
VARCHAR(256),
primary_key=True,
unique=True,
nullable=False,
)
block_number = Column(
BigInteger,
ForeignKey("ethereum_blocks.block_number", ondelete="CASCADE"),
nullable=False,
)
from_address = Column(VARCHAR(256), index=True)
to_address = Column(VARCHAR(256), index=True)
gas = Column(Text)
gas_price = Column(Text)
input = Column(Text)
nonce = Column(VARCHAR(256))
transaction_index = Column(BigInteger)
value = Column(Text)
indexed_at = Column(
DateTime(timezone=True), server_default=utcnow(), nullable=False
)
class EthereumPendingTransaction(Base): # type: ignore
__tablename__ = "ethereum_pending_transactions"
hash = Column(
VARCHAR(256),
primary_key=True,
unique=True,
nullable=False,
)
block_number = Column(
BigInteger,
ForeignKey("ethereum_blocks.block_number", ondelete="CASCADE"),
nullable=False,
)
from_address = Column(VARCHAR(256), index=True)
to_address = Column(VARCHAR(256), index=True)
gas = Column(Text)
gas_price = Column(Text)
input = Column(Text)
nonce = Column(VARCHAR(256))
transaction_index = Column(BigInteger)
value = Column(Text)
indexed_at = Column(
DateTime(timezone=True), server_default=utcnow(), nullable=False
)

Wyświetl plik

BIN
db/requirements.txt 100644

Plik binarny nie jest wyświetlany.

1
db/sample.env 100644
Wyświetl plik

@ -0,0 +1 @@
export EXPLORATION_DB_URI="<database_uri>"