kopia lustrzana https://github.com/bugout-dev/moonstream
Work with multiple configuration files
rodzic
d438801b98
commit
d5fff67abd
|
@ -11,22 +11,20 @@ probes engine clean-call-requests --db-uri "${ENGINE_DB_URI}"
|
|||
Run service with configuration:
|
||||
|
||||
```bash
|
||||
probes service --config "~/.probes/config.json"
|
||||
probes service \
|
||||
--config /home/ubuntu/.probes/engine-clean-call-requests.js
|
||||
```
|
||||
|
||||
Config example:
|
||||
|
||||
```json
|
||||
[
|
||||
{
|
||||
"name": "engine",
|
||||
{
|
||||
"application": "engine",
|
||||
"db_uri": "ENGINE_DB_URI",
|
||||
"workers": [
|
||||
{
|
||||
"db_timeout": "15s",
|
||||
"probe": {
|
||||
"name": "clean-call-requests",
|
||||
"interval": 10
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
```
|
||||
|
|
|
@ -136,18 +136,18 @@ func CreateEngineCommand() *cobra.Command {
|
|||
}
|
||||
|
||||
func CreateServiceCommand() *cobra.Command {
|
||||
var configPath string
|
||||
var configPaths []string
|
||||
|
||||
serviceCmd := &cobra.Command{
|
||||
Use: "service",
|
||||
Short: "Run workers as background asynchronous services",
|
||||
Long: `Each active worker specified in configuration will run in go-routine.`,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
return RunService(configPath)
|
||||
return RunService(configPaths)
|
||||
},
|
||||
}
|
||||
|
||||
serviceCmd.PersistentFlags().StringVarP(&configPath, "config", "c", "", "Config path, default: ~/.workers")
|
||||
serviceCmd.PersistentFlags().StringSliceVarP(&configPaths, "config", "c", []string{}, "Config paths")
|
||||
|
||||
return serviceCmd
|
||||
}
|
||||
|
|
|
@ -6,157 +6,75 @@ import (
|
|||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
probes "github.com/moonstream-to/api/probes/pkg"
|
||||
engine "github.com/moonstream-to/api/probes/pkg/engine"
|
||||
)
|
||||
|
||||
var (
|
||||
DEFAULT_CONFIG_DIR_NAME = ".probes"
|
||||
DEFAULT_CONFIG_FILE_NAME = "config.json"
|
||||
)
|
||||
// Application Probe configuration
|
||||
type ApplicationProbeConfig struct {
|
||||
Application string `json:"application"`
|
||||
DbUri string `json:"db_uri"`
|
||||
DbTimeout string `json:"db_timeout"`
|
||||
|
||||
// Workers configuration
|
||||
type ServiceWorkersConfig struct {
|
||||
Name string `json:"name"`
|
||||
DbUri string `json:"db_uri"`
|
||||
DbTimeout string `json:"db_timeout"`
|
||||
Workers []probes.ServiceWorker `json:"workers"`
|
||||
Probe probes.ApplicationProbe `json:"probe"`
|
||||
}
|
||||
|
||||
func ReadConfig(configPath string) (*[]ServiceWorkersConfig, int, error) {
|
||||
totalWorkersNum := 0
|
||||
|
||||
rawBytes, err := ioutil.ReadFile(configPath)
|
||||
if err != nil {
|
||||
return nil, totalWorkersNum, err
|
||||
}
|
||||
serviceWorkersConfigTemp := &[]ServiceWorkersConfig{}
|
||||
err = json.Unmarshal(rawBytes, serviceWorkersConfigTemp)
|
||||
if err != nil {
|
||||
return nil, totalWorkersNum, err
|
||||
}
|
||||
|
||||
var serviceWorkersConfig []ServiceWorkersConfig
|
||||
for _, service := range *serviceWorkersConfigTemp {
|
||||
serviceDbUri := os.Getenv(service.DbUri)
|
||||
if serviceDbUri == "" {
|
||||
return nil, totalWorkersNum, fmt.Errorf("unable to load database URI for service %s", service.Name)
|
||||
// ReadConfig parses list of configuration file paths to list of Application Probes configs
|
||||
func ReadConfig(rawConfigPaths []string) (*[]ApplicationProbeConfig, error) {
|
||||
var configs []ApplicationProbeConfig
|
||||
for _, rawConfigPath := range rawConfigPaths {
|
||||
configPath := strings.TrimSuffix(rawConfigPath, "/")
|
||||
_, err := os.Stat(configPath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
log.Printf("File %s not found, err: %v", configPath, err)
|
||||
continue
|
||||
}
|
||||
log.Printf("Error due checking config path %s, err: %v", configPath, err)
|
||||
continue
|
||||
}
|
||||
|
||||
var serviceWorkers []probes.ServiceWorker
|
||||
rawBytes, err := ioutil.ReadFile(configPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
configTemp := &ApplicationProbeConfig{}
|
||||
err = json.Unmarshal(rawBytes, configTemp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dbUri := os.Getenv(configTemp.DbUri)
|
||||
if dbUri == "" {
|
||||
return nil, fmt.Errorf(
|
||||
"unable to load database URI for service %s with probe %s", configTemp.Application, configTemp.Probe.Name,
|
||||
)
|
||||
}
|
||||
|
||||
// Link worker function
|
||||
for w, worker := range service.Workers {
|
||||
switch service.Name {
|
||||
case "engine":
|
||||
engineWorker := engine.ENGINE_SUPPORTED_WORKERS[fmt.Sprintf("%s-%s", service.Name, worker.Name)]
|
||||
if engineWorker.ExecFunction == nil {
|
||||
service.Workers = append(service.Workers[:w], service.Workers[w+1:]...)
|
||||
log.Printf("Function for worker %s at service %s not found, removed from the list", worker.Name, service.Name)
|
||||
continue
|
||||
}
|
||||
serviceWorkers = append(serviceWorkers, probes.ServiceWorker{
|
||||
Name: worker.Name,
|
||||
Interval: worker.Interval,
|
||||
ExecFunction: engineWorker.ExecFunction,
|
||||
})
|
||||
log.Printf("[%s] [%s] - Registered function", service.Name, worker.Name)
|
||||
totalWorkersNum++
|
||||
default:
|
||||
service.Workers = append(service.Workers[:w], service.Workers[w+1:]...)
|
||||
log.Printf("Unsupported %s service with %s worker from the list", worker.Name, service.Name)
|
||||
switch configTemp.Application {
|
||||
case "engine":
|
||||
engineWorker := engine.ENGINE_SUPPORTED_WORKERS[fmt.Sprintf("%s-%s", configTemp.Application, configTemp.Probe.Name)]
|
||||
if engineWorker.ExecFunction == nil {
|
||||
log.Printf("Function for application %s with probe %s not found, removed from the list", configTemp.Application, configTemp.Probe.Name)
|
||||
continue
|
||||
}
|
||||
configs = append(configs, ApplicationProbeConfig{
|
||||
Application: configTemp.Application,
|
||||
DbUri: dbUri,
|
||||
DbTimeout: configTemp.DbTimeout,
|
||||
Probe: probes.ApplicationProbe{
|
||||
Interval: configTemp.Probe.Interval,
|
||||
ExecFunction: engineWorker.ExecFunction,
|
||||
},
|
||||
})
|
||||
log.Printf("[%s] [%s] - Registered function", configTemp.Application, configTemp.Probe.Name)
|
||||
default:
|
||||
log.Printf("Unsupported %s application with %s probe from the config", configTemp.Application, configTemp.Probe.Name)
|
||||
}
|
||||
serviceWorkersConfig = append(serviceWorkersConfig, ServiceWorkersConfig{
|
||||
Name: service.Name,
|
||||
DbUri: serviceDbUri,
|
||||
DbTimeout: service.DbTimeout,
|
||||
Workers: serviceWorkers,
|
||||
})
|
||||
}
|
||||
|
||||
return &serviceWorkersConfig, totalWorkersNum, nil
|
||||
}
|
||||
|
||||
type ConfigPlacement struct {
|
||||
ConfigDirPath string
|
||||
ConfigDirExists bool
|
||||
|
||||
ConfigPath string
|
||||
ConfigExists bool
|
||||
}
|
||||
|
||||
func CheckPathExists(path string) (bool, error) {
|
||||
var exists = true
|
||||
_, err := os.Stat(path)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
exists = false
|
||||
} else {
|
||||
return exists, fmt.Errorf("error due checking file path exists, err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return exists, nil
|
||||
}
|
||||
|
||||
func GetConfigPlacement(providedPath string) (*ConfigPlacement, error) {
|
||||
var configDirPath, configPath string
|
||||
if providedPath == "" {
|
||||
homeDir, err := os.UserHomeDir()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to find user home directory, %v", err)
|
||||
}
|
||||
configDirPath = fmt.Sprintf("%s/%s", homeDir, DEFAULT_CONFIG_DIR_NAME)
|
||||
configPath = fmt.Sprintf("%s/%s", configDirPath, DEFAULT_CONFIG_FILE_NAME)
|
||||
} else {
|
||||
configPath = strings.TrimSuffix(providedPath, "/")
|
||||
configDirPath = filepath.Dir(configPath)
|
||||
}
|
||||
|
||||
configDirPathExists, err := CheckPathExists(configDirPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
configPathExists, err := CheckPathExists(configPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
configPlacement := &ConfigPlacement{
|
||||
ConfigDirPath: configDirPath,
|
||||
ConfigDirExists: configDirPathExists,
|
||||
|
||||
ConfigPath: configPath,
|
||||
ConfigExists: configPathExists,
|
||||
}
|
||||
|
||||
return configPlacement, nil
|
||||
}
|
||||
|
||||
func GenerateDefaultConfig(config *ConfigPlacement) error {
|
||||
if !config.ConfigDirExists {
|
||||
if err := os.MkdirAll(config.ConfigDirPath, os.ModePerm); err != nil {
|
||||
return fmt.Errorf("unable to create directory, %v", err)
|
||||
}
|
||||
log.Printf("Config directory created at: %s", config.ConfigDirPath)
|
||||
}
|
||||
|
||||
if !config.ConfigExists {
|
||||
tempConfig := []ServiceWorkersConfig{}
|
||||
tempConfigJson, err := json.Marshal(tempConfig)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to marshal configuration data, err: %v", err)
|
||||
}
|
||||
err = ioutil.WriteFile(config.ConfigPath, tempConfigJson, os.ModePerm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to write default config to file %s, err: %v", config.ConfigPath, err)
|
||||
}
|
||||
log.Printf("Created default configuration at %s", config.ConfigPath)
|
||||
}
|
||||
|
||||
return nil
|
||||
return &configs, nil
|
||||
}
|
||||
|
|
|
@ -6,64 +6,47 @@ import (
|
|||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
probes "github.com/moonstream-to/api/probes/pkg"
|
||||
)
|
||||
|
||||
func RunService(configPath string) error {
|
||||
func RunService(configPaths []string) error {
|
||||
// Load configuration
|
||||
configPlacement, err := GetConfigPlacement(configPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !configPlacement.ConfigExists {
|
||||
if err := GenerateDefaultConfig(configPlacement); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
log.Printf("Loaded configuration from %s", configPlacement.ConfigPath)
|
||||
}
|
||||
|
||||
serviceConfigs, totalWorkersNum, err := ReadConfig(configPlacement.ConfigPath)
|
||||
configs, err := ReadConfig(configPaths)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to read config, err: %v", err)
|
||||
}
|
||||
|
||||
log.Printf("Loaded configurations of %d services with %d workers", len(*serviceConfigs), totalWorkersNum)
|
||||
log.Printf("Loaded configurations of %d application probes", len(*configs))
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for _, service := range *serviceConfigs {
|
||||
for _, worker := range service.Workers {
|
||||
wg.Add(1)
|
||||
go RunWorker(&wg, worker, service.Name, service.DbUri, service.DbTimeout)
|
||||
}
|
||||
for _, config := range *configs {
|
||||
wg.Add(1)
|
||||
go RunWorker(&wg, config)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func RunWorker(wg *sync.WaitGroup, worker probes.ServiceWorker, serviceName, dbUri, dbTimeout string) error {
|
||||
func RunWorker(wg *sync.WaitGroup, config ApplicationProbeConfig) error {
|
||||
defer wg.Done()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
dbPool, err := CreateDbPool(ctx, dbUri, dbTimeout)
|
||||
dbPool, err := CreateDbPool(ctx, config.DbUri, config.DbTimeout)
|
||||
if err != nil {
|
||||
log.Printf("[%s] [%s] - unable to establish connection with database, err: %v", serviceName, worker.Name, err)
|
||||
log.Printf("[%s] [%s] - unable to establish connection with database, err: %v", config.Application, config.Probe.Name, err)
|
||||
return err
|
||||
}
|
||||
|
||||
defer dbPool.Close()
|
||||
|
||||
t := time.NewTicker(time.Duration(worker.Interval) * time.Second)
|
||||
t := time.NewTicker(time.Duration(config.Probe.Interval) * time.Second)
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
err = worker.ExecFunction(ctx, dbPool)
|
||||
err = config.Probe.ExecFunction(ctx, dbPool)
|
||||
if err != nil {
|
||||
log.Printf("[%s] [%s] - an error occurred during execution, err: %v", serviceName, worker.Name, err)
|
||||
log.Printf("[%s] [%s] - an error occurred during execution, err: %v", config.Application, config.Probe.Name, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
{
|
||||
"application": "engine",
|
||||
"db_uri": "ENGINE_DB_URI",
|
||||
"db_timeout": "15s",
|
||||
"probe": {
|
||||
"name": "clean-call-requests",
|
||||
"interval": 10
|
||||
}
|
||||
}
|
|
@ -18,11 +18,15 @@ APP_DIR="${APP_DIR:-/home/ubuntu/api}"
|
|||
AWS_DEFAULT_REGION="${AWS_DEFAULT_REGION:-us-east-1}"
|
||||
SCRIPT_DIR="$(realpath $(dirname $0))"
|
||||
SECRETS_DIR="${SECRETS_DIR:-/home/ubuntu/engineapi-secrets}"
|
||||
CONFIGS_DIR="${CONFIGS_DIR:-/home/ubuntu/.probes}"
|
||||
PARAMETERS_ENV_PATH="${SECRETS_DIR}/app.env"
|
||||
|
||||
# API server service file
|
||||
PROBES_SERVICE_FILE="probes.service"
|
||||
|
||||
# Config files
|
||||
ENGINE_CLEAN_CALL_REQUESTS="engine-clean-call-requests.json"
|
||||
|
||||
set -eu
|
||||
|
||||
echo
|
||||
|
@ -52,12 +56,12 @@ echo "AWS_LOCAL_IPV4=$(ec2metadata --local-ipv4)" >> "${PARAMETERS_ENV_PATH}"
|
|||
|
||||
echo
|
||||
echo
|
||||
echo -e "${PREFIX_INFO} Replacing existing Engine API server service definition with ${ENGINE_SERVICE_FILE}"
|
||||
chmod 644 "${SCRIPT_DIR}/${ENGINE_SERVICE_FILE}"
|
||||
cp "${SCRIPT_DIR}/${ENGINE_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${ENGINE_SERVICE_FILE}"
|
||||
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload
|
||||
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${ENGINE_SERVICE_FILE}"
|
||||
|
||||
echo -e "${PREFIX_INFO} Copy configs"
|
||||
if [ ! -d "${CONFIGS_DIR}" ]; then
|
||||
mkdir "${CONFIGS_DIR}"
|
||||
echo -e "${CONFIGS_DIR} Created new configs directory"
|
||||
fi
|
||||
cp "${SCRIPT_DIR}/configs/${ENGINE_CLEAN_CALL_REQUESTS}" "${CONFIGS_DIR}/${ENGINE_CLEAN_CALL_REQUESTS}"
|
||||
|
||||
echo
|
||||
echo
|
||||
|
|
|
@ -7,7 +7,8 @@ StartLimitBurst=3
|
|||
[Service]
|
||||
WorkingDirectory=/home/ubuntu/api/probes
|
||||
EnvironmentFile=/home/ubuntu/engineapi-secrets/app.env
|
||||
ExecStart=/home/ubuntu/api/probes/probes service
|
||||
ExecStart=/home/ubuntu/api/probes/probes service \
|
||||
--config /home/ubuntu/.probes/engine-clean-call-requests.json
|
||||
Restart=on-failure
|
||||
RestartSec=15s
|
||||
SyslogIdentifier=probes
|
||||
|
|
|
@ -6,7 +6,7 @@ import (
|
|||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
type ServiceWorker struct {
|
||||
type ApplicationProbe struct {
|
||||
Name string `json:"name"`
|
||||
Description string
|
||||
LonDescription string
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
probes "github.com/moonstream-to/api/probes/pkg"
|
||||
)
|
||||
|
||||
var ENGINE_SUPPORTED_WORKERS = map[string]probes.ServiceWorker{"engine-clean-call-requests": {
|
||||
var ENGINE_SUPPORTED_WORKERS = map[string]probes.ApplicationProbe{"engine-clean-call-requests": {
|
||||
Name: "clean-call-requests",
|
||||
Description: "Clean all inactive call requests from database",
|
||||
LonDescription: "Remove records in call_requests database table with ttl value greater then now.",
|
||||
|
|
Ładowanie…
Reference in New Issue