Add test workable version.

pull/550/head
Andrey Dolgolev 2022-02-16 18:12:42 +02:00
rodzic 5485a73b98
commit 025baeb1c4
8 zmienionych plików z 101 dodań i 42 usunięć

Wyświetl plik

@ -12,6 +12,8 @@ export MOONSTREAM_MOONWORM_TASKS_JOURNAL="<Bugout_journal_with_tasks_for_moonwor
export MOONSTREAM_DATA_JOURNAL_ID="<bugout_journal_id_to_store_blockchain_data>"
export HUMBUG_TXPOOL_CLIENT_ID="<Bugout_Humbug_client_id_for_txpool_transactions_in_journal>"
export MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI="https://<connection_path_uri_to_ethereum_node>"
export MOONSTREAM_QUERIES_JOURNAL_ID="<bugout_journal_id_where_store_queries_for_executingw>"
# Set following parameters if AWS node instance and S3 smartcontracts configured
export MOONSTREAM_S3_SMARTCONTRACTS_BUCKET="<AWS_S3_bucket_to_store_smart_contracts>"

Wyświetl plik

@ -262,3 +262,7 @@ class DashboardCreate(BaseModel):
class DashboardUpdate(BaseModel):
name: Optional[str]
subscription_settings: List[DashboardMeta] = Field(default_factory=list)
class UpdateDataRequest(BaseModel):
params: Dict[str, Any] = Field(default_factory=dict)

Wyświetl plik

