zdravko/internal/server/services/worker_group.go

167 lines
4 KiB
Go
Raw Permalink Normal View History

2024-02-24 21:07:49 +00:00
package services
import (
"context"
"github.com/mentos1386/zdravko/database/models"
2024-02-27 11:04:05 +00:00
"github.com/jmoiron/sqlx"
"go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/client"
2024-02-27 11:04:05 +00:00
"golang.org/x/exp/maps"
2024-02-24 21:07:49 +00:00
)
2024-02-29 22:42:56 +00:00
func CountWorkerGroups(ctx context.Context, db *sqlx.DB) (int, error) {
var count int
err := db.GetContext(ctx, &count, "SELECT COUNT(*) FROM worker_groups")
return count, err
}
func GetActiveWorkers(ctx context.Context, workerGroupId string, temporal client.Client) ([]string, error) {
response, err := temporal.DescribeTaskQueue(ctx, workerGroupId, enums.TASK_QUEUE_TYPE_ACTIVITY)
if err != nil {
return make([]string, 0), err
}
workers := make([]string, len(response.Pollers))
for i, poller := range response.Pollers {
workers[i] = poller.Identity
}
return workers, nil
}
2024-02-27 11:04:05 +00:00
func CreateWorkerGroup(ctx context.Context, db *sqlx.DB, workerGroup *models.WorkerGroup) error {
_, err := db.NamedExecContext(ctx,
2024-02-29 22:42:56 +00:00
"INSERT INTO worker_groups (id, name) VALUES (:id, :name)",
2024-02-27 11:04:05 +00:00
workerGroup,
)
return err
}
2024-02-24 21:07:49 +00:00
2024-02-29 22:42:56 +00:00
func DeleteWorkerGroup(ctx context.Context, db *sqlx.DB, id string) error {
_, err := db.ExecContext(ctx,
2024-02-29 22:42:56 +00:00
"DELETE FROM worker_groups WHERE id = $1",
id,
)
return err
}
2024-02-27 11:04:05 +00:00
func GetWorkerGroups(ctx context.Context, db *sqlx.DB) ([]*models.WorkerGroup, error) {
var workerGroups []*models.WorkerGroup
err := db.SelectContext(ctx, &workerGroups,
2024-02-29 22:42:56 +00:00
"SELECT * FROM worker_groups ORDER BY name",
2024-02-27 11:04:05 +00:00
)
return workerGroups, err
}
2024-02-24 21:07:49 +00:00
func GetWorkerGroupsWithChecks(ctx context.Context, db *sqlx.DB) ([]*models.WorkerGroupWithChecks, error) {
2024-02-27 11:04:05 +00:00
rows, err := db.QueryContext(ctx,
`
SELECT
2024-02-29 22:42:56 +00:00
worker_groups.id,
2024-02-27 11:04:05 +00:00
worker_groups.name,
worker_groups.created_at,
worker_groups.updated_at,
checks.name as check_name
2024-02-27 11:04:05 +00:00
FROM worker_groups
LEFT OUTER JOIN check_worker_groups ON worker_groups.id = check_worker_groups.worker_group_id
LEFT OUTER JOIN checks ON check_worker_groups.check_id = checks.id
ORDER BY worker_groups.name
2024-02-27 11:04:05 +00:00
`)
2024-02-24 21:07:49 +00:00
if err != nil {
return nil, err
}
2024-02-27 11:04:05 +00:00
defer rows.Close()
workerGroups := map[string]*models.WorkerGroupWithChecks{}
2024-02-24 21:07:49 +00:00
2024-02-27 11:04:05 +00:00
for rows.Next() {
workerGroup := &models.WorkerGroupWithChecks{}
2024-02-27 11:04:05 +00:00
var checkName *string
2024-02-27 11:04:05 +00:00
err = rows.Scan(
2024-02-29 22:42:56 +00:00
&workerGroup.Id,
2024-02-27 11:04:05 +00:00
&workerGroup.Name,
&workerGroup.CreatedAt,
&workerGroup.UpdatedAt,
&checkName,
2024-02-27 11:04:05 +00:00
)
if err != nil {
return nil, err
}
if checkName != nil {
checks := []string{}
2024-02-29 22:42:56 +00:00
if workerGroups[workerGroup.Id] != nil {
checks = workerGroups[workerGroup.Id].Checks
2024-02-27 11:04:05 +00:00
}
workerGroup.Checks = append(checks, *checkName)
2024-02-27 11:04:05 +00:00
}
2024-02-29 22:42:56 +00:00
workerGroups[workerGroup.Id] = workerGroup
2024-02-27 11:04:05 +00:00
}
return maps.Values(workerGroups), err
2024-02-24 21:07:49 +00:00
}
2024-02-29 22:42:56 +00:00
func GetWorkerGroupsById(ctx context.Context, db *sqlx.DB, ids []string) ([]*models.WorkerGroup, error) {
2024-02-27 11:04:05 +00:00
var workerGroups []*models.WorkerGroup
err := db.SelectContext(ctx, &workerGroups,
2024-02-29 22:42:56 +00:00
"SELECT * FROM worker_groups WHERE id = ANY($1)",
ids,
2024-02-27 11:04:05 +00:00
)
return workerGroups, err
2024-02-24 21:07:49 +00:00
}
2024-02-29 22:42:56 +00:00
func GetWorkerGroup(ctx context.Context, db *sqlx.DB, id string) (*models.WorkerGroup, error) {
2024-02-27 11:04:05 +00:00
var workerGroup models.WorkerGroup
err := db.GetContext(ctx, &workerGroup,
2024-02-29 22:42:56 +00:00
"SELECT * FROM worker_groups WHERE id = $1",
id,
2024-02-27 11:04:05 +00:00
)
return &workerGroup, err
2024-02-24 21:07:49 +00:00
}
func GetWorkerGroupWithChecks(ctx context.Context, db *sqlx.DB, id string) (*models.WorkerGroupWithChecks, error) {
2024-02-27 11:04:05 +00:00
rows, err := db.QueryContext(ctx,
`
SELECT
2024-02-29 22:42:56 +00:00
worker_groups.id,
2024-02-27 11:04:05 +00:00
worker_groups.name,
worker_groups.created_at,
worker_groups.updated_at,
checks.name as check_name
2024-02-27 11:04:05 +00:00
FROM worker_groups
LEFT OUTER JOIN check_worker_groups ON worker_groups.id = check_worker_groups.worker_group_id
LEFT OUTER JOIN checks ON check_worker_groups.check_id = checks.id
2024-02-29 22:42:56 +00:00
WHERE worker_groups.id=$1
2024-02-27 11:04:05 +00:00
`,
2024-02-29 22:42:56 +00:00
id,
2024-02-27 11:04:05 +00:00
)
if err != nil {
return nil, err
}
defer rows.Close()
workerGroup := &models.WorkerGroupWithChecks{}
2024-02-27 11:04:05 +00:00
for rows.Next() {
var checkName *string
2024-02-27 11:04:05 +00:00
err = rows.Scan(
2024-02-29 22:42:56 +00:00
&workerGroup.Id,
2024-02-27 11:04:05 +00:00
&workerGroup.Name,
&workerGroup.CreatedAt,
&workerGroup.UpdatedAt,
&checkName,
2024-02-27 11:04:05 +00:00
)
if err != nil {
return nil, err
}
if checkName != nil {
workerGroup.Checks = append(workerGroup.Checks, *checkName)
2024-02-27 11:04:05 +00:00
}
}
return workerGroup, err
2024-02-24 21:07:49 +00:00
}