Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 71 additions & 2 deletions cmd/cefasdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/CefasDb/cefasdb/internal/catalog"
"github.com/CefasDb/cefasdb/internal/cluster"
"github.com/CefasDb/cefasdb/internal/config"
"github.com/CefasDb/cefasdb/internal/identitylease"
"github.com/CefasDb/cefasdb/internal/metrics"
"github.com/CefasDb/cefasdb/internal/rebalance"
craft "github.com/CefasDb/cefasdb/internal/replication"
Expand Down Expand Up @@ -63,6 +64,13 @@ func main() {
raftLogCompMinBytes = flag.Int("raft-log-compression-min-bytes", 0, "Minimum raft log payload bytes before compression. 0 inherits config/default.")
raftLogCompMinSavingsRatio = flag.Float64("raft-log-compression-min-savings-ratio", -1, "Minimum compression savings ratio required to keep compressed payloads. Negative inherits config/default.")
raftLogCompSkipCooldown = flag.Duration("raft-log-compression-skip-cooldown", -1, "Cooldown after an unhelpful compression attempt. Negative inherits config/default; 0 disables cooldown.")
raftIdentityLeaseBackend = flag.String("raft-identity-lease-backend", "", "Raft identity lease backend: file, kubernetes, auto, or off. Empty inherits config/default.")
raftIdentityLeasePath = flag.String("raft-identity-lease-path", "", "Directory for file-backed raft identity leases. Empty defaults to -data/raft-identity.")
raftIdentityLeaseName = flag.String("raft-identity-lease-name", "", "Lease resource name for external backends. Empty derives from -raft-id.")
raftIdentityLeaseNamespace = flag.String("raft-identity-lease-namespace", "", "Kubernetes namespace for raft identity Lease objects. Empty uses in-cluster namespace.")
raftIdentityLeaseAPIURL = flag.String("raft-identity-kubernetes-api-url", "", "Kubernetes API URL for raft identity leases. Empty uses in-cluster service discovery.")
raftIdentityLeaseTTL = flag.Duration("raft-identity-lease-ttl", 0, "Raft identity lease TTL. 0 inherits config/default.")
raftIdentityLeaseRenew = flag.Duration("raft-identity-lease-renew-interval", 0, "Raft identity lease renew interval. 0 inherits config/default.")

// Storage tuning.
storageProfile = flag.String("storage-profile", "", "Pebble profile: default, balanced, write-heavy")
Expand Down Expand Up @@ -198,6 +206,10 @@ func main() {
*backupSchedulerEnabled, *backupSchedulerDisabled, *backupSchedulerDryRun,
*backupSchedulerInterval, *backupSchedulerNameTemplate, *backupSchedulerTables,
*backupSchedulerRetentionKeepLatest, *backupSchedulerRetentionMaxAge, *backupSchedulerRetentionDryRun)
bootstrapserver.OverlayRaftIdentityLeaseFlags(&cfg,
*raftIdentityLeaseBackend, *raftIdentityLeasePath, *raftIdentityLeaseName,
*raftIdentityLeaseNamespace, *raftIdentityLeaseAPIURL,
*raftIdentityLeaseTTL, *raftIdentityLeaseRenew)

// Initialise tracing first so subsequent setup gets spans on
// failure. tracingShutdown is a no-op when no endpoint is set.
Expand Down Expand Up @@ -232,6 +244,49 @@ func main() {
os.Exit(1)
}

leaseCtx, leaseCancel := context.WithCancel(context.Background())
defer leaseCancel()
leaseLost := make(chan error, 1)
var raftIdentityGuard *identitylease.Guard
if raftIdentityLeaseRequired(cfg) {
raftIdentityGuard, err = identitylease.Acquire(context.Background(), identitylease.Options{
NodeID: cfg.Cluster.SelfID,
DataDir: cfg.Data,
Backend: cfg.RaftIdentity.LeaseBackend,
LeasePath: cfg.RaftIdentity.LeasePath,
LeaseName: cfg.RaftIdentity.LeaseName,
LeaseTTL: cfg.RaftIdentity.LeaseTTL,
RenewInterval: cfg.RaftIdentity.LeaseRenewInterval,
KubernetesNamespace: cfg.RaftIdentity.LeaseNamespace,
KubernetesAPIURL: cfg.RaftIdentity.KubernetesAPIURL,
KubernetesBearerToken: "",
KubernetesCAFile: "",
HTTPClient: nil,
})
if err != nil {
logger.Error("raft identity lease", "err", err, "raftID", cfg.Cluster.SelfID, "backend", cfg.RaftIdentity.LeaseBackend)
os.Exit(1)
}
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := raftIdentityGuard.Close(ctx); err != nil {
logger.Error("release raft identity lease", "err", err, "raftID", cfg.Cluster.SelfID)
}
}()
rec := raftIdentityGuard.Record()
logger.Info("raft identity lease acquired", "raftID", rec.NodeID, "holder", rec.HolderID, "epoch", rec.Epoch, "backend", rec.Backend, "resource", rec.Resource, "expires", rec.ExpiresAt)
go raftIdentityGuard.RenewLoop(leaseCtx, func(err error) {
logger.Error("raft identity lease lost", "err", err, "raftID", cfg.Cluster.SelfID)
select {
case leaseLost <- err:
default:
}
})
} else if cfg.Cluster.SelfID != "" && cfg.RaftIdentity.LeaseBackend == identitylease.BackendOff {
logger.Info("raft identity lease disabled", "raftID", cfg.Cluster.SelfID)
}

