Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ if `token >= mark` the request is accepted and the mark advances; if

```
disco/
├── lock/ # Distributed lock — Service interface, Grant, Session, errors
├── lock/ # Distributed lock — Service interface, Grant, errors
│ ├── fencing/ # Token type + HTTP/gRPC transport helpers
│ └── guard/ # Server-side validator: high-water mark, HTTP middleware, gRPC interceptors
├── provider/ # Backend implementations (shared across features)
Expand Down Expand Up @@ -95,11 +95,12 @@ Client A reappears as zombie ──► writes to DB with token 34 → REJECTED (

```go
import (
"log"
"net/http"

clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc/metadata"
"github.com/ahrtr/disco/lock"
"github.com/ahrtr/disco/lock/fencing"
etcdprovider "github.com/ahrtr/disco/provider/etcd"
)

Expand All @@ -110,24 +111,25 @@ defer cli.Close()
svc, _ := etcdprovider.NewLock(cli, "/locks/my-resource")
defer svc.Close()

// Blocking acquire.
session, err := svc.Lock(ctx)
if err != nil { ... }
defer session.Unlock(ctx)

// React to involuntary lease loss in the background.
// Done is a property of the service lifetime, not of any individual Lock call.
go func() {
<-session.Done()
log.Printf("lock lost: %v", session.Err())
<-svc.Done()
log.Printf("lock lost: %v", svc.Err())
// stop accessing the resource
}()

// Blocking acquire — returns a Grant with the fencing token and lease metadata.
grant, err := svc.Lock(ctx)
if err != nil { ... }
defer svc.Unlock(ctx)

// Stamp every resource request with the fencing token.
req, _ := http.NewRequest("POST", resourceURL, body)
session.InjectHTTP(req)
fencing.InjectHTTP(req, grant.Token())

