kopia lustrzana https://github.com/bugout-dev/moonstream
Add cli for clean database.
rodzic
e69d81d1fb
commit
a4a1e7833f
|
@ -1,6 +1,6 @@
|
|||
import argparse
|
||||
import logging
|
||||
from typing import Optional
|
||||
from typing import Optional, Literal
|
||||
from uuid import UUID
|
||||
|
||||
from moonstreamdb.blockchain import AvailableBlockchainType
|
||||
|
@ -536,6 +536,49 @@ def main() -> None:
|
|||
)
|
||||
historical_crawl_parser.set_defaults(func=handle_historical_crawl)
|
||||
|
||||
database_cli = subparsers.add_parser("database", help="Database operations")
|
||||
database_cli.add_argument(
|
||||
"--blockchain-type",
|
||||
"-b",
|
||||
type=str,
|
||||
help=f"Available blockchain types: {[member.value for member in AvailableBlockchainType]}",
|
||||
)
|
||||
|
||||
database_cli.set_defaults(func=lambda _: database_cli.print_help())
|
||||
|
||||
database_cli_subparsers = database_cli.add_subparsers()
|
||||
|
||||
deduplicate_parser = database_cli_subparsers.add_parser(
|
||||
"deduplicate",
|
||||
help="Deduplicate database records",
|
||||
)
|
||||
|
||||
deduplicate_parser.add_argument(
|
||||
"--table",
|
||||
"-t",
|
||||
type=str,
|
||||
choices=["blocks", "labels", "transactions"],
|
||||
required=True,
|
||||
help="Table type to deduplicate",
|
||||
)
|
||||
|
||||
deduplicate_parser.add_argument(
|
||||
"--label",
|
||||
"-l",
|
||||
type=str,
|
||||
required=False,
|
||||
help="Label to deduplicate",
|
||||
)
|
||||
|
||||
deduplicate_parser.add_argument(
|
||||
"--type",
|
||||
"-y",
|
||||
type=str,
|
||||
choices=["event", "function"],
|
||||
required=True,
|
||||
help="Type to deduplicate",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
args.func(args)
|
||||
|
||||
|
|
|
@ -1,11 +1,13 @@
|
|||
import logging
|
||||
import json
|
||||
from typing import Dict, List, Optional
|
||||
from typing import Dict, List, Optional, Literal
|
||||
|
||||
from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model
|
||||
from moonstreamdb.models import Base
|
||||
from moonworm.crawler.function_call_crawler import ContractFunctionCall # type: ignore
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy.sql import text
|
||||
|
||||
|
||||
from ..settings import CRAWLER_LABEL
|
||||
from .event_crawler import Event
|
||||
|
@ -218,3 +220,137 @@ def add_function_calls_to_session(
|
|||
|
||||
logger.info(f"Saving {len(labels_to_save)} labels to session")
|
||||
db_session.add_all(labels_to_save)
|
||||
|
||||
|
||||
def deduplicate_records(
|
||||
db_session: Session,
|
||||
blockchain_type: AvailableBlockchainType,
|
||||
table: Literal["labels", "transactions", "blocks"],
|
||||
label_name: Optional[str] = None,
|
||||
label_type: Optional[str] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Deduplicates records in the database.
|
||||
label name and label type work only for labels table.
|
||||
"""
|
||||
|
||||
if table == "bloks":
|
||||
raise NotImplementedError("Deduplication for blocks is not implemented yet")
|
||||
|
||||
if table == "labels":
|
||||
label_model = get_label_model(blockchain_type)
|
||||
|
||||
if label_name is None or label_type is None:
|
||||
raise ValueError(
|
||||
"label_name and label_type are required for deduplication of labels"
|
||||
)
|
||||
|
||||
if label_type == "event":
|
||||
# get list of all label_type addresses
|
||||
|
||||
all_addresses = (
|
||||
db_session.query(label_model.address.label("address"))
|
||||
.filter(label_model.label == label_name)
|
||||
.filter(label_model.label_data["type"] == "event")
|
||||
.distinct()
|
||||
.all()
|
||||
) # can take a while
|
||||
|
||||
for address_raw in all_addresses:
|
||||
address = address_raw[0]
|
||||
|
||||
deduplicate_records = db_session.execute(
|
||||
text(
|
||||
"""
|
||||
WITH lates_labels AS (
|
||||
SELECT
|
||||
DISTINCT ON (transaction_hash, log_index) transaction_hash, log_index,
|
||||
block_number as block_number,
|
||||
created_at as created_at
|
||||
FROM
|
||||
{}
|
||||
WHERE
|
||||
label=:label
|
||||
AND address=:address
|
||||
AND label_data->>'type' = :label_type
|
||||
ORDER BY
|
||||
transaction_hash ASC,
|
||||
log_index ASC,
|
||||
block_number ASC,
|
||||
created_at ASC
|
||||
)
|
||||
DELETE FROM
|
||||
{} USING lates_token_metadata
|
||||
WHERE
|
||||
label=:label
|
||||
AND address=:address
|
||||
AND {}.id not in (select id from lates_labels ) RETURNING {}.block_number;
|
||||
""".format(
|
||||
table, table, table, table
|
||||
)
|
||||
),
|
||||
{"address": address, "label": label_name, "label_type": label_type},
|
||||
)
|
||||
|
||||
db_session.commit()
|
||||
|
||||
logger.info(
|
||||
f"Deleted {deduplicate_records} duplicate labels for address {address}"
|
||||
)
|
||||
|
||||
if label_type == "tx_call":
|
||||
# get list of all label_type addresses
|
||||
|
||||
all_addresses = (
|
||||
db_session.query(label_model.address.label("address"))
|
||||
.filter(label_model.label == label_name)
|
||||
.filter(label_model.label_data["type"] == "tx_call")
|
||||
.distinct()
|
||||
.all()
|
||||
)
|
||||
|
||||
for address_raw in all_addresses:
|
||||
address = address_raw[0]
|
||||
|
||||
deduplicate_records = db_session.execute(
|
||||
text(
|
||||
"""
|
||||
WITH lates_labels AS (
|
||||
SELECT
|
||||
DISTINCT ON (transaction_hash) transaction_hash,
|
||||
block_number as block_number,
|
||||
created_at as created_at
|
||||
FROM
|
||||
{}
|
||||
WHERE
|
||||
label=:label
|
||||
AND address=:address
|
||||
AND label_data->>'type' = :label_type
|
||||
ORDER BY
|
||||
transaction_hash ASC,
|
||||
block_number ASC,
|
||||
created_at ASC
|
||||
)
|
||||
DELETE FROM
|
||||
{} USING lates_token_metadata
|
||||
WHERE
|
||||
label=:label
|
||||
AND address=:address
|
||||
AND {}.id not in (select id from lates_labels ) RETURNING {}.block_number;
|
||||
""".format(
|
||||
table, table, table, table
|
||||
)
|
||||
),
|
||||
{"address": address, "label": label_name, "label_type": label_type},
|
||||
)
|
||||
|
||||
db_session.commit()
|
||||
|
||||
logger.info(
|
||||
f"Deleted {deduplicate_records} duplicate labels for address {address}"
|
||||
)
|
||||
|
||||
if table == "transactions":
|
||||
raise NotImplementedError(
|
||||
"Deduplication for transactions is not implemented yet"
|
||||
)
|
||||
|
|
Ładowanie…
Reference in New Issue