var (
db *pebble.DB
cat *catalog.Catalog
Expand Down Expand Up @@ -517,8 +572,15 @@ func main() {

stop := make(chan os.Signal, 1)
signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)
<-stop
logger.Info("shutting down")
shutdownReason := "signal"
select {
case sig := <-stop:
shutdownReason = sig.String()
case err := <-leaseLost:
shutdownReason = "raft identity lease lost: " + err.Error()
}
logger.Info("shutting down", "reason", shutdownReason)
leaseCancel()
runtimeCancel()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand All @@ -529,3 +591,10 @@ func main() {
gsrv.GracefulStop()
}
}

func raftIdentityLeaseRequired(cfg config.Config) bool {
if cfg.Cluster.SelfID == "" || cfg.RaftIdentity.LeaseBackend == identitylease.BackendOff {
return false
}
return cfg.Cluster.Shards > 0 || cfg.Raft.Bind != ""
}
8 changes: 8 additions & 0 deletions dist/helm/cefas/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@
{{ include "cefas.fullname" . }}-headless
{{- end -}}

{{- define "cefas.serviceAccountName" -}}
{{- if .Values.serviceAccount.create -}}
{{ default (include "cefas.fullname" .) .Values.serviceAccount.name }}
{{- else -}}
{{ default "default" .Values.serviceAccount.name }}
{{- end -}}
{{- end -}}

