Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# Added by goreleaser init:
dist/
cmd/ad-runtime-utils/ad-runtime-utils
13 changes: 12 additions & 1 deletion README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,15 @@ Default runtimes:
Service <service-name>:
<runtime>: <path or "error: <message>">
```
```

### 4. Starting a Service

When --start and/or --supervise flags are provided --service is a required argument.

- It starts the executable `services.<service-name>.executable` with the specified arguments from `services.<service-name>.executable_args`.

- If the `--supervise` flag is provided, it will run the health checks, defined in `services.<service-name>.health_checks` on service start to make sure the service is operational. If any of the checks fail the service will be stopped. `Type=notify` should be used in the systemd unit(see example in `examples/systemd` directory).

- While using `--supervise` and health checks, make sure that systemd service has enough `TimeoutStartSec`. ideally should be a combined timeout of all health checks.

63 changes: 63 additions & 0 deletions cmd/ad-runtime-utils/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ import (
"flag"
"fmt"
"io"
"os"
"strings"

"github.com/arenadata/ad-runtime-utils/internal/config"
"github.com/arenadata/ad-runtime-utils/internal/detect"
"github.com/arenadata/ad-runtime-utils/internal/exec"
"github.com/coreos/go-systemd/v22/daemon"
)

// exit codes.
Expand All @@ -27,6 +30,8 @@ func Run(args []string, stdout, stderr io.Writer) int {
listAll := fs.Bool("list", false, "List all detected runtimes (default + services)")
fs.BoolVar(listAll, "l", false, "shorthand for --list")
printCACerts := fs.Bool("print-cacerts", false, "When used with --runtime=java, prints the cacerts path and exits")
start := fs.Bool("start", false, "Start the service. Use with simple/exec services")
supervise := fs.Bool("supervise", false, "Supervise the service. Use with notify systemd services")

if err := fs.Parse(args); err != nil {
return exitParseError
Expand Down Expand Up @@ -76,6 +81,15 @@ func Run(args []string, stdout, stderr io.Writer) int {
}

envName := detectEnvName(cfg, *service, *runtime)

if *start {
if err = startService(*service, envName, path, *cfg, *supervise); err != nil {
fmt.Fprintf(stderr, "start service failed: %v\n", err)
return exitUserError
}
return exitOK
}

fmt.Fprintf(stdout, "export %s=%s\n", envName, path)
return exitOK
}
Expand Down Expand Up @@ -124,3 +138,52 @@ func runList(cfg *config.Config, stdout, stderr io.Writer) int {
}
return exitOK
}

func startService(service string, envName string, envPath string, cfg config.Config, supervise bool) error {
srvConfig, ok := cfg.Services[service]
if !ok {
return fmt.Errorf("service %s not found in config", service)
}
// Append the env for the runtime (eg. JAVA_HOME)
if srvConfig.EnvVars == nil {
srvConfig.EnvVars = make(map[string]string)
}
srvConfig.EnvVars[envName] = envPath
if !supervise {
return exec.RunExecutable(srvConfig.Executable, srvConfig.ExecutableArgs, srvConfig.EnvVars)
}
process, err := exec.RunExecutableAsync(srvConfig.Executable, srvConfig.ExecutableArgs, srvConfig.EnvVars)
if err != nil {
return err
}
// Run the health checks
for _, checkCfg := range srvConfig.HealthChecks {
switch checkCfg.Type {
case exec.PortHealthCheckType:
portheck := exec.PortHealthCheck{
PID: process.Process.Pid,
Config: checkCfg,
}
if err = portheck.Check(); err != nil {
if err = process.Process.Signal(os.Interrupt); err != nil {
return fmt.Errorf("failed to send interrupt signal to process: %w", err)
}
return fmt.Errorf("health check failed: %w", err)
}
default:
if err = process.Process.Signal(os.Interrupt); err != nil {
return fmt.Errorf("failed to send interrupt signal to process: %w", err)
}
return fmt.Errorf("unknown health check type: %s", checkCfg.Type)
}
}
// Notify systemd daemon that service has started
if _, err = daemon.SdNotify(false, daemon.SdNotifyReady); err != nil {
fmt.Fprintf(os.Stderr, "systemd notification failed: %v\n", err)
}
// TODO: Replace this with an actual supervisor loop
if err = process.Wait(); err != nil {
return err
}
return nil
}
2 changes: 1 addition & 1 deletion configs/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,4 @@ services:
FLINK2:
path: /etc/flink2/conf/flink2-java.yaml
OZONE:
path: /etc/ozone/conf/ozone-java.yaml
path: /etc/ozone/conf/ozone-java.yaml
17 changes: 17 additions & 0 deletions examples/config/kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
executable: bin/kafka-server-start.sh
executable_args:
- config/server.properties
runtimes:
java:
version: "21"
health_checks:
- type: port
params:
port: 9092
timeout: 20
protocol: tcp
- type: port
params:
port: 9093
timeout: 20
protocol: tcp
10 changes: 10 additions & 0 deletions examples/systemd/kafka.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[Unit]
Description=Test Sleep

[Service]
Type=notify
ExecStart=bin/ad-runtime-utils --config configs/config.yaml --service kafka --runtime java --start --supervise
TimeoutStartSec=120

[Install]
WantedBy=multi-user.target
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@ module github.com/arenadata/ad-runtime-utils

go 1.24.4

