zdravko/internal/server/services/check.go

310 lines
7.3 KiB
Go

package services
import (
"context"
"log"
"sort"
"time"
"github.com/jmoiron/sqlx"
"github.com/mentos1386/zdravko/database/models"
internaltemporal "github.com/mentos1386/zdravko/internal/temporal"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/temporal"
"golang.org/x/exp/maps"
)
func getScheduleId(id string) string {
return "check-" + id
}
func CountChecks(ctx context.Context, db *sqlx.DB) (int, error) {
var count int
err := db.GetContext(ctx, &count, "SELECT COUNT(*) FROM checks")
return count, err
}
func GetCheckState(ctx context.Context, temporal client.Client, id string) (models.CheckState, error) {
schedule := temporal.ScheduleClient().GetHandle(ctx, getScheduleId(id))
description, err := schedule.Describe(ctx)
if err != nil {
return models.CheckStateUnknown, err
}
if description.Schedule.State.Paused {
return models.CheckStatePaused, nil
}
return models.CheckStateActive, nil
}
func SetCheckState(ctx context.Context, temporal client.Client, id string, state models.CheckState) error {
schedule := temporal.ScheduleClient().GetHandle(ctx, getScheduleId(id))
if state == models.CheckStateActive {
return schedule.Unpause(ctx, client.ScheduleUnpauseOptions{Note: "Unpaused by user"})
}
if state == models.CheckStatePaused {
return schedule.Pause(ctx, client.SchedulePauseOptions{Note: "Paused by user"})
}
return nil
}
func CreateCheck(ctx context.Context, db *sqlx.DB, check *models.Check) error {
_, err := db.NamedExecContext(ctx,
`INSERT INTO checks (id, name, script, schedule, filter)
VALUES (:id, :name, :script, :schedule, :filter)`,
check,
)
return err
}
func UpdateCheck(ctx context.Context, db *sqlx.DB, check *models.Check) error {
_, err := db.NamedExecContext(ctx,
`UPDATE checks SET script=:script, schedule=:schedule, filter=:filter WHERE id=:id`,
check,
)
return err
}
func DeleteCheck(ctx context.Context, db *sqlx.DB, id string) error {
_, err := db.ExecContext(ctx,
"DELETE FROM checks WHERE id=$1",
id,
)
return err
}
func UpdateCheckWorkerGroups(ctx context.Context, db *sqlx.DB, check *models.Check, workerGroups []*models.WorkerGroup) error {
tx, err := db.BeginTxx(ctx, nil)
if err != nil {
return err
}
_, err = tx.ExecContext(ctx,
"DELETE FROM check_worker_groups WHERE check_id=$1",
check.Id,
)
if err != nil {
tx.Rollback()
return err
}
for _, group := range workerGroups {
_, err = tx.ExecContext(ctx,
"INSERT INTO check_worker_groups (check_id, worker_group_id) VALUES ($1, $2)",
check.Id,
group.Id,
)
if err != nil {
tx.Rollback()
return err
}
}
return tx.Commit()
}
func GetCheck(ctx context.Context, db *sqlx.DB, id string) (*models.Check, error) {
check := &models.Check{}
err := db.GetContext(ctx, check,
"SELECT * FROM checks WHERE id=$1",
id,
)
return check, err
}
func GetCheckWithWorkerGroups(ctx context.Context, db *sqlx.DB, id string) (*models.CheckWithWorkerGroups, error) {
rows, err := db.QueryContext(ctx,
`
SELECT
checks.id,
checks.name,
checks.script,
checks.schedule,
checks.created_at,
checks.updated_at,
checks.filter,
worker_groups.name as worker_group_name
FROM checks
LEFT OUTER JOIN check_worker_groups ON checks.id = check_worker_groups.check_id
LEFT OUTER JOIN worker_groups ON check_worker_groups.worker_group_id = worker_groups.id
WHERE checks.id=$1
ORDER BY checks.name
`,
id,
)
if err != nil {
return nil, err
}
defer rows.Close()
check := &models.CheckWithWorkerGroups{}
for rows.Next() {
var workerGroupName *string
err = rows.Scan(
&check.Id,
&check.Name,
&check.Script,
&check.Schedule,
&check.CreatedAt,
&check.UpdatedAt,
&check.Filter,
&workerGroupName,
)
if err != nil {
return nil, err
}
if workerGroupName != nil {
check.WorkerGroups = append(check.WorkerGroups, *workerGroupName)
}
}
return check, err
}
func GetChecks(ctx context.Context, db *sqlx.DB) ([]*models.Check, error) {
checks := []*models.Check{}
err := db.SelectContext(ctx, &checks,
"SELECT * FROM checks ORDER BY name",
)
return checks, err
}
func GetChecksWithWorkerGroups(ctx context.Context, db *sqlx.DB) ([]*models.CheckWithWorkerGroups, error) {
rows, err := db.QueryContext(ctx,
`
SELECT
checks.id,
checks.name,
checks.script,
checks.schedule,
checks.created_at,
checks.updated_at,
checks.filter,
worker_groups.name as worker_group_name
FROM checks
LEFT OUTER JOIN check_worker_groups ON checks.id = check_worker_groups.check_id
LEFT OUTER JOIN worker_groups ON check_worker_groups.worker_group_id = worker_groups.id
ORDER BY checks.name
`)
if err != nil {
return nil, err
}
defer rows.Close()
checks := map[string]*models.CheckWithWorkerGroups{}
for rows.Next() {
check := &models.CheckWithWorkerGroups{}
var workerGroupName *string
err = rows.Scan(
&check.Id,
&check.Name,
&check.Script,
&check.Schedule,
&check.CreatedAt,
&check.UpdatedAt,
&check.Filter,
&workerGroupName,
)
if err != nil {
return nil, err
}
if workerGroupName != nil {
workerGroups := []string{}
if checks[check.Id] != nil {
workerGroups = checks[check.Id].WorkerGroups
}
check.WorkerGroups = append(workerGroups, *workerGroupName)
}
checks[check.Id] = check
}
checksWithWorkerGroups := maps.Values(checks)
sort.SliceStable(checksWithWorkerGroups, func(i, j int) bool {
return checksWithWorkerGroups[i].Name < checksWithWorkerGroups[j].Name
})
return checksWithWorkerGroups, err
}
func DeleteCheckSchedule(ctx context.Context, t client.Client, id string) error {
schedule := t.ScheduleClient().GetHandle(ctx, getScheduleId(id))
return schedule.Delete(ctx)
}
func CreateOrUpdateCheckSchedule(
ctx context.Context,
t client.Client,
check *models.Check,
workerGroups []*models.WorkerGroup,
) error {
log.Println("Creating or Updating Check Schedule")
workerGroupStrings := make([]string, len(workerGroups))
for i, group := range workerGroups {
workerGroupStrings[i] = group.Id
}
args := make([]interface{}, 1)
args[0] = internaltemporal.WorkflowCheckParam{
Filter: check.Filter,
Script: check.Script,
CheckId: check.Id,
WorkerGroupIds: workerGroupStrings,
}
options := client.ScheduleOptions{
ID: getScheduleId(check.Id),
Spec: client.ScheduleSpec{
CronExpressions: []string{check.Schedule},
Jitter: time.Second * 10,
},
Action: &client.ScheduleWorkflowAction{
ID: getScheduleId(check.Id),
Workflow: internaltemporal.WorkflowCheckName,
Args: args,
TaskQueue: internaltemporal.TEMPORAL_SERVER_QUEUE,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 3,
},
},
}
schedule := t.ScheduleClient().GetHandle(ctx, getScheduleId(check.Id))
// If exists, we update
_, err := schedule.Describe(ctx)
if err == nil {
err = schedule.Update(ctx, client.ScheduleUpdateOptions{
DoUpdate: func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) {
return &client.ScheduleUpdate{
Schedule: &client.Schedule{
Spec: &options.Spec,
Action: options.Action,
Policy: input.Description.Schedule.Policy,
State: input.Description.Schedule.State,
},
}, nil
},
})
if err != nil {
return err
}
} else {
schedule, err = t.ScheduleClient().Create(ctx, options)
if err != nil {
return err
}
}
err = schedule.Trigger(ctx, client.ScheduleTriggerOptions{})
if err != nil {
return err
}
return nil
}