2023-06-06 12:11:49 +00:00
from datetime import datetime
from collections import Counter
2023-08-31 15:04:23 +00:00
import json
from typing import List , Any , Optional , Dict , Union , Tuple , cast
2023-06-06 12:11:49 +00:00
import uuid
import logging
2023-08-31 15:04:23 +00:00
from bugout . data import BugoutResource , BugoutSearchResult
2023-06-06 12:11:49 +00:00
from eth_typing import Address
from hexbytes import HexBytes
import requests # type: ignore
from sqlalchemy . dialects . postgresql import insert
from sqlalchemy . orm import Session
2023-11-20 13:34:47 +00:00
from sqlalchemy import func , text , or_ , and_ , Subquery
2023-08-12 06:14:33 +00:00
from sqlalchemy . engine import Row
2023-06-06 12:11:49 +00:00
from web3 import Web3
from web3 . types import ChecksumAddress
2023-08-31 16:32:20 +00:00
from . data import Score , LeaderboardScore , LeaderboardConfigUpdate , LeaderboardConfig
2023-06-06 12:11:49 +00:00
from . contracts import Dropper_interface , ERC20_interface , Terminus_interface
from . models import (
DropperClaimant ,
DropperContract ,
DropperClaim ,
Leaderboard ,
LeaderboardScores ,
2023-11-20 08:51:31 +00:00
LeaderboardVersion ,
2023-06-06 12:11:49 +00:00
)
from . import signatures
from . settings import (
2023-08-31 15:04:23 +00:00
bugout_client as bc ,
2023-06-06 12:11:49 +00:00
BLOCKCHAIN_WEB3_PROVIDERS ,
LEADERBOARD_RESOURCE_TYPE ,
MOONSTREAM_APPLICATION_ID ,
MOONSTREAM_ADMIN_ACCESS_TOKEN ,
2023-09-01 04:06:11 +00:00
MOONSTREAM_LEADERBOARD_CONFIGURATION_JOURNAL_ID ,
2023-06-06 12:11:49 +00:00
)
2023-07-05 18:49:43 +00:00
class LeaderboardsResourcesNotFound ( Exception ) :
pass
2023-06-06 12:11:49 +00:00
class AuthorizationError ( Exception ) :
pass
class DropWithNotSettedBlockDeadline ( Exception ) :
pass
class DublicateClaimantError ( Exception ) :
pass
class DuplicateLeaderboardAddressError ( Exception ) :
def __init__ ( self , message , duplicates ) :
super ( DuplicateLeaderboardAddressError , self ) . __init__ ( message )
self . message = message
self . duplicates = duplicates
class LeaderboardIsEmpty ( Exception ) :
pass
class LeaderboardDeleteScoresError ( Exception ) :
pass
2023-08-06 13:55:56 +00:00
class LeaderboardCreateError ( Exception ) :
pass
class LeaderboardUpdateError ( Exception ) :
pass
class LeaderboardDeleteError ( Exception ) :
pass
2023-08-31 15:04:23 +00:00
class LeaderboardConfigNotFound ( Exception ) :
pass
class LeaderboardConfigAlreadyActive ( Exception ) :
pass
class LeaderboardConfigAlreadyInactive ( Exception ) :
pass
2023-11-20 08:51:31 +00:00
class LeaderboardVersionNotFound ( Exception ) :
pass
2023-06-06 12:11:49 +00:00
BATCH_SIGNATURE_PAGE_SIZE = 500
logger = logging . getLogger ( __name__ )
def create_dropper_contract (
db_session : Session ,
blockchain : Optional [ str ] ,
dropper_contract_address ,
title ,
description ,
image_uri ,
) :
"""
Create a new dropper contract .
"""
dropper_contract = DropperContract (
blockchain = blockchain ,
address = Web3 . toChecksumAddress ( dropper_contract_address ) ,
title = title ,
description = description ,
image_uri = image_uri ,
)
db_session . add ( dropper_contract )
db_session . commit ( )
return dropper_contract
def delete_dropper_contract (
db_session : Session , blockchain : Optional [ str ] , dropper_contract_address
) :
dropper_contract = (
db_session . query ( DropperContract )
. filter (
DropperContract . address == Web3 . toChecksumAddress ( dropper_contract_address )
)
. filter ( DropperContract . blockchain == blockchain )
. one ( )
)
db_session . delete ( dropper_contract )
db_session . commit ( )
return dropper_contract
def list_dropper_contracts (
db_session : Session , blockchain : Optional [ str ]
) - > List [ Dict [ str , Any ] ] :
"""
List all dropper contracts
"""
dropper_contracts = [ ]
dropper_contracts = db_session . query ( DropperContract )
if blockchain :
dropper_contracts = dropper_contracts . filter (
DropperContract . blockchain == blockchain
)
return dropper_contracts
def get_dropper_contract_by_id (
db_session : Session , dropper_contract_id : uuid . UUID
) - > DropperContract :
"""
Get a dropper contract by its ID
"""
query = db_session . query ( DropperContract ) . filter (
DropperContract . id == dropper_contract_id
)
return query . one ( )
def list_drops_terminus ( db_session : Session , blockchain : Optional [ str ] = None ) :
"""
List distinct of terminus addressess
"""
terminus = (
db_session . query (
DropperClaim . terminus_address ,
DropperClaim . terminus_pool_id ,
DropperContract . blockchain ,
)
. join ( DropperContract )
. filter ( DropperClaim . terminus_address . isnot ( None ) )
. filter ( DropperClaim . terminus_pool_id . isnot ( None ) )
)
if blockchain :
terminus = terminus . filter ( DropperContract . blockchain == blockchain )
terminus = terminus . distinct (
DropperClaim . terminus_address , DropperClaim . terminus_pool_id
)
return terminus
def list_drops_blockchains ( db_session : Session ) :
"""
List distinct of blockchains
"""
blockchains = (
db_session . query ( DropperContract . blockchain )
. filter ( DropperContract . blockchain . isnot ( None ) )
. distinct ( DropperContract . blockchain )
)
return blockchains
def list_claims ( db_session : Session , dropper_contract_id , active = True ) :
"""
List all claims
"""
claims = (
db_session . query (
DropperClaim . id ,
DropperClaim . title ,
DropperClaim . description ,
DropperClaim . terminus_address ,
DropperClaim . terminus_pool_id ,
DropperClaim . claim_block_deadline ,
)
. filter ( DropperClaim . dropper_contract_id == dropper_contract_id )
. filter ( DropperClaim . active == active )
. all ( )
)
return claims
def delete_claim ( db_session : Session , dropper_claim_id ) :
"""
Delete a claim
"""
claim = (
2023-08-06 13:55:56 +00:00
db_session . query ( DropperClaim ) . filter ( DropperClaim . id == dropper_claim_id ) . one ( ) # type: ignore
2023-06-06 12:11:49 +00:00
)
db_session . delete ( claim )
db_session . commit ( )
return claim
def create_claim (
db_session : Session ,
dropper_contract_id : uuid . UUID ,
claim_id : Optional [ int ] = None ,
title : Optional [ str ] = None ,
description : Optional [ str ] = None ,
terminus_address : Optional [ ChecksumAddress ] = None ,
terminus_pool_id : Optional [ int ] = None ,
claim_block_deadline : Optional [ int ] = None ,
) :
"""
Create a new dropper claim .
"""
# get the dropper contract
dropper_contract = (
db_session . query ( DropperContract )
. filter ( DropperContract . id == dropper_contract_id )
. one ( )
)
dropper_claim = DropperClaim (
dropper_contract_id = dropper_contract . id ,
claim_id = claim_id ,
title = title ,
description = description ,
terminus_address = terminus_address ,
terminus_pool_id = terminus_pool_id ,
claim_block_deadline = claim_block_deadline ,
)
db_session . add ( dropper_claim )
db_session . commit ( )
db_session . refresh ( dropper_claim ) # refresh the object to get the id
return dropper_claim
def activate_drop ( db_session : Session , dropper_claim_id : uuid . UUID ) :
"""
Activate a claim
"""
claim = (
2023-08-06 13:55:56 +00:00
db_session . query ( DropperClaim ) . filter ( DropperClaim . id == dropper_claim_id ) . one ( ) # type: ignore
2023-06-06 12:11:49 +00:00
)
claim . active = True
db_session . commit ( )
return claim
def deactivate_drop ( db_session : Session , dropper_claim_id : uuid . UUID ) :
"""
Activate a claim
"""
claim = (
2023-08-06 13:55:56 +00:00
db_session . query ( DropperClaim ) . filter ( DropperClaim . id == dropper_claim_id ) . one ( ) # type: ignore
2023-06-06 12:11:49 +00:00
)
claim . active = False
db_session . commit ( )
return claim
def update_drop (
db_session : Session ,
dropper_claim_id : uuid . UUID ,
title : Optional [ str ] = None ,
description : Optional [ str ] = None ,
terminus_address : Optional [ str ] = None ,
terminus_pool_id : Optional [ int ] = None ,
claim_block_deadline : Optional [ int ] = None ,
claim_id : Optional [ int ] = None ,
address : Optional [ str ] = None ,
) :
"""
Update a claim
"""
claim = (
2023-08-06 13:55:56 +00:00
db_session . query ( DropperClaim ) . filter ( DropperClaim . id == dropper_claim_id ) . one ( ) # type: ignore
2023-06-06 12:11:49 +00:00
)
if title :
claim . title = title
if description :
claim . description = description
if terminus_address or terminus_pool_id :
ensure_dropper_contract_owner ( db_session , claim . dropper_contract_id , address )
if terminus_address :
terminus_address = Web3 . toChecksumAddress ( terminus_address )
claim . terminus_address = terminus_address
if terminus_pool_id :
claim . terminus_pool_id = terminus_pool_id
if claim_block_deadline :
claim . claim_block_deadline = claim_block_deadline
if claim_id :
claim . claim_id = claim_id
db_session . commit ( )
return claim
def get_free_drop_number_in_range (
db_session : Session , dropper_contract_id : uuid . UUID , start : Optional [ int ] , end : int
) :
"""
Return list of free drops number in range
"""
if start is None :
start = 0
drops = (
db_session . query ( DropperClaim )
. filter ( DropperClaim . dropper_contract_id == dropper_contract_id )
. filter ( DropperClaim . claim_id > = start )
. filter ( DropperClaim . claim_id < = end )
. all ( )
)
free_numbers = list ( range ( start , end + 1 ) )
for drop in drops :
free_numbers . remove ( drop . claim_id )
return drops
def add_claimants ( db_session : Session , dropper_claim_id , claimants , added_by ) :
"""
Add a claimants to a claim
"""
# On conflict requirements https://stackoverflow.com/questions/42022362/no-unique-or-exclusion-constraint-matching-the-on-conflict
claimant_objects = [ ]
addresses = [ Web3 . toChecksumAddress ( claimant . address ) for claimant in claimants ]
if len ( claimants ) > len ( set ( addresses ) ) :
raise DublicateClaimantError ( " Duplicate claimants " )
for claimant in claimants :
claimant_objects . append (
{
" dropper_claim_id " : dropper_claim_id ,
" address " : Web3 . toChecksumAddress ( claimant . address ) ,
" amount " : 0 ,
" raw_amount " : str ( claimant . amount ) ,
" added_by " : added_by ,
" created_at " : datetime . now ( ) ,
" updated_at " : datetime . now ( ) ,
}
)
insert_statement = insert ( DropperClaimant ) . values ( claimant_objects )
result_stmt = insert_statement . on_conflict_do_update (
index_elements = [ DropperClaimant . address , DropperClaimant . dropper_claim_id ] ,
set_ = dict (
amount = insert_statement . excluded . amount ,
raw_amount = insert_statement . excluded . raw_amount ,
added_by = insert_statement . excluded . added_by ,
updated_at = datetime . now ( ) ,
) ,
)
db_session . execute ( result_stmt )
db_session . commit ( )
return claimant_objects
def transform_claim_amount (
db_session : Session , dropper_claim_id : uuid . UUID , db_amount : int
) - > int :
claim = (
db_session . query (
DropperClaim . claim_id , DropperContract . address , DropperContract . blockchain
)
. join ( DropperContract , DropperContract . id == DropperClaim . dropper_contract_id )
. filter ( DropperClaim . id == dropper_claim_id )
. one ( )
)
dropper_contract = Dropper_interface . Contract (
BLOCKCHAIN_WEB3_PROVIDERS [ claim . blockchain ] , claim . address
)
claim_info = dropper_contract . getClaim ( claim . claim_id ) . call ( )
if claim_info [ 0 ] != 20 :
return db_amount
erc20_contract = ERC20_interface . Contract (
BLOCKCHAIN_WEB3_PROVIDERS [ claim . blockchain ] , claim_info [ 1 ]
)
decimals = int ( erc20_contract . decimals ( ) . call ( ) )
return db_amount * ( 10 * * decimals )
def batch_transform_claim_amounts (
db_session : Session , dropper_claim_id : uuid . UUID , db_amounts : List [ int ]
) - > List [ int ] :
claim = (
db_session . query (
DropperClaim . claim_id , DropperContract . address , DropperContract . blockchain
)
. join ( DropperContract , DropperContract . id == DropperClaim . dropper_contract_id )
. filter ( DropperClaim . id == dropper_claim_id )
. one ( )
)
dropper_contract = Dropper_interface . Contract (
BLOCKCHAIN_WEB3_PROVIDERS [ claim . blockchain ] , claim . address
)
claim_info = dropper_contract . getClaim ( claim . claim_id )
if claim_info [ 0 ] != 20 :
return db_amounts
erc20_contract = ERC20_interface . Contract ( claim . blockchain , claim_info [ 1 ] )
decimals = int ( erc20_contract . decimals ( ) . call ( ) )
return [ db_amount * ( 10 * * decimals ) for db_amount in db_amounts ]
def get_claimants (
db_session : Session ,
dropper_claim_id : uuid . UUID ,
amount : Optional [ int ] = None ,
added_by : Optional [ str ] = None ,
address : Optional [ ChecksumAddress ] = None ,
limit = None ,
offset = None ,
) :
"""
Search for a claimant by address
"""
claimants_query = db_session . query (
DropperClaimant . address ,
DropperClaimant . amount ,
DropperClaimant . added_by ,
DropperClaimant . raw_amount ,
) . filter ( DropperClaimant . dropper_claim_id == dropper_claim_id )
if amount :
claimants_query = claimants_query . filter ( DropperClaimant . amount == amount )
if added_by :
claimants_query = claimants_query . filter ( DropperClaimant . added_by == added_by )
if address :
claimants_query = claimants_query . filter ( DropperClaimant . address == address )
if limit :
claimants_query = claimants_query . limit ( limit )
if offset :
claimants_query = claimants_query . offset ( offset )
return claimants_query . all ( )
def get_claimant ( db_session : Session , dropper_claim_id , address ) :
"""
Search for a claimant by address
"""
claimant_query = (
db_session . query (
DropperClaimant . id . label ( " dropper_claimant_id " ) ,
DropperClaimant . address ,
DropperClaimant . amount ,
DropperClaimant . raw_amount ,
DropperClaimant . signature ,
DropperClaim . id . label ( " dropper_claim_id " ) ,
DropperClaim . claim_id ,
DropperClaim . active ,
DropperClaim . claim_block_deadline ,
DropperClaim . title ,
DropperClaim . description ,
DropperContract . address . label ( " dropper_contract_address " ) ,
( DropperClaim . updated_at < DropperClaimant . updated_at ) . label (
" is_recent_signature "
) ,
DropperContract . blockchain . label ( " blockchain " ) ,
)
. join ( DropperClaim , DropperClaimant . dropper_claim_id == DropperClaim . id )
. join ( DropperContract , DropperClaim . dropper_contract_id == DropperContract . id )
. filter ( DropperClaimant . dropper_claim_id == dropper_claim_id )
. filter ( DropperClaimant . address == Web3 . toChecksumAddress ( address ) )
)
return claimant_query . one ( )
def get_claimant_drops (
db_session : Session ,
blockchain : str ,
address ,
current_block_number = None ,
limit = None ,
offset = None ,
) :
"""
Search for a claimant by address
"""
claimant_query = (
db_session . query (
DropperClaimant . id . label ( " dropper_claimant_id " ) ,
DropperClaimant . address ,
DropperClaimant . amount ,
DropperClaimant . raw_amount ,
DropperClaimant . signature ,
DropperClaim . id . label ( " dropper_claim_id " ) ,
DropperClaim . claim_id ,
DropperClaim . active ,
DropperClaim . claim_block_deadline ,
DropperClaim . title ,
DropperClaim . description ,
DropperContract . address . label ( " dropper_contract_address " ) ,
DropperContract . blockchain ,
( DropperClaim . updated_at < DropperClaimant . updated_at ) . label (
" is_recent_signature "
) ,
)
. join ( DropperClaim , DropperClaimant . dropper_claim_id == DropperClaim . id )
. join ( DropperContract , DropperClaim . dropper_contract_id == DropperContract . id )
. filter ( DropperClaim . active == True )
. filter ( DropperClaimant . address == address )
. filter ( DropperContract . blockchain == blockchain )
)
if current_block_number :
logger . info ( " Trying to filter block number " + str ( current_block_number ) )
claimant_query = claimant_query . filter (
DropperClaim . claim_block_deadline > current_block_number
)
claimant_query . order_by ( DropperClaimant . created_at . asc ( ) )
if limit :
claimant_query = claimant_query . limit ( limit )
if offset :
claimant_query = claimant_query . offset ( offset )
return claimant_query . all ( )
def get_terminus_claims (
db_session : Session ,
blockchain : str ,
terminus_address : ChecksumAddress ,
terminus_pool_id : int ,
dropper_contract_address : Optional [ ChecksumAddress ] = None ,
active : Optional [ bool ] = None ,
limit : Optional [ int ] = None ,
offset : Optional [ int ] = None ,
) :
"""
Search for a claimant by address
"""
query = (
db_session . query (
DropperClaim . id ,
DropperClaim . title ,
DropperClaim . description ,
DropperClaim . terminus_address ,
DropperClaim . terminus_pool_id ,
DropperClaim . claim_block_deadline ,
DropperClaim . claim_id ,
DropperClaim . active ,
DropperContract . address . label ( " dropper_contract_address " ) ,
)
. join ( DropperContract )
. filter ( DropperClaim . terminus_address == terminus_address )
. filter ( DropperClaim . terminus_pool_id == terminus_pool_id )
. filter ( DropperContract . blockchain == blockchain )
)
if dropper_contract_address :
query = query . filter ( DropperContract . address == dropper_contract_address )
if active :
query = query . filter ( DropperClaim . active == active )
# TODO: add ordering in all pagination queries
query = query . order_by ( DropperClaim . created_at . asc ( ) )
if limit :
query = query . limit ( limit )
if offset :
query = query . offset ( offset )
return query
def get_drop ( db_session : Session , dropper_claim_id : uuid . UUID ) :
"""
Return particular drop
"""
drop = (
2023-08-06 13:55:56 +00:00
db_session . query ( DropperClaim ) . filter ( DropperClaim . id == dropper_claim_id ) . one ( ) # type: ignore
2023-06-06 12:11:49 +00:00
)
return drop
def get_claims (
db_session : Session ,
blockchain : str ,
dropper_contract_address : Optional [ ChecksumAddress ] = None ,
claimant_address : Optional [ ChecksumAddress ] = None ,
terminus_address : Optional [ ChecksumAddress ] = None ,
terminus_pool_id : Optional [ int ] = None ,
active : Optional [ bool ] = None ,
limit : Optional [ int ] = None ,
offset : Optional [ int ] = None ,
) :
"""
Search for a claimant by address
"""
query = (
db_session . query (
DropperClaim . id ,
DropperClaim . title ,
DropperClaim . description ,
DropperClaim . terminus_address ,
DropperClaim . terminus_pool_id ,
DropperClaim . claim_block_deadline ,
DropperClaim . claim_id ,
DropperClaim . active ,
DropperClaimant . amount ,
DropperContract . address . label ( " dropper_contract_address " ) ,
)
. join ( DropperContract )
. join ( DropperClaimant )
. filter ( DropperContract . blockchain == blockchain )
)
if dropper_contract_address :
query = query . filter ( DropperContract . address == dropper_contract_address )
if claimant_address :
query = query . filter ( DropperClaimant . address == claimant_address )
if terminus_address :
query = query . filter ( DropperClaim . terminus_address == terminus_address )
if terminus_pool_id :
query = query . filter ( DropperClaim . terminus_pool_id == terminus_pool_id )
if active :
query = query . filter ( DropperClaim . active == active )
query = query . order_by ( DropperClaim . created_at . asc ( ) )
if limit :
query = query . limit ( limit )
if offset :
query = query . offset ( offset )
return query
def get_drops (
db_session : Session ,
blockchain : str ,
dropper_contract_address : Optional [ ChecksumAddress ] = None ,
drop_number : Optional [ int ] = None ,
terminus_address : Optional [ ChecksumAddress ] = None ,
terminus_pool_id : Optional [ int ] = None ,
active : Optional [ bool ] = None ,
limit : Optional [ int ] = None ,
offset : Optional [ int ] = None ,
) :
"""
Get drops
"""
query = (
db_session . query (
DropperClaim . id ,
DropperClaim . title ,
DropperClaim . description ,
DropperClaim . terminus_address ,
DropperClaim . terminus_pool_id ,
DropperClaim . claim_block_deadline ,
DropperClaim . claim_id . label ( " drop_number " ) ,
DropperClaim . active ,
DropperContract . address . label ( " dropper_contract_address " ) ,
)
. join ( DropperContract )
. filter ( DropperContract . blockchain == blockchain )
)
if dropper_contract_address :
query = query . filter ( DropperContract . address == dropper_contract_address )
if drop_number :
query = query . filter ( DropperClaim . claim_id == drop_number )
if terminus_address :
query = query . filter ( DropperClaim . terminus_address == terminus_address )
if terminus_pool_id :
query = query . filter ( DropperClaim . terminus_pool_id == terminus_pool_id )
if active :
query = query . filter ( DropperClaim . active == active )
query = query . order_by ( DropperClaim . created_at . desc ( ) )
if limit :
query = query . limit ( limit )
if offset :
query = query . offset ( offset )
return query
def get_claim_admin_pool (
db_session : Session ,
dropper_claim_id : uuid . UUID ,
) - > Any :
"""
Search for a claimant by address
"""
query = (
db_session . query (
DropperContract . blockchain ,
DropperClaim . terminus_address ,
DropperClaim . terminus_pool_id ,
)
. join ( DropperContract )
. filter ( DropperClaim . id == dropper_claim_id )
)
return query . one ( )
def ensure_admin_token_holder (
db_session : Session , dropper_claim_id : uuid . UUID , address : ChecksumAddress
) - > bool :
blockchain , terminus_address , terminus_pool_id = get_claim_admin_pool (
db_session = db_session , dropper_claim_id = dropper_claim_id
)
terminus = Terminus_interface . Contract (
BLOCKCHAIN_WEB3_PROVIDERS [ blockchain ] , terminus_address
)
balance = terminus . balanceOf ( address , terminus_pool_id ) . call ( )
if balance == 0 :
raise AuthorizationError (
f " Address has insufficient balance in Terminus pool: address= { address } , blockchain= { blockchain } , terminus_address= { terminus_address } , terminus_pool_id= { terminus_pool_id } "
)
return True
def ensure_dropper_contract_owner (
db_session : Session , dropper_contract_id : uuid . UUID , address : ChecksumAddress
) - > bool :
dropper_contract_info = get_dropper_contract_by_id (
db_session = db_session , dropper_contract_id = dropper_contract_id
)
dropper = Dropper_interface . Contract (
BLOCKCHAIN_WEB3_PROVIDERS [ dropper_contract_info . blockchain ] ,
dropper_contract_info . address ,
)
dropper_owner_address = dropper . owner ( ) . call ( )
if address != Web3 . toChecksumAddress ( dropper_owner_address ) :
raise AuthorizationError (
f " Given address is not the owner of the given dropper contract: address= { address } , blockchain= { dropper_contract_info . blockchain } , dropper_address= { dropper_contract_info . address } "
)
return True
def delete_claimants ( db_session : Session , dropper_claim_id , addresses ) :
"""
Delete all claimants for a claim
"""
normalize_addresses = [ Web3 . toChecksumAddress ( address ) for address in addresses ]
was_deleted = [ ]
deleted_addresses = (
db_session . query ( DropperClaimant )
. filter ( DropperClaimant . dropper_claim_id == dropper_claim_id )
. filter ( DropperClaimant . address . in_ ( normalize_addresses ) )
)
for deleted_address in deleted_addresses :
was_deleted . append ( deleted_address . address )
db_session . delete ( deleted_address )
db_session . commit ( )
return was_deleted
def refetch_drop_signatures (
db_session : Session , dropper_claim_id : uuid . UUID , added_by : str
) :
"""
Refetch signatures for drop
"""
claim = (
db_session . query (
DropperClaim . claim_id ,
DropperClaim . claim_block_deadline ,
DropperContract . address ,
DropperContract . blockchain ,
)
. join ( DropperContract , DropperClaim . dropper_contract_id == DropperContract . id )
. filter ( DropperClaim . id == dropper_claim_id )
2023-08-06 13:55:56 +00:00
) . one ( ) # type: ignore
2023-06-06 12:11:49 +00:00
if claim . claim_block_deadline is None :
raise DropWithNotSettedBlockDeadline (
f " Claim block deadline is not set for dropper claim: { dropper_claim_id } "
)
outdated_signatures = (
db_session . query (
DropperClaim . claim_id ,
DropperClaimant . address ,
DropperClaimant . amount ,
)
. join ( DropperClaimant , DropperClaimant . dropper_claim_id == DropperClaim . id )
. filter ( DropperClaim . id == dropper_claim_id )
. filter (
or_ (
DropperClaimant . updated_at < DropperClaim . updated_at ,
DropperClaimant . signature == None ,
)
)
) . limit ( BATCH_SIGNATURE_PAGE_SIZE )
current_offset = 0
users_hashes = { }
users_amount = { }
hashes_signature = { }
dropper_contract = Dropper_interface . Contract (
BLOCKCHAIN_WEB3_PROVIDERS [ claim . blockchain ] , claim . address
)
while True :
signature_requests = [ ]
page = outdated_signatures . offset ( current_offset ) . all ( )
claim_amounts = [ outdated_signature . amount for outdated_signature in page ]
transformed_claim_amounts = batch_transform_claim_amounts (
db_session , dropper_claim_id , claim_amounts
)
for outdated_signature , transformed_claim_amount in zip (
page , transformed_claim_amounts
) :
message_hash_raw = dropper_contract . claimMessageHash (
claim . claim_id ,
outdated_signature . address ,
claim . claim_block_deadline ,
int ( transformed_claim_amount ) ,
) . call ( )
message_hash = HexBytes ( message_hash_raw ) . hex ( )
signature_requests . append ( message_hash )
users_hashes [ outdated_signature . address ] = message_hash
users_amount [ outdated_signature . address ] = outdated_signature . amount
message_hashes = [ signature_request for signature_request in signature_requests ]
signed_messages = signatures . DROP_SIGNER . batch_sign_message ( message_hashes )
hashes_signature . update ( signed_messages )
if len ( page ) == 0 :
break
current_offset + = BATCH_SIGNATURE_PAGE_SIZE
claimant_objects = [ ]
for address , hash in users_hashes . items ( ) :
claimant_objects . append (
{
" dropper_claim_id " : dropper_claim_id ,
" address " : address ,
" signature " : hashes_signature [ hash ] ,
" amount " : users_amount [ address ] ,
" added_by " : added_by ,
" created_at " : datetime . now ( ) ,
" updated_at " : datetime . now ( ) ,
}
)
insert_statement = insert ( DropperClaimant ) . values ( claimant_objects )
result_stmt = insert_statement . on_conflict_do_update (
index_elements = [ DropperClaimant . address , DropperClaimant . dropper_claim_id ] ,
set_ = dict (
signature = insert_statement . excluded . signature ,
updated_at = datetime . now ( ) ,
) ,
)
db_session . execute ( result_stmt )
db_session . commit ( )
return claimant_objects
2023-11-20 08:51:31 +00:00
def leaderboard_version_filter (
db_session : Session ,
leaderboard_id : uuid . UUID ,
version_number : Optional [ int ] = None ,
) - > Union [ Subquery , int ] :
# Subquery to get the latest version number for the given leaderboard
2023-11-20 13:34:47 +00:00
if version_number is None :
2023-11-20 08:51:31 +00:00
latest_version = (
db_session . query ( func . max ( LeaderboardVersion . version_number ) ) . filter (
LeaderboardVersion . leaderboard_id == leaderboard_id ,
LeaderboardVersion . published == True ,
)
) . scalar_subquery ( )
else :
latest_version = version_number
return latest_version
2023-11-20 13:34:47 +00:00
def get_leaderboard_total_count (
db_session : Session , leaderboard_id , version_number : Optional [ int ] = None
) - > int :
2023-06-06 12:11:49 +00:00
"""
2023-11-20 13:34:47 +00:00
Get the total number of position in the leaderboard
2023-06-06 12:11:49 +00:00
"""
2023-11-20 13:34:47 +00:00
latest_version = leaderboard_version_filter (
db_session = db_session ,
leaderboard_id = leaderboard_id ,
version_number = version_number ,
2023-06-06 12:11:49 +00:00
)
2023-11-20 13:34:47 +00:00
total_count = (
db_session . query ( func . count ( LeaderboardScores . id ) )
. join (
LeaderboardVersion ,
and_ (
LeaderboardVersion . leaderboard_id == LeaderboardScores . leaderboard_id ,
LeaderboardVersion . version_number
== LeaderboardScores . leaderboard_version_number ,
) ,
)
. filter (
LeaderboardVersion . published == True ,
LeaderboardVersion . version_number == latest_version ,
)
. filter ( LeaderboardScores . leaderboard_id == leaderboard_id )
) . scalar ( )
return total_count
2023-06-06 12:11:49 +00:00
2023-08-12 06:14:33 +00:00
def get_leaderboard_info (
2023-11-20 08:51:31 +00:00
db_session : Session , leaderboard_id : uuid . UUID , version_number : Optional [ int ] = None
2023-08-12 06:14:33 +00:00
) - > Row [ Tuple [ uuid . UUID , str , str , int , Optional [ datetime ] ] ] :
2023-07-05 18:49:43 +00:00
"""
2023-08-06 13:55:56 +00:00
Get the leaderboard from the database with users count
2023-07-05 18:49:43 +00:00
"""
2023-11-20 08:51:31 +00:00
latest_version = leaderboard_version_filter (
db_session = db_session ,
leaderboard_id = leaderboard_id ,
version_number = version_number ,
)
2023-11-20 13:34:47 +00:00
2023-12-20 02:51:20 +00:00
query = (
2023-08-06 13:55:56 +00:00
db_session . query (
Leaderboard . id ,
Leaderboard . title ,
Leaderboard . description ,
func . count ( LeaderboardScores . id ) . label ( " users_count " ) ,
func . max ( LeaderboardScores . updated_at ) . label ( " last_update " ) ,
)
2023-08-12 06:14:33 +00:00
. join (
2023-12-20 02:51:20 +00:00
LeaderboardVersion ,
and_ (
LeaderboardVersion . leaderboard_id == Leaderboard . id ,
LeaderboardVersion . published == True ,
) ,
2023-08-12 06:14:33 +00:00
isouter = True ,
)
2023-11-20 08:51:31 +00:00
. join (
2023-12-20 02:51:20 +00:00
LeaderboardScores ,
2023-11-20 13:34:47 +00:00
and_ (
2023-12-20 02:51:20 +00:00
LeaderboardScores . leaderboard_id == Leaderboard . id ,
LeaderboardScores . leaderboard_version_number
== LeaderboardVersion . version_number ,
2023-11-20 13:34:47 +00:00
) ,
2023-11-20 08:51:31 +00:00
isouter = True ,
)
. filter (
2023-12-20 02:51:20 +00:00
or_ (
LeaderboardVersion . published == None ,
LeaderboardVersion . version_number == latest_version ,
)
2023-11-20 08:51:31 +00:00
)
2023-08-06 13:55:56 +00:00
. filter ( Leaderboard . id == leaderboard_id )
. group_by ( Leaderboard . id , Leaderboard . title , Leaderboard . description )
2023-12-20 02:51:20 +00:00
)
leaderboard = query . one ( )
2023-07-05 18:49:43 +00:00
return leaderboard
2023-08-06 13:55:56 +00:00
def get_leaderboard_scores_changes (
db_session : Session , leaderboard_id : uuid . UUID
2023-08-12 06:14:33 +00:00
) - > List [ Row [ Tuple [ int , datetime ] ] ] :
2023-08-06 13:55:56 +00:00
"""
Return the leaderboard scores changes timeline changes of leaderboard scores
"""
leaderboard_scores_changes = (
db_session . query (
func . count ( LeaderboardScores . address ) . label ( " players_count " ) ,
# func.extract("epoch", LeaderboardScores.updated_at).label("timestamp"),
LeaderboardScores . updated_at . label ( " date " ) ,
)
. filter ( LeaderboardScores . leaderboard_id == leaderboard_id )
. group_by ( LeaderboardScores . updated_at )
. order_by ( LeaderboardScores . updated_at . desc ( ) )
2023-08-12 06:14:33 +00:00
) . all ( )
2023-08-06 13:55:56 +00:00
return leaderboard_scores_changes
def get_leaderboard_scores_by_timestamp (
db_session : Session ,
leaderboard_id : uuid . UUID ,
date : datetime ,
limit : int ,
offset : int ,
2023-08-12 06:14:33 +00:00
) - > List [ LeaderboardScores ] :
2023-08-06 13:55:56 +00:00
"""
Return the leaderboard scores by timestamp
"""
leaderboard_scores = (
db_session . query (
LeaderboardScores . leaderboard_id ,
LeaderboardScores . address ,
LeaderboardScores . score ,
LeaderboardScores . points_data ,
)
. filter ( LeaderboardScores . leaderboard_id == leaderboard_id )
. filter ( LeaderboardScores . updated_at == date )
. order_by ( LeaderboardScores . score . desc ( ) )
. limit ( limit )
. offset ( offset )
)
return leaderboard_scores
2023-07-05 18:49:43 +00:00
def get_leaderboards (
db_session : Session ,
token : Union [ str , uuid . UUID ] ,
2023-07-06 05:51:46 +00:00
) - > List [ Leaderboard ] :
2023-07-05 18:49:43 +00:00
"""
Get the leaderboards resources
"""
user_resources = bc . list_resources (
token = token ,
params = { " type " : " leaderboard " } ,
)
if len ( user_resources . resources ) == 0 :
raise LeaderboardsResourcesNotFound ( f " Leaderboard not found for token " )
leaderboards_ids = [ ]
for resource in user_resources . resources :
leaderboard_id = resource . resource_data [ " leaderboard_id " ]
leaderboards_ids . append ( leaderboard_id )
leaderboards = (
db_session . query ( Leaderboard ) . filter ( Leaderboard . id . in_ ( leaderboards_ids ) ) . all ( )
)
return leaderboards
2023-06-06 12:11:49 +00:00
def get_position (
2023-11-20 08:51:31 +00:00
db_session : Session ,
leaderboard_id ,
address ,
window_size ,
limit : int ,
offset : int ,
version_number : Optional [ int ] = None ,
2023-08-12 06:14:33 +00:00
) - > List [ Row [ Tuple [ str , int , int , int , Any ] ] ] :
2023-06-06 12:11:49 +00:00
"""
Return position by address with window size
"""
2023-11-20 08:51:31 +00:00
latest_version = leaderboard_version_filter (
db_session = db_session ,
leaderboard_id = leaderboard_id ,
version_number = version_number ,
)
query = (
db_session . query (
LeaderboardScores . address ,
LeaderboardScores . score ,
LeaderboardScores . points_data . label ( " points_data " ) ,
func . rank ( ) . over ( order_by = LeaderboardScores . score . desc ( ) ) . label ( " rank " ) ,
func . row_number ( )
. over ( order_by = LeaderboardScores . score . desc ( ) )
. label ( " number " ) ,
)
. join (
LeaderboardVersion ,
2023-11-20 13:34:47 +00:00
and_ (
LeaderboardVersion . leaderboard_id == LeaderboardScores . leaderboard_id ,
LeaderboardVersion . version_number
== LeaderboardScores . leaderboard_version_number ,
) ,
2023-11-20 08:51:31 +00:00
)
. filter (
LeaderboardVersion . published == True ,
LeaderboardVersion . version_number == latest_version ,
)
. filter ( LeaderboardScores . leaderboard_id == leaderboard_id )
)
2023-06-06 12:11:49 +00:00
ranked_leaderboard = query . cte ( name = " ranked_leaderboard " )
query = db_session . query (
ranked_leaderboard . c . address ,
ranked_leaderboard . c . score ,
ranked_leaderboard . c . rank ,
ranked_leaderboard . c . number ,
) . filter (
ranked_leaderboard . c . address == address ,
)
my_position = query . cte ( name = " my_position " )
# get the position with the window size
query = db_session . query (
ranked_leaderboard . c . address ,
ranked_leaderboard . c . score ,
ranked_leaderboard . c . rank ,
ranked_leaderboard . c . number ,
ranked_leaderboard . c . points_data ,
) . filter (
ranked_leaderboard . c . number . between ( # taking off my hat!
my_position . c . number - window_size ,
my_position . c . number + window_size ,
)
)
if limit :
query = query . limit ( limit )
if offset :
query = query . offset ( offset )
return query . all ( )
def get_leaderboard_positions (
2023-11-20 08:51:31 +00:00
db_session : Session ,
leaderboard_id ,
limit : int ,
offset : int ,
version_number : Optional [ int ] = None ,
2023-08-12 06:14:33 +00:00
) - > List [ Row [ Tuple [ uuid . UUID , str , int , str , int ] ] ] :
2023-06-06 12:11:49 +00:00
"""
Get the leaderboard positions
"""
2023-11-20 08:51:31 +00:00
# get public leaderboard scores with max version
latest_version = leaderboard_version_filter (
db_session = db_session ,
leaderboard_id = leaderboard_id ,
version_number = version_number ,
)
# Main query
2023-06-06 12:11:49 +00:00
query = (
db_session . query (
LeaderboardScores . id ,
LeaderboardScores . address ,
LeaderboardScores . score ,
LeaderboardScores . points_data ,
func . rank ( ) . over ( order_by = LeaderboardScores . score . desc ( ) ) . label ( " rank " ) ,
)
2023-11-20 08:51:31 +00:00
. join (
LeaderboardVersion ,
2023-11-20 13:34:47 +00:00
and_ (
LeaderboardVersion . leaderboard_id == LeaderboardScores . leaderboard_id ,
LeaderboardVersion . version_number
== LeaderboardScores . leaderboard_version_number ,
) ,
2023-11-20 08:51:31 +00:00
)
2023-06-06 12:11:49 +00:00
. filter ( LeaderboardScores . leaderboard_id == leaderboard_id )
2023-11-20 08:51:31 +00:00
. filter ( LeaderboardVersion . published == True )
. filter ( LeaderboardVersion . version_number == latest_version )
2023-06-06 12:11:49 +00:00
)
if limit :
query = query . limit ( limit )
if offset :
query = query . offset ( offset )
return query
2023-08-12 06:14:33 +00:00
def get_qurtiles (
2023-11-20 08:51:31 +00:00
db_session : Session , leaderboard_id , version_number : Optional [ int ] = None
2023-08-12 06:14:33 +00:00
) - > Tuple [ Row [ Tuple [ str , float , int ] ] , . . . ] :
2023-06-06 12:11:49 +00:00
"""
Get the leaderboard qurtiles
https : / / docs . sqlalchemy . org / en / 14 / core / functions . html #sqlalchemy.sql.functions.percentile_disc
"""
2023-11-20 08:51:31 +00:00
latest_version = leaderboard_version_filter (
db_session = db_session ,
leaderboard_id = leaderboard_id ,
version_number = version_number ,
)
query = (
db_session . query (
LeaderboardScores . address ,
LeaderboardScores . score ,
func . rank ( ) . over ( order_by = LeaderboardScores . score . desc ( ) ) . label ( " rank " ) ,
)
. join (
LeaderboardVersion ,
2023-11-20 13:34:47 +00:00
and_ (
LeaderboardVersion . leaderboard_id == LeaderboardScores . leaderboard_id ,
LeaderboardVersion . version_number
== LeaderboardScores . leaderboard_version_number ,
) ,
2023-11-20 08:51:31 +00:00
)
. filter (
LeaderboardVersion . published == True ,
LeaderboardVersion . version_number == latest_version ,
)
. filter ( LeaderboardScores . leaderboard_id == leaderboard_id )
)
2023-06-06 12:11:49 +00:00
ranked_leaderboard = query . cte ( name = " ranked_leaderboard " )
current_count = db_session . query ( ranked_leaderboard ) . count ( )
if current_count == 0 :
raise LeaderboardIsEmpty ( f " Leaderboard { leaderboard_id } is empty " )
index_75 = int ( current_count / 4 )
index_50 = int ( current_count / 2 )
index_25 = int ( current_count * 3 / 4 )
q1 = db_session . query ( ranked_leaderboard ) . limit ( 1 ) . offset ( index_25 ) . first ( )
q2 = db_session . query ( ranked_leaderboard ) . limit ( 1 ) . offset ( index_50 ) . first ( )
q3 = db_session . query ( ranked_leaderboard ) . limit ( 1 ) . offset ( index_75 ) . first ( )
return q1 , q2 , q3
2023-11-20 08:51:31 +00:00
def get_ranks (
db_session : Session , leaderboard_id , version_number : Optional [ int ] = None
) - > List [ Row [ Tuple [ int , int , int ] ] ] :
2023-06-06 12:11:49 +00:00
"""
Get the leaderboard rank buckets ( rank , size , score )
"""
2023-11-20 08:51:31 +00:00
latest_version = leaderboard_version_filter (
db_session = db_session ,
leaderboard_id = leaderboard_id ,
version_number = version_number ,
)
query = (
db_session . query (
LeaderboardScores . id ,
LeaderboardScores . address ,
LeaderboardScores . score ,
LeaderboardScores . points_data ,
func . rank ( ) . over ( order_by = LeaderboardScores . score . desc ( ) ) . label ( " rank " ) ,
)
. join (
LeaderboardVersion ,
2023-11-20 13:34:47 +00:00
and_ (
LeaderboardVersion . leaderboard_id == LeaderboardScores . leaderboard_id ,
LeaderboardVersion . version_number
== LeaderboardScores . leaderboard_version_number ,
) ,
2023-11-20 08:51:31 +00:00
)
. filter (
LeaderboardVersion . published == True ,
LeaderboardVersion . version_number == latest_version ,
)
. filter ( LeaderboardScores . leaderboard_id == leaderboard_id )
)
2023-06-06 12:11:49 +00:00
ranked_leaderboard = query . cte ( name = " ranked_leaderboard " )
ranks = db_session . query (
ranked_leaderboard . c . rank ,
func . count ( ranked_leaderboard . c . id ) . label ( " size " ) ,
ranked_leaderboard . c . score ,
) . group_by ( ranked_leaderboard . c . rank , ranked_leaderboard . c . score )
return ranks
def get_rank (
db_session : Session ,
leaderboard_id : uuid . UUID ,
rank : int ,
limit : Optional [ int ] = None ,
offset : Optional [ int ] = None ,
2023-11-20 08:51:31 +00:00
version_number : Optional [ int ] = None ,
2023-08-12 06:14:33 +00:00
) - > List [ Row [ Tuple [ uuid . UUID , str , int , str , int ] ] ] :
2023-06-06 12:11:49 +00:00
"""
Get bucket in leaderboard by rank
"""
2023-11-20 08:51:31 +00:00
latest_version = leaderboard_version_filter (
db_session = db_session ,
leaderboard_id = leaderboard_id ,
version_number = version_number ,
)
2023-06-06 12:11:49 +00:00
query = (
db_session . query (
LeaderboardScores . id ,
LeaderboardScores . address ,
LeaderboardScores . score ,
LeaderboardScores . points_data ,
func . rank ( ) . over ( order_by = LeaderboardScores . score . desc ( ) ) . label ( " rank " ) ,
)
2023-11-20 08:51:31 +00:00
. join (
LeaderboardVersion ,
2023-11-20 13:34:47 +00:00
and_ (
LeaderboardVersion . leaderboard_id == LeaderboardScores . leaderboard_id ,
LeaderboardVersion . version_number
== LeaderboardScores . leaderboard_version_number ,
) ,
2023-11-20 08:51:31 +00:00
)
. filter (
LeaderboardVersion . published == True ,
LeaderboardVersion . version_number == latest_version ,
)
2023-06-06 12:11:49 +00:00
. filter ( LeaderboardScores . leaderboard_id == leaderboard_id )
. order_by ( text ( " rank asc, id asc " ) )
)
ranked_leaderboard = query . cte ( name = " ranked_leaderboard " )
positions = db_session . query ( ranked_leaderboard ) . filter (
ranked_leaderboard . c . rank == rank
)
if limit :
positions = positions . limit ( limit )
if offset :
positions = positions . offset ( offset )
return positions
2023-08-06 13:55:56 +00:00
def create_leaderboard (
db_session : Session ,
title : str ,
description : Optional [ str ] ,
2023-08-12 06:18:19 +00:00
token : Optional [ Union [ uuid . UUID , str ] ] = None ,
2023-08-12 06:14:33 +00:00
) - > Leaderboard :
2023-06-06 12:11:49 +00:00
"""
Create a leaderboard
"""
2023-08-12 06:14:33 +00:00
if not token :
token = uuid . UUID ( MOONSTREAM_ADMIN_ACCESS_TOKEN )
2023-08-06 13:55:56 +00:00
try :
leaderboard = Leaderboard ( title = title , description = description )
db_session . add ( leaderboard )
db_session . commit ( )
resource = create_leaderboard_resource (
leaderboard_id = str ( leaderboard . id ) ,
token = token ,
)
leaderboard . resource_id = resource . id
db_session . commit ( )
except Exception as e :
db_session . rollback ( )
logger . error ( f " Error creating leaderboard: { e } " )
raise LeaderboardCreateError ( f " Error creating leaderboard: { e } " )
return leaderboard
def delete_leaderboard (
db_session : Session , leaderboard_id : uuid . UUID , token : uuid . UUID
2023-08-12 06:14:33 +00:00
) - > Leaderboard :
2023-08-06 13:55:56 +00:00
"""
Delete a leaderboard
"""
try :
leaderboard = (
db_session . query ( Leaderboard ) . filter ( Leaderboard . id == leaderboard_id ) . one ( ) # type: ignore
)
if leaderboard . resource_id is not None :
try :
bc . delete_resource (
token = token ,
resource_id = leaderboard . resource_id ,
)
except Exception as e :
logger . error ( f " Error deleting leaderboard resource: { e } " )
2023-08-12 06:14:33 +00:00
else :
logger . error (
f " Leaderboard { leaderboard_id } has no resource id. Skipping. Better delete it manually. "
)
2023-08-06 13:55:56 +00:00
db_session . delete ( leaderboard )
db_session . commit ( )
except Exception as e :
db_session . rollback ( )
logger . error ( e )
raise LeaderboardDeleteError ( f " Error deleting leaderboard: { e } " )
return leaderboard
def update_leaderboard (
db_session : Session ,
leaderboard_id : uuid . UUID ,
title : Optional [ str ] ,
description : Optional [ str ] ,
2023-08-12 06:14:33 +00:00
) - > Leaderboard :
2023-08-06 13:55:56 +00:00
"""
Update a leaderboard
"""
leaderboard = (
db_session . query ( Leaderboard ) . filter ( Leaderboard . id == leaderboard_id ) . one ( ) # type: ignore
)
if title is not None :
leaderboard . title = title
if description is not None :
leaderboard . description = description
2023-06-06 12:11:49 +00:00
db_session . commit ( )
2023-08-06 13:55:56 +00:00
return leaderboard
2023-06-06 12:11:49 +00:00
2023-08-12 06:14:33 +00:00
def get_leaderboard_by_id ( db_session : Session , leaderboard_id ) - > Leaderboard :
2023-06-06 12:11:49 +00:00
"""
Get the leaderboard by id
"""
2023-08-06 13:55:56 +00:00
return db_session . query ( Leaderboard ) . filter ( Leaderboard . id == leaderboard_id ) . one ( ) # type: ignore
2023-06-06 12:11:49 +00:00
2023-08-12 06:14:33 +00:00
def get_leaderboard_by_title ( db_session : Session , title ) - > Leaderboard :
2023-06-06 12:11:49 +00:00
"""
Get the leaderboard by title
"""
2023-08-06 13:55:56 +00:00
return db_session . query ( Leaderboard ) . filter ( Leaderboard . title == title ) . one ( ) # type: ignore
2023-06-06 12:11:49 +00:00
2023-08-12 06:14:33 +00:00
def list_leaderboards (
db_session : Session , limit : int , offset : int
) - > List [ Row [ Tuple [ uuid . UUID , str , str ] ] ] :
2023-06-06 12:11:49 +00:00
"""
List all leaderboards
"""
query = db_session . query ( Leaderboard . id , Leaderboard . title , Leaderboard . description )
if limit :
query = query . limit ( limit )
if offset :
query = query . offset ( offset )
return query . all ( )
def add_scores (
db_session : Session ,
leaderboard_id : uuid . UUID ,
scores : List [ Score ] ,
2023-11-20 08:51:31 +00:00
version_number : int ,
2023-06-06 12:11:49 +00:00
normalize_addresses : bool = True ,
) :
"""
Add scores to the leaderboard
"""
leaderboard_scores = [ ]
normalizer_fn = Web3 . toChecksumAddress
if not normalize_addresses :
normalizer_fn = lambda x : x # type: ignore
addresses = [ score . address for score in scores ]
if len ( addresses ) != len ( set ( addresses ) ) :
duplicates = [ key for key , value in Counter ( addresses ) . items ( ) if value > 1 ]
raise DuplicateLeaderboardAddressError ( " Dublicated addresses " , duplicates )
for score in scores :
leaderboard_scores . append (
{
" leaderboard_id " : leaderboard_id ,
" address " : normalizer_fn ( score . address ) ,
" score " : score . score ,
" points_data " : score . points_data ,
2023-11-20 08:51:31 +00:00
" leaderboard_version_number " : version_number ,
2023-06-06 12:11:49 +00:00
}
)
insert_statement = insert ( LeaderboardScores ) . values ( leaderboard_scores )
result_stmt = insert_statement . on_conflict_do_update (
2023-11-20 13:34:47 +00:00
index_elements = [
LeaderboardScores . address ,
LeaderboardScores . leaderboard_id ,
LeaderboardScores . leaderboard_version_number ,
] ,
2023-06-06 12:11:49 +00:00
set_ = dict (
score = insert_statement . excluded . score ,
points_data = insert_statement . excluded . points_data ,
updated_at = datetime . now ( ) ,
) ,
)
try :
db_session . execute ( result_stmt )
db_session . commit ( )
except :
db_session . rollback ( )
return leaderboard_scores
2023-11-10 17:20:26 +00:00
# leaderboard access actions
2023-06-06 12:11:49 +00:00
def create_leaderboard_resource (
2023-08-12 06:14:33 +00:00
leaderboard_id : str , token : Union [ Optional [ uuid . UUID ] , str ] = None
2023-06-06 12:11:49 +00:00
) - > BugoutResource :
resource_data : Dict [ str , Any ] = {
" type " : LEADERBOARD_RESOURCE_TYPE ,
" leaderboard_id " : leaderboard_id ,
}
if token is None :
token = MOONSTREAM_ADMIN_ACCESS_TOKEN
2023-08-06 13:55:56 +00:00
try :
resource = bc . create_resource (
token = token ,
application_id = MOONSTREAM_APPLICATION_ID ,
resource_data = resource_data ,
timeout = 10 ,
)
except Exception as e :
raise LeaderboardCreateError ( f " Error creating leaderboard resource: { e } " )
2023-06-06 12:11:49 +00:00
return resource
def assign_resource (
db_session : Session ,
leaderboard_id : uuid . UUID ,
2023-08-06 13:55:56 +00:00
user_token : Union [ uuid . UUID , str ] ,
2023-06-06 12:11:49 +00:00
resource_id : Optional [ uuid . UUID ] = None ,
) :
"""
Assign a resource handler to a leaderboard
"""
leaderboard = (
2023-08-06 13:55:56 +00:00
db_session . query ( Leaderboard ) . filter ( Leaderboard . id == leaderboard_id ) . one ( ) # type: ignore
2023-06-06 12:11:49 +00:00
)
if resource_id is not None :
leaderboard . resource_id = resource_id
else :
# Create resource via admin token
resource = create_leaderboard_resource (
2023-08-06 13:55:56 +00:00
leaderboard_id = str ( leaderboard_id ) ,
token = user_token ,
2023-06-06 12:11:49 +00:00
)
leaderboard . resource_id = resource . id
db_session . commit ( )
db_session . flush ( )
return leaderboard . resource_id
def list_leaderboards_resources (
db_session : Session ,
) :
"""
List all leaderboards resources
"""
query = db_session . query ( Leaderboard . id , Leaderboard . title , Leaderboard . resource_id )
return query . all ( )
2023-08-31 15:04:23 +00:00
def get_leaderboard_config_entry (
leaderboard_id : uuid . UUID ,
) - > BugoutSearchResult :
2023-08-31 16:32:20 +00:00
query = f " #leaderboard_id: { leaderboard_id } "
2023-08-31 15:04:23 +00:00
configs = bc . search (
token = MOONSTREAM_ADMIN_ACCESS_TOKEN ,
2023-09-01 04:06:11 +00:00
journal_id = MOONSTREAM_LEADERBOARD_CONFIGURATION_JOURNAL_ID ,
2023-08-31 16:32:20 +00:00
query = query ,
2023-08-31 15:04:23 +00:00
limit = 1 ,
)
results = cast ( List [ BugoutSearchResult ] , configs . results )
if len ( configs . results ) == 0 or results [ 0 ] . content is None :
raise LeaderboardConfigNotFound (
f " Leaderboard config not found for { leaderboard_id } "
)
return results [ 0 ]
def get_leaderboard_config (
leaderboard_id : uuid . UUID ,
) - > Dict [ str , Any ] :
"""
Return leaderboard config from leaderboard generator journal
"""
entry = get_leaderboard_config_entry ( leaderboard_id )
2023-08-31 16:32:20 +00:00
content = json . loads ( entry . content ) # type: ignore
if " status:active " not in entry . tags :
content [ " leaderboard_auto_update_active " ] = False
else :
content [ " leaderboard_auto_update_active " ] = True
return content
2023-08-31 15:04:23 +00:00
def update_leaderboard_config (
leaderboard_id : uuid . UUID , config : LeaderboardConfigUpdate
) - > Dict [ str , Any ] :
"""
Update leaderboard config in leaderboard generator journal
"""
entry_config = get_leaderboard_config_entry ( leaderboard_id )
current_config = LeaderboardConfig ( * * json . loads ( entry_config . content ) ) # type: ignore
new_params = config . params
for key , value in new_params . items ( ) :
if key not in current_config . params :
continue
current_config . params [ key ] = value
# we replace values of parameters that are not None
entry = bc . update_entry_content (
token = MOONSTREAM_ADMIN_ACCESS_TOKEN ,
2023-09-01 04:06:11 +00:00
journal_id = MOONSTREAM_LEADERBOARD_CONFIGURATION_JOURNAL_ID ,
2023-08-31 15:04:23 +00:00
title = entry_config . title ,
entry_id = entry_config . entry_url . split ( " / " ) [ - 1 ] ,
content = json . dumps ( current_config . dict ( ) ) ,
)
2023-08-31 16:32:20 +00:00
new_config = json . loads ( entry . content )
if " status:active " not in entry . tags :
new_config [ " leaderboard_auto_update_active " ] = False
else :
new_config [ " leaderboard_auto_update_active " ] = True
return new_config
2023-08-31 15:04:23 +00:00
def activate_leaderboard_config (
leaderboard_id : uuid . UUID ,
) :
"""
Add tag status : active to leaderboard config journal entry
"""
entry_config = get_leaderboard_config_entry ( leaderboard_id )
if " status:active " in entry_config . tags :
raise LeaderboardConfigAlreadyActive (
f " Leaderboard config { leaderboard_id } already active "
)
bc . create_tags (
token = MOONSTREAM_ADMIN_ACCESS_TOKEN ,
2023-09-01 04:06:11 +00:00
journal_id = MOONSTREAM_LEADERBOARD_CONFIGURATION_JOURNAL_ID ,
2023-08-31 15:04:23 +00:00
entry_id = entry_config . entry_url . split ( " / " ) [ - 1 ] ,
tags = [ " status:active " ] ,
)
def deactivate_leaderboard_config (
leaderboard_id : uuid . UUID ,
) :
"""
Remove tag status : active from leaderboard config journal entry
"""
entry_config = get_leaderboard_config_entry ( leaderboard_id )
if " status:active " not in entry_config . tags :
raise LeaderboardConfigAlreadyInactive (
f " Leaderboard config { leaderboard_id } not active "
)
2023-08-31 16:32:20 +00:00
bc . delete_tag (
2023-08-31 15:04:23 +00:00
token = MOONSTREAM_ADMIN_ACCESS_TOKEN ,
2023-09-01 04:06:11 +00:00
journal_id = MOONSTREAM_LEADERBOARD_CONFIGURATION_JOURNAL_ID ,
2023-08-31 16:32:20 +00:00
entry_id = entry_config . entry_url . split ( " / " ) [ - 1 ] ,
tag = " status:active " ,
2023-08-31 15:04:23 +00:00
)
2023-08-12 06:14:33 +00:00
def revoke_resource (
db_session : Session , leaderboard_id : uuid . UUID
) - > Optional [ uuid . UUID ] :
2023-06-06 12:11:49 +00:00
"""
Revoke a resource handler to a leaderboard
"""
# TODO(ANDREY): Delete resource via admin token
leaderboard = (
2023-08-06 13:55:56 +00:00
db_session . query ( Leaderboard ) . filter ( Leaderboard . id == leaderboard_id ) . one ( ) # type: ignore
2023-06-06 12:11:49 +00:00
)
if leaderboard . resource_id is None :
raise Exception ( " Leaderboard does not have a resource " )
leaderboard . resource_id = None
db_session . commit ( )
db_session . flush ( )
return leaderboard . resource_id
def check_leaderboard_resource_permissions (
db_session : Session , leaderboard_id : uuid . UUID , token : uuid . UUID
2023-08-12 06:14:33 +00:00
) - > bool :
2023-06-06 12:11:49 +00:00
"""
Check if the user has permissions to access the leaderboard
"""
leaderboard = (
2023-08-06 13:55:56 +00:00
db_session . query ( Leaderboard ) . filter ( Leaderboard . id == leaderboard_id ) . one ( ) # type: ignore
2023-06-06 12:11:49 +00:00
)
permission_url = f " { bc . brood_url } /resources/ { leaderboard . resource_id } /holders "
headers = {
" Authorization " : f " Bearer { token } " ,
}
2023-08-31 16:32:20 +00:00
2023-06-06 12:11:49 +00:00
# If user don't have at least read permission return 404
result = requests . get ( url = permission_url , headers = headers , timeout = 10 )
if result . status_code == 200 :
return True
return False
2023-11-20 08:51:31 +00:00
def get_leaderboard_version (
db_session : Session , leaderboard_id : uuid . UUID , version_number : int
) - > LeaderboardVersion :
"""
Get the leaderboard version by id
"""
return (
db_session . query ( LeaderboardVersion )
. filter ( LeaderboardVersion . leaderboard_id == leaderboard_id )
. filter ( LeaderboardVersion . version_number == version_number )
. one ( )
)
def create_leaderboard_version (
db_session : Session ,
leaderboard_id : uuid . UUID ,
version_number : Optional [ int ] = None ,
publish : bool = False ,
) - > LeaderboardVersion :
"""
Create a leaderboard version
"""
if version_number is None :
latest_version_result = (
db_session . query ( func . max ( LeaderboardVersion . version_number ) )
. filter ( LeaderboardVersion . leaderboard_id == leaderboard_id )
. one ( )
)
latest_version = latest_version_result [ 0 ]
if latest_version is None :
version_number = 0
else :
version_number = latest_version + 1
leaderboard_version = LeaderboardVersion (
leaderboard_id = leaderboard_id ,
version_number = version_number ,
published = publish ,
)
db_session . add ( leaderboard_version )
db_session . commit ( )
return leaderboard_version
def change_publish_leaderboard_version_status (
db_session : Session , leaderboard_id : uuid . UUID , version_number : int , published : bool
) - > LeaderboardVersion :
"""
Publish a leaderboard version
"""
leaderboard_version = (
db_session . query ( LeaderboardVersion )
. filter ( LeaderboardVersion . leaderboard_id == leaderboard_id )
. filter ( LeaderboardVersion . version_number == version_number )
. one ( )
)
leaderboard_version . published = published
db_session . commit ( )
return leaderboard_version
def get_leaderboard_versions (
db_session : Session , leaderboard_id : uuid . UUID
) - > List [ LeaderboardVersion ] :
"""
Get all leaderboard versions
"""
return (
db_session . query ( LeaderboardVersion )
. filter ( LeaderboardVersion . leaderboard_id == leaderboard_id )
. all ( )
)
def delete_leaderboard_version (
db_session : Session , leaderboard_id : uuid . UUID , version_number : int
) - > LeaderboardVersion :
"""
Delete a leaderboard version
"""
leaderboard_version = (
db_session . query ( LeaderboardVersion )
. filter ( LeaderboardVersion . leaderboard_id == leaderboard_id )
. filter ( LeaderboardVersion . version_number == version_number )
. one ( )
)
db_session . delete ( leaderboard_version )
db_session . commit ( )
return leaderboard_version
def get_leaderboard_version_scores (
db_session : Session ,
leaderboard_id : uuid . UUID ,
version_number : int ,
limit : int ,
offset : int ,
) - > List [ LeaderboardScores ] :
"""
Get the leaderboard scores by version number
"""
query = (
db_session . query (
LeaderboardScores . id ,
LeaderboardScores . address . label ( " address " ) ,
LeaderboardScores . score . label ( " score " ) ,
LeaderboardScores . points_data . label ( " points_data " ) ,
func . rank ( ) . over ( order_by = LeaderboardScores . score . desc ( ) ) . label ( " rank " ) ,
)
. filter ( LeaderboardScores . leaderboard_id == leaderboard_id )
. filter ( LeaderboardScores . leaderboard_version_number == version_number )
)
if limit :
query = query . limit ( limit )
if offset :
query = query . offset ( offset )
return query
2023-11-20 09:19:59 +00:00
def delete_previous_versions (
db_session : Session ,
leaderboard_id : uuid . UUID ,
threshold_version_number : int ,
) - > int :
"""
Delete old leaderboard versions
"""
versions_to_delete = (
db_session . query ( LeaderboardVersion )
. filter ( LeaderboardVersion . leaderboard_id == leaderboard_id )
. filter ( LeaderboardVersion . version_number < threshold_version_number )
)
num_deleted = versions_to_delete . delete ( synchronize_session = False )
db_session . commit ( )
return num_deleted