Add creat query command.

Add additional re request update logic.
Add volume query.
pull/693/head
Andrey 2022-11-15 18:57:35 +02:00
rodzic 5e100228cd
commit 4808d600de
3 zmienionych plików z 150 dodań i 10 usunięć

Wyświetl plik

@ -44,17 +44,28 @@ def recive_S3_data_from_query(
while keep_going:
time.sleep(time_await)
data_response = requests.get(
data_url.url,
headers={"If-Modified-Since": if_modified_since},
timeout=10,
)
try:
data_response = requests.get(
data_url.url,
headers={"If-Modified-Since": if_modified_since},
timeout=5,
)
except Exception as e:
print(e)
continue
if data_response.status_code == 200:
break
repeat += 1
if repeat == max_retries // 2:
data_url = client.exec_query(
token=token,
name=query_name,
params=params,
)
if repeat > max_retries:
print("Too many retries")
break
@ -95,6 +106,22 @@ def generate_report(
)
def create_user_query(
client: Moonstream,
token: Union[str, UUID],
query_name: str,
query: str,
):
"""
Create a user query.
"""
try:
client.create_query(token=token, name=query_name, query=query)
except Exception as err:
print(f"Cant create user query: {query_name}. End with error: {err}")
def delete_user_query(client: Moonstream, token: str, query_name: str):
"""
Delete the user's queries.
@ -176,10 +203,6 @@ def run_tokenomics_queries_handler(args: argparse.Namespace):
client = Moonstream()
# for query in client.list_queries(
# token=args.moonstream_token,
# ).queries:
query_name = "erc20_721_volume"
### Run voluem query
@ -220,6 +243,29 @@ def run_tokenomics_queries_handler(args: argparse.Namespace):
key=f'{query_name}/{address}/{range["time_range"].replace(" ","_")}/data.json',
)
# volume change of erc20 and erc721
query_name = "volume_change"
for address, type in addresess_erc20_721.items():
for range in ranges:
params: Dict[str, Any] = {
"address": address,
"type": type,
"time_range": range["time_range"],
}
generate_report(
client=client,
token=args.moonstream_token,
query_name=query_name,
params=params,
bucket_prefix=CUSTOM_CRAWLER_S3_BUCKET_PREFIX,
bucket=CUSTOM_CRAWLER_S3_BUCKET,
key=f'{query_name}/{address}/{range["time_range"].replace(" ","_")}/data.json',
)
query_name = "erc1155_volume"
# volume of erc1155
@ -407,6 +453,24 @@ def delete_user_query_handler(args: argparse.Namespace):
delete_user_query(client=client, token=args.moonstream_token, query_name=args.name)
def create_user_query_handler(args: argparse.Namespace):
"""
Create the user's queries.
"""
client = Moonstream()
for query in tokenomics_queries:
if query["name"] == args.name:
create_user_query(
client=client,
token=args.moonstream_token,
query_name=query["name"],
query=query["query"],
)
def generate_game_bank_report(args: argparse.Namespace):
"""
han
@ -524,6 +588,20 @@ def main():
delete_query.set_defaults(func=delete_user_query_handler)
create_query = queries_subparsers.add_parser(
"create",
help="Create all predifind query",
description="Create all predifind query",
)
create_query.add_argument(
"--name",
required=True,
type=str,
)
create_query.set_defaults(func=create_user_query_handler)
cu_bank_parser = cu_reports_subparsers.add_parser(
"generate-reports",
help="Generate cu-bank state reports",

Wyświetl plik

@ -203,6 +203,60 @@ cu_bank_queries = [
tokenomics_queries = [
{
"name": "volume_change",
"query": """
with all_transfers as (
select
transaction_hash,
CASE
WHEN type: ='NFT' THEN 1
ELSE (label_data->'args'->>'value')::decimal
END as value,
block_timestamp as block_timestamp
from polygon_labels
where label='moonworm-alpha'
and address= :address
and label_data->>'name'='Transfer'
), after_range_transfer as (
select
*
FROM
all_transfers
where block_timestamp >= extract(epoch from now() - interval :time_range)::int
), current_volume as (
SELECT
sum(all_transfers.value) as value,
sum(
CASE
WHEN to_address in ('0xF715bEb51EC8F63317d66f491E37e7BB048fCc2d','0xfede379e48C873C75F3cc0C81F7C784aD730a8F7','0x00000000006c3852cbef3e08e8df289169ede581')
THEN 1
else 0
END
) as os_sales
from all_transfers
LEFT JOIN polygon_transactions ON all_transfers.transaction_hash = polygon_transactions.hash
), volume_different as (
select
sum(after_range_transfer.value) as value,
sum(
CASE
WHEN to_address in ('0xF715bEb51EC8F63317d66f491E37e7BB048fCc2d','0xfede379e48C873C75F3cc0C81F7C784aD730a8F7','0x00000000006c3852cbef3e08e8df289169ede581')
THEN 1
else 0
END
) as os_sales
from after_range_transfer
LEFT JOIN polygon_transactions ON after_range_transfer.transaction_hash = polygon_transactions.hash
)
SELECT
volume_different.value as diff,
volume_different.os_sales as os_diff,
current_volume.value as current,
current_volume.os_sales as os_current
from current_volume, volume_different
""",
},
{
"name": "erc20_721_volume",
"query": """

Wyświetl plik

@ -200,6 +200,14 @@ multicall_contracts: Dict[AvailableBlockchainType, str] = {
CUSTOM_CRAWLER_S3_BUCKET = os.environ.get(
"CUSTOM_CRAWLER_S3_BUCKET", ""
) # S3 bucket for storing custom crawler data
if CUSTOM_CRAWLER_S3_BUCKET == "":
raise ValueError(
"CUSTOM_CRAWLER_S3_BUCKET environment variable must be set"
)
CUSTOM_CRAWLER_S3_BUCKET_PREFIX = os.environ.get(
"CUSTOM_CRAWLER_S3_BUCKET_PREFIX", "dev"
)
)