Add create update get endpoints.

pull/550/head
Andrey Dolgolev 2022-02-17 12:22:20 +02:00
rodzic 4dd101eb43
commit 1c741d5084
5 zmienionych plików z 283 dodań i 11 usunięć

Wyświetl plik

@ -8,8 +8,14 @@ import uuid
import boto3 # type: ignore
from bugout.data import BugoutSearchResults, BugoutSearchResult, BugoutResource
from bugout.data import (
BugoutSearchResults,
BugoutSearchResult,
BugoutResource,
BugoutResources,
)
from bugout.journal import SearchOrder
from bugout.exceptions import BugoutResponseException
from ens.utils import is_valid_ens_name # type: ignore
from eth_utils.address import is_address # type: ignore
from moonstreamdb.models import EthereumLabel
@ -30,6 +36,7 @@ from .settings import (
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
BUGOUT_RESOURCE_QUERY_RESOLVER,
)
from .settings import bugout_client as bc
@ -535,3 +542,27 @@ def apply_moonworm_tasks(
entries=entries_pack,
timeout=15,
)
def get_query_by_name(query_name: str, token: uuid.UUID) -> str:
params = {"type": BUGOUT_RESOURCE_QUERY_RESOLVER, "name": query_name}
try:
resources: BugoutResources = bc.list_resources(token=token, params=params)
except BugoutResponseException as e:
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
logger.error(f"Error get query, error: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
available_queries: Dict[str, str] = {
resource.resource_data["name"]: resource.resource_data["entry_id"]
for resource in resources.resources
}
if query_name not in available_queries:
raise MoonstreamHTTPException(status_code=404, detail="Query not found.")
query_id = available_queries[query_name]
return query_id

Wyświetl plik

@ -3,10 +3,12 @@ Pydantic schemas for the Moonstream HTTP API
"""
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Union, Literal
from uuid import UUID
from xmlrpc.client import Boolean
from pydantic import BaseModel, Field
from sqlalchemy import false
USER_ONBOARDING_STATE = "onboarding_state"
@ -266,3 +268,12 @@ class DashboardUpdate(BaseModel):
class UpdateDataRequest(BaseModel):
params: Dict[str, Any] = Field(default_factory=dict)
class UpdateQueryRequest(BaseModel):
query: str
class PreapprovedQuery(BaseModel):
query: str
public: bool = False

Wyświetl plik

@ -2,19 +2,25 @@
The Moonstream queries HTTP API
"""
import logging
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, Body
import boto3 # type: ignore
from bugout.data import BugoutResources
from bugout.exceptions import BugoutResponseException
from fastapi import APIRouter, Body, Request
import requests
from .. import data
from ..actions import get_query_by_name
from ..middleware import MoonstreamHTTPException
from ..settings import (
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_QUERIES_JOURNAL_ID,
MOONSTREAM_APPLICATION_ID,
MOONSTREAM_CRAWLERS_SERVER_URL,
MOONSTREAM_CRAWLERS_SERVER_PORT,
MOONSTREAM_QUERIES_BUCKET,
MOONSTREAM_QUERIES_JOURNAL_ID,
BUGOUT_RESOURCE_QUERY_RESOLVER,
)
from ..settings import bugout_client as bc
@ -26,19 +32,178 @@ router = APIRouter(
)
@router.post("/{query_id}/update", tags=["queries"])
BUGOUT_RESOURCE_QUERY_RESOLVER = "query_name_resolver"
@router.post("/{query_name}", tags=["queries"])
async def create_query_handler(
request: Request, query_name: str, query_applied: data.PreapprovedQuery = Body(...)
) -> Any:
"""
Create query in bugout journal
"""
token = request.state.token
user = request.state.user
# Check already existed queries
params = {
"type": BUGOUT_RESOURCE_QUERY_RESOLVER,
}
try:
resources: BugoutResources = bc.list_resources(token=token, params=params)
except BugoutResponseException as e:
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
logger.error(
f"Error listing subscriptions for user ({request.user.id}) with token ({request.state.token}), error: {str(e)}"
)
raise MoonstreamHTTPException(status_code=500, internal_error=e)
used_queries: List[str] = [
resource.resource_data["name"] for resource in resources.resources
]
if query_name in used_queries:
raise MoonstreamHTTPException(
status_code=404,
detail=f"Provided query name already use. Please remove it or use PUT /{query_name}",
)
try:
# Put query to journal
entry = bc.create_entry(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_QUERIES_JOURNAL_ID,
title=f"Query:{query_name}",
tags=["type:query"],
content=query_applied.query,
)
except BugoutResponseException as e:
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
logger.error(f"Error creating query entry: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
try:
# create resource query_name_resolver
bc.create_resource(
token=token,
application_id=MOONSTREAM_APPLICATION_ID,
resource_data={
"type": BUGOUT_RESOURCE_QUERY_RESOLVER,
"user_id": str(user.id),
"name": query_name,
"entry_id": str(entry.id),
},
)
except BugoutResponseException as e:
logger.error(f"Error creating name resolving resource: {str(e)}")
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
logger.error(f"Error creating name resolving resource: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
try:
bc.update_tags(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_QUERIES_JOURNAL_ID,
entry_id=entry.id,
tags=[f"query_id:{entry.id}", f"preapprove"],
)
except BugoutResponseException as e:
logger.error(f"Error in applind tags to query entry: {str(e)}")
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
logger.error(f"Error in applind tags to query entry: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
return True
@router.get("/{query_name}/query", tags=["queries"])
async def get_query_handler(request: Request, query_name: str) -> Any:
token = request.state.token
query_id = get_query_by_name(query_name, token)
try:
entry = bc.get_entry(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_QUERIES_JOURNAL_ID,
entry_id=query_id,
)
except BugoutResponseException as e:
logger.error(f"Error in updating query: {str(e)}")
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
logger.error(f"Error in updating query: {e}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
return entry
@router.put("/{query_name}", tags=["queries"])
async def update_query_handler(
request: Request,
query_name: str,
request_update: data.UpdateQueryRequest = Body(...),
) -> Any:
token = request.state.token
query_id = get_query_by_name(query_name, token)
try:
entry = bc.update_entry_content(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_QUERIES_JOURNAL_ID,
entry_id=query_id,
title=query_name,
content=request_update.query,
tags=["preapprove"],
)
except BugoutResponseException as e:
logger.error(f"Error in updating query: {str(e)}")
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
logger.error(f"Error in updating query: {e}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
return entry
@router.post("/{query_name}/update_data", tags=["queries"])
async def update_query_data_handler(
query_id: str, request_update: data.UpdateDataRequest = Body(...)
request: Request,
query_name: str,
request_update: data.UpdateDataRequest = Body(...),
) -> Optional[Dict[str, Any]]:
"""
Request update data on S3 bucket
"""
token = request.state.token
query_id = get_query_by_name(query_name, token)
try:
entries = bc.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_QUERIES_JOURNAL_ID,
query=f"#approved #query:{query_id}",
query=f"#approved ! #query_id:{query_id}",
limit=1,
timeout=5,
)
@ -73,4 +238,55 @@ async def update_query_data_handler(
except Exception as e:
logger.error(f"Error in send generate query data task: {e}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
return None
raise MoonstreamHTTPException(status_code=403, detail="Query not approved yet.")
@router.get("/{query_name}", tags=["queries"])
async def get_access_link_handler(
request: Request,
query_name: str,
) -> str:
"""
Request update data on S3 bucket
"""
# get real connect to query_id
token = request.state.token
query_id = get_query_by_name(query_name, token)
s3 = boto3.client("s3")
try:
entries = bc.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_QUERIES_JOURNAL_ID,
query=f"#approved #query_id:{query_id}",
limit=1,
timeout=5,
)
if entries.results and entries.results[0].content:
tags = entries.results[0].tags
file_type = "json"
if "ext:csv" in tags:
file_type = "csv"
stats_presigned_url = s3.generate_presigned_url(
"get_object",
Params={
"Bucket": MOONSTREAM_QUERIES_BUCKET,
"Key": f"queries/{query_id}/data.{file_type}",
},
ExpiresIn=300000,
HttpMethod="GET",
)
return stats_presigned_url
except Exception as e:
logger.error(f"Error in send generate query data task: {e}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
raise MoonstreamHTTPException(status_code=403, detail="Query not approved yet.")

Wyświetl plik

@ -7,6 +7,8 @@ BUGOUT_BROOD_URL = os.environ.get("BUGOUT_BROOD_URL", "https://auth.bugout.dev")
BUGOUT_SPIRE_URL = os.environ.get("BUGOUT_SPIRE_URL", "https://spire.bugout.dev")
BUGOUT_RESOURCE_QUERY_RESOLVER = "query_name_resolver"
bugout_client = Bugout(brood_api_url=BUGOUT_BROOD_URL, spire_api_url=BUGOUT_SPIRE_URL)
BUGOUT_REQUEST_TIMEOUT_SECONDS = 5
@ -99,3 +101,7 @@ if MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI == "":
raise ValueError(
"MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI environment variable must be set"
)
MOONSTREAM_QUERIES_BUCKET = os.environ.get("MOONSTREAM_QUERIES_BUCKET", "")
if MOONSTREAM_QUERIES_BUCKET == "":
raise ValueError("MOONSTREAM_QUERIES_BUCKET environment variable must be set")

Wyświetl plik

@ -11,6 +11,7 @@ from uuid import UUID
import boto3 # type: ignore
from fastapi import FastAPI, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy import text
from bugout.data import BugoutResource, BugoutResources
@ -180,6 +181,13 @@ async def queries_data_update_handler(
try:
# test statement params
t = text(request.query)
t = t.bindparams(**request.params)
t.compile(compile_kwargs={"literal_binds": True})
print(t.compile(compile_kwargs={"literal_binds": True}))
background_tasks.add_task(
queries.data_generate,
bucket=MOONSTREAM_QUERIES_BUCKET,
@ -199,7 +207,7 @@ async def queries_data_update_handler(
"Bucket": MOONSTREAM_QUERIES_BUCKET,
"Key": f"queries/{query_id}/data.{request.file_type}",
},
ExpiresIn=300000,
ExpiresIn=43200, # 12 hours
HttpMethod="GET",
)