Skip to content

Commit f911ad5

Browse files
committed
Implement .LockForUpdate() using PostgreSQL's FOR UPDATE SKIP LOCKED pattern
1 parent e883b49 commit f911ad5

1 file changed

Lines changed: 38 additions & 0 deletions

File tree

table.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,3 +172,41 @@ func (t *Table[T, TP, ID]) WithTx(tx pgx.Tx) *Table[T, TP, ID] {
172172
Name: t.Name,
173173
}
174174
}
175+
176+
// LockForUpdate locks 0..limit records using PostgreSQL's FOR UPDATE SKIP LOCKED pattern
177+
// for safe concurrent processing where each record is processed exactly once.
178+
// Complete updateFn() quickly to avoid holding the transaction. For long-running work:
179+
// update status to "processing" and return early, then process asynchronously.
180+
func (t *Table[T, TP, ID]) LockForUpdate(ctx context.Context, cond sq.Sqlizer, orderBy []string, limit uint64, updateFn func([]TP)) error {
181+
return pgx.BeginFunc(ctx, t.DB.Conn, func(pgTx pgx.Tx) error {
182+
if len(orderBy) == 0 {
183+
orderBy = []string{t.IDColumn}
184+
}
185+
186+
tx := t.WithTx(pgTx)
187+
188+
q := tx.SQL.
189+
Select("*").
190+
From(t.Name).
191+
Where(cond).
192+
OrderBy(orderBy...).
193+
Limit(limit).
194+
Suffix("FOR UPDATE SKIP LOCKED")
195+
196+
var records []TP
197+
if err := tx.Query.GetAll(ctx, q, &records); err != nil {
198+
return fmt.Errorf("select for update skip locked: %w", err)
199+
}
200+
201+
updateFn(records)
202+
203+
for _, record := range records {
204+
q := tx.SQL.UpdateRecord(record, sq.Eq{t.IDColumn: record.GetID()}, t.Name)
205+
if _, err := tx.Query.Exec(ctx, q); err != nil {
206+
return fmt.Errorf("update record: %w", err)
207+
}
208+
}
209+
210+
return nil
211+
})
212+
}

0 commit comments

Comments
 (0)