Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
db45f35
feat: add initial StreamingPull producer with gRPC support
rockneurotiko Mar 26, 2026
e7e72ac
feat: add stream reader and improve stream manager reliability
rockneurotiko Mar 27, 2026
d30d9d7
feat: add unary RPC acknowledgment pipeline
rockneurotiko Mar 30, 2026
7d3b28f
feat: implement stream draining and graceful shutdown
rockneurotiko Mar 31, 2026
81ea05e
feat: add synchronous acknowledgment and error classification
rockneurotiko Apr 1, 2026
dc35ffc
refactor: extract gRPC client abstraction and regenerate protobuf
rockneurotiko Apr 7, 2026
f3ae9e8
feat: add telemetry instrumentation with metadata
rockneurotiko Apr 8, 2026
a502960
feat: add gRPC interceptors and improve options management
rockneurotiko Apr 9, 2026
fab62fa
feat: improve draining with N:N producer-manager topology
rockneurotiko Apr 10, 2026
a625315
docs: add documentation for streaming producer modules
rockneurotiko Apr 16, 2026
4956551
Add changelog notes
rockneurotiko Apr 22, 2026
e58e174
refactor: improve streaming architecture with retry tracking and test…
rockneurotiko May 1, 2026
87b9787
build: optimize protobuf generation to subscriber service only
rockneurotiko May 1, 2026
9725fca
refine: Stop tracking integration and stress tests
rockneurotiko May 22, 2026
1700497
Merge branch 'main' into pub_sub_streaming
rockneurotiko May 26, 2026
e506ed8
Merge branch 'main' into pub_sub_streaming
rockneurotiko May 27, 2026
2d24206
Fix tests with nimble_options format
rockneurotiko May 27, 2026
c63dec8
Pull producer to pull directory and modules
rockneurotiko May 27, 2026
4c8fcec
Streaming no default producer
rockneurotiko May 27, 2026
4a44ccd
grpc_client accepts {Module, opts}
rockneurotiko May 27, 2026
00a4261
on_failure defaults to {:nack, 0}
rockneurotiko May 27, 2026
c9e8167
README and docs
rockneurotiko May 28, 2026
6498fdb
Documentation improvements
rockneurotiko May 29, 2026
9ca1b36
Mix format
rockneurotiko May 29, 2026
85c8e2f
Add more info to the README
rockneurotiko Jun 2, 2026
f7a6c9e
Upload grpc to 1.0
rockneurotiko Jun 16, 2026
7f92e3d
Merge branch 'main' into pub_sub_streaming
rockneurotiko Jun 16, 2026
12621f4
Remove lock and fix tests
rockneurotiko Jun 16, 2026
3accfcf
Protobuf 0.17 like grpc_core
rockneurotiko Jun 16, 2026
095662d
Fix unary channel's childs being killed and treating it as a channel …
rockneurotiko Jun 17, 2026
60e6752
Update deps
rockneurotiko Jun 17, 2026
95b9278
Unused castore
rockneurotiko Jun 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 73 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,80 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

## [2.0.0-rc.0] - unreleased

2.0 introduces a new default producer based on the gRPC StreamingPull API.
The previous HTTP pull producer is still fully supported under
`BroadwayCloudPubSub.Pull.Producer`. See the
[2.0 upgrade guide](docs/upgrade_to_2.0.md) for step-by-step migration
instructions.

### Added

- **`BroadwayCloudPubSub.Producer`**: a new Broadway producer that uses the
gRPC StreamingPull API for low-latency, push-based message delivery. This is
the recommended producer going forward.

- Persistent bidirectional gRPC stream; messages are pushed by the server
rather than fetched on demand
- Server-side flow control via `:max_outstanding_messages` and
`:max_outstanding_bytes`
- Automatic lease extension with adaptive p99 ack deadlines, preventing
premature redelivery without manual `ackDeadlineSeconds` tuning
- Batched ack/nack via a separate unary gRPC connection, independent of the
streaming connection
- Exactly-once delivery support, auto-detected from subscription properties
at runtime
- Message ordering via `:enable_message_ordering`
- Graceful shutdown with configurable drain timeout (`:drain_timeout_ms`)
- Comprehensive telemetry: stream lifecycle, ack batching, and gRPC spans
- Gun and Mint HTTP/2 adapters via the `:adapter` option
- Pub/Sub emulator support via `:grpc_endpoint` and `:use_ssl`

- **`BroadwayCloudPubSub.Streaming.Client`**: behaviour for custom gRPC client
implementations, analogous to `BroadwayCloudPubSub.Pull.Client` on the pull
side.

- **`BroadwayCloudPubSub.Streaming.GrpcClient`**: default gRPC client
implementation using the `grpc` library.

### Changed
- Bump minimum elixir to 1.15
- Modernize github actions

- Modernize GitHub Actions CI.

### Breaking changes

- Bump minimum Elixir to 1.15.

