This guide walks through implementing a new database platform for db-toolkit. The PostgreSQL implementation (platform/postgres/) is the reference — every code example here comes directly from it.
Reader BatchStream Writer
┌────────────┐ ┌──────────────┐ ┌────────────┐
│ DB → Arrow │──────▶│ Schema() │──────▶│ Arrow → DB │
│ Schema() │ │ Next(ctx) │ │ │
│ Read(ctx) │ │ Close() │ │ WriteStream│
└────────────┘ └──────────────┘ └────────────┘
A Reader connects to a database, derives an Arrow schema eagerly (at construction time), and produces a BatchStream lazily (at Read() time). A Writer consumes a BatchStream and inserts records into a target table. The dio.Copy helper wires the two together.
Platforms self-register via Go's init() mechanism:
// platform/postgres/postgres.go
func init() {
platform.Register("postgres", &Platform{})
}Consumers activate a platform with a blank import:
import _ "github.com/rnestertsov/db-toolkit/platform/postgres"Then retrieve it at runtime:
pg, err := platform.Get("postgres")The registry is thread-safe (sync.RWMutex). Duplicate registrations or nil platforms cause a panic at startup — fail fast, fail loudly.
Schema derivation happens at connection time (e.g., via PREPARE in Postgres) so callers know the Arrow schema before any data flows. The actual query execution is deferred to Read(), which enables filter pushdown: filters arrive as ReadOptions and get folded into the SQL before the query runs.
Create platform/<name>/ with the following files. The postgres package is the template:
| File | Role |
|---|---|
<name>.go |
Platform struct, init() registration, OpenConnection |
<name>_connection.go |
Connection struct, Query, QueryRow, Close |
config.go |
ConnectionConfig struct, ConnectionString() |
reader.go |
QueryReader, queryStream, type mapping (oidToArrowType equivalent), appendValue |
writer.go |
Writer, extractValue, bulk insert adapter |
filter.go |
Filter-to-SQL conversion (optional, but strongly recommended) |
<name>_test.go |
Reader integration tests |
<name>_test_util.go |
StartTestDB, container helpers |
config_test.go |
Config unit tests |
filter_test.go |
Filter unit + integration tests |
writer_test.go |
Writer integration tests |
// platform/platform.go
type Platform interface {
OpenConnection(ctx context.Context, config ConnectionConfig) (Connection, error)
}Your implementation opens a database connection and returns a Connection. Type-assert the generic ConnectionConfig to your concrete type:
// platform/postgres/postgres.go
type Platform struct{}
var _ platform.Platform = (*Platform)(nil)
func (p *Platform) OpenConnection(ctx context.Context, cfg platform.ConnectionConfig) (platform.Connection, error) {
pgCfg, ok := cfg.(ConnectionConfig)
if !ok {
panic("invalid config type; expected postgres.ConnectionConfig")
}
// ... open connection, register custom types, return &Connection{...}
}// platform/connection.go
type ConnectionConfig interface {
ConnectionString() string
}Build a struct with all connection parameters. ConnectionString() assembles them into the driver's expected format:
// platform/postgres/config.go
type ConnectionConfig struct {
Name string
User string
Host string
Port string
SSLMode string
// ... other fields
}
func (c ConnectionConfig) ConnectionString() string {
// Build a postgres:// URL using net/url
}// platform/connection.go
type Connection interface {
Query(ctx context.Context, query string) (dio.Reader, error)
QueryRow(ctx context.Context, query string) Row
Close(ctx context.Context) error
}
type Row interface {
Scan(dest ...any) error
}Query is the primary method — it prepares the query, derives the Arrow schema from result metadata, and returns a Reader. The postgres implementation uses pgx.Conn.Prepare() to get column metadata without executing the query:
// platform/postgres/postgres_connection.go
func (c *Connection) Query(ctx context.Context, query string) (dio.Reader, error) {
sd, err := c.conn.Prepare(ctx, "", query)
if err != nil {
return nil, err
}
schema := schemaFromFieldDescriptions(sd.Fields)
return &QueryReader{
conn: c.conn,
baseQuery: query,
schema: schema,
batchSize: defaultBatchSize,
}, nil
}Expose the underlying driver connection for platform-specific operations (e.g., creating tables, bulk insert):
func (c *Connection) Conn() *pgx.Conn {
return c.conn
}// dio/dio.go
type Reader interface {
Schema() *arrow.Schema
Read(ctx context.Context, opts ...ReadOption) (BatchStream, error)
}Key semantics:
Schema()returns the schema derived at construction time (eager).Read()executes the query and returns aBatchStream. Read-once: the second call returnsdio.NewEmptyStream(schema).ReadOptions carry projection and filter config.
// platform/postgres/reader.go
func (q *QueryReader) Read(ctx context.Context, opts ...dio.ReadOption) (dio.BatchStream, error) {
if q.read {
return dio.NewEmptyStream(q.schema), nil
}
q.read = true
cfg := dio.ApplyOptions(opts)
query := q.baseQuery
var args []any
if len(cfg.Filters) > 0 {
whereClause, filterArgs, err := filtersToSQL(q.schema, cfg.Filters, 1)
if err != nil {
return nil, fmt.Errorf("build filter SQL: %w", err)
}
query = fmt.Sprintf("SELECT * FROM (%s) AS _sub WHERE %s", q.baseQuery, whereClause)
args = filterArgs
}
rows, err := q.conn.Query(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("execute query: %w", err)
}
return &queryStream{rows: rows, schema: q.schema, batchSize: q.batchSize}, nil
}// dio/dio.go
type BatchStream interface {
Schema() *arrow.Schema
Next(ctx context.Context) (arrow.RecordBatch, error)
Close() error
}Next() returns the next batch of rows as an Arrow record. Return io.EOF when exhausted. The caller is responsible for calling Release() on returned records.
// dio/dio.go
type Writer interface {
WriteStream(ctx context.Context, s BatchStream) error
Close() error
}The postgres writer uses pgx's CopyFrom for bulk inserts, adapting the Arrow stream into a pgx.CopyFromSource:
// platform/postgres/writer.go
func (w *Writer) WriteStream(ctx context.Context, s dio.BatchStream) error {
if w.closed {
return fmt.Errorf("writer is closed")
}
columnNames := w.opts.ColumnNames
if len(columnNames) == 0 {
for _, f := range s.Schema().Fields() {
columnNames = append(columnNames, f.Name)
}
}
src := &arrowCopySource{ctx: ctx, stream: s, schema: s.Schema()}
_, err := w.conn.Conn().CopyFrom(ctx, pgx.Identifier{w.opts.TableName}, columnNames, src)
return err
}// dio/dio.go
type ReaderFrom interface {
ReadFrom(ctx context.Context, r Reader) error
}dio.Copy() checks whether the Writer implements ReaderFrom before falling back to the generic stream path. If your platform can optimize the read-write loop (e.g., using a database-native transfer), implement this interface on your Writer.
Every concrete type should have a compile-time check. This catches interface drift at build time, not runtime:
var _ platform.Platform = (*Platform)(nil)
var _ platform.Connection = (*Connection)(nil)
var _ dio.Reader = (*QueryReader)(nil)
var _ dio.FilterClassifier = (*QueryReader)(nil)
var _ dio.BatchStream = (*queryStream)(nil)
var _ dio.Writer = (*Writer)(nil)Each database has its own type system. You need a mapping function that converts database column types to Arrow types.
In postgres, oidToArrowType(oid uint32, typmod int32) arrow.DataType maps OIDs to Arrow types using the type modifier for precision/scale:
func oidToArrowType(oid uint32, typmod int32) arrow.DataType {
switch oid {
case pgtype.Int2OID:
return arrow.PrimitiveTypes.Int16
case pgtype.Int4OID:
return arrow.PrimitiveTypes.Int32
case pgtype.Int8OID:
return arrow.PrimitiveTypes.Int64
case pgtype.Float4OID:
return arrow.PrimitiveTypes.Float32
case pgtype.Float8OID:
return arrow.PrimitiveTypes.Float64
case pgtype.TextOID, pgtype.VarcharOID, pgtype.BPCharOID:
return arrow.BinaryTypes.String
case pgtype.BoolOID:
return &arrow.BooleanType{}
case pgtype.ByteaOID:
return arrow.BinaryTypes.Binary
case pgtype.DateOID:
return arrow.PrimitiveTypes.Date32
case pgtype.TimestampOID:
return &arrow.TimestampType{Unit: timestampUnit(typmod)}
case pgtype.TimestamptzOID:
return &arrow.TimestampType{Unit: timestampUnit(typmod), TimeZone: "UTC"}
case pgtype.NumericOID:
return numericArrowType(typmod)
// ... more mappings
default:
return arrow.BinaryTypes.String
}
}| Concern | Guidance |
|---|---|
| Precision/scale | If the DB type carries precision info (e.g., NUMERIC(10,2)), use it to pick Decimal128 vs String. Postgres falls back to String for unbounded NUMERIC or precision > 38. |
| Timestamps | Preserve the time unit (second/millisecond/microsecond/nanosecond) from the DB type modifier. |
| Unknown types | Fall back to arrow.BinaryTypes.String. Always have a default case — never panic on an unrecognized type. |
| Interval/UUID/JSON | Types without a clean Arrow equivalent map to String. The text representation is portable. |
When scanning rows from the database, you need to convert Go values (from the driver's scan) into Arrow builder appends.
func appendValue(b array.Builder, dt arrow.DataType, v any) error {
if v == nil {
b.AppendNull()
return nil
}
switch dt.ID() {
case arrow.INT16:
b.(*array.Int16Builder).Append(v.(int16))
case arrow.INT32:
b.(*array.Int32Builder).Append(v.(int32))
case arrow.STRING:
if err := appendStringValue(b.(*array.StringBuilder), v); err != nil {
return err
}
case arrow.DECIMAL128:
if err := appendDecimal128Value(b.(*array.Decimal128Builder), dt.(*arrow.Decimal128Type), v); err != nil {
return err
}
// ... all mapped types
default:
return fmt.Errorf("unsupported arrow type: %s", dt)
}
return nil
}Key considerations:
- Null handling: Always check
nilfirst and callAppendNull(). - Type assertion safety: The driver determines what Go type you get. Be precise about what your driver returns for each column type.
- Edge cases: Handle database-specific sentinels (e.g., postgres date infinity maps to
math.MaxInt32/math.MinInt32).
When writing Arrow data to the database, extract Go values that the database driver accepts.
func extractValue(col arrow.Array, row int) (any, error) {
if col.IsNull(row) {
return nil, nil
}
switch c := col.(type) {
case *array.Int16:
return c.Value(row), nil
case *array.Int32:
return c.Value(row), nil
case *array.String:
return c.Value(row), nil
case *array.Date32:
days := c.Value(row)
return time.Unix(int64(days)*86400, 0).UTC(), nil
case *array.Decimal128:
dt := c.DataType().(*arrow.Decimal128Type)
val := c.Value(row)
return pgtype.Numeric{Int: val.BigInt(), Exp: -dt.Scale, Valid: true}, nil
// ... all supported types
default:
return nil, fmt.Errorf("unsupported array type: %T", col)
}
}The writer should handle Arrow types that don't have a direct DB equivalent but can be coerced:
| Arrow Type | Postgres Coercion |
|---|---|
Int8 |
int16 (widen) |
Uint8 |
int16 (widen) |
Uint16 |
int32 (widen) |
Uint64 |
pgtype.Numeric (arbitrary precision) |
Float16 |
float32 (widen) |
LargeString |
string (pass-through) |
LargeBinary |
[]byte (pass-through) |
FixedSizeBinary |
[]byte (pass-through) |
Date64 |
time.Time via ToTime() |
Time32 |
pgtype.Time (unit conversion to microseconds) |
Duration |
pgtype.Interval (unit conversion to microseconds) |
Decimal256 |
pgtype.Numeric via BigInt() |
Null |
nil |
Dictionary |
Recursive resolve via GetValueIndex + extractValue(Dictionary(), idx) |
For types that cannot be coerced (e.g., List, Struct, Map), return a clear error including the column name and index:
default:
return nil, fmt.Errorf("unsupported array type: %T", col)A few Arrow Go API traps that will save you time:
array.Null.IsNull(i)returnsfalse(no validity bitmap). Handle*array.Nullexplicitly before the general null check.arrow.PrimitiveTypesincludesDate32,Date64but notFloat16. Use&arrow.Float16Type{}.- There is no
array.NewLargeBinaryBuilder. Usearray.NewBinaryBuilder(alloc, arrow.BinaryTypes.LargeBinary)then call.NewLargeBinaryArray(). array.NewLargeStringBuilder(alloc)exists directly.- Dictionary arrays: use
c.GetValueIndex(row)to get the physical index, then recursively extract fromc.Dictionary().
If your database supports SQL WHERE clauses (most do), implementing filter pushdown lets the database do the filtering instead of the client.
Your reader should implement this optional interface:
// dio/dio.go
type FilterClassifier interface {
ClassifyFilters(filters []Filter) []FilterSupport
}Return a FilterSupport for each filter:
| Value | Meaning |
|---|---|
FilterExact |
The DB guarantees all returned rows satisfy this filter. Caller can skip post-filtering. |
FilterInexact |
The DB uses this as a hint but doesn't guarantee correctness (e.g., Parquet row-group pruning). |
FilterUnsupported |
The DB cannot handle this filter. |
For SQL databases, all standard comparison filters are typically FilterExact:
func (q *QueryReader) ClassifyFilters(filters []dio.Filter) []FilterSupport {
result := make([]FilterSupport, len(filters))
for i, f := range filters {
result[i] = classifyFilter(f)
}
return result
}
func classifyFilter(f dio.Filter) FilterSupport {
switch v := f.(type) {
case dio.Eq, dio.Neq, dio.Gt, dio.GtEq, dio.Lt, dio.LtEq,
dio.IsNull, dio.IsNotNull, dio.In:
return FilterExact
case dio.And:
for _, child := range v.Filters {
if classifyFilter(child) == FilterUnsupported {
return FilterUnsupported
}
}
return FilterExact
case dio.Or:
// Same recursive check
case dio.Not:
return classifyFilter(v.Child)
default:
return FilterUnsupported
}
}Convert the dio.Filter tree into parameterized SQL. Use positional parameters ($1, $2, ...) or the equivalent for your driver.
func filterToSQL(schema *arrow.Schema, f dio.Filter, argOffset int) (string, []any, int, error) {
switch v := f.(type) {
case dio.Eq:
return comparisonSQL(schema, v.Column, "=", v.Value, argOffset)
case dio.In:
return inSQL(schema, v.Column, v.Values, argOffset)
case dio.IsNull:
col, err := columnName(schema, v.Column)
if err != nil {
return "", nil, 0, err
}
return col + " IS NULL", nil, argOffset, nil
case dio.And:
return logicalSQL(schema, v.Filters, "AND", argOffset)
// ... etc
}
}The filter clause wraps the base query as a subquery:
query = fmt.Sprintf("SELECT * FROM (%s) AS _sub WHERE %s", q.baseQuery, whereClause)Always use parameterized queries. Never interpolate filter values into the SQL string.
Every platform implementation MUST have all of the following test categories. No exceptions.
Set up testcontainers for your database. Follow the StartTestDB pattern:
// platform/<name>/<name>_test_util.go
type <Name>Container struct {
instance testcontainers.Container
}
func New<Name>Container(t *testing.T) *<Name>Container {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
req := testcontainers.ContainerRequest{
Image: "postgres:14.5-alpine", // Use your DB's image
ExposedPorts: []string{"5432/tcp"}, // Your DB's port
AutoRemove: true,
Env: map[string]string{
"POSTGRES_USER": "postgres",
"POSTGRES_PASSWORD": "postgres",
"POSTGRES_DB": "postgres",
},
WaitingFor: wait.ForListeningPort("5432/tcp"),
}
container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
})
if err != nil {
t.Fatal(err)
}
return &<Name>Container{instance: container}
}
func (db *<Name>Container) ConnectionConfig(t *testing.T) ConnectionConfig {
// Return a ConnectionConfig pointing at the container
}
func (db *<Name>Container) Close(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
err := db.instance.Terminate(ctx)
if err != nil {
t.Fatal(err)
}
}
func StartTestDB(t *testing.T) ConnectionConfig {
t.Helper()
container := New<Name>Container(t)
t.Cleanup(func() {
container.Close(t)
})
return container.ConnectionConfig(t)
}Usage in tests:
func TestMyPlatform_SomeFeature(t *testing.T) {
config := myplatform.StartTestDB(t)
ctx := context.Background()
p := &myplatform.Platform{}
conn, err := p.OpenConnection(ctx, config)
require.NoError(t, err)
defer conn.Close(ctx)
// ... test logic
}Test that ConnectionConfig.ConnectionString() produces correct connection strings for various input combinations (defaults, overrides, optional fields):
func TestConfig_ConnectionString(t *testing.T) {
t.Run("defaults", func(t *testing.T) { /* ... */ })
t.Run("all_fields", func(t *testing.T) { /* ... */ })
t.Run("password_escaping", func(t *testing.T) { /* ... */ })
}White-box test of filter-to-SQL generation. Table-driven, one subtest per filter type:
func TestFilterToSQL(t *testing.T) {
t.Run("Eq", func(t *testing.T) { /* ... */ })
t.Run("In", func(t *testing.T) { /* ... */ })
t.Run("And_nested", func(t *testing.T) { /* ... */ })
// ... etc
}All of these run against a real database via testcontainers.
One subtest per supported database type. Each subtest:
- Creates a table with one column of the target type
- Inserts a known value via SQL
- Reads via
QueryReader - Asserts the Arrow schema field type matches expectations
- Asserts the read value matches the inserted value
func TestPostgres_DataTypeConversion(t *testing.T) {
config := postgres.StartTestDB(t)
// ... setup connection
tcases := []struct {
name string
query string
fields []arrow.Field
check func(t *testing.T, rec arrow.Record)
}{
{name: "int2", query: "SELECT 1::int2 AS v", /* ... */},
{name: "int4", query: "SELECT 42::int4 AS v", /* ... */},
// ... one per type
}
for _, tc := range tcases {
t.Run(tc.name, func(t *testing.T) {
reader, err := conn.Query(ctx, tc.query)
// ... assert schema, read stream, check values
})
}
}Verify that multi-column queries produce the correct schema and values.
Verify batching behavior across multiple rows.
Verify that a query returning zero rows produces a stream where Next() immediately returns io.EOF.
Verify NULL values in every supported type. The Arrow builder must produce a null at the correct position.
Verify that calling Read() a second time returns an empty stream (via dio.NewEmptyStream).
- Create a table
- Generate test data using
dio/gen - Write via the platform's
Writer - Read back via
QueryReader - Assert row count matches
func TestPostgres_Writer_RoundTrip(t *testing.T) {
// ... setup
r := gen.NewReader(gen.TableSpec{
Columns: []gen.ColumnSpec{
{Name: "id", Type: arrow.PrimitiveTypes.Int64, Distribution: &gen.Sequential{Start: 1, Step: 1}},
{Name: "name", Type: arrow.BinaryTypes.String, Distribution: &gen.StringPool{Cardinality: 10, MinLen: 3, MaxLen: 8}},
},
RowCount: 100,
BatchSize: 25,
})
// ... write, read back, assert
}Write dio.NewEmptyStream(schema) — must succeed without error.
Call Close() twice. The second call must not panic or error.
Write and read back every supported Arrow type with known values. Manually construct Arrow arrays using builders for precise control over test data.
Test Arrow types that don't directly map to a DB type but are coercible (e.g., Int8 -> int16, Float16 -> float32). Use subtests — one per coercible type:
func TestPostgres_Writer_CoercibleTypes(t *testing.T) {
// ... setup
t.Run("int8_to_smallint", func(t *testing.T) { /* ... */ })
t.Run("uint8_to_smallint", func(t *testing.T) { /* ... */ })
t.Run("float16_to_real", func(t *testing.T) { /* ... */ })
// ...
}Verify that complex Arrow types (List, Struct, Map) produce clear error messages including column context.
Test each filter type (Eq, Neq, Gt, GtEq, Lt, LtEq, IsNull, IsNotNull, In, And, Or, Not) against real data. Insert known rows, apply a filter, verify the result set.
Verify that ClassifyFilters returns FilterExact for supported filters and FilterUnsupported for unknown ones.
A filtered Read() still obeys read-once semantics — second call returns an empty stream.
Verify exact values in filtered results, not just row counts.
| Test Function | Category | Requires Container |
|---|---|---|
TestConfig_ConnectionString |
Unit | No |
TestFilterToSQL |
Unit | No |
TestFiltersToSQL |
Unit | No |
Test<Platform>_DataTypeConversion |
Integration — Reader | Yes |
Test<Platform>_MultipleColumns |
Integration — Reader | Yes |
Test<Platform>_MultipleRows |
Integration — Reader | Yes |
Test<Platform>_EmptyResult |
Integration — Reader | Yes |
Test<Platform>_NullHandling |
Integration — Reader | Yes |
Test<Platform>_ReadOnceSemantics |
Integration — Reader | Yes |
Test<Platform>_Writer_RoundTrip |
Integration — Writer | Yes |
Test<Platform>_Writer_EmptyStream |
Integration — Writer | Yes |
Test<Platform>_Writer_CloseIdempotent |
Integration — Writer | Yes |
Test<Platform>_Writer_AllTypesRoundTrip |
Integration — Writer | Yes |
Test<Platform>_Writer_CoercibleTypes |
Integration — Writer | Yes |
Test<Platform>_Writer_UnsupportedTypes |
Integration — Writer | Yes |
TestQueryReader_FilterPushdown |
Integration — Filter | Yes |
TestQueryReader_FilterClassifier |
Integration — Filter | Yes |
TestQueryReader_FilterPreservesReadOnce |
Integration — Filter | Yes |
TestQueryReader_FilterVerifiesValues |
Integration — Filter | Yes |
Copy this into your PR description:
### Platform Implementation Checklist
**Package structure:**
- [ ] `platform/<name>/<name>.go` — Platform struct + `init()` registration
- [ ] `platform/<name>/<name>_connection.go` — Connection struct
- [ ] `platform/<name>/config.go` — ConnectionConfig
- [ ] `platform/<name>/reader.go` — QueryReader + queryStream + type mapping
- [ ] `platform/<name>/writer.go` — Writer + extractValue
- [ ] `platform/<name>/filter.go` — Filter pushdown (or documented reason for omission)
**Compile-time interface assertions:**
- [ ] `var _ platform.Platform = (*Platform)(nil)`
- [ ] `var _ platform.Connection = (*Connection)(nil)`
- [ ] `var _ dio.Reader = (*QueryReader)(nil)`
- [ ] `var _ dio.BatchStream = (*queryStream)(nil)`
- [ ] `var _ dio.Writer = (*Writer)(nil)`
- [ ] `var _ dio.FilterClassifier = (*QueryReader)(nil)` (if filter pushdown implemented)
**Schema derivation:**
- [ ] Type mapping function covers all common DB types
- [ ] Unknown types fall back to `arrow.BinaryTypes.String`
- [ ] Precision/scale handling for decimal types
**Value conversion:**
- [ ] `appendValue` covers all mapped Arrow types
- [ ] `extractValue` covers all mapped Arrow types + coercible types
- [ ] Null handling in both directions
- [ ] Unsupported types produce clear errors with column context
**Unit tests:**
- [ ] `TestConfig_ConnectionString`
- [ ] `TestFilterToSQL` (if applicable)
**Integration tests — Reader:**
- [ ] `Test<Platform>_DataTypeConversion` (one subtest per type)
- [ ] `Test<Platform>_MultipleColumns`
- [ ] `Test<Platform>_MultipleRows`
- [ ] `Test<Platform>_EmptyResult`
- [ ] `Test<Platform>_NullHandling`
- [ ] `Test<Platform>_ReadOnceSemantics`
**Integration tests — Writer:**
- [ ] `Test<Platform>_Writer_RoundTrip`
- [ ] `Test<Platform>_Writer_EmptyStream`
- [ ] `Test<Platform>_Writer_CloseIdempotent`
- [ ] `Test<Platform>_Writer_AllTypesRoundTrip`
- [ ] `Test<Platform>_Writer_CoercibleTypes`
- [ ] `Test<Platform>_Writer_UnsupportedTypes`
**Integration tests — Filter (if applicable):**
- [ ] `TestQueryReader_FilterPushdown`
- [ ] `TestQueryReader_FilterClassifier`
- [ ] `TestQueryReader_FilterPreservesReadOnce`
- [ ] `TestQueryReader_FilterVerifiesValues`
**Infrastructure:**
- [ ] `StartTestDB(t)` using testcontainers
- [ ] Container cleanup via `t.Cleanup()`
- [ ] `singleRecordStream` helper (or equivalent) for writer tests