feat: refactor temporal and make it work on fly

This commit is contained in:
Tine 2024-02-13 21:52:10 +01:00
parent b516c979b3
commit 4a9d684108
Signed by: mentos1386
SSH key fingerprint: SHA256:MNtTsLbihYaWF8j1fkOHfkKNlnN1JQfxEU/rBU8nCGw
13 changed files with 163 additions and 124 deletions

View file

@ -16,7 +16,7 @@ func main() {
r := mux.NewRouter()
db, query, err := internal.ConnectToDatabase(config.SQLITE_DB_PATH)
db, query, err := internal.ConnectToDatabase(config.ZDRAVKO_DATABASE_PATH)
if err != nil {
log.Fatal(err)
}

View file

@ -1,132 +1,52 @@
package main
import (
"context"
"log"
"os"
"path/filepath"
"go.temporal.io/server/common/config"
"go.temporal.io/server/schema/sqlite"
"go.temporal.io/server/temporal"
"code.tjo.space/mentos1386/zdravko/internal"
t "code.tjo.space/mentos1386/zdravko/pkg/temporal"
"go.temporal.io/server/common/authorization"
tlog "go.temporal.io/server/common/log"
uiserver "github.com/temporalio/ui-server/v2/server"
uiconfig "github.com/temporalio/ui-server/v2/server/config"
uiserveroptions "github.com/temporalio/ui-server/v2/server/server_options"
)
func backendServer() {
cfg := t.NewConfig()
func backendServer(config *internal.Config) {
serverConfig := t.NewServerConfig(config)
logger := tlog.NewZapLogger(tlog.BuildZapLogger(tlog.Config{
Stdout: true,
Level: "info",
OutputFile: "",
}))
sqlConfig := cfg.Persistence.DataStores[t.PersistenceStoreName].SQL
// Apply migrations if file does not already exist
if _, err := os.Stat(sqlConfig.DatabaseName); os.IsNotExist(err) {
// Check if any of the parent dirs are missing
dir := filepath.Dir(sqlConfig.DatabaseName)
if _, err := os.Stat(dir); err != nil {
log.Fatal(err)
}
if err := sqlite.SetupSchema(sqlConfig); err != nil {
log.Fatal(err)
}
}
// Pre-create namespaces
var namespaces []*sqlite.NamespaceConfig
for _, ns := range []string{"default"} {
namespaces = append(namespaces, sqlite.NewNamespaceConfig(cfg.ClusterMetadata.CurrentClusterName, ns, false))
}
if err := sqlite.CreateNamespaces(sqlConfig, namespaces...); err != nil {
log.Fatal(err)
}
authorizer, err := authorization.GetAuthorizerFromConfig(&cfg.Global.Authorization)
server, err := t.NewServer(serverConfig)
if err != nil {
log.Fatal(err)
log.Fatalf("Unable to create server: %v", err)
}
claimMapper, err := authorization.GetClaimMapperFromConfig(&cfg.Global.Authorization, logger)
err = server.Start()
if err != nil {
log.Fatal(err)
log.Fatalf("Unable to start server: %v", err)
}
ctx := context.Background()
interruptChan := make(chan interface{}, 1)
go func() {
if doneChan := ctx.Done(); doneChan != nil {
s := <-doneChan
interruptChan <- s
} else {
s := <-temporal.InterruptCh()
interruptChan <- s
}
}()
temporal, err := temporal.NewServer(
temporal.WithConfig(cfg),
temporal.ForServices(temporal.DefaultServices),
temporal.WithLogger(logger),
temporal.WithAuthorizer(authorizer),
temporal.WithClaimMapper(func(cfg *config.Config) authorization.ClaimMapper {
return claimMapper
}),
temporal.InterruptOn(interruptChan),
)
err = server.Stop()
if err != nil {
log.Fatal(err)
log.Fatalf("Unable to stop server: %v", err)
}
log.Println("Starting temporal server")
if err := temporal.Start(); err != nil {
panic(err)
}
err = temporal.Stop()
if err != nil {
panic(err)
}
}
func frontendServer() {
cfg := &uiconfig.Config{
Host: "0.0.0.0",
Port: 8223,
TemporalGRPCAddress: "localhost:7233",
EnableUI: true,
PublicPath: "/temporal",
Codec: uiconfig.Codec{
Endpoint: "",
},
CORS: uiconfig.CORS{
CookieInsecure: true,
},
func frontendServer(config *internal.Config) {
uiConfig := t.NewUiConfig(config)
uiServer, err := t.NewUiServer(uiConfig)
if err != nil {
log.Fatalf("Unable to create UI server: %v", err)
}
server := uiserver.NewServer(uiserveroptions.WithConfigProvider(cfg))
log.Println("Starting temporal ui server")
if err := server.Start(); err != nil {
panic(err)
err = uiServer.Start()
if err != nil {
log.Fatalf("Unable to start UI server: %v", err)
}
server.Stop()
uiServer.Stop()
}
func main() {
config := internal.NewConfig()
go func() {
frontendServer()
frontendServer(config)
}()
backendServer()
backendServer(config)
}

View file

@ -3,7 +3,8 @@ PORT=8000
ROOT_URL=http://localhost:8000
# SQLite
SQLITE_DB_PATH=zdravko.db
ZDRAVKO_DATABASE_PATH=zdravko.db
TEMPORAL_DATABASE_PATH=temporal.db
# Session
SESSION_SECRET=your_secret

View file

@ -10,17 +10,19 @@ primary_region = 'waw'
[env]
PORT = '8080'
ROOT_URL = 'https://zdravko.fly.dev'
SQLITE_DB_PATH = 'zdravko.db'
# Other are defined in secrets
OAUTH2_SCOPES = 'openid,profile,email'
OAUTH2_ENDPOINT_TOKEN_URL = 'https://id.tjo.space/application/o/token/'
OAUTH2_ENDPOINT_AUTH_URL = 'https://id.tjo.space/application/o/authorize/'
OAUTH2_ENDPOINT_USER_INFO_URL = 'https://id.tjo.space/application/o/userinfo/'
OAUTH2_ENDPOINT_LOGOUT_URL = 'https://id.tjo.space/application/o/zdravko-development/end-session/'
TEMPORAL_UI_HOST = 'temporal.process.zdravko.internal:8223'
TEMPORAL_SERVER_HOST = 'temporal.process.zdravko.internal:7233'
[processes]
server = "server"
#worker = "worker"
temporal = "temporal"
[http_service]
processes = ["server"]

5
go.mod
View file

@ -3,13 +3,13 @@ module code.tjo.space/mentos1386/zdravko
go 1.21.6
require (
github.com/glebarez/sqlite v1.10.0
github.com/gorilla/mux v1.8.1
github.com/gorilla/sessions v1.2.2
github.com/temporalio/ui-server/v2 v2.23.0
go.temporal.io/sdk v1.26.0-rc.2
go.temporal.io/server v1.22.4
golang.org/x/oauth2 v0.17.0
gorm.io/driver/sqlite v1.5.0
gorm.io/gen v0.3.25
gorm.io/gorm v1.25.7
gorm.io/plugin/dbresolver v1.5.0
@ -39,7 +39,6 @@ require (
github.com/emirpasic/gods v1.18.1 // indirect
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/glebarez/go-sqlite v1.22.0 // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-sql-driver/mysql v1.7.1 // indirect
@ -75,6 +74,7 @@ require (
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-sqlite3 v1.14.16 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/ncruces/go-strftime v0.1.9 // indirect
github.com/olivere/elastic/v7 v7.0.32 // indirect
@ -131,6 +131,7 @@ require (
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.17.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/api v0.155.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto v0.0.0-20240125205218-1f4bbc51befe // indirect

4
go.sum
View file

@ -85,10 +85,6 @@ github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSw
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/glebarez/go-sqlite v1.22.0 h1:uAcMJhaA6r3LHMTFgP0SifzgXg46yJkgxqyuyec+ruQ=
github.com/glebarez/go-sqlite v1.22.0/go.mod h1:PlBIdHe0+aUEFn+r2/uthrWq4FxbzugL0L8Li6yQJbc=
github.com/glebarez/sqlite v1.10.0 h1:u4gt8y7OND/cCei/NMHmfbLxF6xP2wgKcT/BJf2pYkc=
github.com/glebarez/sqlite v1.10.0/go.mod h1:IJ+lfSOmiekhQsFTJRx/lHtGYmCdtAiTaf5wI9u5uHA=
github.com/go-faker/faker/v4 v4.2.0 h1:dGebOupKwssrODV51E0zbMrv5e2gO9VWSLNC1WDCpWg=
github.com/go-faker/faker/v4 v4.2.0/go.mod h1:F/bBy8GH9NxOxMInug5Gx4WYeG6fHJZ8Ol/dhcpRub4=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=

View file

@ -9,8 +9,6 @@ type Config struct {
PORT string
ROOT_URL string // Needed for oauth2 redirect
SQLITE_DB_PATH string
SESSION_SECRET string
OAUTH2_CLIENT_ID string
@ -20,6 +18,13 @@ type Config struct {
OAUTH2_ENDPOINT_AUTH_URL string
OAUTH2_ENDPOINT_USER_INFO_URL string
OAUTH2_ENDPOINT_LOGOUT_URL string
ZDRAVKO_DATABASE_PATH string
TEMPORAL_DATABASE_PATH string
TEMPORAL_LISTEN_ADDRESS string
TEMPORAL_UI_HOST string
TEMPORAL_SERVER_HOST string
}
func getEnv(key, fallback string) string {
@ -41,15 +46,21 @@ func NewConfig() *Config {
PORT: getEnv("PORT", "8000"),
ROOT_URL: getEnvRequired("ROOT_URL"),
SQLITE_DB_PATH: getEnv("SQLITE_DB_PATH", "zdravko.db"),
SESSION_SECRET: getEnvRequired("SESSION_SECRET"),
OAUTH2_CLIENT_ID: getEnvRequired("OAUTH2_CLIENT_ID"),
OAUTH2_CLIENT_SECRET: getEnvRequired("OAUTH2_CLIENT_SECRET"),
OAUTH2_SCOPES: strings.Split(getEnvRequired("OAUTH2_SCOPES"), ","),
OAUTH2_SCOPES: strings.Split(getEnv("OAUTH2_SCOPES", "openid,profile,email"), ","),
OAUTH2_ENDPOINT_TOKEN_URL: getEnvRequired("OAUTH2_ENDPOINT_TOKEN_URL"),
OAUTH2_ENDPOINT_AUTH_URL: getEnvRequired("OAUTH2_ENDPOINT_AUTH_URL"),
OAUTH2_ENDPOINT_USER_INFO_URL: getEnvRequired("OAUTH2_ENDPOINT_USER_INFO_URL"),
OAUTH2_ENDPOINT_LOGOUT_URL: getEnvRequired("OAUTH2_ENDPOINT_LOGOUT_URL"),
ZDRAVKO_DATABASE_PATH: getEnv("ZDRAVKO_DATABASE_PATH", "zdravko.db"),
TEMPORAL_DATABASE_PATH: getEnv("TEMPORAL_DATABASE_PATH", "temporal.db"),
TEMPORAL_LISTEN_ADDRESS: getEnv("TEMPORAL_LISTEN_ADDRESS", "0.0.0.0"),
TEMPORAL_UI_HOST: getEnv("TEMPORAL_UI_HOST", "localhost:8223"),
TEMPORAL_SERVER_HOST: getEnv("TEMPORAL_SERVER_HOST", "localhost:7233"),
}
}

View file

@ -3,7 +3,7 @@ package internal
import (
"code.tjo.space/mentos1386/zdravko/internal/models"
"code.tjo.space/mentos1386/zdravko/internal/models/query"
"github.com/glebarez/sqlite"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)

View file

@ -10,7 +10,7 @@ var customTransport = http.DefaultTransport
func (h *BaseHandler) Temporal(w http.ResponseWriter, r *http.Request, user *AuthenticatedUser) {
// Create a new HTTP request with the same method, URL, and body as the original request
targetURL := r.URL
targetURL.Host = "localhost:8223"
targetURL.Host = h.config.TEMPORAL_UI_HOST
targetURL.Scheme = "http"
proxyReq, err := http.NewRequest(r.Method, targetURL.String(), r.Body)

View file

@ -4,6 +4,7 @@ import (
"fmt"
"time"
"code.tjo.space/mentos1386/zdravko/internal"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/persistence/sql/sqlplugin/sqlite"
@ -19,7 +20,7 @@ const HistoryPort = 7234
const MatchingPort = 7235
const WorkerPort = 7236
func NewConfig() *config.Config {
func NewServerConfig(cfg *internal.Config) *config.Config {
return &config.Config{
Persistence: config.Persistence{
DataStores: map[string]config.DataStore{
@ -28,7 +29,7 @@ func NewConfig() *config.Config {
ConnectAttributes: map[string]string{
"mode": "rwc",
},
DatabaseName: "temporal.db",
DatabaseName: cfg.TEMPORAL_DATABASE_PATH,
},
},
},
@ -49,7 +50,7 @@ func NewConfig() *config.Config {
GRPCPort: FrontendPort,
MembershipPort: FrontendPort + 100,
BindOnLocalHost: false,
BindOnIP: "0.0.0.0",
BindOnIP: cfg.TEMPORAL_LISTEN_ADDRESS,
},
},
"history": {

78
pkg/temporal/server.go Normal file
View file

@ -0,0 +1,78 @@
package temporal
import (
"context"
"os"
"path/filepath"
"go.temporal.io/server/common/authorization"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/log"
"go.temporal.io/server/schema/sqlite"
t "go.temporal.io/server/temporal"
)
func NewServer(cfg *config.Config) (t.Server, error) {
logger := log.NewZapLogger(log.BuildZapLogger(log.Config{
Stdout: true,
Level: "info",
OutputFile: "",
}))
sqlConfig := cfg.Persistence.DataStores[PersistenceStoreName].SQL
// Apply migrations if file does not already exist
if _, err := os.Stat(sqlConfig.DatabaseName); os.IsNotExist(err) {
// Check if any of the parent dirs are missing
dir := filepath.Dir(sqlConfig.DatabaseName)
if _, err := os.Stat(dir); err != nil {
return nil, err
}
if err := sqlite.SetupSchema(sqlConfig); err != nil {
return nil, err
}
}
// Pre-create namespaces
var namespaces []*sqlite.NamespaceConfig
for _, ns := range []string{"default"} {
namespaces = append(namespaces, sqlite.NewNamespaceConfig(cfg.ClusterMetadata.CurrentClusterName, ns, false))
}
if err := sqlite.CreateNamespaces(sqlConfig, namespaces...); err != nil {
return nil, err
}
authorizer, err := authorization.GetAuthorizerFromConfig(&cfg.Global.Authorization)
if err != nil {
return nil, err
}
claimMapper, err := authorization.GetClaimMapperFromConfig(&cfg.Global.Authorization, logger)
if err != nil {
return nil, err
}
ctx := context.Background()
interruptChan := make(chan interface{}, 1)
go func() {
if doneChan := ctx.Done(); doneChan != nil {
s := <-doneChan
interruptChan <- s
} else {
s := <-t.InterruptCh()
interruptChan <- s
}
}()
return t.NewServer(
t.WithConfig(cfg),
t.ForServices(t.DefaultServices),
t.WithLogger(logger),
t.WithAuthorizer(authorizer),
t.WithClaimMapper(func(cfg *config.Config) authorization.ClaimMapper {
return claimMapper
}),
t.InterruptOn(interruptChan),
)
}

29
pkg/temporal/ui.go Normal file
View file

@ -0,0 +1,29 @@
package temporal
import (
"code.tjo.space/mentos1386/zdravko/internal"
"github.com/temporalio/ui-server/v2/server"
"github.com/temporalio/ui-server/v2/server/config"
"github.com/temporalio/ui-server/v2/server/server_options"
)
func NewUiConfig(cfg *internal.Config) *config.Config {
return &config.Config{
Host: cfg.TEMPORAL_LISTEN_ADDRESS,
Port: 8223,
TemporalGRPCAddress: cfg.TEMPORAL_SERVER_HOST,
EnableUI: true,
PublicPath: "/temporal",
Codec: config.Codec{
Endpoint: "",
},
CORS: config.CORS{
CookieInsecure: true,
},
}
}
func NewUiServer(cfg *config.Config) (*server.Server, error) {
s := server.NewServer(server_options.WithConfigProvider(cfg))
return s, nil
}

View file

@ -16,7 +16,7 @@ func main() {
FieldNullable: true,
})
db, _, _ := internal.ConnectToDatabase(config.SQLITE_DB_PATH)
db, _, _ := internal.ConnectToDatabase(config.ZDRAVKO_DATABASE_PATH)
// Use the above `*gorm.DB` instance to initialize the generator,
// which is required to generate structs from db when using `GenerateModel/GenerateModelAs`