- **`BroadwayCloudPubSub.Producer` is now the gRPC streaming producer.** The
1.x HTTP pull producer has moved to `BroadwayCloudPubSub.Pull.Producer`.
Update the module name in your pipeline to keep the pull behaviour:

```elixir
# 1.x
{BroadwayCloudPubSub.Producer, goth: MyApp.Goth, subscription: "..."}

# 2.0, keep pull
{BroadwayCloudPubSub.Pull.Producer, goth: MyApp.Goth, subscription: "..."}

# 2.0, switch to streaming (recommended)
{BroadwayCloudPubSub.Producer, goth: MyApp.Goth, subscription: "...",
max_outstanding_messages: 1000}
```

- **`BroadwayCloudPubSub.PullClient` renamed to
`BroadwayCloudPubSub.Pull.FinchClient`.** Only affects you if you referenced
it directly

- **`BroadwayCloudPubSub.Client` behaviour renamed to
`BroadwayCloudPubSub.Pull.Client`.** Only affects you if you implemented a
custom HTTP pull client to override the `:client` option..

- **`on_failure` default changed from `:noop` to `{:nack, 0}`** in both
producers. Failed messages are now immediately made available for redelivery,
matching the behaviour of the official Google Cloud Pub/Sub client libraries.
Set `on_failure: :noop` explicitly to preserve the 1.x behaviour.


## [1.0.0] - 2026-05-26