@ -1,16 +1,14 @@
"""
The Moonstream subscriptions HTTP API
The Moonstream queries HTTP API
"""
import logging
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Optional
from bugout.data import BugoutResource
from fastapi import APIRouter, Depends, Query, Request
from fastapi import APIRouter, Body
import requests
from sqlalchemy.orm import Session
from moonstreamdb import db
from .. import data
from ..middleware import MoonstreamHTTPException
from ..settings import (
MOONSTREAM_ADMIN_ACCESS_TOKEN,
@ -23,36 +21,44 @@ from ..settings import bugout_client as bc
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/queries",
)
router = APIRouter(prefix="/queries",)
@router.post("/{query_id}/update", tags=["queries"])
async def update_query_data_handler(
request: Request,
query_id: str = Query(...),
query_id: str, request_update: data.UpdateDataRequest = Body(...)
) -> Optional[Dict[str, Any]]:
"""
Request update data on S3 bucket
"""
token = request.state.token
try:
entries = bc.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_QUERIES_JOURNAL_ID,
query=f"#approved #query:{query_id} #user_token:{token}",
query=f"#approved #query:{query_id}",
limit=1,
timeout=5,
)
if entries.results and entries.results[0].content:
content = entries.results[0].content
tags = entries.results[0].tags
file_type = "json"
if "ext:csv" in tags:
file_type = "csv"
responce = requests.post(
f"{MOONSTREAM_CRAWLERS_SERVER_URL}:{MOONSTREAM_CRAWLERS_SERVER_PORT}/jobs/query_update",
json=content,
f"{MOONSTREAM_CRAWLERS_SERVER_URL}:{MOONSTREAM_CRAWLERS_SERVER_PORT}/jobs/{query_id}/query_update",
json={
"query": content,
"params": request_update.params,
"file_type": file_type,
},
timeout=5,
)
if responce.status_code != 200:
@ -63,6 +69,6 @@ async def update_query_data_handler(
return responce.json()
except Exception as e:
logger.error("Unable to get events")
logger.error(f"Error in send generate query data task: {e}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
return None

Wyświetl plik

@ -21,6 +21,7 @@ from .settings import (
ORIGINS,
bugout_client as bc,
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
MOONSTREAM_QUERIES_BUCKET,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
)
from .version import MOONCRAWL_VERSION
@ -80,17 +81,14 @@ async def now_handler() -> data.NowResponse:
@app.post("/jobs/stats_update", tags=["jobs"])
async def status_handler(
stats_update: data.StatsUpdateRequest,
background_tasks: BackgroundTasks,
stats_update: data.StatsUpdateRequest, background_tasks: BackgroundTasks,
):
"""
Update dashboard endpoint create are tasks for update.
"""
dashboard_resource: BugoutResource = bc.get_resource(
token=stats_update.token,
resource_id=stats_update.dashboard_id,
timeout=10,
token=stats_update.token, resource_id=stats_update.dashboard_id, timeout=10,
)
# get all user subscriptions
@ -170,9 +168,7 @@ async def status_handler(
@app.post("/jobs/{query_id}/query_update", tags=["jobs"])
async def queries_data_update_handler(
query_id: str,
query: Any,
background_tasks: BackgroundTasks,
query_id: str, request: data.QueryDataUpdate, background_tasks: BackgroundTasks,
) -> Dict[str, Any]:
s3_client = boto3.client("s3")
@ -181,9 +177,11 @@ async def queries_data_update_handler(
background_tasks.add_task(
queries.data_generate,
bucket="queries_bucket",
key=f"queries/{query_id}/data.json",
query=query,
bucket=MOONSTREAM_QUERIES_BUCKET,
query_id=f"{query_id}",
file_type=request.file_type,
query=request.query,
params=request.params,
)
except Exception as e:
@ -193,8 +191,8 @@ async def queries_data_update_handler(
stats_presigned_url = s3_client.generate_presigned_url(
"get_object",
Params={
"Bucket": "queries_bucket",
"Key": f"queries/{query_id}/data.json",
"Bucket": MOONSTREAM_QUERIES_BUCKET,
"Key": f"queries/{query_id}/data.{request.file_type}",
},
ExpiresIn=300000,
HttpMethod="GET",

Wyświetl plik

@ -1,9 +1,9 @@
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import List
from typing import List, Any, Dict
from pydantic import BaseModel
from pydantic import BaseModel, Field
class AvailableBlockchainType(Enum):
@ -47,3 +47,10 @@ class NowResponse(BaseModel):
"""
epoch_time: float
class QueryDataUpdate(BaseModel):
file_type: str
query: str
params: Dict[str, Any] = Field(default_factory=dict)

Wyświetl plik

@ -84,3 +84,9 @@ if MOONSTREAM_MOONWORM_TASKS_JOURNAL == "":
raise ValueError(
"MOONSTREAM_MOONWORM_TASKS_JOURNAL environment variable must be set"
)
MOONSTREAM_QUERIES_BUCKET = os.environ.get("MOONSTREAM_QUERIES_BUCKET", "")
if MOONSTREAM_QUERIES_BUCKET == "":
raise ValueError("MOONSTREAM_QUERIES_BUCKET environment variable must be set")

Wyświetl plik

@ -1,8 +1,11 @@
import json
import logging
from typing import Any
from typing import Any, Dict, Optional
from io import StringIO
import csv
import boto3 # type: ignore
import boto3
from moonstreamdb.db import yield_db_session_ctx
@ -10,13 +13,10 @@ logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def push_statistics(data: Any, key: str, bucket: str) -> None:
def push_statistics(s3: Any, data: Any, key: str, bucket: str) -> None:
result_bytes = json.dumps(data).encode("utf-8")
s3 = boto3.client("s3")
s3.put_object(
Body=result_bytes,
Body=data,
Bucket=bucket,
Key=key,
ContentType="application/json",
@ -26,9 +26,44 @@ def push_statistics(data: Any, key: str, bucket: str) -> None:
logger.info(f"Statistics push to bucket: s3://{bucket}/{key}")
def data_generate(bucket: str, key: str, query: str):
def data_generate(
bucket: str,
query_id: str,
file_type: str,
query: str,
params: Optional[Dict[str, Any]],
):
"""
Generate query and push it to S3
"""
s3 = boto3.client("s3")
with yield_db_session_ctx() as db_session:
push_statistics(data=db_session.execute(query).all(), key=key, bucket=bucket)
if file_type == "csv":
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=";")
# engine.execution_options(stream_results=True)
result = db_session.execute(query, params).keys()
csv_writer.writerow(result.keys())
csv_writer.writerows(result.fetchAll())
push_statistics(
s3=s3,
data=csv_buffer.getvalue().encode("utf-8"),
key=f"queries/{query_id}/data.{file_type}",
bucket=bucket,
)
else:
data = json.dumps(
[dict(row) for row in db_session.execute(query, params)]
).encode("utf-8")
push_statistics(
s3=s3,
data=data,
key=f"queries/{query_id}/data.{file_type}",
bucket=bucket,
)

Wyświetl plik

@ -15,3 +15,4 @@ export MOONSTREAM_DATA_JOURNAL_ID="<Bugout journal id for moonstream>"
export MOONSTREAM_ADMIN_ACCESS_TOKEN="<Bugout access token for moonstream>"
export NFT_HUMBUG_TOKEN="<Token for nft crawler>"
export MOONSTREAM_MOONWORM_TASKS_JOURNAL="<journal_with_tasks_for_moonworm_crawler>"
export MOONSTREAM_QUERIES_BUCKET="<bucket for queries data>"