require github.com/goccy/go-yaml v1.18.0
require (
github.com/coreos/go-systemd/v22 v22.6.0
github.com/goccy/go-yaml v1.18.0
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
github.com/coreos/go-systemd/v22 v22.6.0 h1:aGVa/v8B7hpb0TKl0MWoAavPDmHvobFe5R5zn0bCJWo=
github.com/coreos/go-systemd/v22 v22.6.0/go.mod h1:iG+pp635Fo7ZmV/j14KUcmEyWF+0X7Lua8rrTWzYgWU=
github.com/goccy/go-yaml v1.18.0 h1:8W7wMFS12Pcas7KU+VVkaiCng+kG8QiFeFwzFb+rwuw=
github.com/goccy/go-yaml v1.18.0/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA=
120 changes: 94 additions & 26 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package config
import (
"fmt"
"os"
"strings"

"github.com/goccy/go-yaml"
)
Expand All @@ -14,9 +15,19 @@ type RuntimeSetting struct {
Paths []string `yaml:"paths,omitempty"`
}

type HealthCheckConfig struct {
Type string `yaml:"type"`
Params map[string]any `yaml:"params,omitempty"`
}

type ServiceConfig struct {
Runtimes map[string]RuntimeSetting `yaml:"runtimes,omitempty"`
Path string `yaml:"path,omitempty"`
Runtimes map[string]RuntimeSetting `yaml:"runtimes,omitempty"`
Path string `yaml:"path,omitempty"`
Executable string `yaml:"executable,omitempty"`
ExecutableArgs []string `yaml:"executable_args,omitempty"`
EnvVars map[string]string `yaml:"env_vars,omitempty"`
EnvVarsFile string `yaml:"env_vars_file,omitempty"`
HealthChecks []HealthCheckConfig `yaml:"health_checks,omitempty"`
}

type Config struct {
Expand All @@ -31,6 +42,30 @@ type Config struct {
Services map[string]ServiceConfig `yaml:"services"`
}

func (h *HealthCheckConfig) ParamToInt(name string) (int, error) {
val, found := h.Params[name]
if !found {
return 0, fmt.Errorf("health check param %q not found", name)
}
intVal, ok := val.(int)
if !ok {
return 0, fmt.Errorf("health check param %q is not an integer", name)
}
return intVal, nil
}

func (h *HealthCheckConfig) ParamToString(name string) (string, error) {
val, found := h.Params[name]
if !found {
return "", fmt.Errorf("health check param %q not found", name)
}
strVal, ok := val.(string)
if !ok {
return "", fmt.Errorf("health check param %q is not a string", name)
}
return strVal, nil
}

func Load(path string) (*Config, error) {
data, readErr := os.ReadFile(path)
if readErr != nil {
Expand All @@ -44,37 +79,70 @@ func Load(path string) (*Config, error) {
}

for name, svc := range cfg.Services {
if svc.Path == "" {
continue
}

extData, readExtErr := os.ReadFile(svc.Path)
if readExtErr != nil {
if os.IsNotExist(readExtErr) {
continue
// Load Service config from file if specified
if svc.Path != "" {
if extCfg, fullCfgErr := parseExternalConfig(svc.Path); fullCfgErr == nil {
if extSvc, found := extCfg.Services[name]; found {
cfg.Services[name] = extSvc
continue
}
}
return nil, fmt.Errorf("read service config %q: %w", svc.Path, readExtErr)
}

var extCfg Config
fullCfgErr := yaml.UnmarshalWithOptions(extData, &extCfg, yaml.Strict())
if fullCfgErr == nil {
if extSvc, found := extCfg.Services[name]; found && len(extSvc.Runtimes) > 0 {
svc.Runtimes = extSvc.Runtimes
cfg.Services[name] = svc
continue
// Load Service from external file if specified
ext, err := parseExternalServiceConfig(svc.Path)
if err != nil {
return nil, fmt.Errorf("parse external service config %q: %w", svc.Path, err)
}
// Replace ServiceConfig with the loaded one
svc = ext
// Keep the path to the Service config file for future references
svc.Path = path
}

var ext ServiceConfig
fallbackErr := yaml.UnmarshalWithOptions(extData, &ext, yaml.Strict())
if fallbackErr != nil {
return nil, fmt.Errorf("parse service config %q: %w", svc.Path, fallbackErr)
}

svc.Runtimes = ext.Runtimes
cfg.Services[name] = svc
}

return &cfg, nil
}

func parseExternalConfig(path string) (Config, error) {
var cfg Config

extData, err := os.ReadFile(path)
if err != nil {
if os.IsNotExist(err) {
return cfg, nil
}
return cfg, fmt.Errorf("read service config %q: %w", path, err)
}

if err = yaml.UnmarshalWithOptions(extData, &cfg, yaml.Strict()); err == nil {
return cfg, nil
}
return cfg, nil
}

func parseExternalServiceConfig(path string) (ServiceConfig, error) {
var cfg ServiceConfig
extData, err := os.ReadFile(path)
if err != nil {
if os.IsNotExist(err) {
return cfg, nil
}
return cfg, fmt.Errorf("read service config %q: %w", path, err)
}
if err = yaml.UnmarshalWithOptions(extData, &cfg, yaml.Strict()); err != nil {
return cfg, fmt.Errorf("parse service config %q: %w", path, err)
}
// Abbomination to support env files sourcing
// It changes exec to bash and adds source command and the original executable as the args to bash
if cfg.EnvVarsFile != "" {
argsString := strings.Join(cfg.ExecutableArgs, " ")
cfg.ExecutableArgs = []string{
"-c",
fmt.Sprintf("source %s; %s %s", cfg.EnvVarsFile, cfg.Executable, argsString),
}
cfg.Executable = "bash"
}
return cfg, nil
}
Loading