Expand Down
154 changes: 142 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,173 @@ A Google Cloud Pub/Sub connector for [Broadway](https://github.com/dashbitco/bro

Documentation can be found at [https://hexdocs.pm/broadway_cloud_pub_sub](https://hexdocs.pm/broadway_cloud_pub_sub).

This project provides:

* `BroadwayCloudPubSub.Producer` - A GenStage producer that continuously receives messages from a Pub/Sub subscription acknowledges them after being successfully processed.
* `BroadwayCloudPubSub.Client` - A generic behaviour to implement Pub/Sub clients.
* `BroadwayCloudPubSub.PullClient` - Default REST client used by `BroadwayCloudPubSub.Producer`.
## What's in the box

* `BroadwayCloudPubSub.Producer`: Broadway producer using the gRPC
[StreamingPull][gcp-streamingpull] API. Messages are pushed by the server over
a persistent bidirectional stream, giving low latency and high throughput with
automatic lease extension and server-side flow control. **This is the
recommended producer**, in line with Google's own [guidance][gcp-streamingpull]
that StreamingPull is what their first-party client libraries use "where
possible".
* `BroadwayCloudPubSub.Pull.Producer`: Broadway producer using the unary HTTP
[Pull][gcp-pull-api] API. Retained for environments where gRPC is unavailable
or undesired, and for the cases Google lists as Pull-only: when you need
strict control over the number of messages pulled per request, tight control
over client memory and CPU, or when your subscriber acts as a proxy to
another pull-oriented system.
* `BroadwayCloudPubSub.Streaming.Client`: Behaviour for custom gRPC client implementations.
* `BroadwayCloudPubSub.Pull.Client`: Behaviour for custom HTTP pull client implementations.

[gcp-streamingpull]: https://cloud.google.com/pubsub/docs/pull#streamingpull_api
[gcp-pull-api]: https://cloud.google.com/pubsub/docs/pull#pull_api

## Installation

Add `:broadway_cloud_pub_sub` to the list of dependencies in `mix.exs`:
Add `:broadway_cloud_pub_sub` to your dependencies, along with an HTTP/2 adapter
for `:grpc`:

```elixir
def deps do
[
{:broadway_cloud_pub_sub, "~> 1.0"},
{:goth, "~> 1.3"}
{:broadway_cloud_pub_sub, "~> 2.0"},
{:goth, "~> 1.3"},
{:grpc, "~> 1.0"},
{:protobuf, "~> 0.12"},
# Pick one HTTP/2 adapter:
{:gun, "~> 2.0"},
# or
# {:mint, "~> 1.5"},
# {:castore, "~> 1.0"}
]
end
```

> Note the [goth](https://hexdocs.pm/goth) package, which handles Google Authentication, is required for the default token generator.
> The [goth](https://hexdocs.pm/goth) package handles Google Authentication and
> is required for the default token generator.
>
> The `grpc` and `protobuf` packages are required by
> `BroadwayCloudPubSub.Producer`. You must pick one HTTP/2 adapter for the gRPC
> connection and add it to your `mix.exs`: either `:gun`, or `:mint` together
> with `:castore`.
>
> If you only use `BroadwayCloudPubSub.Pull.Producer` you may omit `:grpc`,
> `:protobuf`, and the adapter packages.

## Usage

Configure Broadway with one or more producers using `BroadwayCloudPubSub.Producer`:

```elixir
Broadway.start_link(MyBroadway,
name: MyBroadway,
producer: [
module: {BroadwayCloudPubSub.Producer,
goth: MyGoth,
subscription: "projects/my-project/subscriptions/my-subscription",
max_outstanding_messages: 1000
}
],
processors: [default: [concurrency: 10]]
)
```

See `BroadwayCloudPubSub.Producer` for the full option reference, including flow
control, reconnection backoff, graceful shutdown, and telemetry.

### HTTP/2 adapter

The producer supports two adapters. Both are optional dependencies of `:grpc`,
so you select one by adding it to your application's `mix.exs` (see
[Installation](#installation)).

- `:gun` (default): [Gun](https://github.com/ninenines/gun) HTTP/2 client.
Add `{:gun, "~> 2.0"}` to your deps.
- `:mint`: [Mint](https://github.com/elixir-mint/mint) HTTP/2 client.
Add `{:mint, "~> 1.5"}` and `{:castore, "~> 1.0"}` to your deps.

Then select the adapter in your producer config:

```elixir
{BroadwayCloudPubSub.Producer,
goth: MyGoth,
subscription: "projects/my-project/subscriptions/my-subscription",
adapter: :mint}
```

### Using the HTTP pull producer

If gRPC is not available in your environment or you prefer to use the HTTP pull method, use `BroadwayCloudPubSub.Pull.Producer`:

```elixir
Broadway.start_link(MyBroadway,
name: MyBroadway,
producer: [
module: {BroadwayCloudPubSub.Pull.Producer,
goth: MyGoth,
subscription: "projects/my-project/subscriptions/my-subscription"
}
]
],
processors: [default: [concurrency: 10]]
)
```

### Upgrading from 1.x

> **2.0 is a major release with breaking changes.** The three-line summary is
> below; the [full upgrade guide](docs/upgrade_to_2.0.md) has step-by-step
> instructions, option mapping tables, and rationale for every change.

#### Breaking change 1: [`BroadwayCloudPubSub.Producer` is now the gRPC streaming producer](docs/upgrade_to_2.0.md#1-new-default-producer)

The biggest change: the module name `BroadwayCloudPubSub.Producer` now refers to
the **new gRPC StreamingPull producer**. The 1.x HTTP pull producer lives on
under `BroadwayCloudPubSub.Pull.Producer`.

```elixir
# 1.x — HTTP pull producer
{BroadwayCloudPubSub.Producer, goth: MyApp.Goth, subscription: "..."}

# 2.0 option A — switch to streaming (recommended, lower latency)
{BroadwayCloudPubSub.Producer,
goth: MyApp.Goth,
subscription: "...",
max_outstanding_messages: 1000}

# 2.0 option B — keep HTTP pull, one-line change
{BroadwayCloudPubSub.Pull.Producer, goth: MyApp.Goth, subscription: "..."}
```

The streaming producer requires `:grpc`, `:protobuf`, and an HTTP/2 adapter
(`:gun` or `:mint` + `:castore`). If you stay on the pull producer those
packages are not needed.

#### Breaking change 2: [two modules renamed](docs/upgrade_to_2.0.md#2-broadwaycloudpubsubpullclient-renamed) (only if referenced directly)

| 1.x | 2.0 |
|---|---|
| `BroadwayCloudPubSub.PullClient` | `BroadwayCloudPubSub.Pull.FinchClient` |
| `BroadwayCloudPubSub.Client` (behaviour) | `BroadwayCloudPubSub.Pull.Client` |

These only matter if you passed the module explicitly (e.g. `client:
BroadwayCloudPubSub.PullClient`) or implemented a custom pull client with
`@behaviour BroadwayCloudPubSub.Client`.

#### Breaking change 3: [`on_failure` default changed from `:noop` to `{:nack, 0}`](docs/upgrade_to_2.0.md#4-on_failure-default-changed-noop--nack-0)

Failed messages are now immediately made available for redelivery instead of
waiting for the subscription's `ackDeadlineSeconds` to expire. This matches the
behaviour of Google's own first-party client libraries.

To keep the 1.x behaviour, set the option explicitly:

```elixir
{BroadwayCloudPubSub.Pull.Producer,
goth: MyApp.Goth,
subscription: "...",
on_failure: :noop}
```

See the [full upgrade guide](docs/upgrade_to_2.0.md) for all details.

## License

Copyright 2019 Michael Crumm \
Expand Down
22 changes: 22 additions & 0 deletions buf.gen.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
version: v2
clean: true
managed:
enabled: true
inputs:
- zip_archive: https://github.com/googleapis/googleapis/archive/refs/heads/master.zip
strip_components: 1
paths:
- google/pubsub/v1
plugins:
- local: protoc-gen-elixir
out: lib/broadway_cloud_pub_sub/proto
opt:
- paths=source_relative
- include_docs=true
- plugins=grpc
# To add a new RPC in the future, append its fully-qualified method path
# here and re-run: buf generate
types:
- google.pubsub.v1.Subscriber.StreamingPull
- google.pubsub.v1.Subscriber.Acknowledge
- google.pubsub.v1.Subscriber.ModifyAckDeadline
1 change: 1 addition & 0 deletions buf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
version: v2
Loading
Loading