Skip to content

Commit 23a2cdb

Browse files
committed
Fix pg cron
1 parent b2b013b commit 23a2cdb

2 files changed

Lines changed: 82 additions & 1 deletion

File tree

internal/controller/dataflowcron_controller.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,19 +39,22 @@ const (
3939
//+kubebuilder:rbac:groups=dataflow.dataflow.io,resources=dataflowcrons/finalizers,verbs=update
4040
//+kubebuilder:rbac:groups=batch,resources=cronjobs;jobs,verbs=get;list;watch;create;update;patch;delete
4141
//+kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch;create;update;patch;delete
42+
//+kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch
4243
//+kubebuilder:rbac:groups="",resources=events,verbs=create;patch
4344

4445
type DataFlowCronReconciler struct {
4546
client.Client
4647
Scheme *runtime.Scheme
4748
processorImage string
49+
secretResolver *SecretResolver
4850
}
4951

5052
func NewDataFlowCronReconciler(client client.Client, scheme *runtime.Scheme) *DataFlowCronReconciler {
5153
return &DataFlowCronReconciler{
5254
Client: client,
5355
Scheme: scheme,
5456
processorImage: runtimeimage.ProcessorImage(),
57+
secretResolver: NewSecretResolver(client),
5558
}
5659
}
5760

@@ -82,7 +85,11 @@ func (r *DataFlowCronReconciler) Reconcile(ctx context.Context, req ctrl.Request
8285
}
8386

8487
func (r *DataFlowCronReconciler) reconcileSpecConfigMap(ctx context.Context, dfc *dataflowv1.DataFlowCron) error {
85-
specJSON, err := json.Marshal(dfc.Spec.DataFlowSpec)
88+
resolvedSpec, err := r.secretResolver.ResolveDataFlowSpec(ctx, dfc.Namespace, &dfc.Spec.DataFlowSpec)
89+
if err != nil {
90+
return fmt.Errorf("resolve secrets: %w", err)
91+
}
92+
specJSON, err := json.Marshal(resolvedSpec)
8693
if err != nil {
8794
return fmt.Errorf("marshal spec: %w", err)
8895
}

internal/controller/dataflowcron_controller_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package controller
22

33
import (
44
"context"
5+
"encoding/json"
56
"testing"
67

78
batchv1 "k8s.io/api/batch/v1"
@@ -15,6 +16,8 @@ import (
1516

1617
dataflowv1 "github.com/dataflow-operator/dataflow/api/v1"
1718
"github.com/dataflow-operator/dataflow/pkg/k8snames"
19+
"github.com/stretchr/testify/assert"
20+
"github.com/stretchr/testify/require"
1821
)
1922

2023
func TestDataFlowCronReconcile_CreatesConfigMapAndCronJob(t *testing.T) {
@@ -152,3 +155,74 @@ func TestDataFlowCronReconcile_CreatesFirstTriggerJobAfterProcessor(t *testing.T
152155
t.Fatalf("expected first trigger job to be created after processor")
153156
}
154157
}
158+
159+
func TestDataFlowCronReconcile_ResolvesSecretsInConfigMap(t *testing.T) {
160+
scheme := runtime.NewScheme()
161+
require.NoError(t, dataflowv1.AddToScheme(scheme))
162+
require.NoError(t, clientgoscheme.AddToScheme(scheme))
163+
164+
const wantDSN = "postgres://user:pass@host:5432/db"
165+
secret := &corev1.Secret{
166+
ObjectMeta: metav1.ObjectMeta{Name: "db-env-vars", Namespace: "default"},
167+
Data: map[string][]byte{
168+
"pg-source": []byte(wantDSN),
169+
"pg-sink": []byte(wantDSN),
170+
},
171+
}
172+
173+
dfc := &dataflowv1.DataFlowCron{
174+
ObjectMeta: metav1.ObjectMeta{Name: "cron-secrets", Namespace: "default"},
175+
Spec: dataflowv1.DataFlowCronSpec{
176+
Schedule: "0 0 * * *",
177+
DataFlowSpec: dataflowv1.DataFlowSpec{
178+
Source: dataflowv1.SourceSpec{
179+
Type: "postgresql",
180+
Config: mustConfig(dataflowv1.PostgreSQLSourceSpec{
181+
Table: "price.price",
182+
ConnectionStringSecretRef: &dataflowv1.SecretRef{
183+
Name: "db-env-vars",
184+
Key: "pg-source",
185+
},
186+
}),
187+
},
188+
Sink: dataflowv1.SinkSpec{
189+
Type: "postgresql",
190+
Config: mustConfig(dataflowv1.PostgreSQLSinkSpec{
191+
Table: "public.price_target",
192+
ConnectionStringSecretRef: &dataflowv1.SecretRef{
193+
Name: "db-env-vars",
194+
Key: "pg-sink",
195+
},
196+
}),
197+
},
198+
},
199+
},
200+
}
201+
202+
c := fake.NewClientBuilder().
203+
WithScheme(scheme).
204+
WithStatusSubresource(&dataflowv1.DataFlowCron{}).
205+
WithObjects(secret, dfc).
206+
Build()
207+
r := NewDataFlowCronReconciler(c, scheme)
208+
_, err := r.Reconcile(context.Background(), ctrl.Request{
209+
NamespacedName: types.NamespacedName{Name: "cron-secrets", Namespace: "default"},
210+
})
211+
require.NoError(t, err)
212+
213+
var cm corev1.ConfigMap
214+
require.NoError(t, c.Get(context.Background(), types.NamespacedName{
215+
Name: k8snames.CronSpecConfigMap("cron-secrets"), Namespace: "default",
216+
}, &cm))
217+
218+
var spec dataflowv1.DataFlowSpec
219+
require.NoError(t, json.Unmarshal([]byte(cm.Data["spec.json"]), &spec))
220+
221+
sourceCfg, err := spec.Source.GetPostgreSQLConfig()
222+
require.NoError(t, err)
223+
assert.Equal(t, wantDSN, sourceCfg.ConnectionString)
224+
225+
sinkCfg, err := spec.Sink.GetPostgreSQLConfig()
226+
require.NoError(t, err)
227+
assert.Equal(t, wantDSN, sinkCfg.ConnectionString)
228+
}

0 commit comments

Comments
 (0)