diff --git a/README.md b/README.md index 1aa4874..340a925 100644 --- a/README.md +++ b/README.md @@ -60,7 +60,7 @@ if `token >= mark` the request is accepted and the mark advances; if ``` disco/ -├── lock/ # Distributed lock — Service interface, Grant, Session, errors +├── lock/ # Distributed lock — Service interface, Grant, errors │ ├── fencing/ # Token type + HTTP/gRPC transport helpers │ └── guard/ # Server-side validator: high-water mark, HTTP middleware, gRPC interceptors ├── provider/ # Backend implementations (shared across features) @@ -95,11 +95,12 @@ Client A reappears as zombie ──► writes to DB with token 34 → REJECTED ( ```go import ( + "log" "net/http" clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc/metadata" - "github.com/ahrtr/disco/lock" + "github.com/ahrtr/disco/lock/fencing" etcdprovider "github.com/ahrtr/disco/provider/etcd" ) @@ -110,24 +111,25 @@ defer cli.Close() svc, _ := etcdprovider.NewLock(cli, "/locks/my-resource") defer svc.Close() -// Blocking acquire. -session, err := svc.Lock(ctx) -if err != nil { ... } -defer session.Unlock(ctx) - // React to involuntary lease loss in the background. +// Done is a property of the service lifetime, not of any individual Lock call. go func() { - <-session.Done() - log.Printf("lock lost: %v", session.Err()) + <-svc.Done() + log.Printf("lock lost: %v", svc.Err()) // stop accessing the resource }() +// Blocking acquire — returns a Grant with the fencing token and lease metadata. +grant, err := svc.Lock(ctx) +if err != nil { ... } +defer svc.Unlock(ctx) + // Stamp every resource request with the fencing token. req, _ := http.NewRequest("POST", resourceURL, body) -session.InjectHTTP(req) +fencing.InjectHTTP(req, grant.Token()) // For gRPC: -outCtx := metadata.NewOutgoingContext(ctx, session.GRPCMetadata()) +outCtx := metadata.NewOutgoingContext(ctx, fencing.ToGRPCMetadata(grant.Token())) ``` ### Resource guard (server side) @@ -157,7 +159,7 @@ if err := g.Check(incomingToken); err != nil { | Decision | Rationale | |-----------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | Cluster revision as fencing token | etcd cluster revision is globally ordered and increases on every write; the revision recorded when the lock is acquired is always strictly higher than any previous acquisition | -| Provider manages keepalive | The session's keepalive goroutine runs internally; callers watch `Session.Done()` instead of calling `Renew()` | +| Provider manages keepalive | The session's keepalive goroutine runs internally; callers watch `svc.Done()` instead of calling `Renew()` | | `Guard` high-water mark | Atomic CAS loop with no locks; accepts `token >= mark`, rejects `token < mark` | | Caller-owned etcd client | The caller creates, configures, and closes the etcd client; the provider never closes it | | No `init()` auto-registration | Providers are constructed explicitly; no hidden init-time side effects | diff --git a/examples/db/main.go b/examples/db/main.go index e831d83..0caecc5 100644 --- a/examples/db/main.go +++ b/examples/db/main.go @@ -121,23 +121,23 @@ func main() { // ── Step 1: Client A acquires the lock ──────────────────────────────────── log.Println("Client A: acquiring lock …") - sessionA, err := providerA.Lock(ctx) + grantA, err := providerA.Lock(ctx) if err != nil { log.Fatalf("client A lock: %v", err) } - log.Printf("Client A: lock acquired fencing_token=%d TTL=5s", sessionA.FencingToken()) + log.Printf("Client A: lock acquired fencing_token=%d TTL=5s", grantA.FencingToken) // ── Step 2: Client B waits for the lock in a separate goroutine ─────────── bWritten := make(chan struct{}) go func() { log.Println("Client B: waiting for the lock …") - sessionB, err := providerB.Lock(ctx) + grantB, err := providerB.Lock(ctx) if err != nil { log.Fatalf("client B lock: %v", err) } - log.Printf("Client B: lock acquired fencing_token=%d", sessionB.FencingToken()) + log.Printf("Client B: lock acquired fencing_token=%d", grantB.FencingToken) - if err := db.Write(sessionB.FencingToken(), "hello from client B"); err != nil { + if err := db.Write(grantB.Token(), "hello from client B"); err != nil { log.Printf("Client B: write rejected: %v", err) } else { data, tok := db.State() @@ -145,7 +145,7 @@ func main() { } close(bWritten) - if err := sessionB.Unlock(ctx); err != nil { + if err := providerB.Unlock(ctx); err != nil { log.Printf("client B unlock: %v", err) } else { log.Println("Client B: lock released") @@ -159,13 +159,15 @@ func main() { cancelA() // ── Step 4: Wait for Client B to write ──────────────────────────────────── + // Blocks until B has acquired the lock (after A's lease expires ~5s from + // now) and written to the DB, advancing the stored fencing token to T'. <-bWritten // ── Step 5: Client A wakes up and tries to write with its stale token ───── // The DB rejects it because A's token is lower than the stored high-water // mark — no in-memory guard or middleware needed. log.Println("Client A: woke up — attempting write with stale token …") - if err := db.Write(sessionA.FencingToken(), "hello from client A"); err != nil { + if err := db.Write(grantA.Token(), "hello from client A"); err != nil { if errors.Is(err, fencing.ErrTokenStale) { log.Printf("Client A: write rejected by DB — %v", err) } else { diff --git a/examples/grpc/client/main.go b/examples/grpc/client/main.go index a9b4a4e..2259f7d 100644 --- a/examples/grpc/client/main.go +++ b/examples/grpc/client/main.go @@ -30,6 +30,7 @@ import ( "github.com/ahrtr/disco/examples/grpc/pb" "github.com/ahrtr/disco/lock" + "github.com/ahrtr/disco/lock/fencing" etcdprovider "github.com/ahrtr/disco/provider/etcd" ) @@ -85,27 +86,27 @@ func main() { // ── Step 1: Client A acquires the lock ──────────────────────────────────── log.Println("Client A: acquiring lock …") - sessionA, err := providerA.Lock(ctx) + grantA, err := providerA.Lock(ctx) if err != nil { log.Fatalf("client A lock: %v", err) } - log.Printf("Client A: lock acquired fencing_token=%d TTL=5s", sessionA.FencingToken()) + log.Printf("Client A: lock acquired fencing_token=%d TTL=5s", grantA.FencingToken) // ── Step 2: Client B waits for the lock in a goroutine ─────────────────── bWritten := make(chan struct{}) go func() { log.Println("Client B: waiting for the lock …") - sessionB, err := providerB.Lock(ctx) + grantB, err := providerB.Lock(ctx) if err != nil { log.Fatalf("client B lock: %v", err) } - log.Printf("Client B: lock acquired fencing_token=%d", sessionB.FencingToken()) + log.Printf("Client B: lock acquired fencing_token=%d", grantB.FencingToken) log.Println("Client B: calling Resource/Write …") - doWrite(ctx, "Client B", rc, sessionB) + doWrite(ctx, "Client B", rc, grantB) close(bWritten) - if err := sessionB.Unlock(ctx); err != nil { + if err := providerB.Unlock(ctx); err != nil { log.Printf("client B unlock: %v", err) } else { log.Println("Client B: lock released") @@ -127,14 +128,13 @@ func main() { // A's grant still holds the old token in memory, but the resource server's // high-water mark is now higher. The server rejects the RPC. log.Println("Client A: woke up — calling Resource/Write with stale token …") - doWrite(ctx, "Client A", rc, sessionA) + doWrite(ctx, "Client A", rc, grantA) } -// doWrite calls pb.Resource/Write, attaching the session's fencing token as +// doWrite calls pb.Resource/Write, attaching the grant's fencing token as // gRPC metadata so the server's guard interceptor can validate it. -func doWrite(ctx context.Context, name string, rc pb.ResourceClient, session *lock.Session) { - // session.GRPCMetadata() returns metadata containing the x-fencing-token key. - outCtx := metadata.NewOutgoingContext(ctx, session.GRPCMetadata()) +func doWrite(ctx context.Context, name string, rc pb.ResourceClient, grant *lock.Grant) { + outCtx := metadata.NewOutgoingContext(ctx, fencing.ToGRPCMetadata(grant.Token())) resp, err := rc.Write(outCtx, &pb.WriteRequest{ Data: fmt.Sprintf("hello from %s", name), diff --git a/examples/grpc/resource/main.go b/examples/grpc/resource/main.go index 64f285c..421f272 100644 --- a/examples/grpc/resource/main.go +++ b/examples/grpc/resource/main.go @@ -50,6 +50,6 @@ func main() { } log.Println("grpc/resource listening on :50051") - log.Println(" pb.Resource/Write — requires x-fencing-token metadata") + log.Println(" pb.Resource/Write — requires x-fencing-token metadata") log.Fatal(srv.Serve(lis)) } diff --git a/examples/http/client/main.go b/examples/http/client/main.go index df96445..32ca275 100644 --- a/examples/http/client/main.go +++ b/examples/http/client/main.go @@ -23,11 +23,13 @@ import ( "io" "log" "net/http" + "strings" "time" clientv3 "go.etcd.io/etcd/client/v3" "github.com/ahrtr/disco/lock" + "github.com/ahrtr/disco/lock/fencing" etcdprovider "github.com/ahrtr/disco/provider/etcd" ) @@ -72,27 +74,27 @@ func main() { // ── Step 1: Client A acquires the lock ──────────────────────────────────── log.Println("Client A: acquiring lock …") - sessionA, err := providerA.Lock(ctx) + grantA, err := providerA.Lock(ctx) if err != nil { log.Fatalf("client A lock: %v", err) } - log.Printf("Client A: lock acquired fencing_token=%d TTL=5s", sessionA.FencingToken()) + log.Printf("Client A: lock acquired fencing_token=%d TTL=5s", grantA.FencingToken) // ── Step 2: Client B waits for the lock in a separate goroutine ─────────── bWritten := make(chan struct{}) go func() { log.Println("Client B: waiting for the lock …") - sessionB, err := providerB.Lock(ctx) + grantB, err := providerB.Lock(ctx) if err != nil { log.Fatalf("client B lock: %v", err) } - log.Printf("Client B: lock acquired fencing_token=%d", sessionB.FencingToken()) + log.Printf("Client B: lock acquired fencing_token=%d", grantB.FencingToken) log.Println("Client B: writing to resource …") - doWrite("Client B", sessionB) + doWrite("Client B", grantB) close(bWritten) - if err := sessionB.Unlock(ctx); err != nil { + if err := providerB.Unlock(ctx); err != nil { log.Printf("client B unlock: %v", err) } else { log.Println("Client B: lock released") @@ -115,17 +117,17 @@ func main() { // A's grant still holds the old fencing token T in memory, but the resource // server's high-water mark is now T'. Since T < T', the write is rejected. log.Println("Client A: woke up — attempting write with stale token …") - doWrite("Client A", sessionA) + doWrite("Client A", grantA) } // doWrite sends a POST /write request to the resource server with the -// session's fencing token attached via the X-Fencing-Token header. -func doWrite(name string, session *lock.Session) { +// grant's fencing token attached via the X-Fencing-Token header. +func doWrite(name string, grant *lock.Grant) { req, err := http.NewRequest(http.MethodPost, resourceWriteURL, nil) if err != nil { log.Fatalf("build request: %v", err) } - session.InjectHTTP(req) + fencing.InjectHTTP(req, grant.Token()) resp, err := http.DefaultClient.Do(req) if err != nil { @@ -133,5 +135,5 @@ func doWrite(name string, session *lock.Session) { } defer resp.Body.Close() body, _ := io.ReadAll(resp.Body) - log.Printf("%s: %s — %s", name, resp.Status, body) + log.Printf("%s: %s — %s", name, resp.Status, strings.TrimSpace(string(body))) } diff --git a/lock/doc.go b/lock/doc.go index ac64bcf..aff50bb 100644 --- a/lock/doc.go +++ b/lock/doc.go @@ -1,5 +1,5 @@ // Package lock defines the core abstractions for the disco distributed -// coordination system: the Service interface, Grant, and Session types. +// coordination system: the Service interface and the Grant type. // // Concrete backend implementations live under provider/. package lock diff --git a/lock/fencing/token.go b/lock/fencing/token.go index 2ce1e97..c288be2 100644 --- a/lock/fencing/token.go +++ b/lock/fencing/token.go @@ -5,8 +5,8 @@ import "errors" // ErrNoToken is returned when a request carries no fencing token. var ErrNoToken = errors.New("fencing: no token present in request") -// ErrTokenStale is returned by a Guard when the incoming token is lower than -// the highest token the resource has already accepted. +// ErrTokenStale is returned when an incoming token is lower than the highest +// token the resource has already accepted. var ErrTokenStale = errors.New("fencing: token is stale") // Token is a monotonically increasing integer that identifies a lock diff --git a/lock/grant.go b/lock/grant.go index aaabef9..9434fa8 100644 --- a/lock/grant.go +++ b/lock/grant.go @@ -1,14 +1,24 @@ package lock -import "time" +import ( + "time" -// Grant represents a successfully acquired lock. + "github.com/ahrtr/disco/lock/fencing" +) + +// Grant represents the metadata for a successfully acquired lock. +// +// The fencing token carried by a Grant must be attached to every request sent +// to a guarded resource (database write, external API call, etc.) so that the +// resource can reject requests from stale (zombie) clients. +// Use grant.Token() to obtain a fencing.Token and pass it to the helpers in +// the fencing package (fencing.InjectHTTP, fencing.ToGRPCMetadata, etc.). type Grant struct { // Key is the distributed lock key that was acquired. Key string // FencingToken is a monotonically increasing integer assigned at lock - // grant time. It must be attached to every resource request so the + // acquisition time. It must be attached to every resource request so the // resource can reject requests from stale (lower-token) owners. FencingToken int64 @@ -17,3 +27,9 @@ type Grant struct { // backend. ExpiresAt time.Time } + +// Token returns the fencing token as a fencing.Token, ready to pass to +// fencing.InjectHTTP, fencing.ToGRPCMetadata, or a resource's Check method. +func (g *Grant) Token() fencing.Token { + return fencing.Token(g.FencingToken) +} diff --git a/lock/guard/grpc.go b/lock/guard/grpc.go index 76a6d30..6bcb54c 100644 --- a/lock/guard/grpc.go +++ b/lock/guard/grpc.go @@ -13,9 +13,9 @@ import ( // UnaryInterceptor returns a gRPC unary server interceptor that validates the // fencing token carried in incoming metadata before calling the handler. // -// Requests missing the fencing-token metadata key are rejected with -// codes.InvalidArgument. Requests carrying a stale token are rejected with -// codes.Aborted. +// Requests with a missing or malformed fencing-token metadata key are rejected +// with codes.InvalidArgument. Requests carrying a stale token are rejected +// with codes.Aborted. // // Register it when creating the server: // diff --git a/lock/guard/http.go b/lock/guard/http.go index daed0ad..3bdd2f4 100644 --- a/lock/guard/http.go +++ b/lock/guard/http.go @@ -9,8 +9,9 @@ import ( // HTTPMiddleware is an HTTP middleware that extracts the fencing token from // every incoming request and validates it against the Guard before calling next. // -// Requests missing the X-Fencing-Token header are rejected with 400 Bad -// Request. Requests carrying a stale token are rejected with 409 Conflict. +// Requests with a missing or malformed X-Fencing-Token header are rejected +// with 400 Bad Request. Requests carrying a stale token are rejected with +// 409 Conflict. // // Use it directly as a handler wrapper: // diff --git a/lock/service.go b/lock/service.go index 57206b5..ac16b61 100644 --- a/lock/service.go +++ b/lock/service.go @@ -4,18 +4,41 @@ import "context" // Service is the single abstraction over all distributed lock backends. // -// Each Service instance is bound to a single lock key, established at -// construction time. The lease and its keepalive are managed internally; -// callers do not need to renew it. Use the Session returned by Lock or -// TryLock to monitor for involuntary lease loss via Session.Done. +// A Service instance is bound to a single lock key, established at construction +// time (e.g. etcd.NewLock). The underlying lease and its keepalive are managed +// internally; callers do not need to renew it. +// +// The Done channel and Err reflect the health of the lease — they are +// properties of the Service lifetime, not of any individual Lock call. +// Monitor Done in a background goroutine to detect involuntary lease loss: +// +// go func() { +// <-svc.Done() +// log.Println("lease lost — stop accessing guarded resources") +// }() type Service interface { // Lock acquires the distributed lock, blocking until it is available or - // ctx is canceled. - Lock(ctx context.Context) (*Session, error) + // ctx is canceled. Returns a Grant carrying the fencing token and lease + // metadata for this acquisition. + Lock(ctx context.Context) (*Grant, error) // TryLock attempts to acquire the lock without blocking. // Returns ErrLockTaken immediately if the lock is held by another owner. - TryLock(ctx context.Context) (*Session, error) + TryLock(ctx context.Context) (*Grant, error) + + // Unlock explicitly releases the lock. The underlying lease remains alive + // so Lock can be called again without creating a new Service. + Unlock(ctx context.Context) error + + // Done returns a channel that is closed when the underlying lease is lost + // (expired or revoked). Once closed, callers must immediately stop + // accessing guarded resources and must not call Lock again. + // Call Close to release backend resources. + Done() <-chan struct{} + + // Err returns ErrLockLost if the lease has been lost, nil otherwise. + // Safe to call concurrently at any point in the Service lifetime. + Err() error // Close revokes the lease and releases all backend resources. Close() error diff --git a/lock/session.go b/lock/session.go deleted file mode 100644 index bb77c68..0000000 --- a/lock/session.go +++ /dev/null @@ -1,76 +0,0 @@ -package lock - -import ( - "context" - "net/http" - - "google.golang.org/grpc/metadata" - - "github.com/ahrtr/disco/lock/fencing" -) - -// Session represents an active, leased lock ownership. -// -// The fencing token carried by this session must be attached to every request -// sent to a guarded resource (database write, external API call, etc.) so that -// the resource can reject requests from stale sessions. -// -// A Session is not safe for concurrent calls to Unlock; all other methods -// are safe for concurrent use. -type Session struct { - grant *Grant - done <-chan struct{} - unlock func(context.Context) error -} - -// NewSession creates a Session for use by Service implementations. -// done is closed when the lease is lost; unlock releases the lock. -// Callers should obtain a Session via Service.Lock or Service.TryLock. -func NewSession(grant *Grant, done <-chan struct{}, unlock func(context.Context) error) *Session { - return &Session{grant: grant, done: done, unlock: unlock} -} - -// FencingToken returns the monotonically increasing token for this lock -// generation. Attach it to every resource request via InjectHTTP or -// GRPCMetadata. -func (s *Session) FencingToken() fencing.Token { - return fencing.Token(s.grant.FencingToken) -} - -// Grant returns the underlying Grant with metadata such as the fencing -// token, lock key, and lease expiry time. -func (s *Session) Grant() *Grant { return s.grant } - -// Done returns a channel that is closed when the lease is lost. -// Once closed, callers must immediately stop accessing guarded resources. -func (s *Session) Done() <-chan struct{} { return s.done } - -// Err returns ErrLockLost if the lease has been lost, or nil if the -// session is still alive. Safe to call before or after Done() is closed. -func (s *Session) Err() error { - select { - case <-s.done: - return ErrLockLost - default: - return nil - } -} - -// Unlock explicitly releases the lock. -func (s *Session) Unlock(ctx context.Context) error { - return s.unlock(ctx) -} - -// InjectHTTP sets the X-Fencing-Token header on req. Call this before -// executing any HTTP request against a guarded resource. -func (s *Session) InjectHTTP(req *http.Request) { - fencing.InjectHTTP(req, s.FencingToken()) -} - -// GRPCMetadata returns gRPC metadata containing the fencing token. Attach it -// to the outgoing context: -// -// ctx = metadata.NewOutgoingContext(ctx, session.GRPCMetadata()) -func (s *Session) GRPCMetadata() metadata.MD { - return fencing.ToGRPCMetadata(s.FencingToken()) -} diff --git a/provider/etcd/doc.go b/provider/etcd/doc.go index 228037f..ca6e27b 100644 --- a/provider/etcd/doc.go +++ b/provider/etcd/doc.go @@ -21,5 +21,5 @@ // // The session manages its own lease keepalive goroutine. Callers do not need // to renew the lease manually; they should instead monitor the channel returned -// by Session.Done to detect involuntary lease loss. +// by Service.Done to detect involuntary lease loss. package etcd diff --git a/provider/etcd/mutex.go b/provider/etcd/mutex.go index 019ac77..b02cedf 100644 --- a/provider/etcd/mutex.go +++ b/provider/etcd/mutex.go @@ -21,12 +21,14 @@ var ( type mutex struct { s *session - pfx string - myKey string - myRev int64 - hdr *pb.ResponseHeader + pfx string // key prefix; all candidate keys are put under pfx + myKey string // this session's candidate key in etcd + myRev int64 // create revision of myKey; lowest revision is the lock holder + hdr *pb.ResponseHeader // response header from the last successful lock acquisition } +// newMutex returns a mutex for pfx backed by session s. +// All lock keys are stored under pfx + "/". func newMutex(s *session, pfx string) *mutex { return &mutex{s, pfx + "/", "", -1, nil} } @@ -42,13 +44,14 @@ func (m *mutex) tryLock(ctx context.Context) error { if err != nil { return err } - // if no key on prefix / the minimum rev is key, already hold the lock + // No key exists under the prefix, or our key has the lowest create revision: + // we are already the lock holder. ownerKey := resp.Responses[1].GetResponseRange().Kvs if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev { m.hdr = resp.Header return nil } - // Cannot lock, so delete the key + // Another session holds the lock; clean up our candidate key and return. if _, err := m.s.client.Delete(ctx, m.myKey); err != nil { return err } @@ -67,7 +70,8 @@ func (m *mutex) lock(ctx context.Context) error { if err != nil { return err } - // if no key on prefix / the minimum rev is key, already hold the lock + // No key exists under the prefix, or our key has the lowest create revision: + // we are already the lock holder. ownerKey := resp.Responses[1].GetResponseRange().Kvs if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev { m.hdr = resp.Header @@ -119,6 +123,8 @@ func (m *mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) { return resp, nil } +// unlock deletes this session's candidate key, releasing the lock. +// Returns errLockReleased if the key has already been deleted. func (m *mutex) unlock(ctx context.Context) error { if m.myKey == "" || m.myRev <= 0 || m.myKey == "\x00" { return errLockReleased @@ -139,6 +145,8 @@ func (m *mutex) unlock(ctx context.Context) error { // header returns the response header received from etcd on acquiring the lock. func (m *mutex) header() *pb.ResponseHeader { return m.hdr } +// waitDelete blocks until a DELETE event is observed for key at or after rev, +// or until ctx is canceled or the watch channel is closed unexpectedly. func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error { cctx, cancel := context.WithCancel(ctx) defer cancel() diff --git a/provider/etcd/options.go b/provider/etcd/options.go index 31e46a1..a78c7fb 100644 --- a/provider/etcd/options.go +++ b/provider/etcd/options.go @@ -26,7 +26,7 @@ func defaultProviderOptions() providerOptions { // WithContext sets the parent context for the session's lease keepalive loop. // When the context is cancelled the keepalive stops, the lease expires, and -// any Session obtained from this service will have its Done channel closed. +// the service's Done channel is closed. // If not set, the etcd client's own context is used. func WithContext(ctx context.Context) ProviderOption { return func(o *providerOptions) { @@ -35,6 +35,7 @@ func WithContext(ctx context.Context) ProviderOption { } // WithDefaultTTL sets the default lease TTL. Defaults to 30 s. +// Values below 5 s are clamped to 5 s by NewLock. func WithDefaultTTL(d time.Duration) ProviderOption { return func(o *providerOptions) { o.cfg.DefaultTTL = d diff --git a/provider/etcd/provider.go b/provider/etcd/provider.go index 10f4b62..d132795 100644 --- a/provider/etcd/provider.go +++ b/provider/etcd/provider.go @@ -31,6 +31,9 @@ type Provider struct { // mutex for key. Both are reused across Lock and TryLock calls for the // lifetime of the returned service. // +// The effective TTL is clamped to a minimum of 5 seconds regardless of the +// value passed via WithDefaultTTL. +// // The caller is responsible for creating, configuring, and eventually closing // the etcd client. Close revokes the session lease; it never closes the client. // @@ -67,47 +70,62 @@ func NewLock(client *clientv3.Client, key string, opts ...ProviderOption) (lock. // // The fencing token is the etcd cluster revision at the moment the lock is // acquired, a globally monotonically increasing value across the etcd cluster. -func (p *Provider) Lock(ctx context.Context) (*lock.Session, error) { +func (p *Provider) Lock(ctx context.Context) (*lock.Grant, error) { if err := p.mutex.lock(ctx); err != nil { return nil, fmt.Errorf("etcd provider: lock %q: %w", p.key, err) } - return p.newSession(), nil + return p.newGrant(), nil } // TryLock attempts to acquire the lock without blocking. // Returns lock.ErrLockTaken immediately if the lock is held by another owner. -func (p *Provider) TryLock(ctx context.Context) (*lock.Session, error) { +func (p *Provider) TryLock(ctx context.Context) (*lock.Grant, error) { if err := p.mutex.tryLock(ctx); err != nil { if errors.Is(err, errLocked) { return nil, lock.ErrLockTaken } return nil, fmt.Errorf("etcd provider: trylock %q: %w", p.key, err) } - return p.newSession(), nil + return p.newGrant(), nil } -// unlock releases the lock. The session and its lease remain alive so Lock +// Unlock releases the lock. The session and its lease remain alive so Lock // can be called again without creating a new Provider. -func (p *Provider) unlock(ctx context.Context) error { +func (p *Provider) Unlock(ctx context.Context) error { if err := p.mutex.unlock(ctx); err != nil && !errors.Is(err, errLockReleased) { return fmt.Errorf("etcd provider: unlock %q: %w", p.key, err) } return nil } +// Done returns a channel that is closed when the session lease is lost. +// The channel is created once at NewLock time and never changes. +func (p *Provider) Done() <-chan struct{} { + return p.session.donec +} + +// Err returns lock.ErrLockLost if the session lease has been lost, nil otherwise. +func (p *Provider) Err() error { + select { + case <-p.session.donec: + return lock.ErrLockLost + default: + return nil + } +} + // Close revokes the session lease, releasing any held lock. The underlying // etcd client is not closed; the caller that created it is responsible for that. func (p *Provider) Close() error { return p.session.close() } -// newSession builds a lock.Session from the current mutex state after a -// successful lock acquisition. -func (p *Provider) newSession() *lock.Session { - grant := &lock.Grant{ +// newGrant builds a lock.Grant from the current mutex state after a successful +// lock acquisition. +func (p *Provider) newGrant() *lock.Grant { + return &lock.Grant{ Key: p.key, FencingToken: p.mutex.header().Revision, ExpiresAt: time.Now().Add(time.Duration(p.session.opts.ttl) * time.Second), } - return lock.NewSession(grant, p.session.donec, p.unlock) } diff --git a/provider/etcd/session.go b/provider/etcd/session.go index b121fff..1ac3d87 100644 --- a/provider/etcd/session.go +++ b/provider/etcd/session.go @@ -9,7 +9,7 @@ import ( v3 "go.etcd.io/etcd/client/v3" ) -// session represents a lease kept alive for the lifetime of a client. +// session represents a lease kept alive for the lifetime of a Provider. type session struct { client *v3.Client opts *sessionOptions @@ -81,6 +81,7 @@ func (s *session) close() error { return err } +// sessionOptions holds the resolved configuration for a session. type sessionOptions struct { ttl int ctx context.Context