From 9c6e4a17c3da14333a435ee2c61818cf212098b3 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Fri, 15 May 2026 20:53:56 +0100 Subject: [PATCH] Move session's interface method Unlock, Done and Err into interface Service A service is actually just a lock returned by NewLock. If users call the Lock or TryLock again without Unlock beforehand, then it should just return the same Grant with the same Fencing token. The Done channel is only created when the NewLock is called, it doesn't change at all each time when Lock or TryLock is called Signed-off-by: Benjamin Wang --- README.md | 26 ++++++------ examples/db/main.go | 16 +++---- examples/grpc/client/main.go | 22 +++++----- examples/grpc/resource/main.go | 2 +- examples/http/client/main.go | 24 ++++++----- lock/doc.go | 2 +- lock/fencing/token.go | 4 +- lock/grant.go | 22 ++++++++-- lock/guard/grpc.go | 6 +-- lock/guard/http.go | 5 ++- lock/service.go | 37 +++++++++++++---- lock/session.go | 76 ---------------------------------- provider/etcd/doc.go | 2 +- provider/etcd/mutex.go | 22 ++++++---- provider/etcd/options.go | 3 +- provider/etcd/provider.go | 40 +++++++++++++----- provider/etcd/session.go | 3 +- 17 files changed, 155 insertions(+), 157 deletions(-) delete mode 100644 lock/session.go diff --git a/README.md b/README.md index 1aa4874..340a925 100644 --- a/README.md +++ b/README.md @@ -60,7 +60,7 @@ if `token >= mark` the request is accepted and the mark advances; if ``` disco/ -├── lock/ # Distributed lock — Service interface, Grant, Session, errors +├── lock/ # Distributed lock — Service interface, Grant, errors │ ├── fencing/ # Token type + HTTP/gRPC transport helpers │ └── guard/ # Server-side validator: high-water mark, HTTP middleware, gRPC interceptors ├── provider/ # Backend implementations (shared across features) @@ -95,11 +95,12 @@ Client A reappears as zombie ──► writes to DB with token 34 → REJECTED ( ```go import ( + "log" "net/http" clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc/metadata" - "github.com/ahrtr/disco/lock" + "github.com/ahrtr/disco/lock/fencing" etcdprovider "github.com/ahrtr/disco/provider/etcd" ) @@ -110,24 +111,25 @@ defer cli.Close() svc, _ := etcdprovider.NewLock(cli, "/locks/my-resource") defer svc.Close() -// Blocking acquire. -session, err := svc.Lock(ctx) -if err != nil { ... } -defer session.Unlock(ctx) - // React to involuntary lease loss in the background. +// Done is a property of the service lifetime, not of any individual Lock call. go func() { - <-session.Done() - log.Printf("lock lost: %v", session.Err()) + <-svc.Done() + log.Printf("lock lost: %v", svc.Err()) // stop accessing the resource }() +// Blocking acquire — returns a Grant with the fencing token and lease metadata. +grant, err := svc.Lock(ctx) +if err != nil { ... } +defer svc.Unlock(ctx) + // Stamp every resource request with the fencing token. req, _ := http.NewRequest("POST", resourceURL, body) -session.InjectHTTP(req) +fencing.InjectHTTP(req, grant.Token()) // For gRPC: -outCtx := metadata.NewOutgoingContext(ctx, session.GRPCMetadata()) +outCtx := metadata.NewOutgoingContext(ctx, fencing.ToGRPCMetadata(grant.Token())) ``` ### Resource guard (server side) @@ -157,7 +159,7 @@ if err := g.Check(incomingToken); err != nil { | Decision | Rationale | |-----------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | Cluster revision as fencing token | etcd cluster revision is globally ordered and increases on every write; the revision recorded when the lock is acquired is always strictly higher than any previous acquisition | -| Provider manages keepalive | The session's keepalive goroutine runs internally; callers watch `Session.Done()` instead of calling `Renew()` | +| Provider manages keepalive | The session's keepalive goroutine runs internally; callers watch `svc.Done()` instead of calling `Renew()` | | `Guard` high-water mark | Atomic CAS loop with no locks; accepts `token >= mark`, rejects `token < mark` | | Caller-owned etcd client | The caller creates, configures, and closes the etcd client; the provider never closes it | | No `init()` auto-registration | Providers are constructed explicitly; no hidden init-time side effects | diff --git a/examples/db/main.go b/examples/db/main.go index e831d83..0caecc5 100644 --- a/examples/db/main.go +++ b/examples/db/main.go @@ -121,23 +121,23 @@ func main() { // ── Step 1: Client A acquires the lock ──────────────────────────────────── log.Println("Client A: acquiring lock …") - sessionA, err := providerA.Lock(ctx) + grantA, err := providerA.Lock(ctx) if err != nil { log.Fatalf("client A lock: %v", err) } - log.Printf("Client A: lock acquired fencing_token=%d TTL=5s", sessionA.FencingToken()) + log.Printf("Client A: lock acquired fencing_token=%d TTL=5s", grantA.FencingToken) // ── Step 2: Client B waits for the lock in a separate goroutine ─────────── bWritten := make(chan struct{}) go func() { log.Println("Client B: waiting for the lock …") - sessionB, err := providerB.Lock(ctx) + grantB, err := providerB.Lock(ctx) if err != nil { log.Fatalf("client B lock: %v", err) } - log.Printf("Client B: lock acquired fencing_token=%d", sessionB.FencingToken()) + log.Printf("Client B: lock acquired fencing_token=%d", grantB.FencingToken) - if err := db.Write(sessionB.FencingToken(), "hello from client B"); err != nil { + if err := db.Write(grantB.Token(), "hello from client B"); err != nil { log.Printf("Client B: write rejected: %v", err) } else { data, tok := db.State() @@ -145,7 +145,7 @@ func main() { } close(bWritten) - if err := sessionB.Unlock(ctx); err != nil { + if err := providerB.Unlock(ctx); err != nil { log.Printf("client B unlock: %v", err) } else { log.Println("Client B: lock released") @@ -159,13 +159,15 @@ func main() { cancelA() // ── Step 4: Wait for Client B to write ──────────────────────────────────── + // Blocks until B has acquired the lock (after A's lease expires ~5s from + // now) and written to the DB, advancing the stored fencing token to T'. <-bWritten // ── Step 5: Client A wakes up and tries to write with its stale token ───── // The DB rejects it because A's token is lower than the stored high-water // mark — no in-memory guard or middleware needed. log.Println("Client A: woke up — attempting write with stale token …") - if err := db.Write(sessionA.FencingToken(), "hello from client A"); err != nil { + if err := db.Write(grantA.Token(), "hello from client A"); err != nil { if errors.Is(err, fencing.ErrTokenStale) { log.Printf("Client A: write rejected by DB — %v", err) } else { diff --git a/examples/grpc/client/main.go b/examples/grpc/client/main.go index a9b4a4e..2259f7d 100644 --- a/examples/grpc/client/main.go +++ b/examples/grpc/client/main.go @@ -30,6 +30,7 @@ import ( "github.com/ahrtr/disco/examples/grpc/pb" "github.com/ahrtr/disco/lock" + "github.com/ahrtr/disco/lock/fencing" etcdprovider "github.com/ahrtr/disco/provider/etcd" ) @@ -85,27 +86,27 @@ func main() { // ── Step 1: Client A acquires the lock ──────────────────────────────────── log.Println("Client A: acquiring lock …") - sessionA, err := providerA.Lock(ctx) + grantA, err := providerA.Lock(ctx) if err != nil { log.Fatalf("client A lock: %v", err) } - log.Printf("Client A: lock acquired fencing_token=%d TTL=5s", sessionA.FencingToken()) + log.Printf("Client A: lock acquired fencing_token=%d TTL=5s", grantA.FencingToken) // ── Step 2: Client B waits for the lock in a goroutine ─────────────────── bWritten := make(chan struct{}) go func() { log.Println("Client B: waiting for the lock …") - sessionB, err := providerB.Lock(ctx) + grantB, err := providerB.Lock(ctx) if err != nil { log.Fatalf("client B lock: %v", err) } - log.Printf("Client B: lock acquired fencing_token=%d", sessionB.FencingToken()) + log.Printf("Client B: lock acquired fencing_token=%d", grantB.FencingToken) log.Println("Client B: calling Resource/Write …") - doWrite(ctx, "Client B", rc, sessionB) + doWrite(ctx, "Client B", rc, grantB) close(bWritten) - if err := sessionB.Unlock(ctx); err != nil { + if err := providerB.Unlock(ctx); err != nil { log.Printf("client B unlock: %v", err) } else { log.Println("Client B: lock released") @@ -127,14 +128,13 @@ func main() { // A's grant still holds the old token in memory, but the resource server's // high-water mark is now higher. The server rejects the RPC. log.Println("Client A: woke up — calling Resource/Write with stale token …") - doWrite(ctx, "Client A", rc, sessionA) + doWrite(ctx, "Client A", rc, grantA) } -// doWrite calls pb.Resource/Write, attaching the session's fencing token as +// doWrite calls pb.Resource/Write, attaching the grant's fencing token as // gRPC metadata so the server's guard interceptor can validate it. -func doWrite(ctx context.Context, name string, rc pb.ResourceClient, session *lock.Session) { - // session.GRPCMetadata() returns metadata containing the x-fencing-token key. - outCtx := metadata.NewOutgoingContext(ctx, session.GRPCMetadata()) +func doWrite(ctx context.Context, name string, rc pb.ResourceClient, grant *lock.Grant) { + outCtx := metadata.NewOutgoingContext(ctx, fencing.ToGRPCMetadata(grant.Token())) resp, err := rc.Write(outCtx, &pb.WriteRequest{ Data: fmt.Sprintf("hello from %s", name), diff --git a/examples/grpc/resource/main.go b/examples/grpc/resource/main.go index 64f285c..421f272 100644 --- a/examples/grpc/resource/main.go +++ b/examples/grpc/resource/main.go @@ -50,6 +50,6 @@ func main() { } log.Println("grpc/resource listening on :50051") - log.Println(" pb.Resource/Write — requires x-fencing-token metadata") + log.Println(" pb.Resource/Write — requires x-fencing-token metadata") log.Fatal(srv.Serve(lis)) } diff --git a/examples/http/client/main.go b/examples/http/client/main.go index df96445..32ca275 100644 --- a/examples/http/client/main.go +++ b/examples/http/client/main.go @@ -23,11 +23,13 @@ import ( "io" "log" "net/http" + "strings" "time" clientv3 "go.etcd.io/etcd/client/v3" "github.com/ahrtr/disco/lock" + "github.com/ahrtr/disco/lock/fencing" etcdprovider "github.com/ahrtr/disco/provider/etcd" ) @@ -72,27 +74,27 @@ func main() { // ── Step 1: Client A acquires the lock ──────────────────────────────────── log.Println("Client A: acquiring lock …") - sessionA, err := providerA.Lock(ctx) + grantA, err := providerA.Lock(ctx) if err != nil { log.Fatalf("client A lock: %v", err) } - log.Printf("Client A: lock acquired fencing_token=%d TTL=5s", sessionA.FencingToken()) + log.Printf("Client A: lock acquired fencing_token=%d TTL=5s", grantA.FencingToken) // ── Step 2: Client B waits for the lock in a separate goroutine ─────────── bWritten := make(chan struct{}) go func() { log.Println("Client B: waiting for the lock …") - sessionB, err := providerB.Lock(ctx) + grantB, err := providerB.Lock(ctx) if err != nil { log.Fatalf("client B lock: %v", err) } - log.Printf("Client B: lock acquired fencing_token=%d", sessionB.FencingToken()) + log.Printf("Client B: lock acquired fencing_token=%d", grantB.FencingToken) log.Println("Client B: writing to resource …") - doWrite("Client B", sessionB) + doWrite("Client B", grantB) close(bWritten) - if err := sessionB.Unlock(ctx); err != nil { + if err := providerB.Unlock(ctx); err != nil { log.Printf("client B unlock: %v", err) } else { log.Println("Client B: lock released") @@ -115,17 +117,17 @@ func main() { // A's grant still holds the old fencing token T in memory, but the resource // server's high-water mark is now T'. Since T < T', the write is rejected. log.Println("Client A: woke up — attempting write with stale token …") - doWrite("Client A", sessionA) + doWrite("Client A", grantA) } // doWrite sends a POST /write request to the resource server with the -// session's fencing token attached via the X-Fencing-Token header. -func doWrite(name string, session *lock.Session) { +// grant's fencing token attached via the X-Fencing-Token header. +func doWrite(name string, grant *lock.Grant) { req, err := http.NewRequest(http.MethodPost, resourceWriteURL, nil) if err != nil { log.Fatalf("build request: %v", err) } - session.InjectHTTP(req) + fencing.InjectHTTP(req, grant.Token()) resp, err := http.DefaultClient.Do(req) if err != nil { @@ -133,5 +135,5 @@ func doWrite(name string, session *lock.Session) { } defer resp.Body.Close() body, _ := io.ReadAll(resp.Body) - log.Printf("%s: %s — %s", name, resp.Status, body) + log.Printf("%s: %s — %s", name, resp.Status, strings.TrimSpace(string(body))) } diff --git a/lock/doc.go b/lock/doc.go index ac64bcf..aff50bb 100644 --- a/lock/doc.go +++ b/lock/doc.go @@ -1,5 +1,5 @@ // Package lock defines the core abstractions for the disco distributed -// coordination system: the Service interface, Grant, and Session types. +// coordination system: the Service interface and the Grant type. // // Concrete backend implementations live under provider/. package lock diff --git a/lock/fencing/token.go b/lock/fencing/token.go index 2ce1e97..c288be2 100644 --- a/lock/fencing/token.go +++ b/lock/fencing/token.go @@ -5,8 +5,8 @@ import "errors" // ErrNoToken is returned when a request carries no fencing token. var ErrNoToken = errors.New("fencing: no token present in request") -// ErrTokenStale is returned by a Guard when the incoming token is lower than -// the highest token the resource has already accepted. +// ErrTokenStale is returned when an incoming token is lower than the highest +// token the resource has already accepted. var ErrTokenStale = errors.New("fencing: token is stale") // Token is a monotonically increasing integer that identifies a lock diff --git a/lock/grant.go b/lock/grant.go index aaabef9..9434fa8 100644 --- a/lock/grant.go +++ b/lock/grant.go @@ -1,14 +1,24 @@ package lock -import "time" +import ( + "time" -// Grant represents a successfully acquired lock. + "github.com/ahrtr/disco/lock/fencing" +) + +// Grant represents the metadata for a successfully acquired lock. +// +// The fencing token carried by a Grant must be attached to every request sent +// to a guarded resource (database write, external API call, etc.) so that the +// resource can reject requests from stale (zombie) clients. +// Use grant.Token() to obtain a fencing.Token and pass it to the helpers in +// the fencing package (fencing.InjectHTTP, fencing.ToGRPCMetadata, etc.). type Grant struct { // Key is the distributed lock key that was acquired. Key string // FencingToken is a monotonically increasing integer assigned at lock - // grant time. It must be attached to every resource request so the + // acquisition time. It must be attached to every resource request so the // resource can reject requests from stale (lower-token) owners. FencingToken int64 @@ -17,3 +27,9 @@ type Grant struct { // backend. ExpiresAt time.Time } + +// Token returns the fencing token as a fencing.Token, ready to pass to +// fencing.InjectHTTP, fencing.ToGRPCMetadata, or a resource's Check method. +func (g *Grant) Token() fencing.Token { + return fencing.Token(g.FencingToken) +} diff --git a/lock/guard/grpc.go b/lock/guard/grpc.go index 76a6d30..6bcb54c 100644 --- a/lock/guard/grpc.go +++ b/lock/guard/grpc.go @@ -13,9 +13,9 @@ import ( // UnaryInterceptor returns a gRPC unary server interceptor that validates the // fencing token carried in incoming metadata before calling the handler. // -// Requests missing the fencing-token metadata key are rejected with -// codes.InvalidArgument. Requests carrying a stale token are rejected with -// codes.Aborted. +// Requests with a missing or malformed fencing-token metadata key are rejected +// with codes.InvalidArgument. Requests carrying a stale token are rejected +// with codes.Aborted. // // Register it when creating the server: // diff --git a/lock/guard/http.go b/lock/guard/http.go index daed0ad..3bdd2f4 100644 --- a/lock/guard/http.go +++ b/lock/guard/http.go @@ -9,8 +9,9 @@ import ( // HTTPMiddleware is an HTTP middleware that extracts the fencing token from // every incoming request and validates it against the Guard before calling next. // -// Requests missing the X-Fencing-Token header are rejected with 400 Bad -// Request. Requests carrying a stale token are rejected with 409 Conflict. +// Requests with a missing or malformed X-Fencing-Token header are rejected +// with 400 Bad Request. Requests carrying a stale token are rejected with +// 409 Conflict. // // Use it directly as a handler wrapper: // diff --git a/lock/service.go b/lock/service.go index 57206b5..ac16b61 100644 --- a/lock/service.go +++ b/lock/service.go @@ -4,18 +4,41 @@ import "context" // Service is the single abstraction over all distributed lock backends. // -// Each Service instance is bound to a single lock key, established at -// construction time. The lease and its keepalive are managed internally; -// callers do not need to renew it. Use the Session returned by Lock or -// TryLock to monitor for involuntary lease loss via Session.Done. +// A Service instance is bound to a single lock key, established at construction +// time (e.g. etcd.NewLock). The underlying lease and its keepalive are managed +// internally; callers do not need to renew it. +// +// The Done channel and Err reflect the health of the lease — they are +// properties of the Service lifetime, not of any individual Lock call. +// Monitor Done in a background goroutine to detect involuntary lease loss: +// +// go func() { +// <-svc.Done() +// log.Println("lease lost — stop accessing guarded resources") +// }() type Service interface { // Lock acquires the distributed lock, blocking until it is available or - // ctx is canceled. - Lock(ctx context.Context) (*Session, error) + // ctx is canceled. Returns a Grant carrying the fencing token and lease + // metadata for this acquisition. + Lock(ctx context.Context) (*Grant, error) // TryLock attempts to acquire the lock without blocking. // Returns ErrLockTaken immediately if the lock is held by another owner. - TryLock(ctx context.Context) (*Session, error) + TryLock(ctx context.Context) (*Grant, error) + + // Unlock explicitly releases the lock. The underlying lease remains alive + // so Lock can be called again without creating a new Service. + Unlock(ctx context.Context) error + + // Done returns a channel that is closed when the underlying lease is lost + // (expired or revoked). Once closed, callers must immediately stop + // accessing guarded resources and must not call Lock again. + // Call Close to release backend resources. + Done() <-chan struct{} + + // Err returns ErrLockLost if the lease has been lost, nil otherwise. + // Safe to call concurrently at any point in the Service lifetime. + Err() error // Close revokes the lease and releases all backend resources. Close() error diff --git a/lock/session.go b/lock/session.go deleted file mode 100644 index bb77c68..0000000 --- a/lock/session.go +++ /dev/null @@ -1,76 +0,0 @@ -package lock - -import ( - "context" - "net/http" - - "google.golang.org/grpc/metadata" - - "github.com/ahrtr/disco/lock/fencing" -) - -// Session represents an active, leased lock ownership. -// -// The fencing token carried by this session must be attached to every request -// sent to a guarded resource (database write, external API call, etc.) so that -// the resource can reject requests from stale sessions. -// -// A Session is not safe for concurrent calls to Unlock; all other methods -// are safe for concurrent use. -type Session struct { - grant *Grant - done <-chan struct{} - unlock func(context.Context) error -} - -// NewSession creates a Session for use by Service implementations. -// done is closed when the lease is lost; unlock releases the lock. -// Callers should obtain a Session via Service.Lock or Service.TryLock. -func NewSession(grant *Grant, done <-chan struct{}, unlock func(context.Context) error) *Session { - return &Session{grant: grant, done: done, unlock: unlock} -} - -// FencingToken returns the monotonically increasing token for this lock -// generation. Attach it to every resource request via InjectHTTP or -// GRPCMetadata. -func (s *Session) FencingToken() fencing.Token { - return fencing.Token(s.grant.FencingToken) -} - -// Grant returns the underlying Grant with metadata such as the fencing -// token, lock key, and lease expiry time. -func (s *Session) Grant() *Grant { return s.grant } - -// Done returns a channel that is closed when the lease is lost. -// Once closed, callers must immediately stop accessing guarded resources. -func (s *Session) Done() <-chan struct{} { return s.done } - -// Err returns ErrLockLost if the lease has been lost, or nil if the -// session is still alive. Safe to call before or after Done() is closed. -func (s *Session) Err() error { - select { - case <-s.done: - return ErrLockLost - default: - return nil - } -} - -// Unlock explicitly releases the lock. -func (s *Session) Unlock(ctx context.Context) error { - return s.unlock(ctx) -} - -// InjectHTTP sets the X-Fencing-Token header on req. Call this before -// executing any HTTP request against a guarded resource. -func (s *Session) InjectHTTP(req *http.Request) { - fencing.InjectHTTP(req, s.FencingToken()) -} - -// GRPCMetadata returns gRPC metadata containing the fencing token. Attach it -// to the outgoing context: -// -// ctx = metadata.NewOutgoingContext(ctx, session.GRPCMetadata()) -func (s *Session) GRPCMetadata() metadata.MD { - return fencing.ToGRPCMetadata(s.FencingToken()) -} diff --git a/provider/etcd/doc.go b/provider/etcd/doc.go index 228037f..ca6e27b 100644 --- a/provider/etcd/doc.go +++ b/provider/etcd/doc.go @@ -21,5 +21,5 @@ // // The session manages its own lease keepalive goroutine. Callers do not need // to renew the lease manually; they should instead monitor the channel returned -// by Session.Done to detect involuntary lease loss. +// by Service.Done to detect involuntary lease loss. package etcd diff --git a/provider/etcd/mutex.go b/provider/etcd/mutex.go index 019ac77..b02cedf 100644 --- a/provider/etcd/mutex.go +++ b/provider/etcd/mutex.go @@ -21,12 +21,14 @@ var ( type mutex struct { s *session - pfx string - myKey string - myRev int64 - hdr *pb.ResponseHeader + pfx string // key prefix; all candidate keys are put under pfx + myKey string // this session's candidate key in etcd + myRev int64 // create revision of myKey; lowest revision is the lock holder + hdr *pb.ResponseHeader // response header from the last successful lock acquisition } +// newMutex returns a mutex for pfx backed by session s. +// All lock keys are stored under pfx + "/". func newMutex(s *session, pfx string) *mutex { return &mutex{s, pfx + "/", "", -1, nil} } @@ -42,13 +44,14 @@ func (m *mutex) tryLock(ctx context.Context) error { if err != nil { return err } - // if no key on prefix / the minimum rev is key, already hold the lock + // No key exists under the prefix, or our key has the lowest create revision: + // we are already the lock holder. ownerKey := resp.Responses[1].GetResponseRange().Kvs if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev { m.hdr = resp.Header return nil } - // Cannot lock, so delete the key + // Another session holds the lock; clean up our candidate key and return. if _, err := m.s.client.Delete(ctx, m.myKey); err != nil { return err } @@ -67,7 +70,8 @@ func (m *mutex) lock(ctx context.Context) error { if err != nil { return err } - // if no key on prefix / the minimum rev is key, already hold the lock + // No key exists under the prefix, or our key has the lowest create revision: + // we are already the lock holder. ownerKey := resp.Responses[1].GetResponseRange().Kvs if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev { m.hdr = resp.Header @@ -119,6 +123,8 @@ func (m *mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) { return resp, nil } +// unlock deletes this session's candidate key, releasing the lock. +// Returns errLockReleased if the key has already been deleted. func (m *mutex) unlock(ctx context.Context) error { if m.myKey == "" || m.myRev <= 0 || m.myKey == "\x00" { return errLockReleased @@ -139,6 +145,8 @@ func (m *mutex) unlock(ctx context.Context) error { // header returns the response header received from etcd on acquiring the lock. func (m *mutex) header() *pb.ResponseHeader { return m.hdr } +// waitDelete blocks until a DELETE event is observed for key at or after rev, +// or until ctx is canceled or the watch channel is closed unexpectedly. func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error { cctx, cancel := context.WithCancel(ctx) defer cancel() diff --git a/provider/etcd/options.go b/provider/etcd/options.go index 31e46a1..a78c7fb 100644 --- a/provider/etcd/options.go +++ b/provider/etcd/options.go @@ -26,7 +26,7 @@ func defaultProviderOptions() providerOptions { // WithContext sets the parent context for the session's lease keepalive loop. // When the context is cancelled the keepalive stops, the lease expires, and -// any Session obtained from this service will have its Done channel closed. +// the service's Done channel is closed. // If not set, the etcd client's own context is used. func WithContext(ctx context.Context) ProviderOption { return func(o *providerOptions) { @@ -35,6 +35,7 @@ func WithContext(ctx context.Context) ProviderOption { } // WithDefaultTTL sets the default lease TTL. Defaults to 30 s. +// Values below 5 s are clamped to 5 s by NewLock. func WithDefaultTTL(d time.Duration) ProviderOption { return func(o *providerOptions) { o.cfg.DefaultTTL = d diff --git a/provider/etcd/provider.go b/provider/etcd/provider.go index 10f4b62..d132795 100644 --- a/provider/etcd/provider.go +++ b/provider/etcd/provider.go @@ -31,6 +31,9 @@ type Provider struct { // mutex for key. Both are reused across Lock and TryLock calls for the // lifetime of the returned service. // +// The effective TTL is clamped to a minimum of 5 seconds regardless of the +// value passed via WithDefaultTTL. +// // The caller is responsible for creating, configuring, and eventually closing // the etcd client. Close revokes the session lease; it never closes the client. // @@ -67,47 +70,62 @@ func NewLock(client *clientv3.Client, key string, opts ...ProviderOption) (lock. // // The fencing token is the etcd cluster revision at the moment the lock is // acquired, a globally monotonically increasing value across the etcd cluster. -func (p *Provider) Lock(ctx context.Context) (*lock.Session, error) { +func (p *Provider) Lock(ctx context.Context) (*lock.Grant, error) { if err := p.mutex.lock(ctx); err != nil { return nil, fmt.Errorf("etcd provider: lock %q: %w", p.key, err) } - return p.newSession(), nil + return p.newGrant(), nil } // TryLock attempts to acquire the lock without blocking. // Returns lock.ErrLockTaken immediately if the lock is held by another owner. -func (p *Provider) TryLock(ctx context.Context) (*lock.Session, error) { +func (p *Provider) TryLock(ctx context.Context) (*lock.Grant, error) { if err := p.mutex.tryLock(ctx); err != nil { if errors.Is(err, errLocked) { return nil, lock.ErrLockTaken } return nil, fmt.Errorf("etcd provider: trylock %q: %w", p.key, err) } - return p.newSession(), nil + return p.newGrant(), nil } -// unlock releases the lock. The session and its lease remain alive so Lock +// Unlock releases the lock. The session and its lease remain alive so Lock // can be called again without creating a new Provider. -func (p *Provider) unlock(ctx context.Context) error { +func (p *Provider) Unlock(ctx context.Context) error { if err := p.mutex.unlock(ctx); err != nil && !errors.Is(err, errLockReleased) { return fmt.Errorf("etcd provider: unlock %q: %w", p.key, err) } return nil } +// Done returns a channel that is closed when the session lease is lost. +// The channel is created once at NewLock time and never changes. +func (p *Provider) Done() <-chan struct{} { + return p.session.donec +} + +// Err returns lock.ErrLockLost if the session lease has been lost, nil otherwise. +func (p *Provider) Err() error { + select { + case <-p.session.donec: + return lock.ErrLockLost + default: + return nil + } +} + // Close revokes the session lease, releasing any held lock. The underlying // etcd client is not closed; the caller that created it is responsible for that. func (p *Provider) Close() error { return p.session.close() } -// newSession builds a lock.Session from the current mutex state after a -// successful lock acquisition. -func (p *Provider) newSession() *lock.Session { - grant := &lock.Grant{ +// newGrant builds a lock.Grant from the current mutex state after a successful +// lock acquisition. +func (p *Provider) newGrant() *lock.Grant { + return &lock.Grant{ Key: p.key, FencingToken: p.mutex.header().Revision, ExpiresAt: time.Now().Add(time.Duration(p.session.opts.ttl) * time.Second), } - return lock.NewSession(grant, p.session.donec, p.unlock) } diff --git a/provider/etcd/session.go b/provider/etcd/session.go index b121fff..1ac3d87 100644 --- a/provider/etcd/session.go +++ b/provider/etcd/session.go @@ -9,7 +9,7 @@ import ( v3 "go.etcd.io/etcd/client/v3" ) -// session represents a lease kept alive for the lifetime of a client. +// session represents a lease kept alive for the lifetime of a Provider. type session struct { client *v3.Client opts *sessionOptions @@ -81,6 +81,7 @@ func (s *session) close() error { return err } +// sessionOptions holds the resolved configuration for a session. type sessionOptions struct { ttl int ctx context.Context