{{- define "cefas.labels" -}}
app.kubernetes.io/name: {{ include "cefas.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
Expand Down
32 changes: 32 additions & 0 deletions dist/helm/cefas/templates/lease-rbac.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{{- if and .Values.raftIdentity.lease.enabled (eq .Values.raftIdentity.lease.backend "kubernetes") -}}
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: {{ include "cefas.fullname" . }}-identity-lease
labels:
{{- include "cefas.labels" . | nindent 4 }}
rules:
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- get
- create
- update
- patch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: {{ include "cefas.fullname" . }}-identity-lease
labels:
{{- include "cefas.labels" . | nindent 4 }}
subjects:
- kind: ServiceAccount
name: {{ include "cefas.serviceAccountName" . }}
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: {{ include "cefas.fullname" . }}-identity-lease
{{- end }}
20 changes: 20 additions & 0 deletions dist/helm/cefas/templates/statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ spec:
annotations:
{{- toYaml .Values.podAnnotations | nindent 8 }}
spec:
serviceAccountName: {{ include "cefas.serviceAccountName" . }}
securityContext:
{{- toYaml .Values.securityContext | nindent 8 }}
terminationGracePeriodSeconds: 30
Expand All @@ -30,11 +31,30 @@ spec:
- "/etc/cefas/cefas.yaml"
- "-raft-id"
- "$(POD_NAME)"
{{- if .Values.raftIdentity.lease.enabled }}
- "-raft-identity-lease-backend"
- {{ .Values.raftIdentity.lease.backend | quote }}
- "-raft-identity-lease-name"
- "{{ default (include "cefas.fullname" .) .Values.raftIdentity.lease.namePrefix }}-$(POD_NAME)"
- "-raft-identity-lease-namespace"
- "$(POD_NAMESPACE)"
- "-raft-identity-lease-ttl"
- {{ .Values.raftIdentity.lease.ttl | quote }}
- "-raft-identity-lease-renew-interval"
- {{ .Values.raftIdentity.lease.renewInterval | quote }}
{{- else }}
- "-raft-identity-lease-backend"
- "off"
{{- end }}
env:
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
ports:
- name: http
containerPort: {{ .Values.service.http }}
Expand Down
24 changes: 24 additions & 0 deletions dist/helm/cefas/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ image:

replicaCount: 1

serviceAccount:
create: true
name: ""

resources:
requests:
cpu: "200m"
Expand All @@ -36,6 +40,16 @@ cluster:
bootstrap: true
fsyncOnCommit: false

# Guards each raft-id with a persistent lease so a stale process cannot
# keep serving after Kubernetes starts a replacement with the same id.
raftIdentity:
lease:
enabled: true
backend: kubernetes
ttl: 30s
renewInterval: 10s
namePrefix: ""

# Optional identity provider (Tikti JWKS).
identity:
jwksUrl: ""
Expand All @@ -52,6 +66,16 @@ tracing:
metrics:
enabled: true

ingress:
enabled: false
className: ""
annotations: {}
prefix: /
stripPrefix:
enabled: false
hosts: []
tls: []

podAnnotations:
prometheus.io/scrape: "true"
prometheus.io/port: "8080"
Expand Down
32 changes: 32 additions & 0 deletions internal/bootstrap/server/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,38 @@ func OverlayFlags(
}
}

// OverlayRaftIdentityLeaseFlags keeps the newer identity-lease flags
// out of the already-large OverlayFlags signature while preserving the
// same precedence rule: only explicitly non-zero flag values override
// env/YAML/default config.
func OverlayRaftIdentityLeaseFlags(
cfg *config.Config,
backend, path, name, namespace, kubernetesAPIURL string,
ttl, renewInterval time.Duration,
) {
if backend != "" {
cfg.RaftIdentity.LeaseBackend = backend
}
if path != "" {
cfg.RaftIdentity.LeasePath = path
}
if name != "" {
cfg.RaftIdentity.LeaseName = name
}
if namespace != "" {
cfg.RaftIdentity.LeaseNamespace = namespace
}
if kubernetesAPIURL != "" {
cfg.RaftIdentity.KubernetesAPIURL = kubernetesAPIURL
}
if ttl > 0 {
cfg.RaftIdentity.LeaseTTL = ttl
}
if renewInterval > 0 {
cfg.RaftIdentity.LeaseRenewInterval = renewInterval
}
}

// SplitCSVFlag splits a comma-separated CLI flag value into a trimmed
// slice. Blank entries are dropped, so "a, ,b" yields ["a","b"].
func SplitCSVFlag(in string) []string {
Expand Down
18 changes: 18 additions & 0 deletions internal/bootstrap/server/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,24 @@ func TestOverlayFlags_RaftGroup(t *testing.T) {
}
}

func TestOverlayRaftIdentityLeaseFlags(t *testing.T) {
cfg := baseCfg()
OverlayRaftIdentityLeaseFlags(&cfg, "kubernetes", "/leases", "lease-n1", "cefasdb", "https://kubernetes.default.svc", 45*time.Second, 15*time.Second)

if cfg.RaftIdentity.LeaseBackend != "kubernetes" {
t.Errorf("LeaseBackend = %q", cfg.RaftIdentity.LeaseBackend)
}
if cfg.RaftIdentity.LeasePath != "/leases" || cfg.RaftIdentity.LeaseName != "lease-n1" || cfg.RaftIdentity.LeaseNamespace != "cefasdb" {
t.Errorf("lease identity fields = %+v", cfg.RaftIdentity)
}
if cfg.RaftIdentity.KubernetesAPIURL != "https://kubernetes.default.svc" {
t.Errorf("KubernetesAPIURL = %q", cfg.RaftIdentity.KubernetesAPIURL)
}
if cfg.RaftIdentity.LeaseTTL != 45*time.Second || cfg.RaftIdentity.LeaseRenewInterval != 15*time.Second {
t.Errorf("lease durations = %+v", cfg.RaftIdentity)
}
}

func TestOverlayFlags_PeerSetGroup(t *testing.T) {
cfg := baseCfg()
args := zeroArgs()
Expand Down
21 changes: 21 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ type Config struct {
LogCompressionMinSavingsRatio float64 `yaml:"logCompressionMinSavingsRatio"`
LogCompressionSkipCooldown time.Duration `yaml:"logCompressionSkipCooldown"`
} `yaml:"raft"`
RaftIdentity struct {
LeaseBackend string `yaml:"leaseBackend"`
LeasePath string `yaml:"leasePath"`
LeaseName string `yaml:"leaseName"`
LeaseNamespace string `yaml:"leaseNamespace"`
LeaseTTL time.Duration `yaml:"leaseTtl"`
LeaseRenewInterval time.Duration `yaml:"leaseRenewInterval"`
KubernetesAPIURL string `yaml:"kubernetesApiUrl"`
KubernetesServiceAccount string `yaml:"kubernetesServiceAccount"`
} `yaml:"raftIdentity"`
Identity struct {
JwksURL string `yaml:"jwksUrl"`
Issuer string `yaml:"issuer"`
Expand Down Expand Up @@ -179,6 +189,9 @@ func Defaults() Config {
c.Raft.LogCompressionMinBytes = 1024
c.Raft.LogCompressionMinSavingsRatio = 0.05
c.Raft.LogCompressionSkipCooldown = time.Second
c.RaftIdentity.LeaseBackend = "file"
c.RaftIdentity.LeaseTTL = 30 * time.Second
c.RaftIdentity.LeaseRenewInterval = 10 * time.Second
c.Tracing.SampleRate = 1.0
return c
}
Expand Down Expand Up @@ -331,6 +344,14 @@ func ApplyEnv(cfg *Config) error {
cfg.Raft.LogCompressionMinBytes = integer("RAFT_LOG_COMPRESSION_MIN_BYTES", cfg.Raft.LogCompressionMinBytes)
cfg.Raft.LogCompressionMinSavingsRatio = flt("RAFT_LOG_COMPRESSION_MIN_SAVINGS_RATIO", cfg.Raft.LogCompressionMinSavingsRatio)
cfg.Raft.LogCompressionSkipCooldown = dur("RAFT_LOG_COMPRESSION_SKIP_COOLDOWN", cfg.Raft.LogCompressionSkipCooldown)
cfg.RaftIdentity.LeaseBackend = str("RAFT_IDENTITY_LEASE_BACKEND", cfg.RaftIdentity.LeaseBackend)
cfg.RaftIdentity.LeasePath = str("RAFT_IDENTITY_LEASE_PATH", cfg.RaftIdentity.LeasePath)
cfg.RaftIdentity.LeaseName = str("RAFT_IDENTITY_LEASE_NAME", cfg.RaftIdentity.LeaseName)
cfg.RaftIdentity.LeaseNamespace = str("RAFT_IDENTITY_LEASE_NAMESPACE", cfg.RaftIdentity.LeaseNamespace)
cfg.RaftIdentity.LeaseTTL = dur("RAFT_IDENTITY_LEASE_TTL", cfg.RaftIdentity.LeaseTTL)
cfg.RaftIdentity.LeaseRenewInterval = dur("RAFT_IDENTITY_LEASE_RENEW_INTERVAL", cfg.RaftIdentity.LeaseRenewInterval)
cfg.RaftIdentity.KubernetesAPIURL = str("RAFT_IDENTITY_KUBERNETES_API_URL", cfg.RaftIdentity.KubernetesAPIURL)
cfg.RaftIdentity.KubernetesServiceAccount = str("RAFT_IDENTITY_KUBERNETES_SERVICE_ACCOUNT", cfg.RaftIdentity.KubernetesServiceAccount)

cfg.Identity.JwksURL = str("IDENTITY_JWKS_URL", cfg.Identity.JwksURL)
cfg.Identity.Issuer = str("IDENTITY_ISSUER", cfg.Identity.Issuer)
Expand Down
Loading
Loading