// For gRPC:
outCtx := metadata.NewOutgoingContext(ctx, session.GRPCMetadata())
outCtx := metadata.NewOutgoingContext(ctx, fencing.ToGRPCMetadata(grant.Token()))
```

### Resource guard (server side)
Expand Down Expand Up @@ -157,7 +159,7 @@ if err := g.Check(incomingToken); err != nil {
| Decision | Rationale |
|-----------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Cluster revision as fencing token | etcd cluster revision is globally ordered and increases on every write; the revision recorded when the lock is acquired is always strictly higher than any previous acquisition |
| Provider manages keepalive | The session's keepalive goroutine runs internally; callers watch `Session.Done()` instead of calling `Renew()` |
| Provider manages keepalive | The session's keepalive goroutine runs internally; callers watch `svc.Done()` instead of calling `Renew()` |
| `Guard` high-water mark | Atomic CAS loop with no locks; accepts `token >= mark`, rejects `token < mark` |
| Caller-owned etcd client | The caller creates, configures, and closes the etcd client; the provider never closes it |
| No `init()` auto-registration | Providers are constructed explicitly; no hidden init-time side effects |
Expand Down
16 changes: 9 additions & 7 deletions examples/db/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,31 +121,31 @@ func main() {

// ── Step 1: Client A acquires the lock ────────────────────────────────────
log.Println("Client A: acquiring lock …")
sessionA, err := providerA.Lock(ctx)
grantA, err := providerA.Lock(ctx)
if err != nil {
log.Fatalf("client A lock: %v", err)
}
log.Printf("Client A: lock acquired fencing_token=%d TTL=5s", sessionA.FencingToken())
log.Printf("Client A: lock acquired fencing_token=%d TTL=5s", grantA.FencingToken)

// ── Step 2: Client B waits for the lock in a separate goroutine ───────────
bWritten := make(chan struct{})
go func() {
log.Println("Client B: waiting for the lock …")
sessionB, err := providerB.Lock(ctx)
grantB, err := providerB.Lock(ctx)
if err != nil {
log.Fatalf("client B lock: %v", err)
}
log.Printf("Client B: lock acquired fencing_token=%d", sessionB.FencingToken())
log.Printf("Client B: lock acquired fencing_token=%d", grantB.FencingToken)

if err := db.Write(sessionB.FencingToken(), "hello from client B"); err != nil {
if err := db.Write(grantB.Token(), "hello from client B"); err != nil {
log.Printf("Client B: write rejected: %v", err)
} else {
data, tok := db.State()
log.Printf("Client B: write accepted — data=%q db_token=%d", data, tok)
}
close(bWritten)

if err := sessionB.Unlock(ctx); err != nil {
if err := providerB.Unlock(ctx); err != nil {
log.Printf("client B unlock: %v", err)
} else {
log.Println("Client B: lock released")
Expand All @@ -159,13 +159,15 @@ func main() {
cancelA()

// ── Step 4: Wait for Client B to write ────────────────────────────────────
// Blocks until B has acquired the lock (after A's lease expires ~5s from
// now) and written to the DB, advancing the stored fencing token to T'.
<-bWritten

// ── Step 5: Client A wakes up and tries to write with its stale token ─────
// The DB rejects it because A's token is lower than the stored high-water
// mark — no in-memory guard or middleware needed.
log.Println("Client A: woke up — attempting write with stale token …")
if err := db.Write(sessionA.FencingToken(), "hello from client A"); err != nil {
if err := db.Write(grantA.Token(), "hello from client A"); err != nil {
if errors.Is(err, fencing.ErrTokenStale) {
log.Printf("Client A: write rejected by DB — %v", err)
} else {
Expand Down
22 changes: 11 additions & 11 deletions examples/grpc/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/ahrtr/disco/examples/grpc/pb"
"github.com/ahrtr/disco/lock"
"github.com/ahrtr/disco/lock/fencing"
etcdprovider "github.com/ahrtr/disco/provider/etcd"
)

Expand Down Expand Up @@ -85,27 +86,27 @@ func main() {

// ── Step 1: Client A acquires the lock ────────────────────────────────────
log.Println("Client A: acquiring lock …")
sessionA, err := providerA.Lock(ctx)
grantA, err := providerA.Lock(ctx)
if err != nil {
log.Fatalf("client A lock: %v", err)
}
log.Printf("Client A: lock acquired fencing_token=%d TTL=5s", sessionA.FencingToken())
log.Printf("Client A: lock acquired fencing_token=%d TTL=5s", grantA.FencingToken)

// ── Step 2: Client B waits for the lock in a goroutine ───────────────────
bWritten := make(chan struct{})
go func() {
log.Println("Client B: waiting for the lock …")
sessionB, err := providerB.Lock(ctx)
grantB, err := providerB.Lock(ctx)
if err != nil {
log.Fatalf("client B lock: %v", err)
}
log.Printf("Client B: lock acquired fencing_token=%d", sessionB.FencingToken())
log.Printf("Client B: lock acquired fencing_token=%d", grantB.FencingToken)

log.Println("Client B: calling Resource/Write …")
doWrite(ctx, "Client B", rc, sessionB)
doWrite(ctx, "Client B", rc, grantB)
close(bWritten)

if err := sessionB.Unlock(ctx); err != nil {
if err := providerB.Unlock(ctx); err != nil {
log.Printf("client B unlock: %v", err)
} else {
log.Println("Client B: lock released")
Expand All @@ -127,14 +128,13 @@ func main() {
// A's grant still holds the old token in memory, but the resource server's
// high-water mark is now higher. The server rejects the RPC.
log.Println("Client A: woke up — calling Resource/Write with stale token …")
doWrite(ctx, "Client A", rc, sessionA)
doWrite(ctx, "Client A", rc, grantA)
}

// doWrite calls pb.Resource/Write, attaching the session's fencing token as
// doWrite calls pb.Resource/Write, attaching the grant's fencing token as
// gRPC metadata so the server's guard interceptor can validate it.
func doWrite(ctx context.Context, name string, rc pb.ResourceClient, session *lock.Session) {
// session.GRPCMetadata() returns metadata containing the x-fencing-token key.
outCtx := metadata.NewOutgoingContext(ctx, session.GRPCMetadata())
func doWrite(ctx context.Context, name string, rc pb.ResourceClient, grant *lock.Grant) {
outCtx := metadata.NewOutgoingContext(ctx, fencing.ToGRPCMetadata(grant.Token()))

resp, err := rc.Write(outCtx, &pb.WriteRequest{
Data: fmt.Sprintf("hello from %s", name),
Expand Down
2 changes: 1 addition & 1 deletion examples/grpc/resource/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,6 @@ func main() {
}

log.Println("grpc/resource listening on :50051")
log.Println(" pb.Resource/Write — requires x-fencing-token metadata")
log.Println(" pb.Resource/Write — requires x-fencing-token metadata")
log.Fatal(srv.Serve(lis))
}
24 changes: 13 additions & 11 deletions examples/http/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import (
"io"
"log"
"net/http"
"strings"
"time"

clientv3 "go.etcd.io/etcd/client/v3"

"github.com/ahrtr/disco/lock"
"github.com/ahrtr/disco/lock/fencing"
etcdprovider "github.com/ahrtr/disco/provider/etcd"
)

Expand Down Expand Up @@ -72,27 +74,27 @@ func main() {

// ── Step 1: Client A acquires the lock ────────────────────────────────────
log.Println("Client A: acquiring lock …")
sessionA, err := providerA.Lock(ctx)
grantA, err := providerA.Lock(ctx)
if err != nil {
log.Fatalf("client A lock: %v", err)
}
log.Printf("Client A: lock acquired fencing_token=%d TTL=5s", sessionA.FencingToken())
log.Printf("Client A: lock acquired fencing_token=%d TTL=5s", grantA.FencingToken)

// ── Step 2: Client B waits for the lock in a separate goroutine ───────────
bWritten := make(chan struct{})
go func() {
log.Println("Client B: waiting for the lock …")
sessionB, err := providerB.Lock(ctx)
grantB, err := providerB.Lock(ctx)
if err != nil {
log.Fatalf("client B lock: %v", err)
}
log.Printf("Client B: lock acquired fencing_token=%d", sessionB.FencingToken())
log.Printf("Client B: lock acquired fencing_token=%d", grantB.FencingToken)

log.Println("Client B: writing to resource …")
doWrite("Client B", sessionB)
doWrite("Client B", grantB)
close(bWritten)

if err := sessionB.Unlock(ctx); err != nil {
if err := providerB.Unlock(ctx); err != nil {
log.Printf("client B unlock: %v", err)
} else {
log.Println("Client B: lock released")
Expand All @@ -115,23 +117,23 @@ func main() {
// A's grant still holds the old fencing token T in memory, but the resource
// server's high-water mark is now T'. Since T < T', the write is rejected.
log.Println("Client A: woke up — attempting write with stale token …")
doWrite("Client A", sessionA)
doWrite("Client A", grantA)
}

// doWrite sends a POST /write request to the resource server with the
// session's fencing token attached via the X-Fencing-Token header.
func doWrite(name string, session *lock.Session) {
// grant's fencing token attached via the X-Fencing-Token header.
func doWrite(name string, grant *lock.Grant) {
req, err := http.NewRequest(http.MethodPost, resourceWriteURL, nil)
if err != nil {
log.Fatalf("build request: %v", err)
}
session.InjectHTTP(req)
fencing.InjectHTTP(req, grant.Token())

resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatalf("%s: do request: %v", name, err)
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
log.Printf("%s: %s — %s", name, resp.Status, body)
log.Printf("%s: %s — %s", name, resp.Status, strings.TrimSpace(string(body)))
}
2 changes: 1 addition & 1 deletion lock/doc.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Package lock defines the core abstractions for the disco distributed
// coordination system: the Service interface, Grant, and Session types.
// coordination system: the Service interface and the Grant type.
//
// Concrete backend implementations live under provider/.
package lock
4 changes: 2 additions & 2 deletions lock/fencing/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import "errors"
// ErrNoToken is returned when a request carries no fencing token.
var ErrNoToken = errors.New("fencing: no token present in request")

// ErrTokenStale is returned by a Guard when the incoming token is lower than
// the highest token the resource has already accepted.
// ErrTokenStale is returned when an incoming token is lower than the highest
// token the resource has already accepted.
var ErrTokenStale = errors.New("fencing: token is stale")

// Token is a monotonically increasing integer that identifies a lock
Expand Down
22 changes: 19 additions & 3 deletions lock/grant.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
package lock

import "time"
import (
"time"

// Grant represents a successfully acquired lock.
"github.com/ahrtr/disco/lock/fencing"
)

// Grant represents the metadata for a successfully acquired lock.
//
// The fencing token carried by a Grant must be attached to every request sent
// to a guarded resource (database write, external API call, etc.) so that the
// resource can reject requests from stale (zombie) clients.
// Use grant.Token() to obtain a fencing.Token and pass it to the helpers in
// the fencing package (fencing.InjectHTTP, fencing.ToGRPCMetadata, etc.).
type Grant struct {
// Key is the distributed lock key that was acquired.
Key string

// FencingToken is a monotonically increasing integer assigned at lock
// grant time. It must be attached to every resource request so the
// acquisition time. It must be attached to every resource request so the
// resource can reject requests from stale (lower-token) owners.
FencingToken int64

Expand All @@ -17,3 +27,9 @@ type Grant struct {
// backend.
ExpiresAt time.Time
}

// Token returns the fencing token as a fencing.Token, ready to pass to
// fencing.InjectHTTP, fencing.ToGRPCMetadata, or a resource's Check method.
func (g *Grant) Token() fencing.Token {
return fencing.Token(g.FencingToken)
}
6 changes: 3 additions & 3 deletions lock/guard/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
// UnaryInterceptor returns a gRPC unary server interceptor that validates the
// fencing token carried in incoming metadata before calling the handler.
//
// Requests missing the fencing-token metadata key are rejected with
// codes.InvalidArgument. Requests carrying a stale token are rejected with
// codes.Aborted.
// Requests with a missing or malformed fencing-token metadata key are rejected
// with codes.InvalidArgument. Requests carrying a stale token are rejected
// with codes.Aborted.
//
// Register it when creating the server:
//
Expand Down
5 changes: 3 additions & 2 deletions lock/guard/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (
// HTTPMiddleware is an HTTP middleware that extracts the fencing token from
// every incoming request and validates it against the Guard before calling next.
//
// Requests missing the X-Fencing-Token header are rejected with 400 Bad
// Request. Requests carrying a stale token are rejected with 409 Conflict.
// Requests with a missing or malformed X-Fencing-Token header are rejected
// with 400 Bad Request. Requests carrying a stale token are rejected with
// 409 Conflict.
//
// Use it directly as a handler wrapper:
//
Expand Down
37 changes: 30 additions & 7 deletions lock/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,41 @@ import "context"

// Service is the single abstraction over all distributed lock backends.
//
// Each Service instance is bound to a single lock key, established at
// construction time. The lease and its keepalive are managed internally;
// callers do not need to renew it. Use the Session returned by Lock or
// TryLock to monitor for involuntary lease loss via Session.Done.
// A Service instance is bound to a single lock key, established at construction
// time (e.g. etcd.NewLock). The underlying lease and its keepalive are managed
// internally; callers do not need to renew it.
//
// The Done channel and Err reflect the health of the lease — they are
// properties of the Service lifetime, not of any individual Lock call.
// Monitor Done in a background goroutine to detect involuntary lease loss:
//
// go func() {
// <-svc.Done()
// log.Println("lease lost — stop accessing guarded resources")
// }()
type Service interface {
// Lock acquires the distributed lock, blocking until it is available or
// ctx is canceled.
Lock(ctx context.Context) (*Session, error)
// ctx is canceled. Returns a Grant carrying the fencing token and lease
// metadata for this acquisition.
Lock(ctx context.Context) (*Grant, error)

// TryLock attempts to acquire the lock without blocking.
// Returns ErrLockTaken immediately if the lock is held by another owner.
TryLock(ctx context.Context) (*Session, error)
TryLock(ctx context.Context) (*Grant, error)

// Unlock explicitly releases the lock. The underlying lease remains alive
// so Lock can be called again without creating a new Service.
Unlock(ctx context.Context) error

// Done returns a channel that is closed when the underlying lease is lost
// (expired or revoked). Once closed, callers must immediately stop
// accessing guarded resources and must not call Lock again.
// Call Close to release backend resources.
Done() <-chan struct{}

// Err returns ErrLockLost if the lease has been lost, nil otherwise.
// Safe to call concurrently at any point in the Service lifetime.
Err() error

// Close revokes the lease and releases all backend resources.
Close() error
Expand Down
Loading