From 5d088c32fc6674304592106db0e6b9645e90ff5b Mon Sep 17 00:00:00 2001 From: Osvaldo Andrade Date: Wed, 24 Jun 2026 23:10:30 -0300 Subject: [PATCH] Add Raft identity lease fencing --- cmd/cefasdb/main.go | 73 +- dist/helm/cefas/templates/_helpers.tpl | 8 + dist/helm/cefas/templates/lease-rbac.yaml | 32 + dist/helm/cefas/templates/statefulset.yaml | 20 + dist/helm/cefas/values.yaml | 24 + internal/bootstrap/server/flags.go | 32 + internal/bootstrap/server/flags_test.go | 18 + internal/config/config.go | 21 + internal/config/config_test.go | 26 + internal/identitylease/lease.go | 857 +++++++++++++++++++++ internal/identitylease/lease_test.go | 218 ++++++ 11 files changed, 1327 insertions(+), 2 deletions(-) create mode 100644 dist/helm/cefas/templates/lease-rbac.yaml create mode 100644 internal/identitylease/lease.go create mode 100644 internal/identitylease/lease_test.go diff --git a/cmd/cefasdb/main.go b/cmd/cefasdb/main.go index f04d011..72b9b9f 100644 --- a/cmd/cefasdb/main.go +++ b/cmd/cefasdb/main.go @@ -24,6 +24,7 @@ import ( "github.com/CefasDb/cefasdb/internal/catalog" "github.com/CefasDb/cefasdb/internal/cluster" "github.com/CefasDb/cefasdb/internal/config" + "github.com/CefasDb/cefasdb/internal/identitylease" "github.com/CefasDb/cefasdb/internal/metrics" "github.com/CefasDb/cefasdb/internal/rebalance" craft "github.com/CefasDb/cefasdb/internal/replication" @@ -63,6 +64,13 @@ func main() { raftLogCompMinBytes = flag.Int("raft-log-compression-min-bytes", 0, "Minimum raft log payload bytes before compression. 0 inherits config/default.") raftLogCompMinSavingsRatio = flag.Float64("raft-log-compression-min-savings-ratio", -1, "Minimum compression savings ratio required to keep compressed payloads. Negative inherits config/default.") raftLogCompSkipCooldown = flag.Duration("raft-log-compression-skip-cooldown", -1, "Cooldown after an unhelpful compression attempt. Negative inherits config/default; 0 disables cooldown.") + raftIdentityLeaseBackend = flag.String("raft-identity-lease-backend", "", "Raft identity lease backend: file, kubernetes, auto, or off. Empty inherits config/default.") + raftIdentityLeasePath = flag.String("raft-identity-lease-path", "", "Directory for file-backed raft identity leases. Empty defaults to -data/raft-identity.") + raftIdentityLeaseName = flag.String("raft-identity-lease-name", "", "Lease resource name for external backends. Empty derives from -raft-id.") + raftIdentityLeaseNamespace = flag.String("raft-identity-lease-namespace", "", "Kubernetes namespace for raft identity Lease objects. Empty uses in-cluster namespace.") + raftIdentityLeaseAPIURL = flag.String("raft-identity-kubernetes-api-url", "", "Kubernetes API URL for raft identity leases. Empty uses in-cluster service discovery.") + raftIdentityLeaseTTL = flag.Duration("raft-identity-lease-ttl", 0, "Raft identity lease TTL. 0 inherits config/default.") + raftIdentityLeaseRenew = flag.Duration("raft-identity-lease-renew-interval", 0, "Raft identity lease renew interval. 0 inherits config/default.") // Storage tuning. storageProfile = flag.String("storage-profile", "", "Pebble profile: default, balanced, write-heavy") @@ -198,6 +206,10 @@ func main() { *backupSchedulerEnabled, *backupSchedulerDisabled, *backupSchedulerDryRun, *backupSchedulerInterval, *backupSchedulerNameTemplate, *backupSchedulerTables, *backupSchedulerRetentionKeepLatest, *backupSchedulerRetentionMaxAge, *backupSchedulerRetentionDryRun) + bootstrapserver.OverlayRaftIdentityLeaseFlags(&cfg, + *raftIdentityLeaseBackend, *raftIdentityLeasePath, *raftIdentityLeaseName, + *raftIdentityLeaseNamespace, *raftIdentityLeaseAPIURL, + *raftIdentityLeaseTTL, *raftIdentityLeaseRenew) // Initialise tracing first so subsequent setup gets spans on // failure. tracingShutdown is a no-op when no endpoint is set. @@ -232,6 +244,49 @@ func main() { os.Exit(1) } + leaseCtx, leaseCancel := context.WithCancel(context.Background()) + defer leaseCancel() + leaseLost := make(chan error, 1) + var raftIdentityGuard *identitylease.Guard + if raftIdentityLeaseRequired(cfg) { + raftIdentityGuard, err = identitylease.Acquire(context.Background(), identitylease.Options{ + NodeID: cfg.Cluster.SelfID, + DataDir: cfg.Data, + Backend: cfg.RaftIdentity.LeaseBackend, + LeasePath: cfg.RaftIdentity.LeasePath, + LeaseName: cfg.RaftIdentity.LeaseName, + LeaseTTL: cfg.RaftIdentity.LeaseTTL, + RenewInterval: cfg.RaftIdentity.LeaseRenewInterval, + KubernetesNamespace: cfg.RaftIdentity.LeaseNamespace, + KubernetesAPIURL: cfg.RaftIdentity.KubernetesAPIURL, + KubernetesBearerToken: "", + KubernetesCAFile: "", + HTTPClient: nil, + }) + if err != nil { + logger.Error("raft identity lease", "err", err, "raftID", cfg.Cluster.SelfID, "backend", cfg.RaftIdentity.LeaseBackend) + os.Exit(1) + } + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := raftIdentityGuard.Close(ctx); err != nil { + logger.Error("release raft identity lease", "err", err, "raftID", cfg.Cluster.SelfID) + } + }() + rec := raftIdentityGuard.Record() + logger.Info("raft identity lease acquired", "raftID", rec.NodeID, "holder", rec.HolderID, "epoch", rec.Epoch, "backend", rec.Backend, "resource", rec.Resource, "expires", rec.ExpiresAt) + go raftIdentityGuard.RenewLoop(leaseCtx, func(err error) { + logger.Error("raft identity lease lost", "err", err, "raftID", cfg.Cluster.SelfID) + select { + case leaseLost <- err: + default: + } + }) + } else if cfg.Cluster.SelfID != "" && cfg.RaftIdentity.LeaseBackend == identitylease.BackendOff { + logger.Info("raft identity lease disabled", "raftID", cfg.Cluster.SelfID) + } + var ( db *pebble.DB cat *catalog.Catalog @@ -517,8 +572,15 @@ func main() { stop := make(chan os.Signal, 1) signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM) - <-stop - logger.Info("shutting down") + shutdownReason := "signal" + select { + case sig := <-stop: + shutdownReason = sig.String() + case err := <-leaseLost: + shutdownReason = "raft identity lease lost: " + err.Error() + } + logger.Info("shutting down", "reason", shutdownReason) + leaseCancel() runtimeCancel() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -529,3 +591,10 @@ func main() { gsrv.GracefulStop() } } + +func raftIdentityLeaseRequired(cfg config.Config) bool { + if cfg.Cluster.SelfID == "" || cfg.RaftIdentity.LeaseBackend == identitylease.BackendOff { + return false + } + return cfg.Cluster.Shards > 0 || cfg.Raft.Bind != "" +} diff --git a/dist/helm/cefas/templates/_helpers.tpl b/dist/helm/cefas/templates/_helpers.tpl index aa77ff0..20875e7 100644 --- a/dist/helm/cefas/templates/_helpers.tpl +++ b/dist/helm/cefas/templates/_helpers.tpl @@ -10,6 +10,14 @@ {{ include "cefas.fullname" . }}-headless {{- end -}} +{{- define "cefas.serviceAccountName" -}} +{{- if .Values.serviceAccount.create -}} +{{ default (include "cefas.fullname" .) .Values.serviceAccount.name }} +{{- else -}} +{{ default "default" .Values.serviceAccount.name }} +{{- end -}} +{{- end -}} + {{- define "cefas.labels" -}} app.kubernetes.io/name: {{ include "cefas.name" . }} app.kubernetes.io/instance: {{ .Release.Name }} diff --git a/dist/helm/cefas/templates/lease-rbac.yaml b/dist/helm/cefas/templates/lease-rbac.yaml new file mode 100644 index 0000000..dd7cb36 --- /dev/null +++ b/dist/helm/cefas/templates/lease-rbac.yaml @@ -0,0 +1,32 @@ +{{- if and .Values.raftIdentity.lease.enabled (eq .Values.raftIdentity.lease.backend "kubernetes") -}} +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: {{ include "cefas.fullname" . }}-identity-lease + labels: + {{- include "cefas.labels" . | nindent 4 }} +rules: + - apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: + - get + - create + - update + - patch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: {{ include "cefas.fullname" . }}-identity-lease + labels: + {{- include "cefas.labels" . | nindent 4 }} +subjects: + - kind: ServiceAccount + name: {{ include "cefas.serviceAccountName" . }} +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: {{ include "cefas.fullname" . }}-identity-lease +{{- end }} diff --git a/dist/helm/cefas/templates/statefulset.yaml b/dist/helm/cefas/templates/statefulset.yaml index 8e66a90..b7babe4 100644 --- a/dist/helm/cefas/templates/statefulset.yaml +++ b/dist/helm/cefas/templates/statefulset.yaml @@ -18,6 +18,7 @@ spec: annotations: {{- toYaml .Values.podAnnotations | nindent 8 }} spec: + serviceAccountName: {{ include "cefas.serviceAccountName" . }} securityContext: {{- toYaml .Values.securityContext | nindent 8 }} terminationGracePeriodSeconds: 30 @@ -30,11 +31,30 @@ spec: - "/etc/cefas/cefas.yaml" - "-raft-id" - "$(POD_NAME)" + {{- if .Values.raftIdentity.lease.enabled }} + - "-raft-identity-lease-backend" + - {{ .Values.raftIdentity.lease.backend | quote }} + - "-raft-identity-lease-name" + - "{{ default (include "cefas.fullname" .) .Values.raftIdentity.lease.namePrefix }}-$(POD_NAME)" + - "-raft-identity-lease-namespace" + - "$(POD_NAMESPACE)" + - "-raft-identity-lease-ttl" + - {{ .Values.raftIdentity.lease.ttl | quote }} + - "-raft-identity-lease-renew-interval" + - {{ .Values.raftIdentity.lease.renewInterval | quote }} + {{- else }} + - "-raft-identity-lease-backend" + - "off" + {{- end }} env: - name: POD_NAME valueFrom: fieldRef: fieldPath: metadata.name + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace ports: - name: http containerPort: {{ .Values.service.http }} diff --git a/dist/helm/cefas/values.yaml b/dist/helm/cefas/values.yaml index c908ce8..ae95e7d 100644 --- a/dist/helm/cefas/values.yaml +++ b/dist/helm/cefas/values.yaml @@ -11,6 +11,10 @@ image: replicaCount: 1 +serviceAccount: + create: true + name: "" + resources: requests: cpu: "200m" @@ -36,6 +40,16 @@ cluster: bootstrap: true fsyncOnCommit: false +# Guards each raft-id with a persistent lease so a stale process cannot +# keep serving after Kubernetes starts a replacement with the same id. +raftIdentity: + lease: + enabled: true + backend: kubernetes + ttl: 30s + renewInterval: 10s + namePrefix: "" + # Optional identity provider (Tikti JWKS). identity: jwksUrl: "" @@ -52,6 +66,16 @@ tracing: metrics: enabled: true +ingress: + enabled: false + className: "" + annotations: {} + prefix: / + stripPrefix: + enabled: false + hosts: [] + tls: [] + podAnnotations: prometheus.io/scrape: "true" prometheus.io/port: "8080" diff --git a/internal/bootstrap/server/flags.go b/internal/bootstrap/server/flags.go index 82cfdba..16b3b7a 100644 --- a/internal/bootstrap/server/flags.go +++ b/internal/bootstrap/server/flags.go @@ -309,6 +309,38 @@ func OverlayFlags( } } +// OverlayRaftIdentityLeaseFlags keeps the newer identity-lease flags +// out of the already-large OverlayFlags signature while preserving the +// same precedence rule: only explicitly non-zero flag values override +// env/YAML/default config. +func OverlayRaftIdentityLeaseFlags( + cfg *config.Config, + backend, path, name, namespace, kubernetesAPIURL string, + ttl, renewInterval time.Duration, +) { + if backend != "" { + cfg.RaftIdentity.LeaseBackend = backend + } + if path != "" { + cfg.RaftIdentity.LeasePath = path + } + if name != "" { + cfg.RaftIdentity.LeaseName = name + } + if namespace != "" { + cfg.RaftIdentity.LeaseNamespace = namespace + } + if kubernetesAPIURL != "" { + cfg.RaftIdentity.KubernetesAPIURL = kubernetesAPIURL + } + if ttl > 0 { + cfg.RaftIdentity.LeaseTTL = ttl + } + if renewInterval > 0 { + cfg.RaftIdentity.LeaseRenewInterval = renewInterval + } +} + // SplitCSVFlag splits a comma-separated CLI flag value into a trimmed // slice. Blank entries are dropped, so "a, ,b" yields ["a","b"]. func SplitCSVFlag(in string) []string { diff --git a/internal/bootstrap/server/flags_test.go b/internal/bootstrap/server/flags_test.go index 57992d7..2a59c37 100644 --- a/internal/bootstrap/server/flags_test.go +++ b/internal/bootstrap/server/flags_test.go @@ -297,6 +297,24 @@ func TestOverlayFlags_RaftGroup(t *testing.T) { } } +func TestOverlayRaftIdentityLeaseFlags(t *testing.T) { + cfg := baseCfg() + OverlayRaftIdentityLeaseFlags(&cfg, "kubernetes", "/leases", "lease-n1", "cefasdb", "https://kubernetes.default.svc", 45*time.Second, 15*time.Second) + + if cfg.RaftIdentity.LeaseBackend != "kubernetes" { + t.Errorf("LeaseBackend = %q", cfg.RaftIdentity.LeaseBackend) + } + if cfg.RaftIdentity.LeasePath != "/leases" || cfg.RaftIdentity.LeaseName != "lease-n1" || cfg.RaftIdentity.LeaseNamespace != "cefasdb" { + t.Errorf("lease identity fields = %+v", cfg.RaftIdentity) + } + if cfg.RaftIdentity.KubernetesAPIURL != "https://kubernetes.default.svc" { + t.Errorf("KubernetesAPIURL = %q", cfg.RaftIdentity.KubernetesAPIURL) + } + if cfg.RaftIdentity.LeaseTTL != 45*time.Second || cfg.RaftIdentity.LeaseRenewInterval != 15*time.Second { + t.Errorf("lease durations = %+v", cfg.RaftIdentity) + } +} + func TestOverlayFlags_PeerSetGroup(t *testing.T) { cfg := baseCfg() args := zeroArgs() diff --git a/internal/config/config.go b/internal/config/config.go index e4381d1..b211587 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -90,6 +90,16 @@ type Config struct { LogCompressionMinSavingsRatio float64 `yaml:"logCompressionMinSavingsRatio"` LogCompressionSkipCooldown time.Duration `yaml:"logCompressionSkipCooldown"` } `yaml:"raft"` + RaftIdentity struct { + LeaseBackend string `yaml:"leaseBackend"` + LeasePath string `yaml:"leasePath"` + LeaseName string `yaml:"leaseName"` + LeaseNamespace string `yaml:"leaseNamespace"` + LeaseTTL time.Duration `yaml:"leaseTtl"` + LeaseRenewInterval time.Duration `yaml:"leaseRenewInterval"` + KubernetesAPIURL string `yaml:"kubernetesApiUrl"` + KubernetesServiceAccount string `yaml:"kubernetesServiceAccount"` + } `yaml:"raftIdentity"` Identity struct { JwksURL string `yaml:"jwksUrl"` Issuer string `yaml:"issuer"` @@ -179,6 +189,9 @@ func Defaults() Config { c.Raft.LogCompressionMinBytes = 1024 c.Raft.LogCompressionMinSavingsRatio = 0.05 c.Raft.LogCompressionSkipCooldown = time.Second + c.RaftIdentity.LeaseBackend = "file" + c.RaftIdentity.LeaseTTL = 30 * time.Second + c.RaftIdentity.LeaseRenewInterval = 10 * time.Second c.Tracing.SampleRate = 1.0 return c } @@ -331,6 +344,14 @@ func ApplyEnv(cfg *Config) error { cfg.Raft.LogCompressionMinBytes = integer("RAFT_LOG_COMPRESSION_MIN_BYTES", cfg.Raft.LogCompressionMinBytes) cfg.Raft.LogCompressionMinSavingsRatio = flt("RAFT_LOG_COMPRESSION_MIN_SAVINGS_RATIO", cfg.Raft.LogCompressionMinSavingsRatio) cfg.Raft.LogCompressionSkipCooldown = dur("RAFT_LOG_COMPRESSION_SKIP_COOLDOWN", cfg.Raft.LogCompressionSkipCooldown) + cfg.RaftIdentity.LeaseBackend = str("RAFT_IDENTITY_LEASE_BACKEND", cfg.RaftIdentity.LeaseBackend) + cfg.RaftIdentity.LeasePath = str("RAFT_IDENTITY_LEASE_PATH", cfg.RaftIdentity.LeasePath) + cfg.RaftIdentity.LeaseName = str("RAFT_IDENTITY_LEASE_NAME", cfg.RaftIdentity.LeaseName) + cfg.RaftIdentity.LeaseNamespace = str("RAFT_IDENTITY_LEASE_NAMESPACE", cfg.RaftIdentity.LeaseNamespace) + cfg.RaftIdentity.LeaseTTL = dur("RAFT_IDENTITY_LEASE_TTL", cfg.RaftIdentity.LeaseTTL) + cfg.RaftIdentity.LeaseRenewInterval = dur("RAFT_IDENTITY_LEASE_RENEW_INTERVAL", cfg.RaftIdentity.LeaseRenewInterval) + cfg.RaftIdentity.KubernetesAPIURL = str("RAFT_IDENTITY_KUBERNETES_API_URL", cfg.RaftIdentity.KubernetesAPIURL) + cfg.RaftIdentity.KubernetesServiceAccount = str("RAFT_IDENTITY_KUBERNETES_SERVICE_ACCOUNT", cfg.RaftIdentity.KubernetesServiceAccount) cfg.Identity.JwksURL = str("IDENTITY_JWKS_URL", cfg.Identity.JwksURL) cfg.Identity.Issuer = str("IDENTITY_ISSUER", cfg.Identity.Issuer) diff --git a/internal/config/config_test.go b/internal/config/config_test.go index cf2adf4..bc87534 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -44,6 +44,9 @@ func TestDefaultsPopulated(t *testing.T) { if d.Raft.LogCompressionMinBytes != 1024 || d.Raft.LogCompressionMinSavingsRatio != 0.05 || d.Raft.LogCompressionSkipCooldown != time.Second { t.Errorf("raft log compression guardrail defaults not populated: %+v", d.Raft) } + if d.RaftIdentity.LeaseBackend != "file" || d.RaftIdentity.LeaseTTL != 30*time.Second || d.RaftIdentity.LeaseRenewInterval != 10*time.Second { + t.Errorf("raft identity lease defaults not populated: %+v", d.RaftIdentity) + } if d.Storage.Lanes != "auto" { t.Errorf("storage lanes default = %q", d.Storage.Lanes) } @@ -91,6 +94,12 @@ raft: logCompressionMinBytes: 2048 logCompressionMinSavingsRatio: 0.2 logCompressionSkipCooldown: 2s +raftIdentity: + leaseBackend: kubernetes + leaseName: cefasdb-geo-cefas-0 + leaseNamespace: cefasdb + leaseTtl: 45s + leaseRenewInterval: 15s identity: jwksUrl: https://tikti.example.com/jwks.json clockSkew: 45s @@ -153,6 +162,12 @@ backupScheduler: if cfg.Raft.LogCompressionMinBytes != 2048 || cfg.Raft.LogCompressionMinSavingsRatio != 0.2 || cfg.Raft.LogCompressionSkipCooldown != 2*time.Second { t.Fatalf("raft log compression guardrail config not loaded: %+v", cfg.Raft) } + if cfg.RaftIdentity.LeaseBackend != "kubernetes" || cfg.RaftIdentity.LeaseName != "cefasdb-geo-cefas-0" || cfg.RaftIdentity.LeaseNamespace != "cefasdb" { + t.Fatalf("raft identity lease config not loaded: %+v", cfg.RaftIdentity) + } + if cfg.RaftIdentity.LeaseTTL != 45*time.Second || cfg.RaftIdentity.LeaseRenewInterval != 15*time.Second { + t.Fatalf("raft identity lease durations not loaded: %+v", cfg.RaftIdentity) + } if cfg.Metrics.HotspotBuckets != 16 || cfg.Metrics.HotspotWriteThreshold != 42 || cfg.Metrics.HotspotLatencyThreshold != 75*time.Millisecond { t.Fatalf("hotspot metrics config not loaded: %+v", cfg.Metrics) } @@ -184,6 +199,11 @@ func TestApplyEnv(t *testing.T) { t.Setenv("CEFAS_RAFT_LOG_COMPRESSION_MIN_BYTES", "4096") t.Setenv("CEFAS_RAFT_LOG_COMPRESSION_MIN_SAVINGS_RATIO", "0.25") t.Setenv("CEFAS_RAFT_LOG_COMPRESSION_SKIP_COOLDOWN", "3s") + t.Setenv("CEFAS_RAFT_IDENTITY_LEASE_BACKEND", "kubernetes") + t.Setenv("CEFAS_RAFT_IDENTITY_LEASE_NAME", "lease-n1") + t.Setenv("CEFAS_RAFT_IDENTITY_LEASE_NAMESPACE", "cefasdb") + t.Setenv("CEFAS_RAFT_IDENTITY_LEASE_TTL", "40s") + t.Setenv("CEFAS_RAFT_IDENTITY_LEASE_RENEW_INTERVAL", "10s") t.Setenv("CEFAS_METRICS_ENABLED", "false") t.Setenv("CEFAS_METRICS_HOTSPOT_BUCKETS", "32") t.Setenv("CEFAS_METRICS_HOTSPOT_WRITE_THRESHOLD", "99") @@ -236,6 +256,12 @@ func TestApplyEnv(t *testing.T) { if cfg.Raft.LogCompressionMinBytes != 4096 || cfg.Raft.LogCompressionMinSavingsRatio != 0.25 || cfg.Raft.LogCompressionSkipCooldown != 3*time.Second { t.Errorf("raft log compression guardrail env not applied: %+v", cfg.Raft) } + if cfg.RaftIdentity.LeaseBackend != "kubernetes" || cfg.RaftIdentity.LeaseName != "lease-n1" || cfg.RaftIdentity.LeaseNamespace != "cefasdb" { + t.Errorf("raft identity lease env not applied: %+v", cfg.RaftIdentity) + } + if cfg.RaftIdentity.LeaseTTL != 40*time.Second || cfg.RaftIdentity.LeaseRenewInterval != 10*time.Second { + t.Errorf("raft identity lease duration env not applied: %+v", cfg.RaftIdentity) + } if cfg.Storage.ChangeLogMode != "off" { t.Errorf("storage changelog mode env not applied: %+v", cfg.Storage) } diff --git a/internal/identitylease/lease.go b/internal/identitylease/lease.go new file mode 100644 index 0000000..3232141 --- /dev/null +++ b/internal/identitylease/lease.go @@ -0,0 +1,857 @@ +package identitylease + +import ( + "context" + "crypto/rand" + "crypto/sha1" + "crypto/tls" + "crypto/x509" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "os" + "path/filepath" + "strconv" + "strings" + "sync" + "syscall" + "time" +) + +const ( + BackendAuto = "auto" + BackendOff = "off" + BackendFile = "file" + BackendKubernetes = "kubernetes" + + defaultTTL = 30 * time.Second + defaultRenewInterval = 10 * time.Second + + annotationRaftID = "cefasdb.io/raft-id" + annotationEpoch = "cefasdb.io/identity-epoch" +) + +var ( + ErrLeaseHeld = errors.New("raft identity lease held") + ErrLeaseLost = errors.New("raft identity lease lost") +) + +// Options configure the per-raft-id process guard. Kubernetes fields +// are only used when Backend is "kubernetes" or auto detects an +// in-cluster service account. +type Options struct { + NodeID string + DataDir string + Backend string + LeasePath string + LeaseName string + LeaseTTL time.Duration + RenewInterval time.Duration + HolderID string + + KubernetesNamespace string + KubernetesAPIURL string + KubernetesBearerToken string + KubernetesCAFile string + HTTPClient *http.Client +} + +// Record is the durable identity lease state for a raft ServerID. +type Record struct { + NodeID string `json:"nodeId"` + HolderID string `json:"holderId"` + Epoch uint64 `json:"epoch"` + Backend string `json:"backend"` + Resource string `json:"resource"` + RenewedAt time.Time `json:"renewedAt"` + ExpiresAt time.Time `json:"expiresAt"` +} + +// HeldError includes the current lease owner so operators know that a +// replacement must wait for expiry or fence the old process. +type HeldError struct { + Record Record +} + +func (e *HeldError) Error() string { + rec := e.Record + return fmt.Sprintf("%v: node=%s holder=%s epoch=%d backend=%s resource=%s renewed=%s expires=%s; fence the current holder or wait for lease expiry before replacement", + ErrLeaseHeld, rec.NodeID, rec.HolderID, rec.Epoch, rec.Backend, rec.Resource, + rec.RenewedAt.Format(time.RFC3339Nano), rec.ExpiresAt.Format(time.RFC3339Nano)) +} + +func (e *HeldError) Unwrap() error { return ErrLeaseHeld } + +type backend interface { + Acquire(ctx context.Context, holderID string) (Record, error) + Renew(ctx context.Context, holderID string, prev Record) (Record, error) + Release(ctx context.Context, holderID string, prev Record) error +} + +// Guard owns the lease while the process is serving. If RenewLoop +// reports loss, callers must stop accepting reads/writes and shut down +// Raft before releasing process resources. +type Guard struct { + backend backend + holder string + renew time.Duration + + mu sync.RWMutex + record Record +} + +func Acquire(ctx context.Context, opts Options) (*Guard, error) { + if strings.TrimSpace(opts.NodeID) == "" { + return nil, fmt.Errorf("raft identity lease: node id is required") + } + if opts.LeaseTTL <= 0 { + opts.LeaseTTL = defaultTTL + } + if opts.RenewInterval <= 0 { + opts.RenewInterval = defaultRenewInterval + } + if opts.RenewInterval >= opts.LeaseTTL { + return nil, fmt.Errorf("raft identity lease: renew interval %s must be less than ttl %s", opts.RenewInterval, opts.LeaseTTL) + } + holder := opts.HolderID + if holder == "" { + holder = defaultHolderID(opts.NodeID) + } + be, err := newBackend(opts) + if err != nil { + return nil, err + } + rec, err := be.Acquire(ctx, holder) + if err != nil { + return nil, err + } + return &Guard{backend: be, holder: holder, renew: opts.RenewInterval, record: rec}, nil +} + +func (g *Guard) Record() Record { + if g == nil { + return Record{} + } + g.mu.RLock() + defer g.mu.RUnlock() + return g.record +} + +func (g *Guard) Epoch() uint64 { return g.Record().Epoch } + +func (g *Guard) HolderID() string { + if g == nil { + return "" + } + return g.holder +} + +func (g *Guard) RenewLoop(ctx context.Context, onLost func(error)) { + if g == nil { + return + } + t := time.NewTicker(g.renew) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + prev := g.Record() + rec, err := g.backend.Renew(ctx, g.holder, prev) + if err != nil { + if onLost != nil { + onLost(fmt.Errorf("%w: %v", ErrLeaseLost, err)) + } + return + } + g.mu.Lock() + g.record = rec + g.mu.Unlock() + } + } +} + +func (g *Guard) Close(ctx context.Context) error { + if g == nil { + return nil + } + return g.backend.Release(ctx, g.holder, g.Record()) +} + +func newBackend(opts Options) (backend, error) { + backendName := strings.TrimSpace(opts.Backend) + if backendName == "" { + backendName = BackendFile + } + if backendName == BackendAuto { + if inKubernetes(opts) { + backendName = BackendKubernetes + } else { + backendName = BackendFile + } + } + switch backendName { + case BackendOff: + return nil, fmt.Errorf("raft identity lease: backend off cannot acquire a guard") + case BackendFile: + return newFileBackend(opts) + case BackendKubernetes: + return newKubernetesBackend(opts) + default: + return nil, fmt.Errorf("raft identity lease: unsupported backend %q", opts.Backend) + } +} + +func defaultHolderID(nodeID string) string { + host, _ := os.Hostname() + var b [8]byte + if _, err := rand.Read(b[:]); err != nil { + return fmt.Sprintf("%s/%s/%d/%d", nodeID, host, os.Getpid(), time.Now().UnixNano()) + } + return fmt.Sprintf("%s/%s/%d/%s", nodeID, host, os.Getpid(), hex.EncodeToString(b[:])) +} + +func inKubernetes(opts Options) bool { + if opts.KubernetesAPIURL != "" { + return true + } + return os.Getenv("KUBERNETES_SERVICE_HOST") != "" && serviceAccountToken() != "" +} + +type fileBackend struct { + nodeID string + dir string + ttl time.Duration + + mu sync.Mutex + file *os.File +} + +func newFileBackend(opts Options) (*fileBackend, error) { + dir := opts.LeasePath + if dir == "" { + data := opts.DataDir + if data == "" { + data = "." + } + dir = filepath.Join(data, "raft-identity") + } + return &fileBackend{nodeID: opts.NodeID, dir: dir, ttl: opts.LeaseTTL}, nil +} + +func (b *fileBackend) Acquire(ctx context.Context, holderID string) (Record, error) { + _ = ctx + if err := os.MkdirAll(b.dir, 0o755); err != nil { + return Record{}, fmt.Errorf("raft identity lease file mkdir: %w", err) + } + lockPath := filepath.Join(b.dir, fileNameForNode(b.nodeID)+".lock") + f, err := os.OpenFile(lockPath, os.O_CREATE|os.O_RDWR, 0o600) + if err != nil { + return Record{}, fmt.Errorf("raft identity lease file open: %w", err) + } + if err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB); err != nil { + _ = f.Close() + rec := b.readRecord() + if errors.Is(err, syscall.EWOULDBLOCK) || errors.Is(err, syscall.EAGAIN) { + return Record{}, &HeldError{Record: rec} + } + return Record{}, fmt.Errorf("raft identity lease file lock: %w", err) + } + now := time.Now().UTC() + rec := b.readRecord() + rec.NodeID = b.nodeID + rec.HolderID = holderID + rec.Epoch++ + if rec.Epoch == 0 { + rec.Epoch = 1 + } + rec.Backend = BackendFile + rec.Resource = b.dir + rec.RenewedAt = now + rec.ExpiresAt = now.Add(b.ttl) + if err := b.writeRecord(rec); err != nil { + _ = syscall.Flock(int(f.Fd()), syscall.LOCK_UN) + _ = f.Close() + return Record{}, err + } + b.mu.Lock() + b.file = f + b.mu.Unlock() + return rec, nil +} + +func (b *fileBackend) Renew(ctx context.Context, holderID string, prev Record) (Record, error) { + _ = ctx + b.mu.Lock() + defer b.mu.Unlock() + if b.file == nil { + return Record{}, fmt.Errorf("file lease is not held") + } + now := time.Now().UTC() + prev.NodeID = b.nodeID + prev.HolderID = holderID + prev.Backend = BackendFile + prev.Resource = b.dir + prev.RenewedAt = now + prev.ExpiresAt = now.Add(b.ttl) + if err := b.writeRecord(prev); err != nil { + return Record{}, err + } + return prev, nil +} + +func (b *fileBackend) Release(ctx context.Context, holderID string, prev Record) error { + _ = ctx + _ = holderID + _ = prev + b.mu.Lock() + defer b.mu.Unlock() + if b.file == nil { + return nil + } + err := syscall.Flock(int(b.file.Fd()), syscall.LOCK_UN) + if closeErr := b.file.Close(); err == nil { + err = closeErr + } + b.file = nil + return err +} + +func (b *fileBackend) recordPath() string { + return filepath.Join(b.dir, fileNameForNode(b.nodeID)+".json") +} + +func (b *fileBackend) readRecord() Record { + p := b.recordPath() + raw, err := os.ReadFile(p) + if err != nil { + now := time.Now().UTC() + return Record{NodeID: b.nodeID, Backend: BackendFile, Resource: b.dir, RenewedAt: now, ExpiresAt: now} + } + var rec Record + if err := json.Unmarshal(raw, &rec); err != nil { + now := time.Now().UTC() + return Record{NodeID: b.nodeID, Backend: BackendFile, Resource: b.dir, RenewedAt: now, ExpiresAt: now} + } + return rec +} + +func (b *fileBackend) writeRecord(rec Record) error { + raw, err := json.MarshalIndent(rec, "", " ") + if err != nil { + return fmt.Errorf("raft identity lease file marshal: %w", err) + } + tmp := b.recordPath() + ".tmp" + if err := os.WriteFile(tmp, raw, 0o600); err != nil { + return fmt.Errorf("raft identity lease file write: %w", err) + } + if err := os.Rename(tmp, b.recordPath()); err != nil { + return fmt.Errorf("raft identity lease file rename: %w", err) + } + return nil +} + +func fileNameForNode(nodeID string) string { + sum := sha1.Sum([]byte(nodeID)) + name := strings.NewReplacer("/", "_", "\\", "_", ":", "_").Replace(nodeID) + name = strings.Trim(name, "._- ") + if name == "" { + name = "node" + } + if len(name) > 48 { + name = name[:48] + } + return fmt.Sprintf("%s-%s", name, hex.EncodeToString(sum[:4])) +} + +type kubernetesBackend struct { + nodeID string + name string + namespace string + apiURL string + token string + ttl time.Duration + client *http.Client +} + +func newKubernetesBackend(opts Options) (*kubernetesBackend, error) { + ns := opts.KubernetesNamespace + if ns == "" { + ns = serviceAccountNamespace() + } + if ns == "" { + return nil, fmt.Errorf("raft identity lease kubernetes: namespace is required") + } + apiURL := opts.KubernetesAPIURL + if apiURL == "" { + host := os.Getenv("KUBERNETES_SERVICE_HOST") + port := os.Getenv("KUBERNETES_SERVICE_PORT") + if port == "" { + port = "443" + } + if host == "" { + return nil, fmt.Errorf("raft identity lease kubernetes: KUBERNETES_SERVICE_HOST is not set") + } + apiURL = "https://" + netJoinHostPort(host, port) + } + if _, err := url.Parse(apiURL); err != nil { + return nil, fmt.Errorf("raft identity lease kubernetes api url: %w", err) + } + token := opts.KubernetesBearerToken + if token == "" { + token = serviceAccountToken() + } + if token == "" { + return nil, fmt.Errorf("raft identity lease kubernetes: bearer token is required") + } + client := opts.HTTPClient + if client == nil { + var err error + client, err = kubernetesHTTPClient(opts.KubernetesCAFile) + if err != nil { + return nil, err + } + } + name := opts.LeaseName + if name == "" { + name = "cefas-" + dnsLabel(opts.NodeID) + } + return &kubernetesBackend{ + nodeID: opts.NodeID, + name: dnsLabel(name), + namespace: ns, + apiURL: strings.TrimRight(apiURL, "/"), + token: token, + ttl: opts.LeaseTTL, + client: client, + }, nil +} + +func (b *kubernetesBackend) Acquire(ctx context.Context, holderID string) (Record, error) { + for attempt := 0; attempt < 3; attempt++ { + lease, status, err := b.get(ctx) + if err != nil { + return Record{}, err + } + now := time.Now().UTC() + if status == http.StatusNotFound { + rec, err := b.create(ctx, holderID, now, 1) + if isConflict(err) { + continue + } + return rec, err + } + if status != http.StatusOK { + return Record{}, fmt.Errorf("raft identity lease kubernetes get: status %d", status) + } + if b.activeHeldByOther(lease, holderID, now) { + return Record{}, &HeldError{Record: b.recordFromLease(lease, now)} + } + nextEpoch := leaseEpoch(lease) + 1 + rec, err := b.update(ctx, lease, holderID, now, nextEpoch) + if isConflict(err) { + continue + } + return rec, err + } + return Record{}, fmt.Errorf("raft identity lease kubernetes acquire: conflicted after retries") +} + +func (b *kubernetesBackend) Renew(ctx context.Context, holderID string, prev Record) (Record, error) { + _ = prev + for attempt := 0; attempt < 3; attempt++ { + lease, status, err := b.get(ctx) + if err != nil { + return Record{}, err + } + now := time.Now().UTC() + if status == http.StatusNotFound { + return Record{}, fmt.Errorf("lease %s/%s disappeared", b.namespace, b.name) + } + if status != http.StatusOK { + return Record{}, fmt.Errorf("raft identity lease kubernetes get: status %d", status) + } + if leaseHolder(lease) != holderID { + return Record{}, &HeldError{Record: b.recordFromLease(lease, now)} + } + rec, err := b.update(ctx, lease, holderID, now, leaseEpoch(lease)) + if isConflict(err) { + continue + } + return rec, err + } + return Record{}, fmt.Errorf("raft identity lease kubernetes renew: conflicted after retries") +} + +func (b *kubernetesBackend) Release(ctx context.Context, holderID string, prev Record) error { + _ = prev + lease, status, err := b.get(ctx) + if err != nil { + return err + } + if status == http.StatusNotFound { + return nil + } + if status != http.StatusOK { + return fmt.Errorf("raft identity lease kubernetes release get: status %d", status) + } + if leaseHolder(lease) != holderID { + return nil + } + now := time.Now().UTC() + _, err = b.update(ctx, lease, "", now, leaseEpoch(lease)) + return err +} + +func (b *kubernetesBackend) activeHeldByOther(lease *k8sLease, holderID string, now time.Time) bool { + holder := leaseHolder(lease) + if holder == "" || holder == holderID { + return false + } + renewed := leaseRenewedAt(lease) + if renewed.IsZero() { + return true + } + ttl := time.Duration(leaseDurationSeconds(lease)) * time.Second + if ttl <= 0 { + ttl = b.ttl + } + return now.Before(renewed.Add(ttl)) +} + +func (b *kubernetesBackend) recordFromLease(lease *k8sLease, now time.Time) Record { + renewed := leaseRenewedAt(lease) + if renewed.IsZero() { + renewed = now + } + ttl := time.Duration(leaseDurationSeconds(lease)) * time.Second + if ttl <= 0 { + ttl = b.ttl + } + return Record{ + NodeID: annotation(lease, annotationRaftID, b.nodeID), + HolderID: leaseHolder(lease), + Epoch: leaseEpoch(lease), + Backend: BackendKubernetes, + Resource: b.namespace + "/" + b.name, + RenewedAt: renewed, + ExpiresAt: renewed.Add(ttl), + } +} + +func (b *kubernetesBackend) create(ctx context.Context, holderID string, now time.Time, epoch uint64) (Record, error) { + lease := newLease(b.namespace, b.name, b.nodeID, holderID, now, b.ttl, epoch) + status, err := b.do(ctx, http.MethodPost, b.collectionPath(), lease, lease) + if err != nil { + return Record{}, err + } + if status != http.StatusCreated && status != http.StatusOK { + return Record{}, statusError{status: status} + } + return b.recordFromLease(lease, now), nil +} + +func (b *kubernetesBackend) update(ctx context.Context, lease *k8sLease, holderID string, now time.Time, epoch uint64) (Record, error) { + applyLeaseUpdate(lease, b.nodeID, holderID, now, b.ttl, epoch) + status, err := b.do(ctx, http.MethodPut, b.resourcePath(), lease, lease) + if err != nil { + return Record{}, err + } + if status != http.StatusOK { + return Record{}, statusError{status: status} + } + return b.recordFromLease(lease, now), nil +} + +func (b *kubernetesBackend) get(ctx context.Context) (*k8sLease, int, error) { + var lease k8sLease + status, err := b.do(ctx, http.MethodGet, b.resourcePath(), nil, &lease) + if status == http.StatusNotFound { + return nil, status, nil + } + if err != nil { + return nil, status, err + } + return &lease, status, nil +} + +func (b *kubernetesBackend) collectionPath() string { + return fmt.Sprintf("/apis/coordination.k8s.io/v1/namespaces/%s/leases", b.namespace) +} + +func (b *kubernetesBackend) resourcePath() string { + return b.collectionPath() + "/" + b.name +} + +func (b *kubernetesBackend) do(ctx context.Context, method, path string, in any, out any) (int, error) { + var body io.Reader + if in != nil { + raw, err := json.Marshal(in) + if err != nil { + return 0, err + } + body = strings.NewReader(string(raw)) + } + req, err := http.NewRequestWithContext(ctx, method, b.apiURL+path, body) + if err != nil { + return 0, err + } + req.Header.Set("Accept", "application/json") + if in != nil { + req.Header.Set("Content-Type", "application/json") + } + req.Header.Set("Authorization", "Bearer "+b.token) + resp, err := b.client.Do(req) + if err != nil { + return 0, err + } + defer resp.Body.Close() + raw, readErr := io.ReadAll(resp.Body) + if resp.StatusCode >= 200 && resp.StatusCode < 300 && out != nil && len(raw) > 0 { + if err := json.Unmarshal(raw, out); err != nil { + return resp.StatusCode, err + } + } + if readErr != nil { + return resp.StatusCode, readErr + } + if resp.StatusCode >= 400 && resp.StatusCode != http.StatusNotFound && resp.StatusCode != http.StatusConflict { + return resp.StatusCode, fmt.Errorf("kubernetes api %s %s: status %d: %s", method, path, resp.StatusCode, string(raw)) + } + return resp.StatusCode, nil +} + +type statusError struct { + status int +} + +func (e statusError) Error() string { return fmt.Sprintf("kubernetes api status %d", e.status) } + +func isConflict(err error) bool { + var st statusError + return errors.As(err, &st) && st.status == http.StatusConflict +} + +type k8sLease struct { + APIVersion string `json:"apiVersion,omitempty"` + Kind string `json:"kind,omitempty"` + Metadata k8sObjectMeta `json:"metadata"` + Spec k8sLeaseSpec `json:"spec"` +} + +type k8sObjectMeta struct { + Name string `json:"name,omitempty"` + Namespace string `json:"namespace,omitempty"` + ResourceVersion string `json:"resourceVersion,omitempty"` + Labels map[string]string `json:"labels,omitempty"` + Annotations map[string]string `json:"annotations,omitempty"` +} + +type k8sLeaseSpec struct { + HolderIdentity *string `json:"holderIdentity,omitempty"` + LeaseDurationSeconds *int32 `json:"leaseDurationSeconds,omitempty"` + AcquireTime *k8sTime `json:"acquireTime,omitempty"` + RenewTime *k8sTime `json:"renewTime,omitempty"` + LeaseTransitions *int32 `json:"leaseTransitions,omitempty"` +} + +type k8sTime struct { + time.Time +} + +func (t k8sTime) MarshalJSON() ([]byte, error) { + return json.Marshal(t.UTC().Format(time.RFC3339Nano)) +} + +func (t *k8sTime) UnmarshalJSON(raw []byte) error { + var s string + if err := json.Unmarshal(raw, &s); err != nil { + return err + } + if s == "" { + t.Time = time.Time{} + return nil + } + parsed, err := time.Parse(time.RFC3339Nano, s) + if err != nil { + return err + } + t.Time = parsed + return nil +} + +func newLease(namespace, name, nodeID, holderID string, now time.Time, ttl time.Duration, epoch uint64) *k8sLease { + lease := &k8sLease{ + APIVersion: "coordination.k8s.io/v1", + Kind: "Lease", + Metadata: k8sObjectMeta{ + Name: name, + Namespace: namespace, + Labels: map[string]string{ + "app.kubernetes.io/name": "cefas", + "cefasdb.io/raft-id": dnsLabel(nodeID), + }, + }, + } + applyLeaseUpdate(lease, nodeID, holderID, now, ttl, epoch) + return lease +} + +func applyLeaseUpdate(lease *k8sLease, nodeID, holderID string, now time.Time, ttl time.Duration, epoch uint64) { + oldHolder := leaseHolder(lease) + if lease.Metadata.Annotations == nil { + lease.Metadata.Annotations = map[string]string{} + } + lease.Metadata.Annotations[annotationRaftID] = nodeID + lease.Metadata.Annotations[annotationEpoch] = strconv.FormatUint(epoch, 10) + seconds := int32(ttl / time.Second) + if seconds <= 0 { + seconds = int32(defaultTTL / time.Second) + } + lease.Spec.HolderIdentity = stringPtr(holderID) + lease.Spec.LeaseDurationSeconds = &seconds + kt := &k8sTime{Time: now.UTC()} + if holderID == "" { + lease.Spec.HolderIdentity = nil + lease.Spec.RenewTime = nil + return + } + if lease.Spec.AcquireTime == nil || oldHolder != holderID { + lease.Spec.AcquireTime = kt + } + lease.Spec.RenewTime = kt + transitions := int32(epoch) + lease.Spec.LeaseTransitions = &transitions +} + +func leaseHolder(lease *k8sLease) string { + if lease == nil || lease.Spec.HolderIdentity == nil { + return "" + } + return *lease.Spec.HolderIdentity +} + +func leaseRenewedAt(lease *k8sLease) time.Time { + if lease == nil || lease.Spec.RenewTime == nil { + return time.Time{} + } + return lease.Spec.RenewTime.Time +} + +func leaseDurationSeconds(lease *k8sLease) int32 { + if lease == nil || lease.Spec.LeaseDurationSeconds == nil { + return 0 + } + return *lease.Spec.LeaseDurationSeconds +} + +func leaseEpoch(lease *k8sLease) uint64 { + if lease == nil { + return 0 + } + if lease.Metadata.Annotations != nil { + if v := lease.Metadata.Annotations[annotationEpoch]; v != "" { + if parsed, err := strconv.ParseUint(v, 10, 64); err == nil { + return parsed + } + } + } + if lease.Spec.LeaseTransitions != nil && *lease.Spec.LeaseTransitions > 0 { + return uint64(*lease.Spec.LeaseTransitions) + } + return 0 +} + +func annotation(lease *k8sLease, key, fallback string) string { + if lease != nil && lease.Metadata.Annotations != nil { + if v := lease.Metadata.Annotations[key]; v != "" { + return v + } + } + return fallback +} + +func stringPtr(s string) *string { + if s == "" { + return nil + } + return &s +} + +func serviceAccountNamespace() string { + if ns := os.Getenv("POD_NAMESPACE"); ns != "" { + return ns + } + raw, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace") + if err != nil { + return "" + } + return strings.TrimSpace(string(raw)) +} + +func serviceAccountToken() string { + raw, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token") + if err != nil { + return "" + } + return strings.TrimSpace(string(raw)) +} + +func kubernetesHTTPClient(caFile string) (*http.Client, error) { + if caFile == "" { + caFile = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" + } + pool, err := x509.SystemCertPool() + if err != nil { + pool = x509.NewCertPool() + } + if raw, err := os.ReadFile(caFile); err == nil && len(raw) > 0 { + pool.AppendCertsFromPEM(raw) + } + return &http.Client{ + Timeout: 5 * time.Second, + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{RootCAs: pool, MinVersion: tls.VersionTLS12}, + }, + }, nil +} + +func netJoinHostPort(host, port string) string { + if strings.Contains(host, ":") && !strings.HasPrefix(host, "[") { + return "[" + host + "]:" + port + } + return host + ":" + port +} + +func dnsLabel(s string) string { + s = strings.ToLower(s) + var b strings.Builder + lastDash := false + for _, r := range s { + ok := (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') + if ok { + b.WriteRune(r) + lastDash = false + continue + } + if !lastDash { + b.WriteByte('-') + lastDash = true + } + } + out := strings.Trim(b.String(), "-") + if out == "" { + out = "node" + } + if len(out) <= 63 { + return out + } + sum := sha1.Sum([]byte(out)) + suffix := "-" + hex.EncodeToString(sum[:4]) + return strings.Trim(out[:63-len(suffix)], "-") + suffix +} diff --git a/internal/identitylease/lease_test.go b/internal/identitylease/lease_test.go new file mode 100644 index 0000000..893032c --- /dev/null +++ b/internal/identitylease/lease_test.go @@ -0,0 +1,218 @@ +package identitylease + +import ( + "context" + "encoding/json" + "errors" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" +) + +func TestFileLeaseExcludesDuplicateAndIncrementsEpoch(t *testing.T) { + dir := t.TempDir() + ctx := context.Background() + first, err := Acquire(ctx, Options{ + NodeID: "cefas-0", + LeasePath: dir, + Backend: BackendFile, + HolderID: "holder-a", + LeaseTTL: time.Second, + RenewInterval: 100 * time.Millisecond, + }) + if err != nil { + t.Fatalf("acquire first: %v", err) + } + if got := first.Epoch(); got != 1 { + t.Fatalf("first epoch = %d, want 1", got) + } + _, err = Acquire(ctx, Options{ + NodeID: "cefas-0", + LeasePath: dir, + Backend: BackendFile, + HolderID: "holder-b", + LeaseTTL: time.Second, + RenewInterval: 100 * time.Millisecond, + }) + if !errors.Is(err, ErrLeaseHeld) { + t.Fatalf("second acquire error = %v, want ErrLeaseHeld", err) + } + if err := first.Close(ctx); err != nil { + t.Fatalf("close first: %v", err) + } + second, err := Acquire(ctx, Options{ + NodeID: "cefas-0", + LeasePath: dir, + Backend: BackendFile, + HolderID: "holder-c", + LeaseTTL: time.Second, + RenewInterval: 100 * time.Millisecond, + }) + if err != nil { + t.Fatalf("acquire after close: %v", err) + } + defer second.Close(ctx) + if got := second.Epoch(); got != 2 { + t.Fatalf("second epoch = %d, want 2", got) + } +} + +func TestKubernetesLeaseBlocksActiveHolderAndAllowsExpiredTakeover(t *testing.T) { + store := &leaseStore{} + api := httptest.NewServer(store) + defer api.Close() + + ctx := context.Background() + first, err := Acquire(ctx, Options{ + NodeID: "cefas-0", + Backend: BackendKubernetes, + LeaseName: "cefas-cefas-0", + KubernetesNamespace: "cefasdb", + KubernetesAPIURL: api.URL, + KubernetesBearerToken: "token", + HTTPClient: api.Client(), + HolderID: "holder-a", + LeaseTTL: time.Second, + RenewInterval: 100 * time.Millisecond, + }) + if err != nil { + t.Fatalf("acquire first: %v", err) + } + if got := first.Epoch(); got != 1 { + t.Fatalf("epoch = %d, want 1", got) + } + + _, err = Acquire(ctx, Options{ + NodeID: "cefas-0", + Backend: BackendKubernetes, + LeaseName: "cefas-cefas-0", + KubernetesNamespace: "cefasdb", + KubernetesAPIURL: api.URL, + KubernetesBearerToken: "token", + HTTPClient: api.Client(), + HolderID: "holder-b", + LeaseTTL: time.Second, + RenewInterval: 100 * time.Millisecond, + }) + if !errors.Is(err, ErrLeaseHeld) { + t.Fatalf("duplicate acquire error = %v, want ErrLeaseHeld", err) + } + + store.expire(2 * time.Second) + second, err := Acquire(ctx, Options{ + NodeID: "cefas-0", + Backend: BackendKubernetes, + LeaseName: "cefas-cefas-0", + KubernetesNamespace: "cefasdb", + KubernetesAPIURL: api.URL, + KubernetesBearerToken: "token", + HTTPClient: api.Client(), + HolderID: "holder-b", + LeaseTTL: time.Second, + RenewInterval: 100 * time.Millisecond, + }) + if err != nil { + t.Fatalf("expired takeover: %v", err) + } + if got := second.Epoch(); got != 2 { + t.Fatalf("takeover epoch = %d, want 2", got) + } + + _, err = first.backend.Renew(ctx, first.HolderID(), first.Record()) + if !errors.Is(err, ErrLeaseHeld) { + t.Fatalf("stale renew error = %v, want ErrLeaseHeld", err) + } +} + +type leaseStore struct { + mu sync.Mutex + lease *k8sLease + rv int +} + +func (s *leaseStore) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.mu.Lock() + defer s.mu.Unlock() + if r.Header.Get("Authorization") != "Bearer token" { + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + if !strings.HasPrefix(r.URL.Path, "/apis/coordination.k8s.io/v1/namespaces/cefasdb/leases") { + http.NotFound(w, r) + return + } + switch r.Method { + case http.MethodGet: + if s.lease == nil { + http.NotFound(w, r) + return + } + writeJSON(w, http.StatusOK, s.lease) + case http.MethodPost: + if s.lease != nil { + http.Error(w, "conflict", http.StatusConflict) + return + } + var lease k8sLease + if err := json.NewDecoder(r.Body).Decode(&lease); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + s.rv++ + lease.Metadata.ResourceVersion = strconvItoa(s.rv) + s.lease = &lease + writeJSON(w, http.StatusCreated, s.lease) + case http.MethodPut: + if s.lease == nil { + http.NotFound(w, r) + return + } + var lease k8sLease + if err := json.NewDecoder(r.Body).Decode(&lease); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + if lease.Metadata.ResourceVersion != s.lease.Metadata.ResourceVersion { + http.Error(w, "conflict", http.StatusConflict) + return + } + s.rv++ + lease.Metadata.ResourceVersion = strconvItoa(s.rv) + s.lease = &lease + writeJSON(w, http.StatusOK, s.lease) + default: + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + } +} + +func (s *leaseStore) expire(age time.Duration) { + s.mu.Lock() + defer s.mu.Unlock() + if s.lease != nil && s.lease.Spec.RenewTime != nil { + s.lease.Spec.RenewTime.Time = time.Now().UTC().Add(-age) + } +} + +func writeJSON(w http.ResponseWriter, code int, v any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(code) + _ = json.NewEncoder(w).Encode(v) +} + +func strconvItoa(v int) string { + const digits = "0123456789" + if v == 0 { + return "0" + } + var buf [20]byte + i := len(buf) + for v > 0 { + i-- + buf[i] = digits[v%10] + v /= 10 + } + return string(buf[i:]) +}