kopia lustrzana https://github.com/bugout-dev/moonstream
Working version of nb with call counter limitation
rodzic
40c9c21777
commit
e199d5bb1a
|
@ -241,7 +241,7 @@ func cli() {
|
|||
stateCLI.addAccessCmd.Parse(os.Args[2:])
|
||||
stateCLI.checkRequirements()
|
||||
|
||||
proposedUserAccess := ClientResourceData{
|
||||
proposedClientResourceData := ClientResourceData{
|
||||
UserID: stateCLI.userIDFlag,
|
||||
AccessID: stateCLI.accessIDFlag,
|
||||
Name: stateCLI.accessNameFlag,
|
||||
|
@ -257,7 +257,7 @@ func cli() {
|
|||
_, err := bugoutClient.Brood.FindUser(
|
||||
NB_CONTROLLER_TOKEN,
|
||||
map[string]string{
|
||||
"user_id": proposedUserAccess.UserID,
|
||||
"user_id": proposedClientResourceData.UserID,
|
||||
"application_id": NB_APPLICATION_ID,
|
||||
},
|
||||
)
|
||||
|
@ -265,7 +265,7 @@ func cli() {
|
|||
fmt.Printf("User does not exists, err: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
resource, err := bugoutClient.Brood.CreateResource(NB_CONTROLLER_TOKEN, NB_APPLICATION_ID, proposedUserAccess)
|
||||
resource, err := bugoutClient.Brood.CreateResource(NB_CONTROLLER_TOKEN, NB_APPLICATION_ID, proposedClientResourceData)
|
||||
if err != nil {
|
||||
fmt.Printf("Unable to create user access, err: %v\n", err)
|
||||
os.Exit(1)
|
||||
|
@ -275,7 +275,7 @@ func cli() {
|
|||
fmt.Printf("Unable to encode resource %s data interface to json, err: %v\n", resource.Id, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
var newUserAccess ClientResourceData
|
||||
var newUserAccess ClientAccess
|
||||
err = json.Unmarshal(resourceData, &newUserAccess)
|
||||
if err != nil {
|
||||
fmt.Printf("Unable to decode resource %s data json to structure, err: %v\n", resource.Id, err)
|
||||
|
@ -321,44 +321,45 @@ func cli() {
|
|||
}
|
||||
|
||||
resource := resources.Resources[0]
|
||||
resource_data, err := json.Marshal(resource.ResourceData)
|
||||
resourceData, err := json.Marshal(resource.ResourceData)
|
||||
if err != nil {
|
||||
fmt.Printf("Unable to encode resource %s data interface to json, err: %v\n", resource.Id, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
var currentUserAccess ClientResourceData
|
||||
err = json.Unmarshal(resource_data, ¤tUserAccess)
|
||||
|
||||
var currentClientAccess ClientAccess
|
||||
currentClientAccess.ResourceID = resource.Id
|
||||
err = json.Unmarshal(resourceData, ¤tClientAccess.ClientResourceData)
|
||||
if err != nil {
|
||||
fmt.Printf("Unable to decode resource %s data json to structure, err: %v\n", resource.Id, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
currentUserAccess.ResourceID = resource.Id
|
||||
|
||||
// TODO(kompotkot): Since we are using bool flags I moved with ugly solution.
|
||||
// Let's find better one when have free time or will re-write flag Set.
|
||||
update := make(map[string]interface{})
|
||||
if stateCLI.accessNameFlag != currentUserAccess.Name && stateCLI.accessNameFlag != DEFAULT_ACCESS_NAME {
|
||||
if stateCLI.accessNameFlag != currentClientAccess.ClientResourceData.Name && stateCLI.accessNameFlag != DEFAULT_ACCESS_NAME {
|
||||
update["name"] = stateCLI.accessNameFlag
|
||||
}
|
||||
if stateCLI.accessDescriptionFlag != currentUserAccess.Description && stateCLI.accessDescriptionFlag != DEFAULT_ACCESS_DESCRIPTION {
|
||||
if stateCLI.accessDescriptionFlag != currentClientAccess.ClientResourceData.Description && stateCLI.accessDescriptionFlag != DEFAULT_ACCESS_DESCRIPTION {
|
||||
update["description"] = stateCLI.accessDescriptionFlag
|
||||
}
|
||||
if stateCLI.blockchainAccessFlag != currentUserAccess.BlockchainAccess && stateCLI.blockchainAccessFlag != DEFAULT_BLOCKCHAIN_ACCESS {
|
||||
if stateCLI.blockchainAccessFlag != currentClientAccess.ClientResourceData.BlockchainAccess && stateCLI.blockchainAccessFlag != DEFAULT_BLOCKCHAIN_ACCESS {
|
||||
update["blockchain_access"] = stateCLI.blockchainAccessFlag
|
||||
}
|
||||
if stateCLI.extendedMethodsFlag != currentUserAccess.ExtendedMethods && stateCLI.extendedMethodsFlag != DEFAULT_EXTENDED_METHODS {
|
||||
if stateCLI.extendedMethodsFlag != currentClientAccess.ClientResourceData.ExtendedMethods && stateCLI.extendedMethodsFlag != DEFAULT_EXTENDED_METHODS {
|
||||
update["extended_methods"] = stateCLI.extendedMethodsFlag
|
||||
}
|
||||
if stateCLI.PeriodDurationFlag != currentUserAccess.PeriodDuration && stateCLI.PeriodDurationFlag != DEFAULT_PERIOD_DURATION {
|
||||
if stateCLI.PeriodDurationFlag != currentClientAccess.ClientResourceData.PeriodDuration && stateCLI.PeriodDurationFlag != DEFAULT_PERIOD_DURATION {
|
||||
update["period_duration"] = stateCLI.PeriodDurationFlag
|
||||
}
|
||||
if stateCLI.MaxCallsPerPeriodFlag != currentUserAccess.MaxCallsPerPeriod && stateCLI.MaxCallsPerPeriodFlag != DEFAULT_MAX_CALLS_PER_PERIOD {
|
||||
if stateCLI.MaxCallsPerPeriodFlag != currentClientAccess.ClientResourceData.MaxCallsPerPeriod && stateCLI.MaxCallsPerPeriodFlag != DEFAULT_MAX_CALLS_PER_PERIOD {
|
||||
update["max_calls_per_period"] = stateCLI.MaxCallsPerPeriodFlag
|
||||
}
|
||||
if stateCLI.PeriodStartTsFlag != currentUserAccess.PeriodStartTs && stateCLI.PeriodStartTsFlag != 0 {
|
||||
if stateCLI.PeriodStartTsFlag != currentClientAccess.ClientResourceData.PeriodStartTs && stateCLI.PeriodStartTsFlag != 0 {
|
||||
update["period_start_ts"] = stateCLI.PeriodStartTsFlag
|
||||
}
|
||||
if stateCLI.CallsPerPeriodFlag != currentUserAccess.CallsPerPeriod && stateCLI.CallsPerPeriodFlag != 0 {
|
||||
if stateCLI.CallsPerPeriodFlag != currentClientAccess.ClientResourceData.CallsPerPeriod && stateCLI.CallsPerPeriodFlag != 0 {
|
||||
update["calls_per_period"] = stateCLI.CallsPerPeriodFlag
|
||||
}
|
||||
|
||||
|
@ -372,13 +373,13 @@ func cli() {
|
|||
fmt.Printf("Unable to update Bugout resource, err: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
|
||||
updatedResourceData, err := json.Marshal(updatedResource.ResourceData)
|
||||
if err != nil {
|
||||
fmt.Printf("Unable to encode resource %s data interface to json, err: %v\n", resource.Id, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
var updatedUserAccess ClientResourceData
|
||||
var updatedUserAccess ClientAccess
|
||||
err = json.Unmarshal(updatedResourceData, &updatedUserAccess)
|
||||
if err != nil {
|
||||
fmt.Printf("Unable to decode resource %s data json to structure, err: %v\n", resource.Id, err)
|
||||
|
@ -413,7 +414,7 @@ func cli() {
|
|||
os.Exit(1)
|
||||
}
|
||||
|
||||
var userAccesses []ClientResourceData
|
||||
var userAccesses []ClientAccess
|
||||
for _, resource := range resources.Resources {
|
||||
deletedResource, err := bugoutClient.Brood.DeleteResource(NB_CONTROLLER_TOKEN, resource.Id)
|
||||
if err != nil {
|
||||
|
@ -425,7 +426,7 @@ func cli() {
|
|||
fmt.Printf("Unable to encode resource %s data interface to json, err: %v\n", resource.Id, err)
|
||||
continue
|
||||
}
|
||||
var deletedUserAccess ClientResourceData
|
||||
var deletedUserAccess ClientAccess
|
||||
err = json.Unmarshal(deletedResourceData, &deletedUserAccess)
|
||||
if err != nil {
|
||||
fmt.Printf("Unable to decode resource %s data json to structure, err: %v\n", resource.Id, err)
|
||||
|
@ -471,7 +472,7 @@ func cli() {
|
|||
os.Exit(1)
|
||||
}
|
||||
|
||||
var userAccesses []ClientResourceData
|
||||
var clientAccesses []ClientAccess
|
||||
|
||||
offset := stateCLI.offsetFlag
|
||||
if stateCLI.offsetFlag > len(resources.Resources) {
|
||||
|
@ -483,20 +484,21 @@ func cli() {
|
|||
}
|
||||
|
||||
for _, resource := range resources.Resources[offset:limit] {
|
||||
resource_data, err := json.Marshal(resource.ResourceData)
|
||||
resourceData, err := json.Marshal(resource.ResourceData)
|
||||
if err != nil {
|
||||
fmt.Printf("Unable to encode resource %s data interface to json, err: %v\n", resource.Id, err)
|
||||
continue
|
||||
}
|
||||
var userAccess ClientResourceData
|
||||
err = json.Unmarshal(resource_data, &userAccess)
|
||||
var clientAccess ClientAccess
|
||||
clientAccess.ResourceID = resource.Id
|
||||
err = json.Unmarshal(resourceData, &clientAccess.ClientResourceData)
|
||||
if err != nil {
|
||||
fmt.Printf("Unable to decode resource %s data json to structure, err: %v\n", resource.Id, err)
|
||||
continue
|
||||
}
|
||||
userAccesses = append(userAccesses, userAccess)
|
||||
clientAccesses = append(clientAccesses, clientAccess)
|
||||
}
|
||||
userAccessesJson, err := json.Marshal(userAccesses)
|
||||
userAccessesJson, err := json.Marshal(clientAccesses)
|
||||
if err != nil {
|
||||
fmt.Printf("Unable to marshal user accesses struct, err: %v\n", err)
|
||||
os.Exit(1)
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -11,9 +12,19 @@ var (
|
|||
)
|
||||
|
||||
// Structure to define user access according with Brood resources
|
||||
type ClientAccess struct {
|
||||
ResourceID string
|
||||
|
||||
ClientResourceData ClientResourceData
|
||||
|
||||
LastAccessTs int64
|
||||
LastSessionAccessTs int64 // When last session with nodebalancer where started
|
||||
LastSessionCallsCounter int64
|
||||
|
||||
requestedDataSource string
|
||||
}
|
||||
|
||||
type ClientResourceData struct {
|
||||
ResourceID string `json:"resource_id"`
|
||||
|
||||
UserID string `json:"user_id"`
|
||||
AccessID string `json:"access_id"`
|
||||
Name string `json:"name"`
|
||||
|
@ -25,12 +36,50 @@ type ClientResourceData struct {
|
|||
PeriodStartTs int64 `json:"period_start_ts"`
|
||||
MaxCallsPerPeriod int64 `json:"max_calls_per_period"`
|
||||
CallsPerPeriod int64 `json:"calls_per_period"`
|
||||
}
|
||||
|
||||
LastAccessTs int64 `json:"last_access_ts"`
|
||||
LastSessionAccessTs int64 `json:"last_session_access_ts"` // When last session with nodebalancer where started
|
||||
LastSessionCallsCounter int64 `json:"last_session_calls_counter"`
|
||||
// CheckClientCallPeriodLimits returns true if limit of call requests per period is exceeded
|
||||
// If client passed this check, we will add this client to cache and let him operates until cache will be
|
||||
// cleaned with go-routine and resource will be updated
|
||||
func (ca *ClientAccess) CheckClientCallPeriodLimits(tsNow int64) bool {
|
||||
isClientAllowedToGetAccess := false
|
||||
if tsNow-ca.ClientResourceData.PeriodStartTs < ca.ClientResourceData.PeriodDuration {
|
||||
// Client operates in period
|
||||
if ca.ClientResourceData.CallsPerPeriod < ca.ClientResourceData.MaxCallsPerPeriod {
|
||||
// Client's limit of calls not reached
|
||||
isClientAllowedToGetAccess = true
|
||||
}
|
||||
} else {
|
||||
// Client period should be refreshed
|
||||
if stateCLI.enableDebugFlag {
|
||||
log.Printf("Refresh client's period_start_ts with time.now() and reset calls_per_period")
|
||||
}
|
||||
ca.ClientResourceData.CallsPerPeriod = 0
|
||||
ca.ClientResourceData.PeriodStartTs = tsNow
|
||||
isClientAllowedToGetAccess = true
|
||||
}
|
||||
return isClientAllowedToGetAccess
|
||||
}
|
||||
|
||||
dataSource string
|
||||
// UpdateClientResourceCallCounter updates Brood resource where increase calls counter to node
|
||||
// with current number of calls during last session.
|
||||
func (ca *ClientAccess) UpdateClientResourceCallCounter(tsNow int64) error {
|
||||
update := make(map[string]interface{})
|
||||
update["period_start_ts"] = ca.ClientResourceData.PeriodStartTs
|
||||
update["calls_per_period"] = ca.ClientResourceData.CallsPerPeriod + ca.LastSessionCallsCounter
|
||||
|
||||
updatedResource, err := bugoutClient.Brood.UpdateResource(
|
||||
NB_CONTROLLER_TOKEN,
|
||||
ca.ResourceID,
|
||||
update,
|
||||
[]string{},
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Printf("Resource %s updated\n", updatedResource.Id)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Node - which one node client worked with
|
||||
|
|
|
@ -30,9 +30,9 @@ var (
|
|||
NB_HEALTH_CHECK_INTERVAL = time.Millisecond * 5000
|
||||
NB_HEALTH_CHECK_CALL_TIMEOUT = time.Second * 2
|
||||
|
||||
NB_CACHE_CLEANING_INTERVAL = time.Second * 10
|
||||
NB_CACHE_ACCESS_ID_LIFETIME = int64(120)
|
||||
NB_CACHE_ACCESS_ID_SESSION_LIFETIME = int64(600)
|
||||
NB_CACHE_CLEANING_INTERVAL = time.Second * 10
|
||||
NB_CACHE_ACCESS_ID_LIFETIME = int64(120) // TODO(kompotkot): Set to 2 mins
|
||||
NB_CACHE_ACCESS_ID_SESSION_LIFETIME = int64(600) // TODO(kompotkot): Set to 10 mins
|
||||
|
||||
NB_MAX_COUNTER_NUMBER = uint64(10000000)
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@ var (
|
|||
)
|
||||
|
||||
type AccessCache struct {
|
||||
accessIds map[string]ClientResourceData
|
||||
accessIds map[string]ClientAccess
|
||||
|
||||
mux sync.RWMutex
|
||||
}
|
||||
|
@ -32,7 +32,7 @@ type AccessCache struct {
|
|||
// CreateAccessCache generates empty cache of client access
|
||||
func CreateAccessCache() {
|
||||
accessIdCache = AccessCache{
|
||||
accessIds: make(map[string]ClientResourceData),
|
||||
accessIds: make(map[string]ClientAccess),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -53,34 +53,43 @@ func (ac *AccessCache) FindAccessIdInCache(accessId string) string {
|
|||
}
|
||||
|
||||
// Update last call access timestamp and datasource for access id
|
||||
func (ac *AccessCache) UpdateAccessIdAtCache(accessId, dataSource string) {
|
||||
func (ac *AccessCache) UpdateAccessIdAtCache(accessId, requestedDataSource string, tsNow int64) {
|
||||
ac.mux.Lock()
|
||||
if accessData, ok := ac.accessIds[accessId]; ok {
|
||||
accessData.LastAccessTs = time.Now().Unix()
|
||||
accessData.dataSource = dataSource
|
||||
accessData.LastAccessTs = tsNow
|
||||
accessData.requestedDataSource = requestedDataSource
|
||||
accessData.LastSessionCallsCounter++
|
||||
|
||||
ac.accessIds[accessId] = accessData
|
||||
}
|
||||
ac.mux.Unlock()
|
||||
}
|
||||
|
||||
// Add new access id with data to cache
|
||||
func (ac *AccessCache) AddAccessIdToCache(clientResourceData ClientResourceData, dataSource string) {
|
||||
tsNow := time.Now().Unix()
|
||||
|
||||
// Add new access ID with data to cache
|
||||
func (ac *AccessCache) AddAccessIdToCache(clientAccess ClientAccess, tsNow int64) {
|
||||
ac.mux.Lock()
|
||||
ac.accessIds[clientResourceData.AccessID] = ClientResourceData{
|
||||
UserID: clientResourceData.UserID,
|
||||
AccessID: clientResourceData.AccessID,
|
||||
Name: clientResourceData.Name,
|
||||
Description: clientResourceData.Description,
|
||||
BlockchainAccess: clientResourceData.BlockchainAccess,
|
||||
ExtendedMethods: clientResourceData.ExtendedMethods,
|
||||
ac.accessIds[clientAccess.ClientResourceData.AccessID] = ClientAccess{
|
||||
ResourceID: clientAccess.ResourceID,
|
||||
|
||||
LastAccessTs: tsNow,
|
||||
LastSessionAccessTs: tsNow,
|
||||
ClientResourceData: ClientResourceData{
|
||||
UserID: clientAccess.ClientResourceData.UserID,
|
||||
AccessID: clientAccess.ClientResourceData.AccessID,
|
||||
Name: clientAccess.ClientResourceData.Name,
|
||||
Description: clientAccess.ClientResourceData.Description,
|
||||
BlockchainAccess: clientAccess.ClientResourceData.BlockchainAccess,
|
||||
ExtendedMethods: clientAccess.ClientResourceData.ExtendedMethods,
|
||||
|
||||
dataSource: dataSource,
|
||||
PeriodDuration: clientAccess.ClientResourceData.PeriodDuration,
|
||||
PeriodStartTs: clientAccess.ClientResourceData.PeriodStartTs,
|
||||
MaxCallsPerPeriod: clientAccess.ClientResourceData.MaxCallsPerPeriod,
|
||||
CallsPerPeriod: clientAccess.ClientResourceData.CallsPerPeriod,
|
||||
},
|
||||
|
||||
LastAccessTs: tsNow,
|
||||
LastSessionAccessTs: tsNow,
|
||||
LastSessionCallsCounter: 1,
|
||||
|
||||
requestedDataSource: clientAccess.requestedDataSource,
|
||||
}
|
||||
ac.mux.Unlock()
|
||||
}
|
||||
|
@ -89,19 +98,31 @@ func (ac *AccessCache) AddAccessIdToCache(clientResourceData ClientResourceData,
|
|||
func (ac *AccessCache) Cleanup() (int64, int64) {
|
||||
var removedAccessIds, totalAccessIds int64
|
||||
tsNow := time.Now().Unix()
|
||||
|
||||
ac.mux.Lock()
|
||||
for aId, aData := range ac.accessIds {
|
||||
if tsNow-aData.LastAccessTs > NB_CACHE_ACCESS_ID_LIFETIME {
|
||||
for aId, clientAccess := range ac.accessIds {
|
||||
if tsNow-clientAccess.LastAccessTs > NB_CACHE_ACCESS_ID_LIFETIME {
|
||||
// Remove clients who is not active for NB_CACHE_ACCESS_ID_LIFETIME lifetime period
|
||||
delete(ac.accessIds, aId)
|
||||
removedAccessIds++
|
||||
} else if tsNow-aData.LastSessionAccessTs > NB_CACHE_ACCESS_ID_SESSION_LIFETIME {
|
||||
err := clientAccess.UpdateClientResourceCallCounter(tsNow)
|
||||
if err != nil {
|
||||
log.Printf("Unable to update Brood resource, err: %v\n", err)
|
||||
}
|
||||
} else if tsNow-clientAccess.LastSessionAccessTs > NB_CACHE_ACCESS_ID_SESSION_LIFETIME {
|
||||
// Remove clients with too long sessions, greater then NB_CACHE_ACCESS_ID_SESSION_LIFETIME
|
||||
delete(ac.accessIds, aId)
|
||||
removedAccessIds++
|
||||
err := clientAccess.UpdateClientResourceCallCounter(tsNow)
|
||||
if err != nil {
|
||||
log.Printf("Unable to update Brood resource, err: %v\n", err)
|
||||
}
|
||||
} else {
|
||||
totalAccessIds++
|
||||
}
|
||||
}
|
||||
ac.mux.Unlock()
|
||||
|
||||
return removedAccessIds, totalAccessIds
|
||||
}
|
||||
|
||||
|
@ -112,9 +133,9 @@ func initCacheCleaning(debug bool) {
|
|||
case <-t.C:
|
||||
removedAccessIds, totalAccessIds := accessIdCache.Cleanup()
|
||||
if debug {
|
||||
log.Printf("Removed %d elements from access id cache", removedAccessIds)
|
||||
log.Printf("Removed %d clients from access cache", removedAccessIds)
|
||||
}
|
||||
log.Printf("Elements in access id cache: %d", totalAccessIds)
|
||||
log.Printf("Clients in access cache: %d", totalAccessIds)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -139,22 +160,22 @@ func extractAccessID(r *http.Request) string {
|
|||
}
|
||||
|
||||
// Extract data_source from header and query. Query takes precedence over header.
|
||||
func extractDataSource(r *http.Request) string {
|
||||
dataSource := "database"
|
||||
func extractRequestedDataSource(r *http.Request) string {
|
||||
requestedDataSource := "database"
|
||||
|
||||
dataSources := r.Header[strings.Title(NB_DATA_SOURCE_HEADER)]
|
||||
for _, h := range dataSources {
|
||||
dataSource = h
|
||||
requestedDataSources := r.Header[strings.Title(NB_DATA_SOURCE_HEADER)]
|
||||
for _, h := range requestedDataSources {
|
||||
requestedDataSource = h
|
||||
}
|
||||
|
||||
queries := r.URL.Query()
|
||||
for k, v := range queries {
|
||||
if k == "data_source" {
|
||||
dataSource = v[0]
|
||||
requestedDataSource = v[0]
|
||||
}
|
||||
}
|
||||
|
||||
return dataSource
|
||||
return requestedDataSource
|
||||
}
|
||||
|
||||
// Handle panic errors to prevent server shutdown
|
||||
|
@ -252,7 +273,7 @@ func logMiddleware(next http.Handler) http.Handler {
|
|||
}
|
||||
accessID := extractAccessID(r)
|
||||
if accessID != "" {
|
||||
dataSource := extractDataSource(r)
|
||||
dataSource := extractRequestedDataSource(r)
|
||||
logStr += fmt.Sprintf(" %s %s", dataSource, accessID)
|
||||
}
|
||||
}
|
||||
|
@ -263,30 +284,38 @@ func logMiddleware(next http.Handler) http.Handler {
|
|||
// Check access id was provided correctly and save user access configuration to request context
|
||||
func accessMiddleware(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
var currentClientAccess ClientResourceData
|
||||
var currentClientAccess ClientAccess
|
||||
|
||||
accessID := extractAccessID(r)
|
||||
dataSource := extractDataSource(r)
|
||||
requestedDataSource := extractRequestedDataSource(r)
|
||||
|
||||
if accessID == "" {
|
||||
http.Error(w, "No access id passed with request", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
|
||||
tsNow := time.Now().Unix()
|
||||
|
||||
// If access id does not belong to internal crawlers, then check cache or find it in Bugout resources
|
||||
if accessID == NB_CONTROLLER_ACCESS_ID {
|
||||
if stateCLI.enableDebugFlag {
|
||||
log.Printf("Access id belongs to internal crawlers")
|
||||
}
|
||||
currentClientAccess = internalCrawlersAccess
|
||||
currentClientAccess.dataSource = dataSource
|
||||
} else if accessIdCache.FindAccessIdInCache(accessID) != "" {
|
||||
if stateCLI.enableDebugFlag {
|
||||
log.Printf("Access id found in cache")
|
||||
log.Printf("Access ID belongs to internal usage for user with ID %s", currentClientAccess.ClientResourceData.UserID)
|
||||
}
|
||||
currentClientAccess.requestedDataSource = requestedDataSource
|
||||
} else if accessIdCache.FindAccessIdInCache(accessID) != "" {
|
||||
currentClientAccess = accessIdCache.accessIds[accessID]
|
||||
currentClientAccess.dataSource = dataSource
|
||||
accessIdCache.UpdateAccessIdAtCache(accessID, dataSource)
|
||||
if stateCLI.enableDebugFlag {
|
||||
log.Printf("Access ID found in cache for user with ID %s", currentClientAccess.ClientResourceData.UserID)
|
||||
}
|
||||
// Check if limit of calls not exceeded
|
||||
isClientAllowedToGetAccess := currentClientAccess.CheckClientCallPeriodLimits(tsNow)
|
||||
if !isClientAllowedToGetAccess {
|
||||
http.Error(w, "User exceeded limit of calls per period", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
currentClientAccess.requestedDataSource = requestedDataSource
|
||||
accessIdCache.UpdateAccessIdAtCache(accessID, requestedDataSource, tsNow)
|
||||
} else {
|
||||
if stateCLI.enableDebugFlag {
|
||||
log.Printf("New access id, looking at Brood resources")
|
||||
|
@ -300,33 +329,35 @@ func accessMiddleware(next http.Handler) http.Handler {
|
|||
http.Error(w, "Unable to get user with provided access identifier", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
if len(resources.Resources) == 0 {
|
||||
resourcesLen := len(resources.Resources)
|
||||
if resourcesLen == 0 {
|
||||
http.Error(w, "User with provided access identifier not found", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
resource_data, err := json.Marshal(resources.Resources[0].ResourceData)
|
||||
if resourcesLen > 1 {
|
||||
http.Error(w, "User with provided access identifier has several access IDs", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
resourceData, err := json.Marshal(resources.Resources[0].ResourceData)
|
||||
if err != nil {
|
||||
http.Error(w, "Unable to encode resource data interface to json", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
var clientResourceData ClientResourceData
|
||||
err = json.Unmarshal(resource_data, &clientResourceData)
|
||||
currentClientAccess.ResourceID = resources.Resources[0].Id
|
||||
currentClientAccess.requestedDataSource = requestedDataSource
|
||||
err = json.Unmarshal(resourceData, ¤tClientAccess.ClientResourceData)
|
||||
if err != nil {
|
||||
http.Error(w, "Unable to decode resource data json to structure", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
currentClientAccess = ClientResourceData{
|
||||
UserID: clientResourceData.UserID,
|
||||
AccessID: clientResourceData.AccessID,
|
||||
Name: clientResourceData.Name,
|
||||
Description: clientResourceData.Description,
|
||||
BlockchainAccess: clientResourceData.BlockchainAccess,
|
||||
ExtendedMethods: clientResourceData.ExtendedMethods,
|
||||
|
||||
dataSource: dataSource,
|
||||
// Check if limit of calls not exceeded
|
||||
isClientAllowedToGetAccess := currentClientAccess.CheckClientCallPeriodLimits(tsNow)
|
||||
if !isClientAllowedToGetAccess {
|
||||
http.Error(w, "User exceeded limit of calls per period", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
|
||||
accessIdCache.AddAccessIdToCache(clientResourceData, dataSource)
|
||||
accessIdCache.AddAccessIdToCache(currentClientAccess, tsNow)
|
||||
}
|
||||
|
||||
ctxUser := context.WithValue(r.Context(), "currentClientAccess", currentClientAccess)
|
||||
|
|
|
@ -27,7 +27,7 @@ func pingRoute(w http.ResponseWriter, r *http.Request) {
|
|||
// lbHandler load balances the incoming requests to nodes
|
||||
func lbHandler(w http.ResponseWriter, r *http.Request) {
|
||||
currentClientAccessRaw := r.Context().Value("currentClientAccess")
|
||||
currentClientAccess, ok := currentClientAccessRaw.(ClientResourceData)
|
||||
currentClientAccess, ok := currentClientAccessRaw.(ClientAccess)
|
||||
if !ok {
|
||||
http.Error(w, "Internal server error", http.StatusInternalServerError)
|
||||
return
|
||||
|
@ -55,14 +55,14 @@ func lbHandler(w http.ResponseWriter, r *http.Request) {
|
|||
// Chose one node
|
||||
var node *Node
|
||||
cpool := GetClientPool(blockchain)
|
||||
node = cpool.GetClientNode(currentClientAccess.AccessID)
|
||||
node = cpool.GetClientNode(currentClientAccess.ClientResourceData.AccessID)
|
||||
if node == nil {
|
||||
node = blockchainPool.GetNextNode(blockchain)
|
||||
if node == nil {
|
||||
http.Error(w, "There are no nodes available", http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
cpool.AddClientNode(currentClientAccess.AccessID, node)
|
||||
cpool.AddClientNode(currentClientAccess.ClientResourceData.AccessID, node)
|
||||
}
|
||||
|
||||
// Save origin path, to use in proxyErrorHandler if node will not response
|
||||
|
@ -78,7 +78,7 @@ func lbHandler(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string, node *Node, currentClientAccess ClientResourceData) {
|
||||
func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string, node *Node, currentClientAccess ClientAccess) {
|
||||
body, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
http.Error(w, "Unable to read body", http.StatusBadRequest)
|
||||
|
@ -94,12 +94,12 @@ func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string,
|
|||
}
|
||||
|
||||
switch {
|
||||
case currentClientAccess.dataSource == "blockchain":
|
||||
if !currentClientAccess.BlockchainAccess {
|
||||
case currentClientAccess.requestedDataSource == "blockchain":
|
||||
if !currentClientAccess.ClientResourceData.BlockchainAccess {
|
||||
http.Error(w, "Access to blockchain node not allowed with provided access id", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
if !currentClientAccess.ExtendedMethods {
|
||||
if !currentClientAccess.ClientResourceData.ExtendedMethods {
|
||||
for _, jsonrpcRequest := range jsonrpcRequests {
|
||||
_, exists := ALLOWED_METHODS[jsonrpcRequest.Method]
|
||||
if !exists {
|
||||
|
@ -115,11 +115,11 @@ func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string,
|
|||
r.URL.Path = "/"
|
||||
node.GethReverseProxy.ServeHTTP(w, r)
|
||||
return
|
||||
case currentClientAccess.dataSource == "database":
|
||||
case currentClientAccess.requestedDataSource == "database":
|
||||
http.Error(w, "Database access under development", http.StatusInternalServerError)
|
||||
return
|
||||
default:
|
||||
http.Error(w, fmt.Sprintf("Unacceptable data source %s", currentClientAccess.dataSource), http.StatusBadRequest)
|
||||
http.Error(w, fmt.Sprintf("Unacceptable data source %s", currentClientAccess.requestedDataSource), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
internalCrawlersAccess ClientResourceData
|
||||
internalCrawlersAccess ClientAccess
|
||||
|
||||
// Crash reporter
|
||||
reporter *humbug.HumbugReporter
|
||||
|
@ -137,23 +137,25 @@ func Server() {
|
|||
fmt.Printf("Unable to encode resource data interface to json, err: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
var clientAccess ClientResourceData
|
||||
err = json.Unmarshal(resource_data, &clientAccess)
|
||||
var clientResourceData ClientResourceData
|
||||
err = json.Unmarshal(resource_data, &clientResourceData)
|
||||
if err != nil {
|
||||
fmt.Printf("Unable to decode resource data json to structure, err: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
internalCrawlersAccess = ClientResourceData{
|
||||
UserID: clientAccess.UserID,
|
||||
AccessID: clientAccess.AccessID,
|
||||
Name: clientAccess.Name,
|
||||
Description: clientAccess.Description,
|
||||
BlockchainAccess: clientAccess.BlockchainAccess,
|
||||
ExtendedMethods: clientAccess.ExtendedMethods,
|
||||
internalCrawlersAccess = ClientAccess{
|
||||
ClientResourceData: ClientResourceData{
|
||||
UserID: clientResourceData.UserID,
|
||||
AccessID: clientResourceData.AccessID,
|
||||
Name: clientResourceData.Name,
|
||||
Description: clientResourceData.Description,
|
||||
BlockchainAccess: clientResourceData.BlockchainAccess,
|
||||
ExtendedMethods: clientResourceData.ExtendedMethods,
|
||||
},
|
||||
}
|
||||
log.Printf(
|
||||
"Internal crawlers access set, resource id: %s, blockchain access: %t, extended methods: %t",
|
||||
resources.Resources[0].Id, clientAccess.BlockchainAccess, clientAccess.ExtendedMethods,
|
||||
resources.Resources[0].Id, clientResourceData.BlockchainAccess, clientResourceData.ExtendedMethods,
|
||||
)
|
||||
|
||||
// Fill NodeConfigList with initial nodes from environment variables
|
||||
|
|
Ładowanie…
Reference in New Issue