Merge pull request #813 from moonstream-to/workers-clean-call-requests

Workers with call_requests clean action
pull/835/head
Sergei Sumarokov 2023-07-05 05:16:30 -07:00 zatwierdzone przez GitHub
commit 12376013ae
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
18 zmienionych plików z 658 dodań i 1 usunięć

Wyświetl plik

@ -45,7 +45,7 @@ echo
echo -e "${PREFIX_INFO} Building executable load balancer for nodes script with Go"
EXEC_DIR=$(pwd)
cd "${APP_DIR}/nodebalancer"
HOME=/home/ubuntu /usr/local/go/bin/go build -o "${APP_DIR}/nodebalancer/nodebalancer" "${APP_DIR}/nodebalancer/cmd/nodebalancer/*.go"
HOME=/home/ubuntu /usr/local/go/bin/go build -o "${APP_DIR}/nodebalancer/nodebalancer" "${APP_DIR}/nodebalancer/cmd/nodebalancer/"
cd "${EXEC_DIR}"
echo

65
probes/.gitignore vendored 100644
Wyświetl plik

@ -0,0 +1,65 @@
# Created by https://www.toptal.com/developers/gitignore/api/visualstudiocode,go
# Edit at https://www.toptal.com/developers/gitignore?templates=visualstudiocode,go
### Go ###
# If you prefer the allow list template instead of the deny list, see community template:
# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore
#
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
# vendor/
# Go workspace file
go.work
### Go Patch ###
/vendor/
/Godeps/
### VisualStudioCode ###
.vscode/*
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
!.vscode/*.code-snippets
# Local History for Visual Studio Code
.history/
# Built Visual Studio Code Extensions
*.vsix
### VisualStudioCode Patch ###
# Ignore all local history of files
.history
.ionide
# Support for Project snippet scope
.vscode/*.code-snippets
# Ignore code-workspaces
*.code-workspace
# End of https://www.toptal.com/developers/gitignore/api/visualstudiocode,go
# Custom
.secrets/*
dev.env
prod.env
test.env
.venv
probes_dev

30
probes/README.md 100644
Wyświetl plik

@ -0,0 +1,30 @@
# probes
Running multiple operations simultaneously under one application.
Execute one command:
```bash
probes engine clean-call-requests --db-uri "${ENGINE_DB_URI}"
```
Run service with configuration:
```bash
probes service \
--config /home/ubuntu/.probes/engine-clean-call-requests.js
```
Config example:
```json
{
"application": "engine",
"db_uri": "ENGINE_DB_URI",
"db_timeout": "15s",
"probe": {
"name": "clean-call-requests",
"interval": 10
}
}
```

Wyświetl plik

@ -0,0 +1,153 @@
package main
import (
"context"
"fmt"
"github.com/spf13/cobra"
probes "github.com/moonstream-to/api/probes/pkg"
engine "github.com/moonstream-to/api/probes/pkg/engine"
)
func CreateRootCommand() *cobra.Command {
// rootCmd represents the base command when called without any subcommands
rootCmd := &cobra.Command{
Use: "workers",
Short: "Autonomous workers for moonstream services",
Long: `workers is a CLI that allows you to run multiple operations according to Moonstream services.
workers currently supports services:
- Engine
`,
Run: func(cmd *cobra.Command, args []string) {},
}
versionCmd := CreateVersionCommand()
engineCmd := CreateEngineCommand()
serviceCmd := CreateServiceCommand()
rootCmd.AddCommand(versionCmd, engineCmd, serviceCmd)
completionCmd := CreateCompletionCommand(rootCmd)
rootCmd.AddCommand(completionCmd)
return rootCmd
}
func CreateCompletionCommand(rootCmd *cobra.Command) *cobra.Command {
completionCmd := &cobra.Command{
Use: "completion",
Short: "Generate shell completion scripts for workers",
Long: `Generate shell completion scripts for workers.
The command for each shell will print a completion script to stdout. You can source this script to get
completions in your current shell session. You can add this script to the completion directory for your
shell to get completions for all future sessions.
For example, to activate bash completions in your current shell:
$ . <(workers completion bash)
To add workers completions for all bash sessions:
$ workers completion bash > /etc/bash_completion.d/workers_completions`,
}
bashCompletionCmd := &cobra.Command{
Use: "bash",
Short: "bash completions for workers",
Run: func(cmd *cobra.Command, args []string) {
rootCmd.GenBashCompletion(cmd.OutOrStdout())
},
}
zshCompletionCmd := &cobra.Command{
Use: "zsh",
Short: "zsh completions for workers",
Run: func(cmd *cobra.Command, args []string) {
rootCmd.GenZshCompletion(cmd.OutOrStdout())
},
}
fishCompletionCmd := &cobra.Command{
Use: "fish",
Short: "fish completions for workers",
Run: func(cmd *cobra.Command, args []string) {
rootCmd.GenFishCompletion(cmd.OutOrStdout(), true)
},
}
powershellCompletionCmd := &cobra.Command{
Use: "powershell",
Short: "powershell completions for workers",
Run: func(cmd *cobra.Command, args []string) {
rootCmd.GenPowerShellCompletion(cmd.OutOrStdout())
},
}
completionCmd.AddCommand(bashCompletionCmd, zshCompletionCmd, fishCompletionCmd, powershellCompletionCmd)
return completionCmd
}
func CreateVersionCommand() *cobra.Command {
versionCmd := &cobra.Command{
Use: "version",
Short: "Print the version number of workers",
Long: `All software has versions. This is workers's.`,
Run: func(cmd *cobra.Command, args []string) {
cmd.Println(probes.VERSION)
},
}
return versionCmd
}
func CreateEngineCommand() *cobra.Command {
engineCommand := &cobra.Command{
Use: "engine",
Short: "Engine workers and more",
}
var dbUri string
var dbTimeout string
engineCommand.PersistentFlags().StringVarP(&dbUri, "db-uri", "d", "", "Database URI")
engineCommand.PersistentFlags().StringVarP(&dbTimeout, "db-timeout", "t", "", "Database timeout (format: 10s)")
engineCommand.MarkFlagRequired("db-uri")
for _, sc := range engine.ENGINE_SUPPORTED_WORKERS {
tempCommand := &cobra.Command{
Use: sc.Name,
Short: sc.Description,
Long: sc.LonDescription,
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
dbPool, err := CreateDbPool(ctx, dbUri, dbTimeout)
if err != nil {
return fmt.Errorf("database connection error: %v", err)
}
defer dbPool.Close()
return sc.ExecFunction(ctx, dbPool)
},
}
engineCommand.AddCommand(tempCommand)
}
return engineCommand
}
func CreateServiceCommand() *cobra.Command {
var configPaths []string
serviceCmd := &cobra.Command{
Use: "service",
Short: "Run workers as background asynchronous services",
Long: `Each active worker specified in configuration will run in go-routine.`,
RunE: func(cmd *cobra.Command, args []string) error {
return RunService(configPaths)
},
}
serviceCmd.PersistentFlags().StringSliceVarP(&configPaths, "config", "c", []string{}, "Config paths")
return serviceCmd
}

Wyświetl plik

@ -0,0 +1,80 @@
package main
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"os"
"strings"
probes "github.com/moonstream-to/api/probes/pkg"
engine "github.com/moonstream-to/api/probes/pkg/engine"
)
// Application Probe configuration
type ApplicationProbeConfig struct {
Application string `json:"application"`
DbUri string `json:"db_uri"`
DbTimeout string `json:"db_timeout"`
Probe probes.ApplicationProbe `json:"probe"`
}
// ReadConfig parses list of configuration file paths to list of Application Probes configs
func ReadConfig(rawConfigPaths []string) (*[]ApplicationProbeConfig, error) {
var configs []ApplicationProbeConfig
for _, rawConfigPath := range rawConfigPaths {
configPath := strings.TrimSuffix(rawConfigPath, "/")
_, err := os.Stat(configPath)
if err != nil {
if os.IsNotExist(err) {
log.Printf("File %s not found, err: %v", configPath, err)
continue
}
log.Printf("Error due checking config path %s, err: %v", configPath, err)
continue
}
rawBytes, err := ioutil.ReadFile(configPath)
if err != nil {
return nil, err
}
configTemp := &ApplicationProbeConfig{}
err = json.Unmarshal(rawBytes, configTemp)
if err != nil {
return nil, err
}
dbUri := os.Getenv(configTemp.DbUri)
if dbUri == "" {
return nil, fmt.Errorf(
"unable to load database URI for service %s with probe %s", configTemp.Application, configTemp.Probe.Name,
)
}
// Link worker function
switch configTemp.Application {
case "engine":
engineWorker := engine.ENGINE_SUPPORTED_WORKERS[fmt.Sprintf("%s-%s", configTemp.Application, configTemp.Probe.Name)]
if engineWorker.ExecFunction == nil {
log.Printf("Function for application %s with probe %s not found, removed from the list", configTemp.Application, configTemp.Probe.Name)
continue
}
configs = append(configs, ApplicationProbeConfig{
Application: configTemp.Application,
DbUri: dbUri,
DbTimeout: configTemp.DbTimeout,
Probe: probes.ApplicationProbe{
Interval: configTemp.Probe.Interval,
ExecFunction: engineWorker.ExecFunction,
},
})
log.Printf("[%s] [%s] - Registered function", configTemp.Application, configTemp.Probe.Name)
default:
log.Printf("Unsupported %s application with %s probe from the config", configTemp.Application, configTemp.Probe.Name)
}
}
return &configs, nil
}

Wyświetl plik

@ -0,0 +1,28 @@
package main
import (
"context"
"fmt"
"time"
"github.com/jackc/pgx/v5/pgxpool"
)
func CreateDbPool(ctx context.Context, dbUri string, timeout string) (*pgxpool.Pool, error) {
conf, err := pgxpool.ParseConfig(dbUri)
if err != nil {
return nil, fmt.Errorf("unable to parse database connection string, err: %v", err)
}
ctDuration, err := time.ParseDuration(timeout)
if err != nil {
return nil, fmt.Errorf("unable to parse connect timeout duration, err: %v", err)
}
conf.ConnConfig.ConnectTimeout = ctDuration
dbPool, err := pgxpool.NewWithConfig(ctx, conf)
if err != nil {
return nil, fmt.Errorf("Unable to establish connection with database, err: %v", err)
}
return dbPool, nil
}

Wyświetl plik

@ -0,0 +1,15 @@
package main
import (
"fmt"
"os"
)
func main() {
command := CreateRootCommand()
err := command.Execute()
if err != nil {
fmt.Println(err.Error())
os.Exit(1)
}
}

Wyświetl plik

@ -0,0 +1,54 @@
package main
import (
"context"
"fmt"
"log"
"sync"
"time"
)
func RunService(configPaths []string) error {
// Load configuration
configs, err := ReadConfig(configPaths)
if err != nil {
return fmt.Errorf("unable to read config, err: %v", err)
}
log.Printf("Loaded configurations of %d application probes", len(*configs))
var wg sync.WaitGroup
for _, config := range *configs {
wg.Add(1)
go RunWorker(&wg, config)
}
wg.Wait()
return nil
}
func RunWorker(wg *sync.WaitGroup, config ApplicationProbeConfig) error {
defer wg.Done()
ctx := context.Background()
dbPool, err := CreateDbPool(ctx, config.DbUri, config.DbTimeout)
if err != nil {
log.Printf("[%s] [%s] - unable to establish connection with database, err: %v", config.Application, config.Probe.Name, err)
return err
}
defer dbPool.Close()
t := time.NewTicker(time.Duration(config.Probe.Interval) * time.Second)
for {
select {
case <-t.C:
err = config.Probe.ExecFunction(ctx, dbPool)
if err != nil {
log.Printf("[%s] [%s] - an error occurred during execution, err: %v", config.Application, config.Probe.Name, err)
continue
}
}
}
}

Wyświetl plik

@ -0,0 +1,9 @@
{
"application": "engine",
"db_uri": "ENGINE_DB_URI",
"db_timeout": "15s",
"probe": {
"name": "clean-call-requests",
"interval": 10
}
}

Wyświetl plik

@ -0,0 +1,80 @@
#!/usr/bin/env bash
# Deployment script - intended to run on Moonstream API server
# Colors
C_RESET='\033[0m'
C_RED='\033[1;31m'
C_GREEN='\033[1;32m'
C_YELLOW='\033[1;33m'
# Logs
PREFIX_INFO="${C_GREEN}[INFO]${C_RESET} [$(date +%d-%m\ %T)]"
PREFIX_WARN="${C_YELLOW}[WARN]${C_RESET} [$(date +%d-%m\ %T)]"
PREFIX_CRIT="${C_RED}[CRIT]${C_RESET} [$(date +%d-%m\ %T)]"
# Main
APP_DIR="${APP_DIR:-/home/ubuntu/api}"
AWS_DEFAULT_REGION="${AWS_DEFAULT_REGION:-us-east-1}"
SCRIPT_DIR="$(realpath $(dirname $0))"
SECRETS_DIR="${SECRETS_DIR:-/home/ubuntu/probes-secrets}"
CONFIGS_DIR="${CONFIGS_DIR:-/home/ubuntu/.probes}"
PARAMETERS_ENV_PATH="${SECRETS_DIR}/app.env"
# API server service file
PROBES_SERVICE_FILE="probes.service"
# Config files
ENGINE_CLEAN_CALL_REQUESTS="engine-clean-call-requests.json"
set -eu
echo
echo
echo -e "${PREFIX_INFO} Install checkenv"
HOME=/home/ubuntu /usr/local/go/bin/go install github.com/bugout-dev/checkenv@latest
echo
echo
echo -e "${PREFIX_INFO} Retrieving deployment parameters"
if [ ! -d "${SECRETS_DIR}" ]; then
mkdir "${SECRETS_DIR}"
echo -e "${PREFIX_WARN} Created new secrets directory"
fi
AWS_DEFAULT_REGION="${AWS_DEFAULT_REGION}" /home/ubuntu/go/bin/checkenv show aws_ssm+probes:true > "${PARAMETERS_ENV_PATH}"
chmod 0640 "${PARAMETERS_ENV_PATH}"
echo
echo
echo -e "${PREFIX_INFO} Add AWS default region to parameters"
echo "AWS_DEFAULT_REGION=${AWS_DEFAULT_REGION}" >> "${PARAMETERS_ENV_PATH}"
echo
echo
echo -e "${PREFIX_INFO} Add instance local IP to parameters"
echo "AWS_LOCAL_IPV4=$(ec2metadata --local-ipv4)" >> "${PARAMETERS_ENV_PATH}"
echo
echo
echo -e "${PREFIX_INFO} Copy configs"
if [ ! -d "${CONFIGS_DIR}" ]; then
mkdir "${CONFIGS_DIR}"
echo -e "${CONFIGS_DIR} Created new configs directory"
fi
cp "${SCRIPT_DIR}/configs/${ENGINE_CLEAN_CALL_REQUESTS}" "${CONFIGS_DIR}/${ENGINE_CLEAN_CALL_REQUESTS}"
echo
echo
echo -e "${PREFIX_INFO} Building executable probes application with Go"
EXEC_DIR=$(pwd)
cd "${APP_DIR}/probes"
HOME=/home/ubuntu /usr/local/go/bin/go build -o "${APP_DIR}/probes/probes" "${APP_DIR}/probes/cmd/probes/"
cd "${EXEC_DIR}"
echo
echo
echo -e "${PREFIX_INFO} Replacing existing probes service and timer with: ${PROBES_SERVICE_FILE}"
chmod 644 "${SCRIPT_DIR}/${PROBES_SERVICE_FILE}"
cp "${SCRIPT_DIR}/${PROBES_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${PROBES_SERVICE_FILE}"
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart "${PROBES_SERVICE_FILE}"

Wyświetl plik

@ -0,0 +1,16 @@
[Unit]
Description=Run probes service
After=network.target
StartLimitIntervalSec=300
StartLimitBurst=3
[Service]
WorkingDirectory=/home/ubuntu/api/probes
EnvironmentFile=/home/ubuntu/probes-secrets/app.env
ExecStart=/home/ubuntu/api/probes/probes service --config /home/ubuntu/.probes/engine-clean-call-requests.json
Restart=on-failure
RestartSec=15s
SyslogIdentifier=probes
[Install]
WantedBy=multi-user.target

10
probes/dev.sh 100755
Wyświetl plik

@ -0,0 +1,10 @@
#!/usr/bin/env sh
# Compile application and run with provided arguments
set -e
PROGRAM_NAME="probes_dev"
go build -o "$PROGRAM_NAME" cmd/probes/*.go
./"$PROGRAM_NAME" "$@"

19
probes/go.mod 100644
Wyświetl plik

@ -0,0 +1,19 @@
module github.com/moonstream-to/api/probes
go 1.19
require (
github.com/jackc/pgx/v5 v5.3.1
github.com/spf13/cobra v1.7.0
)
require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/crypto v0.6.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/text v0.7.0 // indirect
)

34
probes/go.sum 100644
Wyświetl plik

@ -0,0 +1,34 @@
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.3.1 h1:Fcr8QJ1ZeLi5zsPZqQeUZhNhxfkkKBOgJuYkJHoBOtU=
github.com/jackc/pgx/v5 v5.3.1/go.mod h1:t3JDKnCBlYIc0ewLF0Q7B8MXmoIaBOZj/ic7iHozM/8=
github.com/jackc/puddle/v2 v2.2.0 h1:RdcDk92EJBuBS55nQMMYFXTxwstHug4jkhT5pq8VxPk=
github.com/jackc/puddle/v2 v2.2.0/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I=
github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

16
probes/pkg/data.go 100644
Wyświetl plik

@ -0,0 +1,16 @@
package probes
import (
"context"
"github.com/jackc/pgx/v5/pgxpool"
)
type ApplicationProbe struct {
Name string `json:"name"`
Description string
LonDescription string
Interval int `json:"interval"`
ExecFunction func(context.Context, *pgxpool.Pool) error
}

Wyświetl plik

@ -0,0 +1,44 @@
package engine
import (
"context"
"fmt"
"log"
"time"
"github.com/jackc/pgx/v5/pgxpool"
probes "github.com/moonstream-to/api/probes/pkg"
)
var ENGINE_SUPPORTED_WORKERS = map[string]probes.ApplicationProbe{"engine-clean-call-requests": {
Name: "clean-call-requests",
Description: "Clean all inactive call requests from database",
LonDescription: "Remove records in call_requests database table with ttl value greater then now.",
ExecFunction: CleanCallRequestsExec,
}}
type CallRequest struct {
Id string `json:"id"`
RegisteredContractIid string `json:"registered_contract_id"`
Caller string `json:"caller"`
MoonstreamUserId string `json:"moonstream_user_id"`
Method string `json:"method"`
Parameters interface{} `json:"parameters"`
ExpiresAt time.Time `json:"expires_at"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
func CleanCallRequestsExec(ctx context.Context, dbPool *pgxpool.Pool) error {
tag, err := dbPool.Exec(
ctx,
"DELETE FROM call_requests WHERE expires_at <= NOW() - INTERVAL '1 minute';",
)
if err != nil {
return fmt.Errorf("delete execution failed, err: %v", err)
}
log.Printf("[engine] [clean-call-requests] - Deleted %d call requests", tag.RowsAffected())
return nil
}

Wyświetl plik

@ -0,0 +1,3 @@
package probes
const VERSION string = "0.0.1"

Wyświetl plik

@ -0,0 +1 @@
export ENGINE_DB_URI="postgresql://<username>:<password>@<db_host>:<db_port>/<db_name>"