Skip to content

Commit 40d19e0

Browse files
committed
fix(circleci): skip HTTP 500 on workflow/job collectors and filter by time range
* Skip 500 (corrupt CircleCI Server records) alongside 404 in AfterResponse hook so a single bad pipeline does not abort the entire subtask * Apply SyncPolicy.TimeAfter to workflow and job DB iterators on full sync to avoid calling the API for every historical tool-layer row Signed-off-by: Joshua Smith <jbsmith7741@gmail.com>
1 parent f2de2dc commit 40d19e0

4 files changed

Lines changed: 83 additions & 12 deletions

File tree

backend/plugins/circleci/tasks/job_collector.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,10 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
7373
dal.Where("connection_id = ? and project_slug = ?", data.Options.ConnectionId, data.Options.ProjectSlug),
7474
}
7575

76-
if isIncremental {
77-
clauses = append(clauses, dal.Where("created_date > ?", createdAfter))
76+
// Incremental: workflows newer than last successful collectJobs.
77+
// Full sync: workflows within SyncPolicy.TimeAfter window.
78+
if createdAfter != nil {
79+
clauses = append(clauses, dal.Where("created_date >= ?", createdAfter))
7880
}
7981

8082
db := taskCtx.GetDal()
@@ -88,7 +90,7 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
8890
UrlTemplate: "/v2/workflow/{{ .Input.Id }}/job",
8991
Query: BuildQueryParamsWithPageToken,
9092
ResponseParser: ParseCircleciPageTokenResp,
91-
AfterResponse: ignoreDeletedBuilds, // Ignore the 404 response if a workflow has been deleted
93+
AfterResponse: ignoreDeletedOrBrokenBuilds,
9294
},
9395
GetCreated: func(item json.RawMessage) (time.Time, errors.Error) {
9496
var job struct { // Individual job response lacks created_at field, so have to use started_at
@@ -105,7 +107,7 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
105107
UrlTemplate: "/v2/workflow/{{ .Input.Id }}/job", // The individual job endpoint has different fields so need to recollect all jobs for a workflow
106108
Query: BuildQueryParamsWithPageToken,
107109
ResponseParser: ParseCircleciPageTokenResp,
108-
AfterResponse: ignoreDeletedBuilds,
110+
AfterResponse: ignoreDeletedOrBrokenBuilds,
109111
},
110112
BuildInputIterator: func() (api.Iterator, errors.Error) {
111113
db := taskCtx.GetDal()

backend/plugins/circleci/tasks/shared.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -123,10 +123,15 @@ func ParseCircleciPageTokenResp(res *http.Response) ([]json.RawMessage, errors.E
123123
return data.Items, err
124124
}
125125

126-
func ignoreDeletedBuilds(res *http.Response) errors.Error {
127-
// CircleCI API will return a 404 response for a workflow/job that has been deleted
128-
// due to their data retention policy. We should ignore these errors.
129-
if res.StatusCode == http.StatusNotFound {
126+
// ignoreDeletedOrBrokenBuilds skips per-item API failures that should not
127+
// abort an entire collector subtask. 404 means the resource was deleted
128+
// (retention); 500 means the CircleCI Server record is corrupt/stuck
129+
// (e.g. pipeline exists but its /workflow endpoint errors).
130+
func ignoreDeletedOrBrokenBuilds(res *http.Response) errors.Error {
131+
switch res.StatusCode {
132+
case http.StatusNotFound:
133+
return api.ErrIgnoreAndContinue
134+
case http.StatusInternalServerError:
130135
return api.ErrIgnoreAndContinue
131136
}
132137
return nil
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
Licensed to the Apache Software Foundation (ASF) under one or more
3+
contributor license agreements. See the NOTICE file distributed with
4+
this work for additional information regarding copyright ownership.
5+
The ASF licenses this file to You under the Apache License, Version 2.0
6+
(the "License"); you may not use this file except in compliance with
7+
the License. You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
*/
17+
18+
package tasks
19+
20+
import (
21+
"bytes"
22+
"io"
23+
"net/http"
24+
"testing"
25+
26+
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
27+
"github.com/stretchr/testify/assert"
28+
)
29+
30+
func makeResponse(statusCode int) *http.Response {
31+
return &http.Response{
32+
StatusCode: statusCode,
33+
Body: io.NopCloser(bytes.NewBufferString("")),
34+
Request: &http.Request{},
35+
}
36+
}
37+
38+
func TestIgnoreDeletedOrBrokenBuilds(t *testing.T) {
39+
tests := []struct {
40+
name string
41+
statusCode int
42+
want error
43+
}{
44+
{"404 returns ErrIgnoreAndContinue", http.StatusNotFound, api.ErrIgnoreAndContinue},
45+
{"500 returns ErrIgnoreAndContinue", http.StatusInternalServerError, api.ErrIgnoreAndContinue},
46+
{"200 returns nil", http.StatusOK, nil},
47+
{"403 returns nil", http.StatusForbidden, nil},
48+
{"502 returns nil", http.StatusBadGateway, nil},
49+
}
50+
for _, tt := range tests {
51+
t.Run(tt.name, func(t *testing.T) {
52+
res := makeResponse(tt.statusCode)
53+
got := ignoreDeletedOrBrokenBuilds(res)
54+
if tt.want == nil {
55+
assert.Nil(t, got)
56+
} else {
57+
assert.Equal(t, tt.want, got)
58+
}
59+
})
60+
}
61+
}
62+

backend/plugins/circleci/tasks/workflow_collector.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,10 @@ func CollectWorkflows(taskCtx plugin.SubTaskContext) errors.Error {
6060
dal.Where("connection_id = ? AND project_slug = ?", data.Options.ConnectionId, data.Options.ProjectSlug),
6161
}
6262

63-
if isIncremental {
64-
clauses = append(clauses, dal.Where("created_date > ?", createdAfter))
63+
// Incremental: pipelines newer than last successful collectWorkflows.
64+
// Full sync: pipelines within SyncPolicy.TimeAfter window.
65+
if createdAfter != nil {
66+
clauses = append(clauses, dal.Where("created_date >= ?", createdAfter))
6567
}
6668

6769
db := taskCtx.GetDal()
@@ -75,7 +77,7 @@ func CollectWorkflows(taskCtx plugin.SubTaskContext) errors.Error {
7577
UrlTemplate: "/v2/pipeline/{{ .Input.Id }}/workflow",
7678
Query: BuildQueryParamsWithPageToken,
7779
ResponseParser: ParseCircleciPageTokenResp,
78-
AfterResponse: ignoreDeletedBuilds, // Ignore the 404 response if a workflow has been deleted
80+
AfterResponse: ignoreDeletedOrBrokenBuilds,
7981
},
8082
GetCreated: extractCreatedAt,
8183
},
@@ -88,7 +90,7 @@ func CollectWorkflows(taskCtx plugin.SubTaskContext) errors.Error {
8890
err := api.UnmarshalResponse(res, &data)
8991
return []json.RawMessage{data}, err
9092
},
91-
AfterResponse: ignoreDeletedBuilds,
93+
AfterResponse: ignoreDeletedOrBrokenBuilds,
9294
},
9395
BuildInputIterator: func() (api.Iterator, errors.Error) {
9496
clauses := []dal.Clause{

0 commit comments

Comments
 (0)