Skip to content

Commit 3cc67c8

Browse files
committed
add ExtractQDevS3DataMeta
1 parent 4647d3d commit 3cc67c8

2 files changed

Lines changed: 242 additions & 0 deletions

File tree

backend/plugins/q_dev/impl/impl.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ func (p QDev) ScopeConfig() dal.Tabler {
8080
func (p QDev) SubTaskMetas() []plugin.SubTaskMeta {
8181
return []plugin.SubTaskMeta{
8282
tasks.CollectQDevS3FilesMeta,
83+
tasks.ExtractQDevS3DataMeta,
8384
}
8485
}
8586

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
/*
2+
Licensed to the Apache Software Foundation (ASF) under one or more
3+
contributor license agreements. See the NOTICE file distributed with
4+
this work for additional information regarding copyright ownership.
5+
The ASF licenses this file to You under the Apache License, Version 2.0
6+
(the "License"); you may not use this file except in compliance with
7+
the License. You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
*/
17+
18+
package tasks
19+
20+
import (
21+
"encoding/csv"
22+
"fmt"
23+
"github.com/apache/incubator-devlake/core/dal"
24+
"github.com/apache/incubator-devlake/core/errors"
25+
"github.com/apache/incubator-devlake/core/plugin"
26+
"github.com/apache/incubator-devlake/plugins/q_dev/models"
27+
"github.com/aws/aws-sdk-go/aws"
28+
"github.com/aws/aws-sdk-go/service/s3"
29+
"io"
30+
"strconv"
31+
"strings"
32+
"time"
33+
)
34+
35+
var _ plugin.SubTaskEntryPoint = ExtractQDevS3Data
36+
37+
// ExtractQDevS3Data 从S3下载CSV数据并解析
38+
func ExtractQDevS3Data(taskCtx plugin.SubTaskContext) errors.Error {
39+
data := taskCtx.GetData().(*QDevTaskData)
40+
db := taskCtx.GetDal()
41+
42+
// 查询未处理的文件元数据
43+
cursor, err := db.Cursor(
44+
dal.From(&models.QDevS3FileMeta{}),
45+
dal.Where("connection_id = ? AND processed = ?", data.Options.ConnectionId, false),
46+
)
47+
if err != nil {
48+
return errors.Default.Wrap(err, "failed to get file metadata cursor")
49+
}
50+
defer cursor.Close()
51+
52+
taskCtx.SetProgress(0, -1)
53+
54+
// 处理每个文件
55+
for cursor.Next() {
56+
fileMeta := &models.QDevS3FileMeta{}
57+
err = db.Fetch(cursor, fileMeta)
58+
if err != nil {
59+
return errors.Default.Wrap(err, "failed to fetch file metadata")
60+
}
61+
62+
// 获取文件内容
63+
getInput := &s3.GetObjectInput{
64+
Bucket: aws.String(data.S3Client.Bucket),
65+
Key: aws.String(fileMeta.S3Path),
66+
}
67+
68+
getResult, err := data.S3Client.S3.GetObject(getInput)
69+
if err != nil {
70+
return errors.Convert(err)
71+
}
72+
73+
// 处理CSV文件
74+
err = processCSVData(taskCtx, db, getResult.Body, fileMeta)
75+
if err != nil {
76+
return errors.Default.Wrap(err, fmt.Sprintf("failed to process CSV file %s", fileMeta.FileName))
77+
}
78+
79+
// 更新文件处理状态
80+
fileMeta.Processed = true
81+
now := time.Now()
82+
fileMeta.ProcessedTime = &now
83+
err = db.Update(fileMeta)
84+
if err != nil {
85+
return errors.Default.Wrap(err, "failed to update file metadata")
86+
}
87+
88+
taskCtx.IncProgress(1)
89+
}
90+
91+
return nil
92+
}
93+
94+
// 处理CSV文件
95+
func processCSVData(taskCtx plugin.SubTaskContext, db dal.Dal, reader io.ReadCloser, fileMeta *models.QDevS3FileMeta) errors.Error {
96+
defer reader.Close()
97+
98+
csvReader := csv.NewReader(reader)
99+
// 使用默认的逗号分隔符,不需要设置 Comma
100+
csvReader.LazyQuotes = true // 允许非标准引号处理
101+
csvReader.FieldsPerRecord = -1 // 允许每行字段数不同
102+
103+
// 读取标头
104+
headers, err := csvReader.Read()
105+
fmt.Printf("headers: %+v\n", headers)
106+
if err != nil {
107+
return errors.Convert(err)
108+
}
109+
110+
// 逐行读取数据
111+
for {
112+
record, err := csvReader.Read()
113+
if err == io.EOF {
114+
break
115+
}
116+
if err != nil {
117+
return errors.Convert(err)
118+
}
119+
120+
// 创建用户数据对象
121+
userData, err := createUserData(headers, record, fileMeta)
122+
if err != nil {
123+
return errors.Default.Wrap(err, "failed to create user data")
124+
}
125+
126+
// 保存到数据库
127+
err = db.Create(userData)
128+
if err != nil {
129+
return errors.Default.Wrap(err, "failed to save user data")
130+
}
131+
}
132+
133+
return nil
134+
}
135+
136+
// 从CSV记录创建用户数据对象
137+
func createUserData(headers []string, record []string, fileMeta *models.QDevS3FileMeta) (*models.QDevUserData, errors.Error) {
138+
userData := &models.QDevUserData{
139+
ConnectionId: fileMeta.ConnectionId,
140+
}
141+
142+
// 创建字段映射
143+
fieldMap := make(map[string]string)
144+
for i, header := range headers {
145+
if i < len(record) {
146+
// 打印每个header和对应的值,帮助调试
147+
fmt.Printf("Mapping header[%d]: '%s' -> '%s'\n", i, header, record[i])
148+
fieldMap[header] = record[i]
149+
// 同时添加去除空格的版本
150+
trimmedHeader := strings.TrimSpace(header)
151+
if trimmedHeader != header {
152+
fmt.Printf("Also adding trimmed header: '%s'\n", trimmedHeader)
153+
fieldMap[trimmedHeader] = record[i]
154+
}
155+
}
156+
}
157+
158+
// 设置必要字段
159+
var err error
160+
var ok bool
161+
162+
// 设置UserId
163+
userData.UserId, ok = fieldMap["UserId"]
164+
if !ok {
165+
return nil, errors.Default.New("UserId not found in CSV record")
166+
}
167+
168+
// 设置Date
169+
dateStr, ok := fieldMap["Date"]
170+
if !ok {
171+
return nil, errors.Default.New("Date not found in CSV record")
172+
}
173+
174+
userData.Date, err = parseDate(dateStr)
175+
if err != nil {
176+
return nil, errors.Default.Wrap(err, "failed to parse date")
177+
}
178+
179+
// 设置指标字段
180+
userData.CodeReview_FindingsCount = parseInt(fieldMap, "CodeReview_FindingsCount")
181+
userData.CodeReview_SucceededEventCount = parseInt(fieldMap, "CodeReview_SucceededEventCount")
182+
userData.InlineChat_AcceptanceEventCount = parseInt(fieldMap, "InlineChat_AcceptanceEventCount")
183+
userData.InlineChat_AcceptedLineAdditions = parseInt(fieldMap, "InlineChat_AcceptedLineAdditions")
184+
userData.InlineChat_AcceptedLineDeletions = parseInt(fieldMap, "InlineChat_AcceptedLineDeletions")
185+
userData.InlineChat_DismissalEventCount = parseInt(fieldMap, "InlineChat_DismissalEventCount")
186+
userData.InlineChat_DismissedLineAdditions = parseInt(fieldMap, "InlineChat_DismissedLineAdditions")
187+
userData.InlineChat_DismissedLineDeletions = parseInt(fieldMap, "InlineChat_DismissedLineDeletions")
188+
userData.InlineChat_RejectedLineAdditions = parseInt(fieldMap, "InlineChat_RejectedLineAdditions")
189+
userData.InlineChat_RejectedLineDeletions = parseInt(fieldMap, "InlineChat_RejectedLineDeletions")
190+
userData.InlineChat_RejectionEventCount = parseInt(fieldMap, "InlineChat_RejectionEventCount")
191+
userData.InlineChat_TotalEventCount = parseInt(fieldMap, "InlineChat_TotalEventCount")
192+
userData.Inline_AICodeLines = parseInt(fieldMap, "Inline_AICodeLines")
193+
userData.Inline_AcceptanceCount = parseInt(fieldMap, "Inline_AcceptanceCount")
194+
userData.Inline_SuggestionsCount = parseInt(fieldMap, "Inline_SuggestionsCount")
195+
196+
return userData, nil
197+
}
198+
199+
// 解析日期
200+
func parseDate(dateStr string) (time.Time, errors.Error) {
201+
// 尝试常见的日期格式
202+
formats := []string{
203+
"2006-01-02",
204+
"2006/01/02",
205+
"01/02/2006",
206+
"01-02-2006",
207+
time.RFC3339,
208+
}
209+
210+
for _, format := range formats {
211+
date, err := time.Parse(format, dateStr)
212+
if err == nil {
213+
return date, nil
214+
}
215+
}
216+
217+
return time.Time{}, errors.Default.New(fmt.Sprintf("failed to parse date: %s", dateStr))
218+
}
219+
220+
// 解析整数
221+
func parseInt(fieldMap map[string]string, field string) int {
222+
value, ok := fieldMap[field]
223+
if !ok {
224+
return 0
225+
}
226+
227+
intValue, err := strconv.Atoi(value)
228+
if err != nil {
229+
return 0
230+
}
231+
232+
return intValue
233+
}
234+
235+
var ExtractQDevS3DataMeta = plugin.SubTaskMeta{
236+
Name: "extractQDevS3Data",
237+
EntryPoint: ExtractQDevS3Data,
238+
EnabledByDefault: true,
239+
Description: "Extract data from S3 CSV files and save to database",
240+
DomainTypes: []string{plugin.DOMAIN_TYPE_CROSS},
241+
}

0 commit comments

Comments
 (0)