diff --git a/TODO.md b/TODO.md new file mode 100644 index 0000000..e6a2077 --- /dev/null +++ b/TODO.md @@ -0,0 +1,20 @@ +## TODO1. Alibaba Cloud NoSQL(MongoDB) + +- Alibaba Cloud NoSQL (MongoDB) 지원 추가 + +### 적용 기능 범위 + +- Generate +- Backup +- Restore +- Migration + +### 적용 범위 + +- Web UI +- Backend API + +### 참고사항 + +- Alibaba Cloud MongoDB 는 NCloud 에서 제공하는 MongoDB 와 동일 한 형식으로 제공 + diff --git a/config/provider.go b/config/provider.go index 406175f..933c958 100644 --- a/config/provider.go +++ b/config/provider.go @@ -102,6 +102,25 @@ func NewNCPMongoDBClient(username, password, host string, port int) (*mongo.Clie return mongo.Connect(context.Background(), newNCPMongoDBConfig(username, password, host, port)) } +func newAlibabaMongoDBConfig(username, password, host string, port int) *options.ClientOptions { + dc := true + return &options.ClientOptions{ + Auth: &options.Credential{ + Username: username, + Password: password, + }, + Direct: &dc, + Hosts: []string{fmt.Sprintf("%s:%d", host, port)}, + } +} + +func NewAlibabaMongoDBClient(username, password, host string, port int) (*mongo.Client, error) { + if err := validateInputs(&username, &password, &host, &port); err != nil { + return nil, err + } + return mongo.Connect(context.Background(), newAlibabaMongoDBConfig(username, password, host, port)) +} + func NewS3Client(accesskey, secretkey, region string) (*s3.Client, error) { cfg, err := newAWSConfig(accesskey, secretkey, region) if err != nil { @@ -160,15 +179,13 @@ func NewGCPClient(credentialsJson string) (*storage.Client, error) { return client, nil } -func NewAlibabaClient(endpoint, region, accessKey, secretKey string) (*oss.Client, error) { - if endpoint == "" { - return nil, errors.New("endpoint is required") - } +func NewAlibabaClient(region, accessKey, secretKey string) (*oss.Client, error) { + if accessKey == "" || secretKey == "" { return nil, errors.New("accessKey and secretKey are required") } cfg := oss.LoadDefaultConfig(). - WithEndpoint(endpoint). + WithEndpoint("https://oss-" + region + ".aliyuncs.com"). WithCredentialsProvider(osscred.NewStaticCredentialsProvider(accessKey, secretKey)). WithRegion(region). WithRetryMaxAttempts(5) diff --git a/data/var/run/data-manager/task/task.json b/data/var/run/data-manager/task/task.json index f890e2c..fb968c1 100644 --- a/data/var/run/data-manager/task/task.json +++ b/data/var/run/data-manager/task/task.json @@ -5421,17 +5421,19 @@ "projectId": "" }, "sourceFilter": { - "prefix": "datamold-dummy2244858493/sql/", + "path": "", + "pathExcludeYn": "", "contains": null, + "containExcludeYn": "", "suffixes": null, "exact": [ "LibraryManagement_999.sql" ], - "regex": "", "minSize": null, "maxSize": null, "modifiedAfter": null, - "modifiedBefore": null + "modifiedBefore": null, + "sizeFilteringUnit": "" } }, { @@ -5493,17 +5495,19 @@ "projectId": "" }, "sourceFilter": { - "prefix": "datamold-dummy2244858493/sql/", + "path": "", + "pathExcludeYn": "", "contains": null, + "containExcludeYn": "", "suffixes": null, "exact": [ "LibraryManagement_999.sql" ], - "regex": "", "minSize": null, "maxSize": null, "modifiedAfter": null, - "modifiedBefore": null + "modifiedBefore": null, + "sizeFilteringUnit": "" } }, { @@ -5565,17 +5569,19 @@ "projectId": "" }, "sourceFilter": { - "prefix": "datamold-dummy2244858493/sql/", + "path": "", + "pathExcludeYn": "", "contains": null, + "containExcludeYn": "", "suffixes": null, "exact": [ "LibraryManagement_999.sql" ], - "regex": "", "minSize": null, "maxSize": null, "modifiedAfter": null, - "modifiedBefore": null + "modifiedBefore": null, + "sizeFilteringUnit": "" } }, { @@ -5637,17 +5643,19 @@ "projectId": "" }, "sourceFilter": { - "prefix": "datamold-dummy2244858493/sql/", + "path": "", + "pathExcludeYn": "", "contains": null, + "containExcludeYn": "", "suffixes": null, "exact": [ "LibraryManagement_999.sql" ], - "regex": "", "minSize": null, "maxSize": null, "modifiedAfter": null, - "modifiedBefore": null + "modifiedBefore": null, + "sizeFilteringUnit": "" } }, { @@ -5709,17 +5717,19 @@ "projectId": "" }, "sourceFilter": { - "prefix": "datamold-dummy2244858493/sql/", + "path": "", + "pathExcludeYn": "", "contains": null, + "containExcludeYn": "", "suffixes": null, "exact": [ "LibraryManagement_999.sql" ], - "regex": "", "minSize": null, "maxSize": null, "modifiedAfter": null, - "modifiedBefore": null + "modifiedBefore": null, + "sizeFilteringUnit": "" } }, { @@ -5781,17 +5791,19 @@ "projectId": "" }, "sourceFilter": { - "prefix": "datamold-dummy2244858493/sql/", + "path": "", + "pathExcludeYn": "", "contains": null, + "containExcludeYn": "", "suffixes": null, "exact": [ "LibraryManagement_999.sql" ], - "regex": "", "minSize": null, "maxSize": null, "modifiedAfter": null, - "modifiedBefore": null + "modifiedBefore": null, + "sizeFilteringUnit": "" } }, { @@ -5853,19 +5865,21 @@ "projectId": "" }, "sourceFilter": { - "prefix": "datamold-dummy2244858493/sql/", + "path": "", + "pathExcludeYn": "", "contains": null, + "containExcludeYn": "", "suffixes": null, "exact": [ "LibraryManagement_991.sql", "LibraryManagement_993.sql", "LibraryManagement_994.sql" ], - "regex": "", "minSize": null, "maxSize": null, "modifiedAfter": null, - "modifiedBefore": null + "modifiedBefore": null, + "sizeFilteringUnit": "" } }, { @@ -5927,19 +5941,21 @@ "projectId": "" }, "sourceFilter": { - "prefix": "datamold-dummy2244858493/sql/", + "path": "", + "pathExcludeYn": "", "contains": null, + "containExcludeYn": "", "suffixes": null, "exact": [ "LibraryManagement_991.sql", "LibraryManagement_993.sql", "LibraryManagement_994.sql" ], - "regex": "", "minSize": null, "maxSize": null, "modifiedAfter": null, - "modifiedBefore": null + "modifiedBefore": null, + "sizeFilteringUnit": "" } }, { @@ -6001,15 +6017,17 @@ "projectId": "" }, "sourceFilter": { - "prefix": "datamold-dummy2244858493/sql/", + "path": "", + "pathExcludeYn": "", "contains": null, + "containExcludeYn": "", "suffixes": null, "exact": null, - "regex": "", "minSize": null, "maxSize": null, "modifiedAfter": null, - "modifiedBefore": null + "modifiedBefore": null, + "sizeFilteringUnit": "" } }, { @@ -6248,15 +6266,17 @@ "projectId": "" }, "sourceFilter": { - "prefix": "datamold-dummy2244858493/test1234/", + "path": "", + "pathExcludeYn": "", "contains": null, + "containExcludeYn": "", "suffixes": null, "exact": null, - "regex": "", "minSize": null, "maxSize": null, "modifiedAfter": null, - "modifiedBefore": null + "modifiedBefore": null, + "sizeFilteringUnit": "" } }, { @@ -6318,15 +6338,17 @@ "projectId": "" }, "sourceFilter": { - "prefix": "test1234/", + "path": "", + "pathExcludeYn": "", "contains": null, + "containExcludeYn": "", "suffixes": null, "exact": null, - "regex": "", "minSize": null, "maxSize": null, "modifiedAfter": null, - "modifiedBefore": null + "modifiedBefore": null, + "sizeFilteringUnit": "" } }, { @@ -6388,15 +6410,17 @@ "projectId": "" }, "sourceFilter": { - "prefix": "datamold-dummy2244858493/sql", + "path": "", + "pathExcludeYn": "", "contains": null, + "containExcludeYn": "", "suffixes": null, "exact": null, - "regex": "", "minSize": null, "maxSize": null, "modifiedAfter": null, - "modifiedBefore": null + "modifiedBefore": null, + "sizeFilteringUnit": "" } }, { @@ -6458,15 +6482,17 @@ "projectId": "" }, "sourceFilter": { - "prefix": "datamold-dummy2244858493/sql", + "path": "", + "pathExcludeYn": "", "contains": null, + "containExcludeYn": "", "suffixes": null, "exact": null, - "regex": "", "minSize": null, "maxSize": null, "modifiedAfter": null, - "modifiedBefore": null + "modifiedBefore": null, + "sizeFilteringUnit": "" } }, { @@ -6528,19 +6554,21 @@ "projectId": "" }, "sourceFilter": { - "prefix": "datamold-dummy2244858493/sql", + "path": "", + "pathExcludeYn": "", "contains": null, + "containExcludeYn": "", "suffixes": null, "exact": [ "LibraryManagement_991.sql", "LibraryManagement_993.sql", "LibraryManagement_994.sql" ], - "regex": "", "minSize": null, "maxSize": null, "modifiedAfter": null, - "modifiedBefore": null + "modifiedBefore": null, + "sizeFilteringUnit": "" } }, { @@ -6602,15 +6630,17 @@ "projectId": "" }, "sourceFilter": { - "prefix": "datamold-dummy2244858493/sql", + "path": "", + "pathExcludeYn": "", "contains": null, + "containExcludeYn": "", "suffixes": null, "exact": null, - "regex": "", "minSize": null, "maxSize": null, "modifiedAfter": null, - "modifiedBefore": null + "modifiedBefore": null, + "sizeFilteringUnit": "" } }, { @@ -6672,17 +6702,19 @@ "projectId": "" }, "sourceFilter": { - "prefix": "datamold-dummy2244858493/sql", + "path": "", + "pathExcludeYn": "", "contains": null, + "containExcludeYn": "", "suffixes": null, "exact": [ "LibraryManagement_991.sql" ], - "regex": "", "minSize": null, "maxSize": null, "modifiedAfter": null, - "modifiedBefore": null + "modifiedBefore": null, + "sizeFilteringUnit": "" } }, { @@ -6744,15 +6776,17 @@ "projectId": "" }, "sourceFilter": { - "prefix": "datamold-dummy2244858493/sql", + "path": "", + "pathExcludeYn": "", "contains": null, + "containExcludeYn": "", "suffixes": null, "exact": null, - "regex": "", "minSize": null, "maxSize": null, "modifiedAfter": null, - "modifiedBefore": null + "modifiedBefore": null, + "sizeFilteringUnit": "" } }, { @@ -6814,17 +6848,794 @@ "projectId": "" }, "sourceFilter": { - "prefix": "", + "path": "", + "pathExcludeYn": "", "contains": null, + "containExcludeYn": "", "suffixes": [ "png" ], "exact": null, - "regex": "", "minSize": null, "maxSize": null, "modifiedAfter": null, - "modifiedBefore": null + "modifiedBefore": null, + "sizeFilteringUnit": "" + } + }, + { + "meta": { + "serviceType": "nrdbms", + "taskType": "generate", + "taskId": "-task-0-20260417-154555" + }, + "dummy": { + "dummyPath": "/var/folders/8d/dykp7pg5693fhf5_72hr16540000gn/T/datamold-dummy681617525", + "checkSQL": false, + "checkCSV": false, + "checkTXT": false, + "checkPNG": false, + "checkGIF": false, + "checkZIP": false, + "checkJSON": false, + "checkXML": false, + "checkServerJSON": true, + "checkServerSQL": false, + "sizeSQL": "", + "sizeCSV": "", + "sizeTXT": "", + "sizePNG": "", + "sizeGIF": "", + "sizeZIP": "", + "sizeJSON": "", + "sizeXML": "", + "sizeServerJSON": "1", + "sizeServerSQL": "" + }, + "sourcePoint": { + "provider": "", + "region": "", + "credentialId": 0, + "path": "", + "bucket": "", + "endpoint": "", + "host": "", + "port": "", + "username": "", + "password": "", + "databaseName": "", + "databaseId": "", + "projectId": "" + }, + "targetPoint": { + "provider": "alibaba", + "region": "ap-northeast-1", + "credentialId": 1, + "path": "", + "bucket": "", + "endpoint": "https://oss-ap-northeast-1.aliyuncs.com", + "host": "dds-mj76dbe602390fa42.mongodb.ap-northeast-2.rds.aliyuncs.com", + "port": "3717", + "username": "root", + "password": "ㅂwe123!@#", + "databaseName": "admin", + "databaseId": "", + "projectId": "" + } + }, + { + "meta": { + "serviceType": "nrdbms", + "taskType": "generate", + "taskId": "-task-0-20260417-154747" + }, + "dummy": { + "dummyPath": "/var/folders/8d/dykp7pg5693fhf5_72hr16540000gn/T/datamold-dummy1196488898", + "checkSQL": false, + "checkCSV": false, + "checkTXT": false, + "checkPNG": false, + "checkGIF": false, + "checkZIP": false, + "checkJSON": false, + "checkXML": false, + "checkServerJSON": true, + "checkServerSQL": false, + "sizeSQL": "", + "sizeCSV": "", + "sizeTXT": "", + "sizePNG": "", + "sizeGIF": "", + "sizeZIP": "", + "sizeJSON": "", + "sizeXML": "", + "sizeServerJSON": "1", + "sizeServerSQL": "" + }, + "sourcePoint": { + "provider": "", + "region": "", + "credentialId": 0, + "path": "", + "bucket": "", + "endpoint": "", + "host": "", + "port": "", + "username": "", + "password": "", + "databaseName": "", + "databaseId": "", + "projectId": "" + }, + "targetPoint": { + "provider": "alibaba", + "region": "ap-northeast-1", + "credentialId": 1, + "path": "", + "bucket": "", + "endpoint": "https://oss-ap-northeast-1.aliyuncs.com", + "host": "dds-mj76dbe602390fa41311-pub.mongodb.ap-northeast-2.rds.aliyuncs.com", + "port": "3717", + "username": "root", + "password": "ㅂwe123!@#", + "databaseName": "database", + "databaseId": "", + "projectId": "" + } + }, + { + "meta": { + "serviceType": "nrdbms", + "taskType": "generate", + "taskId": "-task-0-20260417-155248" + }, + "dummy": { + "dummyPath": "/var/folders/8d/dykp7pg5693fhf5_72hr16540000gn/T/datamold-dummy3618700754", + "checkSQL": false, + "checkCSV": false, + "checkTXT": false, + "checkPNG": false, + "checkGIF": false, + "checkZIP": false, + "checkJSON": false, + "checkXML": false, + "checkServerJSON": true, + "checkServerSQL": false, + "sizeSQL": "", + "sizeCSV": "", + "sizeTXT": "", + "sizePNG": "", + "sizeGIF": "", + "sizeZIP": "", + "sizeJSON": "", + "sizeXML": "", + "sizeServerJSON": "1", + "sizeServerSQL": "" + }, + "sourcePoint": { + "provider": "", + "region": "", + "credentialId": 0, + "path": "", + "bucket": "", + "endpoint": "", + "host": "", + "port": "", + "username": "", + "password": "", + "databaseName": "", + "databaseId": "", + "projectId": "" + }, + "targetPoint": { + "provider": "alibaba", + "region": "ap-northeast-1", + "credentialId": 1, + "path": "", + "bucket": "", + "endpoint": "https://oss-ap-northeast-1.aliyuncs.com", + "host": "dds-mj76dbe602390fa41311-pub.mongodb.ap-northeast-2.rds.aliyuncs.com", + "port": "3717", + "username": "root", + "password": "qwe123!@#", + "databaseName": "database", + "databaseId": "", + "projectId": "" + } + }, + { + "meta": { + "serviceType": "nrdbms", + "taskType": "generate", + "taskId": "-task-0-20260417-161821" + }, + "dummy": { + "dummyPath": "/var/folders/8d/dykp7pg5693fhf5_72hr16540000gn/T/datamold-dummy2021042541", + "checkSQL": false, + "checkCSV": false, + "checkTXT": false, + "checkPNG": false, + "checkGIF": false, + "checkZIP": false, + "checkJSON": false, + "checkXML": false, + "checkServerJSON": true, + "checkServerSQL": false, + "sizeSQL": "", + "sizeCSV": "", + "sizeTXT": "", + "sizePNG": "", + "sizeGIF": "", + "sizeZIP": "", + "sizeJSON": "", + "sizeXML": "", + "sizeServerJSON": "1", + "sizeServerSQL": "" + }, + "sourcePoint": { + "provider": "", + "region": "", + "credentialId": 0, + "path": "", + "bucket": "", + "endpoint": "", + "host": "", + "port": "", + "username": "", + "password": "", + "databaseName": "", + "databaseId": "", + "projectId": "" + }, + "targetPoint": { + "provider": "aws", + "region": "ap-northeast-1", + "credentialId": 2, + "path": "", + "bucket": "", + "endpoint": "", + "host": "", + "port": "", + "username": "", + "password": "", + "databaseName": "", + "databaseId": "", + "projectId": "" + } + }, + { + "meta": { + "serviceType": "nrdbms", + "taskType": "generate", + "taskId": "-task-0-20260417-162010" + }, + "dummy": { + "dummyPath": "/var/folders/8d/dykp7pg5693fhf5_72hr16540000gn/T/datamold-dummy1949661756", + "checkSQL": false, + "checkCSV": false, + "checkTXT": false, + "checkPNG": false, + "checkGIF": false, + "checkZIP": false, + "checkJSON": false, + "checkXML": false, + "checkServerJSON": true, + "checkServerSQL": false, + "sizeSQL": "", + "sizeCSV": "", + "sizeTXT": "", + "sizePNG": "", + "sizeGIF": "", + "sizeZIP": "", + "sizeJSON": "", + "sizeXML": "", + "sizeServerJSON": "1", + "sizeServerSQL": "" + }, + "sourcePoint": { + "provider": "", + "region": "", + "credentialId": 0, + "path": "", + "bucket": "", + "endpoint": "", + "host": "", + "port": "", + "username": "", + "password": "", + "databaseName": "", + "databaseId": "", + "projectId": "" + }, + "targetPoint": { + "provider": "aws", + "region": "ap-northeast-2", + "credentialId": 2, + "path": "", + "bucket": "", + "endpoint": "", + "host": "", + "port": "", + "username": "", + "password": "", + "databaseName": "", + "databaseId": "", + "projectId": "" + } + }, + { + "meta": { + "serviceType": "nrdbms", + "taskType": "generate", + "taskId": "-task-0-20260417-173239" + }, + "dummy": { + "dummyPath": "/var/folders/8d/dykp7pg5693fhf5_72hr16540000gn/T/datamold-dummy285215639", + "checkSQL": false, + "checkCSV": false, + "checkTXT": false, + "checkPNG": false, + "checkGIF": false, + "checkZIP": false, + "checkJSON": false, + "checkXML": false, + "checkServerJSON": true, + "checkServerSQL": false, + "sizeSQL": "", + "sizeCSV": "", + "sizeTXT": "", + "sizePNG": "", + "sizeGIF": "", + "sizeZIP": "", + "sizeJSON": "", + "sizeXML": "", + "sizeServerJSON": "1", + "sizeServerSQL": "" + }, + "sourcePoint": { + "provider": "", + "region": "", + "credentialId": 0, + "path": "", + "bucket": "", + "endpoint": "", + "host": "", + "port": "", + "username": "", + "password": "", + "databaseName": "", + "databaseId": "", + "projectId": "" + }, + "targetPoint": { + "provider": "alibaba", + "region": "ap-northeast-1", + "credentialId": 1, + "path": "", + "bucket": "", + "endpoint": "https://oss-ap-northeast-1.aliyuncs.com", + "host": "dds-mj76dbe602390fa41311-pub.mongodb.ap-northeast-2.rds.aliyuncs.com", + "port": "3717", + "username": "root", + "password": "qwe123!@#", + "databaseName": "databases", + "databaseId": "", + "projectId": "" + } + }, + { + "meta": { + "serviceType": "nrdbms", + "taskType": "backup", + "taskId": "-task-0-20260417-180916" + }, + "dummy": { + "checkSQL": false, + "checkCSV": false, + "checkTXT": false, + "checkPNG": false, + "checkGIF": false, + "checkZIP": false, + "checkJSON": false, + "checkXML": false, + "checkServerJSON": false, + "checkServerSQL": false, + "sizeSQL": "", + "sizeCSV": "", + "sizeTXT": "", + "sizePNG": "", + "sizeGIF": "", + "sizeZIP": "", + "sizeJSON": "", + "sizeXML": "", + "sizeServerJSON": "", + "sizeServerSQL": "" + }, + "sourcePoint": { + "provider": "alibaba", + "region": "ap-northeast-1", + "credentialId": 1, + "path": "", + "bucket": "", + "endpoint": "https://oss-ap-northeast-1.aliyuncs.com", + "host": "dds-mj76dbe602390fa41311-pub.mongodb.ap-northeast-2.rds.aliyuncs.com", + "port": "3717", + "username": "root", + "password": "qwe123!@#", + "databaseName": "databases", + "databaseId": "", + "projectId": "" + }, + "targetPoint": { + "provider": "", + "region": "", + "credentialId": 0, + "path": "/Users/geontae/workspaces/namutech/mc-data-manager/data/backup", + "bucket": "", + "endpoint": "", + "host": "", + "port": "", + "username": "", + "password": "", + "databaseName": "", + "databaseId": "", + "projectId": "" + } + }, + { + "meta": { + "serviceType": "nrdbms", + "taskType": "restore", + "taskId": "-task-0-20260417-181053" + }, + "dummy": { + "checkSQL": false, + "checkCSV": false, + "checkTXT": false, + "checkPNG": false, + "checkGIF": false, + "checkZIP": false, + "checkJSON": false, + "checkXML": false, + "checkServerJSON": false, + "checkServerSQL": false, + "sizeSQL": "", + "sizeCSV": "", + "sizeTXT": "", + "sizePNG": "", + "sizeGIF": "", + "sizeZIP": "", + "sizeJSON": "", + "sizeXML": "", + "sizeServerJSON": "", + "sizeServerSQL": "" + }, + "sourcePoint": { + "provider": "", + "region": "", + "credentialId": 0, + "path": "/Users/geontae/workspaces/namutech/mc-data-manager/data/backup", + "bucket": "", + "endpoint": "", + "host": "", + "port": "", + "username": "", + "password": "", + "databaseName": "", + "databaseId": "", + "projectId": "" + }, + "targetPoint": { + "provider": "alibaba", + "region": "ap-northeast-1", + "credentialId": 1, + "path": "", + "bucket": "", + "endpoint": "https://oss-ap-northeast-1.aliyuncs.com", + "host": "dds-mj76dbe602390fa41311-pub.mongodb.ap-northeast-2.rds.aliyuncs.com", + "port": "3717", + "username": "root", + "password": "qwe123!@#", + "databaseName": "databases", + "databaseId": "", + "projectId": "" + } + }, + { + "meta": { + "serviceType": "nrdbms", + "taskType": "migrate", + "taskId": "-task-0-20260417-181209" + }, + "dummy": { + "checkSQL": false, + "checkCSV": false, + "checkTXT": false, + "checkPNG": false, + "checkGIF": false, + "checkZIP": false, + "checkJSON": false, + "checkXML": false, + "checkServerJSON": false, + "checkServerSQL": false, + "sizeSQL": "", + "sizeCSV": "", + "sizeTXT": "", + "sizePNG": "", + "sizeGIF": "", + "sizeZIP": "", + "sizeJSON": "", + "sizeXML": "", + "sizeServerJSON": "", + "sizeServerSQL": "" + }, + "sourcePoint": { + "provider": "alibaba", + "region": "", + "credentialId": 1, + "path": "", + "bucket": "", + "endpoint": "", + "host": "dds-mj76dbe602390fa41311-pub.mongodb.ap-northeast-2.rds.aliyuncs.com", + "port": "3717", + "username": "root", + "password": "qwe123!@#", + "databaseName": "databases", + "databaseId": "", + "projectId": "" + }, + "targetPoint": { + "provider": "alibaba", + "region": "", + "credentialId": 1, + "path": "", + "bucket": "", + "endpoint": "", + "host": "dds-mj76dbe602390fa41311-pub.mongodb.ap-northeast-2.rds.aliyuncs.com", + "port": "3717", + "username": "root", + "password": "qwe123!@#", + "databaseName": "databases2", + "databaseId": "", + "projectId": "" + } + }, + { + "meta": { + "serviceType": "nrdbms", + "taskType": "generate", + "taskId": "-task-0-20260424-141931" + }, + "dummy": { + "dummyPath": "/tmp/datamold-dummy893324966", + "checkSQL": false, + "checkCSV": false, + "checkTXT": false, + "checkPNG": false, + "checkGIF": false, + "checkZIP": false, + "checkJSON": false, + "checkXML": false, + "checkServerJSON": true, + "checkServerSQL": false, + "sizeSQL": "", + "sizeCSV": "", + "sizeTXT": "", + "sizePNG": "", + "sizeGIF": "", + "sizeZIP": "", + "sizeJSON": "", + "sizeXML": "", + "sizeServerJSON": "1", + "sizeServerSQL": "" + }, + "sourcePoint": { + "provider": "", + "region": "", + "credentialId": 0, + "path": "", + "bucket": "", + "endpoint": "", + "host": "", + "port": "", + "username": "", + "password": "", + "databaseName": "", + "databaseId": "", + "projectId": "" + }, + "targetPoint": { + "provider": "alibaba", + "region": "", + "credentialId": 1, + "path": "", + "bucket": "", + "endpoint": "", + "host": "10.10.3.62", + "port": "27017", + "username": "root", + "password": "qwe123!@#", + "databaseName": "databases", + "databaseId": "", + "projectId": "" + } + }, + { + "meta": { + "serviceType": "nrdbms", + "taskType": "backup", + "taskId": "-task-0-20260424-143041" + }, + "dummy": { + "checkSQL": false, + "checkCSV": false, + "checkTXT": false, + "checkPNG": false, + "checkGIF": false, + "checkZIP": false, + "checkJSON": false, + "checkXML": false, + "checkServerJSON": false, + "checkServerSQL": false, + "sizeSQL": "", + "sizeCSV": "", + "sizeTXT": "", + "sizePNG": "", + "sizeGIF": "", + "sizeZIP": "", + "sizeJSON": "", + "sizeXML": "", + "sizeServerJSON": "", + "sizeServerSQL": "" + }, + "sourcePoint": { + "provider": "alibaba", + "region": "", + "credentialId": 1, + "path": "", + "bucket": "", + "endpoint": "https://oss-ap-northeast-1.aliyuncs.com", + "host": "10.10.3.63", + "port": "27017", + "username": "root", + "password": "qwe123!@#", + "databaseName": "databases", + "databaseId": "", + "projectId": "" + }, + "targetPoint": { + "provider": "", + "region": "", + "credentialId": 0, + "path": "/Users/geontae/workspaces/namutech/mc-data-manager/data/backup", + "bucket": "", + "endpoint": "", + "host": "", + "port": "", + "username": "", + "password": "", + "databaseName": "", + "databaseId": "", + "projectId": "" + } + }, + { + "meta": { + "serviceType": "nrdbms", + "taskType": "generate", + "taskId": "-task-0-20260424-143215" + }, + "dummy": { + "dummyPath": "/var/folders/8d/dykp7pg5693fhf5_72hr16540000gn/T/datamold-dummy1848932528", + "checkSQL": false, + "checkCSV": false, + "checkTXT": false, + "checkPNG": false, + "checkGIF": false, + "checkZIP": false, + "checkJSON": false, + "checkXML": false, + "checkServerJSON": true, + "checkServerSQL": false, + "sizeSQL": "", + "sizeCSV": "", + "sizeTXT": "", + "sizePNG": "", + "sizeGIF": "", + "sizeZIP": "", + "sizeJSON": "", + "sizeXML": "", + "sizeServerJSON": "1", + "sizeServerSQL": "" + }, + "sourcePoint": { + "provider": "", + "region": "", + "credentialId": 0, + "path": "", + "bucket": "", + "endpoint": "", + "host": "", + "port": "", + "username": "", + "password": "", + "databaseName": "", + "databaseId": "", + "projectId": "" + }, + "targetPoint": { + "provider": "alibaba", + "region": "ap-northeast-1", + "credentialId": 1, + "path": "", + "bucket": "", + "endpoint": "https://oss-ap-northeast-1.aliyuncs.com", + "host": "10.10.3.62", + "port": "27017", + "username": "root", + "password": "qwe123!@#", + "databaseName": "databases", + "databaseId": "", + "projectId": "" + } + }, + { + "meta": { + "serviceType": "nrdbms", + "taskType": "backup", + "taskId": "-task-0-20260424-143250" + }, + "dummy": { + "checkSQL": false, + "checkCSV": false, + "checkTXT": false, + "checkPNG": false, + "checkGIF": false, + "checkZIP": false, + "checkJSON": false, + "checkXML": false, + "checkServerJSON": false, + "checkServerSQL": false, + "sizeSQL": "", + "sizeCSV": "", + "sizeTXT": "", + "sizePNG": "", + "sizeGIF": "", + "sizeZIP": "", + "sizeJSON": "", + "sizeXML": "", + "sizeServerJSON": "", + "sizeServerSQL": "" + }, + "sourcePoint": { + "provider": "alibaba", + "region": "", + "credentialId": 1, + "path": "", + "bucket": "", + "endpoint": "https://oss-ap-northeast-1.aliyuncs.com", + "host": "10.10.3.63", + "port": "27017", + "username": "root", + "password": "qwe123!@#", + "databaseName": "databases", + "databaseId": "", + "projectId": "" + }, + "targetPoint": { + "provider": "", + "region": "", + "credentialId": 0, + "path": "/Users/geontae/workspaces/namutech/mc-data-manager/data/backup", + "bucket": "", + "endpoint": "", + "host": "", + "port": "", + "username": "", + "password": "", + "databaseName": "", + "databaseId": "", + "projectId": "" } } ], diff --git a/docker-compose.yaml b/docker-compose.yaml index 61164e4..a06f05f 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -67,6 +67,24 @@ services: - MYSQL_ROOT_PASSWORD=mcmp + mc-data-manager-mongodb: + image: mongo:6 + container_name: mc-data-manager-mongodb + ports: + - "27017:27017" + environment: + - MONGO_INITDB_ROOT_USERNAME=root + - MONGO_INITDB_ROOT_PASSWORD=qwe123!@# + - MONGO_INITDB_DATABASE=databases + volumes: + - ./data/mongodb:/data/db + restart: always + healthcheck: + test: ["CMD", "mongosh", "--eval", "db.adminCommand('ping')"] + interval: 30s + timeout: 10s + retries: 3 + ################## ## OPTIONAL ## ################## diff --git a/internal/auth/base.go b/internal/auth/base.go index 7e92f66..8d1c15a 100644 --- a/internal/auth/base.go +++ b/internal/auth/base.go @@ -27,12 +27,16 @@ import ( "github.com/cloud-barista/mc-data-manager/config" "github.com/cloud-barista/mc-data-manager/models" + "github.com/cloud-barista/mc-data-manager/pkg/nrdbms/alibabamgdb" "github.com/cloud-barista/mc-data-manager/pkg/nrdbms/awsdnmdb" "github.com/cloud-barista/mc-data-manager/pkg/nrdbms/gcpfsdb" "github.com/cloud-barista/mc-data-manager/pkg/nrdbms/ncpmgdb" "github.com/cloud-barista/mc-data-manager/pkg/objectstorage/alibabafs" "github.com/cloud-barista/mc-data-manager/pkg/objectstorage/gcpfs" + "github.com/cloud-barista/mc-data-manager/pkg/objectstorage/ibmfs" + "github.com/cloud-barista/mc-data-manager/pkg/objectstorage/ktfs" "github.com/cloud-barista/mc-data-manager/pkg/objectstorage/s3fs" + "github.com/cloud-barista/mc-data-manager/pkg/objectstorage/tencentfs" "github.com/cloud-barista/mc-data-manager/pkg/rdbms/mysql" "github.com/cloud-barista/mc-data-manager/service/nrdbc" "github.com/cloud-barista/mc-data-manager/service/osc" @@ -113,20 +117,24 @@ func PreRun(task string, datamoldParams *models.CommandTask, use string) { func GetOS(params *models.ProviderConfig) (*osc.OSController, error) { var OSC *osc.OSController - // log.Info().Str("ProfileName", params.ProfileName).Msg("GetOS") log.Info().Int64("CredentialId", params.CredentialId).Msg("GetOS") log.Info().Str("Provider", params.Provider).Msg("GetOS") - log.Info().Msg("Get Credential") - credentailManager := config.NewAuthManager() - // creds, err := credentailManger.LoadCredentialsByProfile(params.ProfileName, params.Provider) - creds, err := credentailManager.LoadCredentialsById(uint64(params.CredentialId)) - if err != nil { - log.Error().Err(err).Msg("credential load failed") - return nil, err + + loadCreds := func() (interface{}, error) { + creds, err := config.NewAuthManager().LoadCredentialsById(uint64(params.CredentialId)) + if err != nil { + log.Error().Err(err).Msg("credential load failed") + } + return creds, err } + var err error switch params.Provider { case "aws": + creds, cerr := loadCreds() + if cerr != nil { + return nil, cerr + } awsc, ok := creds.(models.AWSCredentials) if !ok { return nil, errors.New("credential load failed") @@ -146,6 +154,10 @@ func GetOS(params *models.ProviderConfig) (*osc.OSController, error) { return nil, fmt.Errorf("osc error : %v", err) } case "gcp": + creds, cerr := loadCreds() + if cerr != nil { + return nil, cerr + } gcpc, ok := creds.(models.GCPCredentials) if !ok { return nil, errors.New("credential load failed") @@ -170,6 +182,10 @@ func GetOS(params *models.ProviderConfig) (*osc.OSController, error) { return nil, fmt.Errorf("osc error : %v", err) } case "ncp": + creds, cerr := loadCreds() + if cerr != nil { + return nil, cerr + } ncpc, ok := creds.(models.NCPCredentials) if !ok { return nil, errors.New("credential load failed") @@ -189,21 +205,45 @@ func GetOS(params *models.ProviderConfig) (*osc.OSController, error) { return nil, fmt.Errorf("osc error : %v", err) } case "alibaba": + creds, cerr := loadCreds() + if cerr != nil { + return nil, cerr + } alibabac, ok := creds.(models.AlibabaCredentials) if !ok { return nil, errors.New("credential load failed") } - log.Info().Str("Endpoint", params.Endpoint).Msg("Alibaba Credentials") log.Info().Str("Region", params.Region).Msg("Alibaba Region") log.Info().Str("AccessKey", alibabac.AccessKey).Msg("Alibaba Credentials") log.Info().Str("SecretKey", alibabac.SecretKey).Msg("Alibaba Credentials") log.Info().Str("BucketName", params.Bucket).Msg("Alibaba BucketName") - ossc, err := config.NewAlibabaClient(params.Endpoint, params.Region, alibabac.AccessKey, alibabac.SecretKey) + ossc, err := config.NewAlibabaClient(params.Region, alibabac.AccessKey, alibabac.SecretKey) if err != nil { return nil, fmt.Errorf("NewAlibabaClient error : %v", err) } - OSC, err = osc.New(alibabafs.New(models.ALIBABA, ossc, params.Endpoint, params.Bucket, params.Region)) + OSC, err = osc.New(alibabafs.New(models.ALIBABA, ossc, "https://oss-"+params.Region+".aliyuncs.com", params.Bucket, params.Region)) + if err != nil { + return nil, fmt.Errorf("osc error : %v", err) + } + case "ibm": + log.Info().Str("Region", params.Region).Msg("IBM Region") + log.Info().Str("BucketName", params.Bucket).Msg("IBM BucketName") + OSC, err = osc.New(ibmfs.New(models.IBM, params.Bucket, params.Region)) + if err != nil { + return nil, fmt.Errorf("osc error : %v", err) + } + case "kt": + log.Info().Str("Region", params.Region).Msg("KT Region") + log.Info().Str("BucketName", params.Bucket).Msg("KT BucketName") + OSC, err = osc.New(ktfs.New(models.KT, params.Bucket, params.Region)) + if err != nil { + return nil, fmt.Errorf("osc error : %v", err) + } + case "tencent": + log.Info().Str("Region", params.Region).Msg("Tencent Region") + log.Info().Str("BucketName", params.Bucket).Msg("Tencent BucketName") + OSC, err = osc.New(tencentfs.New(models.TENCENT, params.Bucket, params.Region)) if err != nil { return nil, fmt.Errorf("osc error : %v", err) } @@ -306,6 +346,24 @@ func GetNRDMS(params *models.ProviderConfig) (*nrdbc.NRDBController, error) { if err != nil { return nil, err } + case "alibaba": + log.Info().Str("Username", params.User).Msg("Alibaba Credentials") + log.Info().Str("Password", params.Password).Msg("Alibaba Credentials") + log.Info().Str("Host", params.Host).Msg("Alibaba Host") + log.Info().Str("Port", params.Port).Msg("Alibaba Port") + port, err := strconv.Atoi(params.Port) + if err != nil { + return nil, err + } + + alibabanrdb, err := config.NewAlibabaMongoDBClient(params.User, params.Password, params.Host, port) + if err != nil { + return nil, err + } + NRDBC, err = nrdbc.New(alibabamgdb.New(alibabanrdb, params.DatabaseName)) + if err != nil { + return nil, err + } } return NRDBC, nil } diff --git a/internal/auth/rdb.go b/internal/auth/rdb.go index 7b59794..c9fcebc 100644 --- a/internal/auth/rdb.go +++ b/internal/auth/rdb.go @@ -137,7 +137,7 @@ func MigrationRDMFunc(datamoldParams *models.CommandTask) error { } log.Info().Msgf("Launch RDBController Copy") - if err := srcRDBC.Copy(dstRDBC); err != nil { + if err := srcRDBC.Copy(dstRDBC, datamoldParams.SourcePoint.DatabaseName); err != nil { log.Error().Msgf("Copy error copying into rdbms : %v", err) return err } diff --git a/models/bagicResponse.go b/models/bagicResponse.go index bbcf76b..5ab65c2 100644 --- a/models/bagicResponse.go +++ b/models/bagicResponse.go @@ -31,6 +31,9 @@ type BasicPageResponse struct { GCPRegions []string `json:"GCPRegions"` NCPRegions []string `json:"NCPRegions"` ALIBABARegions []string `json:"ALIBABARegions"` + IBMRegions []string `json:"IBMRegions"` + KTRegions []string `json:"KTRegions"` + TencentRegions []string `json:"TencentRegions"` } type BasicResponse struct { diff --git a/models/basicProfile.go b/models/basicProfile.go index a492277..ec94582 100644 --- a/models/basicProfile.go +++ b/models/basicProfile.go @@ -39,6 +39,9 @@ type ProfileCredentials struct { NCP NCPCredentials `json:"ncp,omitempty"` GCP GCPCredentials `json:"gcp,omitempty"` ALIBABA AlibabaCredentials `json:"alibaba,omitempty"` + IBM IBMCredentials `json:"ibm,omitempty"` + KT KTCredentials `json:"kt,omitempty"` + TENCENT TencentCredentials `json:"tencent,omitempty"` } type AWSCredentials struct { @@ -70,6 +73,28 @@ type AlibabaCredentials struct { SecretKey string `json:"secretKey" form:"secretKey"` } +type IBMCredentials struct { + ApiKey string `json:"apiKey" form:"apiKey"` + S3AccessKey string `json:"s3AccessKey,omitempty" form:"s3AccessKey"` + S3SecretKey string `json:"s3SecretKey,omitempty" form:"s3SecretKey"` +} + +const KTIdentityEndpoint = "https://api.ucloudbiz.olleh.com/d1/identity/v3/" + +type KTCredentials struct { + Username string `json:"username" form:"username"` + Password string `json:"password" form:"password"` + DomainName string `json:"domainName" form:"domainName"` + ProjectID string `json:"projectID" form:"projectID"` + S3AccessKey string `json:"s3AccessKey,omitempty" form:"s3AccessKey"` + S3SecretKey string `json:"s3SecretKey,omitempty" form:"s3SecretKey"` +} + +type TencentCredentials struct { + SecretId string `json:"secretId" form:"secretId"` + SecretKey string `json:"secretKey" form:"secretKey"` +} + type GCPCredentalCreateParams struct { GCPCredentialJson string `form:"gcpCredentialJson" json:"gcpCredentialJson"` GCPCredential *multipart.FileHeader `form:"gcpCredential" json:"-" swaggerignore:"true"` diff --git a/models/credential.go b/models/credential.go index ae47b89..6a51dd1 100644 --- a/models/credential.go +++ b/models/credential.go @@ -88,6 +88,42 @@ func (cr *CredentialCreateRequest) GetCredential() (string, error) { b, _ := json.Marshal(alibaba) return string(b), nil + case "ibm": + var ibm IBMCredentials + if err := json.Unmarshal(cr.CredentialJson, &ibm); err != nil { + return "", fmt.Errorf("invalid ibm credential json: %w", err) + } + + b, _ := json.Marshal(ibm) + return string(b), nil + + case "kt": + var kt KTCredentials + if err := json.Unmarshal(cr.CredentialJson, &kt); err != nil { + return "", fmt.Errorf("invalid kt credential json: %w", err) + } + + out := map[string]string{ + "identityEndpoint": KTIdentityEndpoint, + "username": kt.Username, + "password": kt.Password, + "domainName": kt.DomainName, + "projectID": kt.ProjectID, + "s3AccessKey": kt.S3AccessKey, + "s3SecretKey": kt.S3SecretKey, + } + b, _ := json.Marshal(out) + return string(b), nil + + case "tencent": + var tencent TencentCredentials + if err := json.Unmarshal(cr.CredentialJson, &tencent); err != nil { + return "", fmt.Errorf("invalid tencent credential json: %w", err) + } + + b, _ := json.Marshal(tencent) + return string(b), nil + default: return "", fmt.Errorf("unsupported cspType: %q", cr.CspType) } diff --git a/models/enums.go b/models/enums.go index d3c3817..531222b 100644 --- a/models/enums.go +++ b/models/enums.go @@ -24,6 +24,9 @@ const ( NCP Provider = "ncp" OPM Provider = "on-premise" ALIBABA Provider = "alibaba" + IBM Provider = "ibm" + KT Provider = "kt" + TENCENT Provider = "tencent" ) // Service type diff --git a/models/objectStorage.go b/models/objectStorage.go index e77c8d4..1e3cd8b 100644 --- a/models/objectStorage.go +++ b/models/objectStorage.go @@ -23,21 +23,43 @@ type BucketListResponse struct { Buckets []Bucket `json:"buckets"` } -type ListAllMyBucketsResult struct { - Owner Owner `json:"Owner"` - Buckets struct { - Bucket []Bucket `json:"Bucket"` - } `json:"Buckets"` +// ObjectStorageListResponse is the response body for GET /ns/{nsId}/resources/objectStorage +type ObjectStorageListResponse struct { + ObjectStorage []ObjectStorage `json:"objectStorage"` } -// 변환 후 구조 -type SimpleBuckets struct { - Buckets []Bucket `json:"Buckets"` +// ObjectStorage represents a single object storage resource returned by CB-Tumblebug +type ObjectStorage struct { + ResourceType string `json:"resourceType"` + ID string `json:"id"` + UID string `json:"uid"` + CspResourceName string `json:"cspResourceName"` + CspResourceId string `json:"cspResourceId"` + ConnectionName string `json:"connectionName"` + ConnectionConfig ConnectionConfig `json:"connectionConfig"` + Description string `json:"description"` + Status string `json:"status"` + Conditions []Condition `json:"conditions"` + Name string `json:"name"` + MaxKeys int `json:"maxKeys"` + Contents []Content `json:"contents"` } -type Owner struct { - ID string `json:"ID"` - DisplayName string `json:"DisplayName"` +// Condition represents a status condition of an object storage resource +type Condition struct { + Type string `json:"type"` + Status string `json:"status"` + Reason string `json:"reason"` + LastTransitionTime string `json:"lastTransitionTime"` +} + +// Content represents a single object within an object storage bucket +type Content struct { + ETag string `json:"eTag"` + Key string `json:"key"` + LastModified time.Time `json:"lastModified"` + Size int64 `json:"size"` + StorageClass string `json:"storageClass"` } type Bucket struct { @@ -45,23 +67,6 @@ type Bucket struct { CreationDate string `json:"CreationDate"` } -type ListBucketResult struct { - Name string `json:"Name"` - Prefix string `json:"Prefix"` - Marker string `json:"Marker"` - MaxKeys int `json:"MaxKeys"` - IsTruncated bool `json:"IsTruncated"` - Contents []Contents `json:"Contents"` -} - -type Contents struct { - Key string `json:"Key"` - LastModified time.Time `json:"LastModified"` - ETag string `json:"ETag"` - Size int64 `json:"Size"` - StorageClass string `json:"StorageClass"` -} - type DeleteRequest struct { XMLName xml.Name `xml:"Delete"` XMLNS string `xml:"xmlns,attr"` diff --git a/models/region.go b/models/region.go index f9158dd..759fe5a 100644 --- a/models/region.go +++ b/models/region.go @@ -40,3 +40,8 @@ type Location struct { Latitude float64 `json:"latitude"` Longitude float64 `json:"longitude"` } + +// ProviderRegionList is the response from /tumblebug/provider/:providerName/region +type ProviderRegionList struct { + Regions []RegionDetail `json:"regions"` +} diff --git a/pkg/nrdbms/alibabamgdb/alibabamgdb.go b/pkg/nrdbms/alibabamgdb/alibabamgdb.go new file mode 100644 index 0000000..88d391e --- /dev/null +++ b/pkg/nrdbms/alibabamgdb/alibabamgdb.go @@ -0,0 +1,108 @@ +/* +Copyright 2023 The Cloud-Barista Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package alibabamgdb + +import ( + "context" + + "github.com/cloud-barista/mc-data-manager/models" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" +) + +type AlibabaMongoDBMS struct { + provider models.Provider + dbName string + + client *mongo.Client + db *mongo.Database + ctx context.Context +} + +type AlibabaMongoDBOption func(*AlibabaMongoDBMS) + +func New(client *mongo.Client, databaseName string, opts ...AlibabaMongoDBOption) *AlibabaMongoDBMS { + dms := &AlibabaMongoDBMS{ + provider: models.ALIBABA, + dbName: databaseName, + client: client, + ctx: context.TODO(), + db: client.Database(databaseName), + } + + for _, opt := range opts { + opt(dms) + } + + return dms +} + +// list table +func (a *AlibabaMongoDBMS) ListTables() ([]string, error) { + return a.db.ListCollectionNames(a.ctx, bson.D{}) +} + +// delete table +func (a *AlibabaMongoDBMS) DeleteTables(tableName string) error { + return a.client.Database(a.dbName).Collection(tableName).Drop(a.ctx) +} + +// create table +func (a *AlibabaMongoDBMS) CreateTable(tableName string) error { + _, err := a.db.Collection(tableName).InsertOne(a.ctx, map[string]interface{}{}) + if err != nil { + return err + } + + _, err = a.db.Collection(tableName).DeleteOne(a.ctx, map[string]interface{}{}) + return err +} + +// import table +func (a *AlibabaMongoDBMS) ImportTable(tableName string, srcData *[]map[string]interface{}) error { + for _, data := range *srcData { + _, err := a.db.Collection(tableName).InsertOne(a.ctx, data) + if err != nil { + return err + } + } + return nil +} + +// export table +func (a *AlibabaMongoDBMS) ExportTable(tableName string, dstData *[]map[string]interface{}) error { + cursor, err := a.db.Collection(tableName).Find(a.ctx, map[string]interface{}{}) + if err != nil { + return err + } + defer cursor.Close(a.ctx) + + for cursor.Next(a.ctx) { + var result map[string]interface{} + err := cursor.Decode(&result) + if err != nil { + return err + } + + if oid, ok := result["_id"].(primitive.ObjectID); ok { + result["_id"] = oid.Hex() + } + + *dstData = append(*dstData, result) + } + return nil +} diff --git a/pkg/objectstorage/alibabafs/alibabafs.go b/pkg/objectstorage/alibabafs/alibabafs.go index 6be0408..33cdfee 100644 --- a/pkg/objectstorage/alibabafs/alibabafs.go +++ b/pkg/objectstorage/alibabafs/alibabafs.go @@ -18,6 +18,7 @@ package alibabafs import ( "context" "encoding/json" + "encoding/xml" "errors" "fmt" "io" @@ -66,25 +67,23 @@ func (w *ossWriter) Close() error { // CreateBucket will provision a bucket if it is not already present. func (f *AlibabaFS) CreateBucket() error { - path := "/tumblebug/resources/objectStorage/" + f.bucketName - method := http.MethodHead + nsId := utils.GetNsId() connName := fmt.Sprintf("%s-%s", f.provider, f.region) - _, err := utils.RequestTumblebug(path, method, connName, nil) - if err != nil { - path = "/tumblebug/resources/objectStorage/" + f.bucketName - method = http.MethodPut - - _, err := utils.RequestTumblebug(path, method, connName, nil) - if err != nil { - fmt.Println("create error: ", err.Error()) - return err - } - + headPath := "/tumblebug/ns/" + nsId + "/resources/objectStorage/" + f.bucketName + _, err := utils.RequestTumblebug(headPath, http.MethodHead, connName, nil) + if err == nil { return nil } + + createBody := []byte(fmt.Sprintf(`{"bucketName":"%s","connectionName":"%s"}`, f.bucketName, connName)) + createPath := "/tumblebug/ns/" + nsId + "/resources/objectStorage" + _, err = utils.RequestTumblebug(createPath, http.MethodPut, connName, createBody) + if err != nil { + fmt.Println("create error: ", err.Error()) + return err + } return nil - // return err } // DeleteBucket removes all objects in a bucket and deletes the bucket itself. @@ -121,7 +120,8 @@ func (f *AlibabaFS) DeleteBucket() error { } // Delete the bucket - path := "/tumblebug/resources/objectStorage/" + f.bucketName + nsId := utils.GetNsId() + path := "/tumblebug/ns/" + nsId + "/resources/objectStorage/" + f.bucketName method := http.MethodDelete connName := fmt.Sprintf("%s-%s", f.provider, f.region) @@ -135,8 +135,30 @@ func (f *AlibabaFS) DeleteBucket() error { // deleteObjectBatch deletes objects in manageable chunks. func (f *AlibabaFS) deleteObjectBatch(keys []string) error { - // TODO: marshal keys into the request body and invoke the OpenAPI client. - return ErrNotImplemented + nsId := utils.GetNsId() + path := "/tumblebug/ns/" + nsId + "/resources/objectStorage/" + f.bucketName + "?delete=true" + method := http.MethodPost + connName := fmt.Sprintf("%s-%s", f.provider, f.region) + + deleteReq := models.DeleteRequest{ + XMLNS: "http://s3.amazonaws.com/doc/2006-03-01/", + } + for _, key := range keys { + deleteReq.Objects = append(deleteReq.Objects, models.S3Object{Key: key}) + } + // 보기 좋게 들여쓰기된 XML 생성 + output, err := xml.MarshalIndent(deleteReq, "", " ") + if err != nil { + return err + } + + // XML 헤더 추가 + _, rerr := utils.RequestTumblebug(path, method, connName, []byte(xml.Header+string(output))) + if rerr != nil { + return err + } + + return nil } // ObjectList yields the objects contained within the configured bucket. @@ -155,7 +177,8 @@ func (f *AlibabaFS) ObjectListWithFilter(flt *filtering.ObjectFilter) ([]*models // query = &storage.Query{Prefix: pre} // } - path := "/tumblebug/resources/objectStorage/" + f.bucketName + nsId := utils.GetNsId() + path := "/tumblebug/ns/" + nsId + "/resources/objectStorage/" + f.bucketName method := http.MethodGet connName := fmt.Sprintf("%s-%s", f.provider, f.region) @@ -164,7 +187,7 @@ func (f *AlibabaFS) ObjectListWithFilter(flt *filtering.ObjectFilter) ([]*models return nil, err } - var resp models.ListBucketResult + var resp models.ObjectStorage if err := json.Unmarshal(result, &resp); err != nil { fmt.Println("error: ", err.Error()) return []*models.Object{}, fmt.Errorf("failed to get objects: %w", err) @@ -201,7 +224,8 @@ func (f *AlibabaFS) ObjectListWithFilter(flt *filtering.ObjectFilter) ([]*models // BucketList returns all buckets that are available for the configured account. func (f *AlibabaFS) BucketList() ([]models.Bucket, error) { - path := "/tumblebug/resources/objectStorage" + nsId := utils.GetNsId() + path := "/tumblebug/ns/" + nsId + "/resources/objectStorage" method := http.MethodGet connName := fmt.Sprintf("%s-%s", f.provider, f.region) @@ -211,32 +235,41 @@ func (f *AlibabaFS) BucketList() ([]models.Bucket, error) { } // Parse the response to extract public key and token ID - var res models.ListAllMyBucketsResult + var res models.ObjectStorageListResponse if err := json.Unmarshal(body, &res); err != nil { fmt.Println("error: ", err.Error()) return []models.Bucket{}, fmt.Errorf("failed to get buckets: %w", err) } - // 버킷이 비어 있으면 빈 리스트 반환 - if res.Buckets.Bucket == nil { - return []models.Bucket{}, nil + buckets := make([]models.Bucket, 0, len(res.ObjectStorage)) + for _, os := range res.ObjectStorage { + buckets = append(buckets, models.Bucket{ + Name: os.Name, + }) } - - return res.Buckets.Bucket, nil + return buckets, nil } // Open streams a single object from Alibaba Cloud OSS. func (f *AlibabaFS) Open(name string) (io.ReadCloser, error) { - // TODO: wire Alibaba download stream into an io.Reader. - return nil, ErrNotImplemented + ctx := f.ctx + if ctx == nil { + ctx = context.Background() + } + + result, err := f.client.GetObject(ctx, &oss.GetObjectRequest{ + Bucket: oss.Ptr(f.bucketName), + Key: oss.Ptr(name), + }) + if err != nil { + return nil, err + } + + return result.Body, nil } // Create opens a writer that uploads an object to the configured bucket. func (f *AlibabaFS) Create(name string) (io.WriteCloser, error) { - if f.client == nil { - return nil, fmt.Errorf("alibaba oss client is not configured") - } - ctx := f.ctx if ctx == nil { ctx = context.Background() diff --git a/pkg/objectstorage/gcpfs/gcpfs.go b/pkg/objectstorage/gcpfs/gcpfs.go index 15b0aa5..92101ad 100644 --- a/pkg/objectstorage/gcpfs/gcpfs.go +++ b/pkg/objectstorage/gcpfs/gcpfs.go @@ -43,23 +43,22 @@ type GCPfs struct { // Creating a Bucket func (f *GCPfs) CreateBucket() error { - path := "/tumblebug/resources/objectStorage/" + f.bucketName - method := http.MethodHead + nsId := utils.GetNsId() connName := fmt.Sprintf("%s-%s", f.provider, f.region) - _, err := utils.RequestTumblebug(path, method, connName, nil) - if err != nil { - path = "/tumblebug/resources/objectStorage/" + f.bucketName - method = http.MethodPut - - _, err := utils.RequestTumblebug(path, method, connName, nil) - if err != nil { - fmt.Println("create error: ", err.Error()) - return err - } - + headPath := "/tumblebug/ns/" + nsId + "/resources/objectStorage/" + f.bucketName + _, err := utils.RequestTumblebug(headPath, http.MethodHead, connName, nil) + if err == nil { return nil } + + createBody := []byte(fmt.Sprintf(`{"bucketName":"%s","connectionName":"%s"}`, f.bucketName, connName)) + createPath := "/tumblebug/ns/" + nsId + "/resources/objectStorage" + _, err = utils.RequestTumblebug(createPath, http.MethodPut, connName, createBody) + if err != nil { + fmt.Println("create error: ", err.Error()) + return err + } return nil } @@ -99,7 +98,8 @@ func (f *GCPfs) DeleteBucket() error { } // Delete the bucket - path := "/tumblebug/resources/objectStorage/" + f.bucketName + nsId := utils.GetNsId() + path := "/tumblebug/ns/" + nsId + "/resources/objectStorage/" + f.bucketName method := http.MethodDelete connName := fmt.Sprintf("%s-%s", f.provider, f.region) @@ -113,7 +113,8 @@ func (f *GCPfs) DeleteBucket() error { // deleteObjectBatch deletes a batch of objects func (f *GCPfs) deleteObjectBatch(keys []string) error { - path := "/tumblebug/resources/objectStorage/" + f.bucketName + "?delete=true" + nsId := utils.GetNsId() + path := "/tumblebug/ns/" + nsId + "/resources/objectStorage/" + f.bucketName + "?delete=true" method := http.MethodPost connName := fmt.Sprintf("%s-%s", f.provider, f.region) @@ -167,7 +168,8 @@ func (f *GCPfs) ObjectListWithFilter(flt *filtering.ObjectFilter) ([]*models.Obj // query = &storage.Query{Prefix: pre} // } - path := "/tumblebug/resources/objectStorage/" + f.bucketName + nsId := utils.GetNsId() + path := "/tumblebug/ns/" + nsId + "/resources/objectStorage/" + f.bucketName method := http.MethodGet connName := fmt.Sprintf("%s-%s", f.provider, f.region) @@ -176,7 +178,7 @@ func (f *GCPfs) ObjectListWithFilter(flt *filtering.ObjectFilter) ([]*models.Obj return nil, err } - var resp models.ListBucketResult + var resp models.ObjectStorage if err := json.Unmarshal(result, &resp); err != nil { fmt.Println("error: ", err.Error()) return []*models.Object{}, fmt.Errorf("failed to get objects: %w", err) @@ -226,7 +228,8 @@ func New(client *storage.Client, projectID, bucketName string, region string) *G } func (f *GCPfs) BucketList() ([]models.Bucket, error) { - path := "/tumblebug/resources/objectStorage" + nsId := utils.GetNsId() + path := "/tumblebug/ns/" + nsId + "/resources/objectStorage" method := http.MethodGet connName := fmt.Sprintf("%s-%s", f.provider, f.region) @@ -236,16 +239,17 @@ func (f *GCPfs) BucketList() ([]models.Bucket, error) { } // Parse the response to extract public key and token ID - var res models.ListAllMyBucketsResult + var res models.ObjectStorageListResponse if err := json.Unmarshal(body, &res); err != nil { fmt.Println("error: ", err.Error()) return []models.Bucket{}, fmt.Errorf("failed to get buckets: %w", err) } - // 버킷이 비어 있으면 빈 리스트 반환 - if res.Buckets.Bucket == nil { - return []models.Bucket{}, nil + buckets := make([]models.Bucket, 0, len(res.ObjectStorage)) + for _, os := range res.ObjectStorage { + buckets = append(buckets, models.Bucket{ + Name: os.Name, + }) } - - return res.Buckets.Bucket, nil + return buckets, nil } diff --git a/pkg/objectstorage/ibmfs/ibmfs.go b/pkg/objectstorage/ibmfs/ibmfs.go new file mode 100644 index 0000000..a669375 --- /dev/null +++ b/pkg/objectstorage/ibmfs/ibmfs.go @@ -0,0 +1,437 @@ +package ibmfs + +import ( + "bytes" + "context" + "encoding/json" + "encoding/xml" + "fmt" + "io" + "net/http" + "net/url" + "strings" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/feature/s3/manager" + "github.com/cloud-barista/mc-data-manager/models" + "github.com/cloud-barista/mc-data-manager/pkg/objectstorage/filtering" + "github.com/cloud-barista/mc-data-manager/pkg/utils" + "github.com/rs/zerolog/log" +) + +type reader struct { + r *io.PipeReader + ch chan error + cancel context.CancelFunc + chkClose bool +} + +func (p *reader) Read(b []byte) (int, error) { + return p.r.Read(b) +} + +func (p *reader) Close() error { + if !p.chkClose { + p.chkClose = true + return p.r.Close() + } + return nil +} + +type writer struct { + w *io.PipeWriter + ch chan error + cancel context.CancelFunc + chkClose bool +} + +func (p *writer) Write(b []byte) (int, error) { + return p.w.Write(b) +} + +func (p *writer) Close() error { + if !p.chkClose { + p.chkClose = true + _ = p.w.Close() + return <-p.ch + } + return nil +} + +type fakeWriteAt struct { + W io.Writer +} + +func (w *fakeWriteAt) WriteAt(p []byte, off int64) (n int, err error) { + return w.W.Write(p) +} + +type IBMFS struct { + provider models.Provider + bucketName string + region string + + ctx context.Context + uploader manager.Uploader + downloader manager.Downloader +} + +func New(provider models.Provider, bucketName, region string) *IBMFS { + return &IBMFS{ + provider: provider, + bucketName: bucketName, + region: region, + ctx: context.Background(), + } +} + +// Creating a Bucket +// +// Aws imposes location constraints when creating buckets +func (f *IBMFS) CreateBucket() error { + nsId := utils.GetNsId() + connName := fmt.Sprintf("%s-%s", f.provider, f.region) + + headPath := "/tumblebug/ns/" + nsId + "/resources/objectStorage/" + f.bucketName + _, err := utils.RequestTumblebug(headPath, http.MethodHead, connName, nil) + if err == nil { + return nil + } + + createBody := []byte(fmt.Sprintf(`{"bucketName":"%s","connectionName":"%s"}`, f.bucketName, connName)) + createPath := "/tumblebug/ns/" + nsId + "/resources/objectStorage" + _, err = utils.RequestTumblebug(createPath, http.MethodPut, connName, createBody) + if err != nil { + fmt.Println("create error: ", err.Error()) + return err + } + return nil +} + +// Delete Bucket +// Check and delete all objects in the bucket and delete the bucket +func (f *IBMFS) DeleteBucket() error { + objList, err := f.ObjectList() + if err != nil { + return err + } + + if len(objList) != 0 { + // Divide objectIds into batches of 1000 + const batchSize = 1000 + var objectIds []string + + for _, object := range objList { + objectIds = append(objectIds, object.Key) + + // When we reach batch size, delete objects + if len(objectIds) == batchSize { + if err := f.deleteObjectBatch(objectIds); err != nil { + return err + } + // Reset objectIds for the next batch + objectIds = []string{} + } + } + + // Delete any remaining objects + if len(objectIds) > 0 { + if err := f.deleteObjectBatch(objectIds); err != nil { + return err + } + } + } + + // Delete the bucket + nsId := utils.GetNsId() + path := "/tumblebug/ns/" + nsId + "/resources/objectStorage/" + f.bucketName + method := http.MethodDelete + connName := fmt.Sprintf("%s-%s", f.provider, f.region) + + _, err = utils.RequestTumblebug(path, method, connName, nil) + if err != nil { + return err + } + log.Info().Msg("DeleteDone") + return nil +} + +// deleteObjectBatch deletes a batch of objects +func (f *IBMFS) deleteObjectBatch(keys []string) error { + nsId := utils.GetNsId() + path := "/tumblebug/ns/" + nsId + "/resources/objectStorage/" + f.bucketName + "?delete=true" + method := http.MethodPost + connName := fmt.Sprintf("%s-%s", f.provider, f.region) + + deleteReq := models.DeleteRequest{ + XMLNS: "http://s3.amazonaws.com/doc/2006-03-01/", + } + for _, key := range keys { + deleteReq.Objects = append(deleteReq.Objects, models.S3Object{Key: key}) + } + // 보기 좋게 들여쓰기된 XML 생성 + output, err := xml.MarshalIndent(deleteReq, "", " ") + if err != nil { + return err + } + + // XML 헤더 추가 + _, rerr := utils.RequestTumblebug(path, method, connName, []byte(xml.Header+string(output))) + if rerr != nil { + return err + } + + return nil +} + +// presignedURLResponse는 Tumblebug Presigned URL API의 응답 구조체입니다. +type presignedURLResponse struct { + PresignedURL string `json:"presignedURL"` + Expires int64 `json:"expires"` + Method string `json:"method"` +} + +// Tumblebug의 Presigned URL API를 통해 오브젝트를 다운로드합니다. +// +// 기존 Open()이 AWS SDK를 직접 사용하는 것과 달리, +// 이 함수는 Tumblebug에 Presigned URL 발급을 요청한 뒤 +// 해당 URL로 HTTP GET을 수행하여 스트림을 반환합니다. +// +// POST /ns/{nsId}/resources/objectStorage/{osId}/object/{objectKey}/presignedUrl?operation=download +func (f *IBMFS) Open(name string) (io.ReadCloser, error) { + nsId := utils.GetNsId() + connName := fmt.Sprintf("%s-%s", f.provider, f.region) + + // objectKey에 슬래시 등 특수문자가 포함될 수 있으므로 path 세그먼트 단위로 인코딩합니다. + // url.PathEscape는 '/'를 인코딩하지 않으므로, 키 전체를 하나의 세그먼트로 처리하기 위해 + // url.QueryEscape 후 '+'를 '%20'으로 변환하는 방식을 사용합니다. + encodedKey := strings.NewReplacer("+", "%20").Replace(url.QueryEscape(name)) + + path := fmt.Sprintf("/tumblebug/ns/%s/resources/objectStorage/%s/object/%s/presignedUrl?operation=download&expires=3600", + nsId, f.bucketName, encodedKey) + + body, err := utils.RequestTumblebug(path, http.MethodPost, connName, nil) + if err != nil { + return nil, fmt.Errorf("openWithTumblebug: failed to generate presigned URL for %q: %w", name, err) + } + + var resp presignedURLResponse + if err := json.Unmarshal(body, &resp); err != nil { + return nil, fmt.Errorf("openWithTumblebug: failed to parse presigned URL response: %w", err) + } + if resp.PresignedURL == "" { + return nil, fmt.Errorf("openWithTumblebug: empty presigned URL returned for %q", name) + } + + log.Debug().Str("key", name).Str("presignedURL", resp.PresignedURL). + Msg("[IBMFS] openWithTumblebug: downloading via presigned URL") + + httpResp, err := http.Get(resp.PresignedURL) //nolint:noctx + if err != nil { + return nil, fmt.Errorf("openWithTumblebug: HTTP GET failed: %w", err) + } + if httpResp.StatusCode != http.StatusOK { + _ = httpResp.Body.Close() + return nil, fmt.Errorf("openWithTumblebug: unexpected status %d for %q", httpResp.StatusCode, name) + } + + return httpResp.Body, nil +} + +// tumblebugWriter는 데이터를 메모리에 버퍼링한 뒤 Close() 시점에 +// Content-Length를 명시하여 Presigned URL로 한 번에 업로드합니다. +// +// AWS S3 Presigned URL은 Transfer-Encoding: chunked를 지원하지 않으므로 +// io.Pipe 스트리밍 방식 대신 버퍼링 후 전송 방식을 사용합니다. +type tumblebugWriter struct { + buf bytes.Buffer + presignedURL string + name string + ctx context.Context + chkClose bool +} + +func (w *tumblebugWriter) Write(b []byte) (int, error) { + return w.buf.Write(b) +} + +func (w *tumblebugWriter) Close() error { + if w.chkClose { + return nil + } + w.chkClose = true + + data := w.buf.Bytes() + + req, err := http.NewRequestWithContext(w.ctx, http.MethodPut, w.presignedURL, bytes.NewReader(data)) + if err != nil { + return fmt.Errorf("createWithTumblebug: failed to create PUT request: %w", err) + } + req.ContentLength = int64(len(data)) + + log.Debug(). + Str("key", w.name). + Str("method", http.MethodPut). + Int64("contentLength", req.ContentLength). + Msg("[IBMFS] createWithTumblebug: sending PUT request") + + httpClient := &http.Client{} + httpResp, err := httpClient.Do(req) + if err != nil { + return fmt.Errorf("createWithTumblebug: PUT request failed: %w", err) + } + defer httpResp.Body.Close() + + respBody, _ := io.ReadAll(httpResp.Body) + + log.Debug(). + Str("key", w.name). + Int("statusCode", httpResp.StatusCode). + Str("responseBody", string(respBody)). + Msg("[IBMFS] createWithTumblebug: PUT response") + + if httpResp.StatusCode != http.StatusOK && httpResp.StatusCode != http.StatusNoContent { + return fmt.Errorf("createWithTumblebug: unexpected status %d for %q, body: %s", + httpResp.StatusCode, w.name, string(respBody)) + } + + log.Info().Str("key", w.name).Int("statusCode", httpResp.StatusCode). + Msg("[IBMFS] createWithTumblebug: upload succeeded") + return nil +} + +// createWithTumblebug은 Tumblebug의 Presigned URL API를 통해 오브젝트를 업로드합니다. +// +// 기존 Create()가 AWS SDK uploader를 직접 사용하는 것과 달리, +// 이 함수는 Tumblebug에 Presigned URL 발급을 요청한 뒤 +// 데이터를 버퍼링하여 Close() 시점에 Content-Length와 함께 HTTP PUT으로 전송합니다. +// +// POST /ns/{nsId}/resources/objectStorage/{osId}/object/{objectKey}/presignedUrl?operation=upload +func (f *IBMFS) Create(name string) (io.WriteCloser, error) { + nsId := utils.GetNsId() + connName := fmt.Sprintf("%s-%s", f.provider, f.region) + + encodedKey := strings.NewReplacer("+", "%20").Replace(url.QueryEscape(name)) + path := fmt.Sprintf("/tumblebug/ns/%s/resources/objectStorage/%s/object/%s/presignedUrl?operation=upload&expires=3600", + nsId, f.bucketName, encodedKey) + + body, err := utils.RequestTumblebug(path, http.MethodPost, connName, nil) + if err != nil { + return nil, fmt.Errorf("createWithTumblebug: failed to generate presigned URL for %q: %w", name, err) + } + + var resp presignedURLResponse + if err := json.Unmarshal(body, &resp); err != nil { + return nil, fmt.Errorf("createWithTumblebug: failed to parse presigned URL response: %w", err) + } + if resp.PresignedURL == "" { + return nil, fmt.Errorf("createWithTumblebug: empty presigned URL returned for %q", name) + } + + log.Debug().Str("key", name).Str("presignedURL", resp.PresignedURL). + Msg("[IBMFS] createWithTumblebug: presigned URL acquired") + + return &tumblebugWriter{ + presignedURL: resp.PresignedURL, + name: name, + ctx: f.ctx, + }, nil +} + +func (f *IBMFS) ObjectListWithFilter(flt *filtering.ObjectFilter) ([]*models.Object, error) { + log.Debug().Msg("[IBMFS] filtering") + var out []*models.Object + // var token *string + + var prefix *string + if flt != nil && flt.Path != "" { + pre := strings.TrimPrefix(flt.Path, "/") + prefix = aws.String(pre) + } + + for { + nsId := utils.GetNsId() + path := "/tumblebug/ns/" + nsId + "/resources/objectStorage/" + f.bucketName + method := http.MethodGet + connName := fmt.Sprintf("%s-%s", f.provider, f.region) + + result, err := utils.RequestTumblebug(path, method, connName, nil) + if err != nil { + return nil, err + } + + var resp models.ObjectStorage + if err := json.Unmarshal(result, &resp); err != nil { + fmt.Println("error: ", err.Error()) + return []*models.Object{}, fmt.Errorf("failed to get objects: %w", err) + } + + for _, o := range resp.Contents { + c := filtering.Candidate{ + Key: o.Key, + Size: o.Size, + LastModified: o.LastModified, + } + + log.Debug().Str("key", c.Key).Int64("size", c.Size). + Msg("[IBMFS] candidate") + + matched := filtering.MatchCandidate(flt, c) + if !matched { + if flt != nil { + log.Debug(). + Str("key", c.Key). + Str("prefix", aws.ToString(prefix)). + Strs("exact", flt.Exact). + Str("modifiedDate", c.LastModified.String()). + Msg("[IBMFS] filtered out") + } + continue + } + + out = append(out, &models.Object{ + ETag: o.ETag, + // ETag: aws.ToString(o.ETag), + Key: c.Key, + LastModified: c.LastModified, + Size: c.Size, + StorageClass: o.StorageClass, + Provider: f.provider, + }) + } + + break + } + return out, nil +} + +func (f *IBMFS) ObjectList() ([]*models.Object, error) { + return f.ObjectListWithFilter(nil) +} + +func (f *IBMFS) BucketList() ([]models.Bucket, error) { + nsId := utils.GetNsId() + path := "/tumblebug/ns/" + nsId + "/resources/objectStorage" + method := http.MethodGet + connName := fmt.Sprintf("%s-%s", f.provider, f.region) + + body, err := utils.RequestTumblebug(path, method, connName, nil) + if err != nil { + return []models.Bucket{}, fmt.Errorf("failed to get buckets: %w", err) + } + + // Parse the response to extract public key and token ID + var res models.ObjectStorageListResponse + if err := json.Unmarshal(body, &res); err != nil { + fmt.Println("error: ", err.Error()) + return []models.Bucket{}, fmt.Errorf("failed to get buckets: %w", err) + } + + buckets := make([]models.Bucket, 0, len(res.ObjectStorage)) + for _, os := range res.ObjectStorage { + buckets = append(buckets, models.Bucket{ + Name: os.Name, + }) + } + return buckets, nil +} diff --git a/pkg/objectstorage/ktfs/ktfs.go b/pkg/objectstorage/ktfs/ktfs.go new file mode 100644 index 0000000..1a67176 --- /dev/null +++ b/pkg/objectstorage/ktfs/ktfs.go @@ -0,0 +1,437 @@ +package ktfs + +import ( + "bytes" + "context" + "encoding/json" + "encoding/xml" + "fmt" + "io" + "net/http" + "net/url" + "strings" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/feature/s3/manager" + "github.com/cloud-barista/mc-data-manager/models" + "github.com/cloud-barista/mc-data-manager/pkg/objectstorage/filtering" + "github.com/cloud-barista/mc-data-manager/pkg/utils" + "github.com/rs/zerolog/log" +) + +type reader struct { + r *io.PipeReader + ch chan error + cancel context.CancelFunc + chkClose bool +} + +func (p *reader) Read(b []byte) (int, error) { + return p.r.Read(b) +} + +func (p *reader) Close() error { + if !p.chkClose { + p.chkClose = true + return p.r.Close() + } + return nil +} + +type writer struct { + w *io.PipeWriter + ch chan error + cancel context.CancelFunc + chkClose bool +} + +func (p *writer) Write(b []byte) (int, error) { + return p.w.Write(b) +} + +func (p *writer) Close() error { + if !p.chkClose { + p.chkClose = true + _ = p.w.Close() + return <-p.ch + } + return nil +} + +type fakeWriteAt struct { + W io.Writer +} + +func (w *fakeWriteAt) WriteAt(p []byte, off int64) (n int, err error) { + return w.W.Write(p) +} + +type KTFS struct { + provider models.Provider + bucketName string + region string + + ctx context.Context + uploader manager.Uploader + downloader manager.Downloader +} + +func New(provider models.Provider, bucketName, region string) *KTFS { + return &KTFS{ + provider: provider, + bucketName: bucketName, + region: region, + ctx: context.Background(), + } +} + +// Creating a Bucket +// +// Aws imposes location constraints when creating buckets +func (f *KTFS) CreateBucket() error { + nsId := utils.GetNsId() + connName := fmt.Sprintf("%s-%s", f.provider, f.region) + + headPath := "/tumblebug/ns/" + nsId + "/resources/objectStorage/" + f.bucketName + _, err := utils.RequestTumblebug(headPath, http.MethodHead, connName, nil) + if err == nil { + return nil + } + + createBody := []byte(fmt.Sprintf(`{"bucketName":"%s","connectionName":"%s"}`, f.bucketName, connName)) + createPath := "/tumblebug/ns/" + nsId + "/resources/objectStorage" + _, err = utils.RequestTumblebug(createPath, http.MethodPut, connName, createBody) + if err != nil { + fmt.Println("create error: ", err.Error()) + return err + } + return nil +} + +// Delete Bucket +// Check and delete all objects in the bucket and delete the bucket +func (f *KTFS) DeleteBucket() error { + objList, err := f.ObjectList() + if err != nil { + return err + } + + if len(objList) != 0 { + // Divide objectIds into batches of 1000 + const batchSize = 1000 + var objectIds []string + + for _, object := range objList { + objectIds = append(objectIds, object.Key) + + // When we reach batch size, delete objects + if len(objectIds) == batchSize { + if err := f.deleteObjectBatch(objectIds); err != nil { + return err + } + // Reset objectIds for the next batch + objectIds = []string{} + } + } + + // Delete any remaining objects + if len(objectIds) > 0 { + if err := f.deleteObjectBatch(objectIds); err != nil { + return err + } + } + } + + // Delete the bucket + nsId := utils.GetNsId() + path := "/tumblebug/ns/" + nsId + "/resources/objectStorage/" + f.bucketName + method := http.MethodDelete + connName := fmt.Sprintf("%s-%s", f.provider, f.region) + + _, err = utils.RequestTumblebug(path, method, connName, nil) + if err != nil { + return err + } + log.Info().Msg("DeleteDone") + return nil +} + +// deleteObjectBatch deletes a batch of objects +func (f *KTFS) deleteObjectBatch(keys []string) error { + nsId := utils.GetNsId() + path := "/tumblebug/ns/" + nsId + "/resources/objectStorage/" + f.bucketName + "?delete=true" + method := http.MethodPost + connName := fmt.Sprintf("%s-%s", f.provider, f.region) + + deleteReq := models.DeleteRequest{ + XMLNS: "http://s3.amazonaws.com/doc/2006-03-01/", + } + for _, key := range keys { + deleteReq.Objects = append(deleteReq.Objects, models.S3Object{Key: key}) + } + // 보기 좋게 들여쓰기된 XML 생성 + output, err := xml.MarshalIndent(deleteReq, "", " ") + if err != nil { + return err + } + + // XML 헤더 추가 + _, rerr := utils.RequestTumblebug(path, method, connName, []byte(xml.Header+string(output))) + if rerr != nil { + return err + } + + return nil +} + +// presignedURLResponse는 Tumblebug Presigned URL API의 응답 구조체입니다. +type presignedURLResponse struct { + PresignedURL string `json:"presignedURL"` + Expires int64 `json:"expires"` + Method string `json:"method"` +} + +// Tumblebug의 Presigned URL API를 통해 오브젝트를 다운로드합니다. +// +// 기존 Open()이 AWS SDK를 직접 사용하는 것과 달리, +// 이 함수는 Tumblebug에 Presigned URL 발급을 요청한 뒤 +// 해당 URL로 HTTP GET을 수행하여 스트림을 반환합니다. +// +// POST /ns/{nsId}/resources/objectStorage/{osId}/object/{objectKey}/presignedUrl?operation=download +func (f *KTFS) Open(name string) (io.ReadCloser, error) { + nsId := utils.GetNsId() + connName := fmt.Sprintf("%s-%s", f.provider, f.region) + + // objectKey에 슬래시 등 특수문자가 포함될 수 있으므로 path 세그먼트 단위로 인코딩합니다. + // url.PathEscape는 '/'를 인코딩하지 않으므로, 키 전체를 하나의 세그먼트로 처리하기 위해 + // url.QueryEscape 후 '+'를 '%20'으로 변환하는 방식을 사용합니다. + encodedKey := strings.NewReplacer("+", "%20").Replace(url.QueryEscape(name)) + + path := fmt.Sprintf("/tumblebug/ns/%s/resources/objectStorage/%s/object/%s/presignedUrl?operation=download&expires=3600", + nsId, f.bucketName, encodedKey) + + body, err := utils.RequestTumblebug(path, http.MethodPost, connName, nil) + if err != nil { + return nil, fmt.Errorf("openWithTumblebug: failed to generate presigned URL for %q: %w", name, err) + } + + var resp presignedURLResponse + if err := json.Unmarshal(body, &resp); err != nil { + return nil, fmt.Errorf("openWithTumblebug: failed to parse presigned URL response: %w", err) + } + if resp.PresignedURL == "" { + return nil, fmt.Errorf("openWithTumblebug: empty presigned URL returned for %q", name) + } + + log.Debug().Str("key", name).Str("presignedURL", resp.PresignedURL). + Msg("[KTFS] openWithTumblebug: downloading via presigned URL") + + httpResp, err := http.Get(resp.PresignedURL) //nolint:noctx + if err != nil { + return nil, fmt.Errorf("openWithTumblebug: HTTP GET failed: %w", err) + } + if httpResp.StatusCode != http.StatusOK { + _ = httpResp.Body.Close() + return nil, fmt.Errorf("openWithTumblebug: unexpected status %d for %q", httpResp.StatusCode, name) + } + + return httpResp.Body, nil +} + +// tumblebugWriter는 데이터를 메모리에 버퍼링한 뒤 Close() 시점에 +// Content-Length를 명시하여 Presigned URL로 한 번에 업로드합니다. +// +// AWS S3 Presigned URL은 Transfer-Encoding: chunked를 지원하지 않으므로 +// io.Pipe 스트리밍 방식 대신 버퍼링 후 전송 방식을 사용합니다. +type tumblebugWriter struct { + buf bytes.Buffer + presignedURL string + name string + ctx context.Context + chkClose bool +} + +func (w *tumblebugWriter) Write(b []byte) (int, error) { + return w.buf.Write(b) +} + +func (w *tumblebugWriter) Close() error { + if w.chkClose { + return nil + } + w.chkClose = true + + data := w.buf.Bytes() + + req, err := http.NewRequestWithContext(w.ctx, http.MethodPut, w.presignedURL, bytes.NewReader(data)) + if err != nil { + return fmt.Errorf("createWithTumblebug: failed to create PUT request: %w", err) + } + req.ContentLength = int64(len(data)) + + log.Debug(). + Str("key", w.name). + Str("method", http.MethodPut). + Int64("contentLength", req.ContentLength). + Msg("[KTFS] createWithTumblebug: sending PUT request") + + httpClient := &http.Client{} + httpResp, err := httpClient.Do(req) + if err != nil { + return fmt.Errorf("createWithTumblebug: PUT request failed: %w", err) + } + defer httpResp.Body.Close() + + respBody, _ := io.ReadAll(httpResp.Body) + + log.Debug(). + Str("key", w.name). + Int("statusCode", httpResp.StatusCode). + Str("responseBody", string(respBody)). + Msg("[KTFS] createWithTumblebug: PUT response") + + if httpResp.StatusCode != http.StatusOK && httpResp.StatusCode != http.StatusNoContent { + return fmt.Errorf("createWithTumblebug: unexpected status %d for %q, body: %s", + httpResp.StatusCode, w.name, string(respBody)) + } + + log.Info().Str("key", w.name).Int("statusCode", httpResp.StatusCode). + Msg("[KTFS] createWithTumblebug: upload succeeded") + return nil +} + +// createWithTumblebug은 Tumblebug의 Presigned URL API를 통해 오브젝트를 업로드합니다. +// +// 기존 Create()가 AWS SDK uploader를 직접 사용하는 것과 달리, +// 이 함수는 Tumblebug에 Presigned URL 발급을 요청한 뒤 +// 데이터를 버퍼링하여 Close() 시점에 Content-Length와 함께 HTTP PUT으로 전송합니다. +// +// POST /ns/{nsId}/resources/objectStorage/{osId}/object/{objectKey}/presignedUrl?operation=upload +func (f *KTFS) Create(name string) (io.WriteCloser, error) { + nsId := utils.GetNsId() + connName := fmt.Sprintf("%s-%s", f.provider, f.region) + + encodedKey := strings.NewReplacer("+", "%20").Replace(url.QueryEscape(name)) + path := fmt.Sprintf("/tumblebug/ns/%s/resources/objectStorage/%s/object/%s/presignedUrl?operation=upload&expires=3600", + nsId, f.bucketName, encodedKey) + + body, err := utils.RequestTumblebug(path, http.MethodPost, connName, nil) + if err != nil { + return nil, fmt.Errorf("createWithTumblebug: failed to generate presigned URL for %q: %w", name, err) + } + + var resp presignedURLResponse + if err := json.Unmarshal(body, &resp); err != nil { + return nil, fmt.Errorf("createWithTumblebug: failed to parse presigned URL response: %w", err) + } + if resp.PresignedURL == "" { + return nil, fmt.Errorf("createWithTumblebug: empty presigned URL returned for %q", name) + } + + log.Debug().Str("key", name).Str("presignedURL", resp.PresignedURL). + Msg("[KTFS] createWithTumblebug: presigned URL acquired") + + return &tumblebugWriter{ + presignedURL: resp.PresignedURL, + name: name, + ctx: f.ctx, + }, nil +} + +func (f *KTFS) ObjectListWithFilter(flt *filtering.ObjectFilter) ([]*models.Object, error) { + log.Debug().Msg("[KTFS] filtering") + var out []*models.Object + // var token *string + + var prefix *string + if flt != nil && flt.Path != "" { + pre := strings.TrimPrefix(flt.Path, "/") + prefix = aws.String(pre) + } + + for { + nsId := utils.GetNsId() + path := "/tumblebug/ns/" + nsId + "/resources/objectStorage/" + f.bucketName + method := http.MethodGet + connName := fmt.Sprintf("%s-%s", f.provider, f.region) + + result, err := utils.RequestTumblebug(path, method, connName, nil) + if err != nil { + return nil, err + } + + var resp models.ObjectStorage + if err := json.Unmarshal(result, &resp); err != nil { + fmt.Println("error: ", err.Error()) + return []*models.Object{}, fmt.Errorf("failed to get objects: %w", err) + } + + for _, o := range resp.Contents { + c := filtering.Candidate{ + Key: o.Key, + Size: o.Size, + LastModified: o.LastModified, + } + + log.Debug().Str("key", c.Key).Int64("size", c.Size). + Msg("[KTFS] candidate") + + matched := filtering.MatchCandidate(flt, c) + if !matched { + if flt != nil { + log.Debug(). + Str("key", c.Key). + Str("prefix", aws.ToString(prefix)). + Strs("exact", flt.Exact). + Str("modifiedDate", c.LastModified.String()). + Msg("[KTFS] filtered out") + } + continue + } + + out = append(out, &models.Object{ + ETag: o.ETag, + // ETag: aws.ToString(o.ETag), + Key: c.Key, + LastModified: c.LastModified, + Size: c.Size, + StorageClass: o.StorageClass, + Provider: f.provider, + }) + } + + break + } + return out, nil +} + +func (f *KTFS) ObjectList() ([]*models.Object, error) { + return f.ObjectListWithFilter(nil) +} + +func (f *KTFS) BucketList() ([]models.Bucket, error) { + nsId := utils.GetNsId() + path := "/tumblebug/ns/" + nsId + "/resources/objectStorage" + method := http.MethodGet + connName := fmt.Sprintf("%s-%s", f.provider, f.region) + + body, err := utils.RequestTumblebug(path, method, connName, nil) + if err != nil { + return []models.Bucket{}, fmt.Errorf("failed to get buckets: %w", err) + } + + // Parse the response to extract public key and token ID + var res models.ObjectStorageListResponse + if err := json.Unmarshal(body, &res); err != nil { + fmt.Println("error: ", err.Error()) + return []models.Bucket{}, fmt.Errorf("failed to get buckets: %w", err) + } + + buckets := make([]models.Bucket, 0, len(res.ObjectStorage)) + for _, os := range res.ObjectStorage { + buckets = append(buckets, models.Bucket{ + Name: os.Name, + }) + } + return buckets, nil +} diff --git a/pkg/objectstorage/s3fs/s3fs.go b/pkg/objectstorage/s3fs/s3fs.go index 0a7d6c2..ff46501 100644 --- a/pkg/objectstorage/s3fs/s3fs.go +++ b/pkg/objectstorage/s3fs/s3fs.go @@ -16,12 +16,14 @@ limitations under the License. package s3fs import ( + "bytes" "context" "encoding/json" "encoding/xml" "fmt" "io" "net/http" + "net/url" "strings" "github.com/aws/aws-sdk-go-v2/aws" @@ -95,25 +97,23 @@ type S3FS struct { // // Aws imposes location constraints when creating buckets func (f *S3FS) CreateBucket() error { - path := "/tumblebug/resources/objectStorage/" + f.bucketName - method := http.MethodHead + nsId := utils.GetNsId() connName := fmt.Sprintf("%s-%s", f.provider, f.region) - _, err := utils.RequestTumblebug(path, method, connName, nil) - if err != nil { - path = "/tumblebug/resources/objectStorage/" + f.bucketName - method = http.MethodPut - - _, err := utils.RequestTumblebug(path, method, connName, nil) - if err != nil { - fmt.Println("create error: ", err.Error()) - return err - } - + headPath := "/tumblebug/ns/" + nsId + "/resources/objectStorage/" + f.bucketName + _, err := utils.RequestTumblebug(headPath, http.MethodHead, connName, nil) + if err == nil { return nil } + + createBody := []byte(fmt.Sprintf(`{"bucketName":"%s","connectionName":"%s"}`, f.bucketName, connName)) + createPath := "/tumblebug/ns/" + nsId + "/resources/objectStorage" + _, err = utils.RequestTumblebug(createPath, http.MethodPut, connName, createBody) + if err != nil { + fmt.Println("create error: ", err.Error()) + return err + } return nil - // return err } // Delete Bucket @@ -151,7 +151,8 @@ func (f *S3FS) DeleteBucket() error { } // Delete the bucket - path := "/tumblebug/resources/objectStorage/" + f.bucketName + nsId := utils.GetNsId() + path := "/tumblebug/ns/" + nsId + "/resources/objectStorage/" + f.bucketName method := http.MethodDelete connName := fmt.Sprintf("%s-%s", f.provider, f.region) @@ -165,7 +166,8 @@ func (f *S3FS) DeleteBucket() error { // deleteObjectBatch deletes a batch of objects func (f *S3FS) deleteObjectBatch(keys []string) error { - path := "/tumblebug/resources/objectStorage/" + f.bucketName + "?delete=true" + nsId := utils.GetNsId() + path := "/tumblebug/ns/" + nsId + "/resources/objectStorage/" + f.bucketName + "?delete=true" method := http.MethodPost connName := fmt.Sprintf("%s-%s", f.provider, f.region) @@ -190,8 +192,162 @@ func (f *S3FS) deleteObjectBatch(keys []string) error { return nil } -// Open function using pipeline +// presignedURLResponse는 Tumblebug Presigned URL API의 응답 구조체입니다. +type presignedURLResponse struct { + PresignedURL string `json:"presignedURL"` + Expires int64 `json:"expires"` + Method string `json:"method"` +} + +// openWithTumblebug은 Tumblebug의 Presigned URL API를 통해 오브젝트를 다운로드합니다. +// +// 기존 Open()이 AWS SDK를 직접 사용하는 것과 달리, +// 이 함수는 Tumblebug에 Presigned URL 발급을 요청한 뒤 +// 해당 URL로 HTTP GET을 수행하여 스트림을 반환합니다. +// +// POST /ns/{nsId}/resources/objectStorage/{osId}/object/{objectKey}/presignedUrl?operation=download func (f *S3FS) Open(name string) (io.ReadCloser, error) { + nsId := utils.GetNsId() + connName := fmt.Sprintf("%s-%s", f.provider, f.region) + + // objectKey에 슬래시 등 특수문자가 포함될 수 있으므로 path 세그먼트 단위로 인코딩합니다. + // url.PathEscape는 '/'를 인코딩하지 않으므로, 키 전체를 하나의 세그먼트로 처리하기 위해 + // url.QueryEscape 후 '+'를 '%20'으로 변환하는 방식을 사용합니다. + encodedKey := strings.NewReplacer("+", "%20").Replace(url.QueryEscape(name)) + + path := fmt.Sprintf("/tumblebug/ns/%s/resources/objectStorage/%s/object/%s/presignedUrl?operation=download&expires=3600", + nsId, f.bucketName, encodedKey) + + body, err := utils.RequestTumblebug(path, http.MethodPost, connName, nil) + if err != nil { + return nil, fmt.Errorf("openWithTumblebug: failed to generate presigned URL for %q: %w", name, err) + } + + var resp presignedURLResponse + if err := json.Unmarshal(body, &resp); err != nil { + return nil, fmt.Errorf("openWithTumblebug: failed to parse presigned URL response: %w", err) + } + if resp.PresignedURL == "" { + return nil, fmt.Errorf("openWithTumblebug: empty presigned URL returned for %q", name) + } + + log.Debug().Str("key", name).Str("presignedURL", resp.PresignedURL). + Msg("[S3FS] openWithTumblebug: downloading via presigned URL") + + httpResp, err := http.Get(resp.PresignedURL) //nolint:noctx + if err != nil { + return nil, fmt.Errorf("openWithTumblebug: HTTP GET failed: %w", err) + } + if httpResp.StatusCode != http.StatusOK { + _ = httpResp.Body.Close() + return nil, fmt.Errorf("openWithTumblebug: unexpected status %d for %q", httpResp.StatusCode, name) + } + + return httpResp.Body, nil +} + +// tumblebugWriter는 데이터를 메모리에 버퍼링한 뒤 Close() 시점에 +// Content-Length를 명시하여 Presigned URL로 한 번에 업로드합니다. +// +// AWS S3 Presigned URL은 Transfer-Encoding: chunked를 지원하지 않으므로 +// io.Pipe 스트리밍 방식 대신 버퍼링 후 전송 방식을 사용합니다. +type tumblebugWriter struct { + buf bytes.Buffer + presignedURL string + name string + ctx context.Context + chkClose bool +} + +func (w *tumblebugWriter) Write(b []byte) (int, error) { + return w.buf.Write(b) +} + +func (w *tumblebugWriter) Close() error { + if w.chkClose { + return nil + } + w.chkClose = true + + data := w.buf.Bytes() + + req, err := http.NewRequestWithContext(w.ctx, http.MethodPut, w.presignedURL, bytes.NewReader(data)) + if err != nil { + return fmt.Errorf("createWithTumblebug: failed to create PUT request: %w", err) + } + req.ContentLength = int64(len(data)) + + log.Debug(). + Str("key", w.name). + Str("method", http.MethodPut). + Int64("contentLength", req.ContentLength). + Msg("[S3FS] createWithTumblebug: sending PUT request") + + httpClient := &http.Client{} + httpResp, err := httpClient.Do(req) + if err != nil { + return fmt.Errorf("createWithTumblebug: PUT request failed: %w", err) + } + defer httpResp.Body.Close() + + respBody, _ := io.ReadAll(httpResp.Body) + + log.Debug(). + Str("key", w.name). + Int("statusCode", httpResp.StatusCode). + Str("responseBody", string(respBody)). + Msg("[S3FS] createWithTumblebug: PUT response") + + if httpResp.StatusCode != http.StatusOK && httpResp.StatusCode != http.StatusNoContent { + return fmt.Errorf("createWithTumblebug: unexpected status %d for %q, body: %s", + httpResp.StatusCode, w.name, string(respBody)) + } + + log.Info().Str("key", w.name).Int("statusCode", httpResp.StatusCode). + Msg("[S3FS] createWithTumblebug: upload succeeded") + return nil +} + +// createWithTumblebug은 Tumblebug의 Presigned URL API를 통해 오브젝트를 업로드합니다. +// +// 기존 Create()가 AWS SDK uploader를 직접 사용하는 것과 달리, +// 이 함수는 Tumblebug에 Presigned URL 발급을 요청한 뒤 +// 데이터를 버퍼링하여 Close() 시점에 Content-Length와 함께 HTTP PUT으로 전송합니다. +// +// POST /ns/{nsId}/resources/objectStorage/{osId}/object/{objectKey}/presignedUrl?operation=upload +func (f *S3FS) Create(name string) (io.WriteCloser, error) { + nsId := utils.GetNsId() + connName := fmt.Sprintf("%s-%s", f.provider, f.region) + + encodedKey := strings.NewReplacer("+", "%20").Replace(url.QueryEscape(name)) + path := fmt.Sprintf("/tumblebug/ns/%s/resources/objectStorage/%s/object/%s/presignedUrl?operation=upload&expires=3600", + nsId, f.bucketName, encodedKey) + + body, err := utils.RequestTumblebug(path, http.MethodPost, connName, nil) + if err != nil { + return nil, fmt.Errorf("createWithTumblebug: failed to generate presigned URL for %q: %w", name, err) + } + + var resp presignedURLResponse + if err := json.Unmarshal(body, &resp); err != nil { + return nil, fmt.Errorf("createWithTumblebug: failed to parse presigned URL response: %w", err) + } + if resp.PresignedURL == "" { + return nil, fmt.Errorf("createWithTumblebug: empty presigned URL returned for %q", name) + } + + log.Debug().Str("key", name).Str("presignedURL", resp.PresignedURL). + Msg("[S3FS] createWithTumblebug: presigned URL acquired") + + return &tumblebugWriter{ + presignedURL: resp.PresignedURL, + name: name, + ctx: f.ctx, + }, nil +} + +// Open function using pipeline +func (f *S3FS) OpenDeprecated(name string) (io.ReadCloser, error) { pr, pw := io.Pipe() ch := make(chan error) ctx, cancel := context.WithCancel(f.ctx) @@ -215,7 +371,7 @@ func (f *S3FS) Open(name string) (io.ReadCloser, error) { } // Create function using pipeline -func (f *S3FS) Create(name string) (io.WriteCloser, error) { +func (f *S3FS) CreateDeprecated(name string) (io.WriteCloser, error) { pr, pw := io.Pipe() ch := make(chan error) ctx, cancel := context.WithCancel(f.ctx) @@ -296,7 +452,8 @@ func (f *S3FS) ObjectListWithFilter(flt *filtering.ObjectFilter) ([]*models.Obje } for { - path := "/tumblebug/resources/objectStorage/" + f.bucketName + nsId := utils.GetNsId() + path := "/tumblebug/ns/" + nsId + "/resources/objectStorage/" + f.bucketName method := http.MethodGet connName := fmt.Sprintf("%s-%s", f.provider, f.region) @@ -305,7 +462,7 @@ func (f *S3FS) ObjectListWithFilter(flt *filtering.ObjectFilter) ([]*models.Obje return nil, err } - var resp models.ListBucketResult + var resp models.ObjectStorage if err := json.Unmarshal(result, &resp); err != nil { fmt.Println("error: ", err.Error()) return []*models.Object{}, fmt.Errorf("failed to get objects: %w", err) @@ -355,7 +512,8 @@ func (f *S3FS) ObjectList() ([]*models.Object, error) { } func (f *S3FS) BucketList() ([]models.Bucket, error) { - path := "/tumblebug/resources/objectStorage" + nsId := utils.GetNsId() + path := "/tumblebug/ns/" + nsId + "/resources/objectStorage" method := http.MethodGet connName := fmt.Sprintf("%s-%s", f.provider, f.region) @@ -365,16 +523,17 @@ func (f *S3FS) BucketList() ([]models.Bucket, error) { } // Parse the response to extract public key and token ID - var res models.ListAllMyBucketsResult + var res models.ObjectStorageListResponse if err := json.Unmarshal(body, &res); err != nil { fmt.Println("error: ", err.Error()) return []models.Bucket{}, fmt.Errorf("failed to get buckets: %w", err) } - // 버킷이 비어 있으면 빈 리스트 반환 - if res.Buckets.Bucket == nil { - return []models.Bucket{}, nil + buckets := make([]models.Bucket, 0, len(res.ObjectStorage)) + for _, os := range res.ObjectStorage { + buckets = append(buckets, models.Bucket{ + Name: os.Name, + }) } - - return res.Buckets.Bucket, nil + return buckets, nil } diff --git a/pkg/objectstorage/tencentfs/tencentfs.go b/pkg/objectstorage/tencentfs/tencentfs.go new file mode 100644 index 0000000..8e7cca9 --- /dev/null +++ b/pkg/objectstorage/tencentfs/tencentfs.go @@ -0,0 +1,437 @@ +package tencentfs + +import ( + "bytes" + "context" + "encoding/json" + "encoding/xml" + "fmt" + "io" + "net/http" + "net/url" + "strings" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/feature/s3/manager" + "github.com/cloud-barista/mc-data-manager/models" + "github.com/cloud-barista/mc-data-manager/pkg/objectstorage/filtering" + "github.com/cloud-barista/mc-data-manager/pkg/utils" + "github.com/rs/zerolog/log" +) + +type reader struct { + r *io.PipeReader + ch chan error + cancel context.CancelFunc + chkClose bool +} + +func (p *reader) Read(b []byte) (int, error) { + return p.r.Read(b) +} + +func (p *reader) Close() error { + if !p.chkClose { + p.chkClose = true + return p.r.Close() + } + return nil +} + +type writer struct { + w *io.PipeWriter + ch chan error + cancel context.CancelFunc + chkClose bool +} + +func (p *writer) Write(b []byte) (int, error) { + return p.w.Write(b) +} + +func (p *writer) Close() error { + if !p.chkClose { + p.chkClose = true + _ = p.w.Close() + return <-p.ch + } + return nil +} + +type fakeWriteAt struct { + W io.Writer +} + +func (w *fakeWriteAt) WriteAt(p []byte, off int64) (n int, err error) { + return w.W.Write(p) +} + +type TencentFS struct { + provider models.Provider + bucketName string + region string + + ctx context.Context + uploader manager.Uploader + downloader manager.Downloader +} + +func New(provider models.Provider, bucketName, region string) *TencentFS { + return &TencentFS{ + provider: provider, + bucketName: bucketName, + region: region, + ctx: context.Background(), + } +} + +// Creating a Bucket +// +// Aws imposes location constraints when creating buckets +func (f *TencentFS) CreateBucket() error { + nsId := utils.GetNsId() + connName := fmt.Sprintf("%s-%s", f.provider, f.region) + + headPath := "/tumblebug/ns/" + nsId + "/resources/objectStorage/" + f.bucketName + _, err := utils.RequestTumblebug(headPath, http.MethodHead, connName, nil) + if err == nil { + return nil + } + + createBody := []byte(fmt.Sprintf(`{"bucketName":"%s","connectionName":"%s"}`, f.bucketName, connName)) + createPath := "/tumblebug/ns/" + nsId + "/resources/objectStorage" + _, err = utils.RequestTumblebug(createPath, http.MethodPut, connName, createBody) + if err != nil { + fmt.Println("create error: ", err.Error()) + return err + } + return nil +} + +// Delete Bucket +// Check and delete all objects in the bucket and delete the bucket +func (f *TencentFS) DeleteBucket() error { + objList, err := f.ObjectList() + if err != nil { + return err + } + + if len(objList) != 0 { + // Divide objectIds into batches of 1000 + const batchSize = 1000 + var objectIds []string + + for _, object := range objList { + objectIds = append(objectIds, object.Key) + + // When we reach batch size, delete objects + if len(objectIds) == batchSize { + if err := f.deleteObjectBatch(objectIds); err != nil { + return err + } + // Reset objectIds for the next batch + objectIds = []string{} + } + } + + // Delete any remaining objects + if len(objectIds) > 0 { + if err := f.deleteObjectBatch(objectIds); err != nil { + return err + } + } + } + + // Delete the bucket + nsId := utils.GetNsId() + path := "/tumblebug/ns/" + nsId + "/resources/objectStorage/" + f.bucketName + method := http.MethodDelete + connName := fmt.Sprintf("%s-%s", f.provider, f.region) + + _, err = utils.RequestTumblebug(path, method, connName, nil) + if err != nil { + return err + } + log.Info().Msg("DeleteDone") + return nil +} + +// deleteObjectBatch deletes a batch of objects +func (f *TencentFS) deleteObjectBatch(keys []string) error { + nsId := utils.GetNsId() + path := "/tumblebug/ns/" + nsId + "/resources/objectStorage/" + f.bucketName + "?delete=true" + method := http.MethodPost + connName := fmt.Sprintf("%s-%s", f.provider, f.region) + + deleteReq := models.DeleteRequest{ + XMLNS: "http://s3.amazonaws.com/doc/2006-03-01/", + } + for _, key := range keys { + deleteReq.Objects = append(deleteReq.Objects, models.S3Object{Key: key}) + } + // 보기 좋게 들여쓰기된 XML 생성 + output, err := xml.MarshalIndent(deleteReq, "", " ") + if err != nil { + return err + } + + // XML 헤더 추가 + _, rerr := utils.RequestTumblebug(path, method, connName, []byte(xml.Header+string(output))) + if rerr != nil { + return err + } + + return nil +} + +// presignedURLResponse는 Tumblebug Presigned URL API의 응답 구조체입니다. +type presignedURLResponse struct { + PresignedURL string `json:"presignedURL"` + Expires int64 `json:"expires"` + Method string `json:"method"` +} + +// Tumblebug의 Presigned URL API를 통해 오브젝트를 다운로드합니다. +// +// 기존 Open()이 AWS SDK를 직접 사용하는 것과 달리, +// 이 함수는 Tumblebug에 Presigned URL 발급을 요청한 뒤 +// 해당 URL로 HTTP GET을 수행하여 스트림을 반환합니다. +// +// POST /ns/{nsId}/resources/objectStorage/{osId}/object/{objectKey}/presignedUrl?operation=download +func (f *TencentFS) Open(name string) (io.ReadCloser, error) { + nsId := utils.GetNsId() + connName := fmt.Sprintf("%s-%s", f.provider, f.region) + + // objectKey에 슬래시 등 특수문자가 포함될 수 있으므로 path 세그먼트 단위로 인코딩합니다. + // url.PathEscape는 '/'를 인코딩하지 않으므로, 키 전체를 하나의 세그먼트로 처리하기 위해 + // url.QueryEscape 후 '+'를 '%20'으로 변환하는 방식을 사용합니다. + encodedKey := strings.NewReplacer("+", "%20").Replace(url.QueryEscape(name)) + + path := fmt.Sprintf("/tumblebug/ns/%s/resources/objectStorage/%s/object/%s/presignedUrl?operation=download&expires=3600", + nsId, f.bucketName, encodedKey) + + body, err := utils.RequestTumblebug(path, http.MethodPost, connName, nil) + if err != nil { + return nil, fmt.Errorf("openWithTumblebug: failed to generate presigned URL for %q: %w", name, err) + } + + var resp presignedURLResponse + if err := json.Unmarshal(body, &resp); err != nil { + return nil, fmt.Errorf("openWithTumblebug: failed to parse presigned URL response: %w", err) + } + if resp.PresignedURL == "" { + return nil, fmt.Errorf("openWithTumblebug: empty presigned URL returned for %q", name) + } + + log.Debug().Str("key", name).Str("presignedURL", resp.PresignedURL). + Msg("[TencentFS] openWithTumblebug: downloading via presigned URL") + + httpResp, err := http.Get(resp.PresignedURL) //nolint:noctx + if err != nil { + return nil, fmt.Errorf("openWithTumblebug: HTTP GET failed: %w", err) + } + if httpResp.StatusCode != http.StatusOK { + _ = httpResp.Body.Close() + return nil, fmt.Errorf("openWithTumblebug: unexpected status %d for %q", httpResp.StatusCode, name) + } + + return httpResp.Body, nil +} + +// tumblebugWriter는 데이터를 메모리에 버퍼링한 뒤 Close() 시점에 +// Content-Length를 명시하여 Presigned URL로 한 번에 업로드합니다. +// +// AWS S3 Presigned URL은 Transfer-Encoding: chunked를 지원하지 않으므로 +// io.Pipe 스트리밍 방식 대신 버퍼링 후 전송 방식을 사용합니다. +type tumblebugWriter struct { + buf bytes.Buffer + presignedURL string + name string + ctx context.Context + chkClose bool +} + +func (w *tumblebugWriter) Write(b []byte) (int, error) { + return w.buf.Write(b) +} + +func (w *tumblebugWriter) Close() error { + if w.chkClose { + return nil + } + w.chkClose = true + + data := w.buf.Bytes() + + req, err := http.NewRequestWithContext(w.ctx, http.MethodPut, w.presignedURL, bytes.NewReader(data)) + if err != nil { + return fmt.Errorf("createWithTumblebug: failed to create PUT request: %w", err) + } + req.ContentLength = int64(len(data)) + + log.Debug(). + Str("key", w.name). + Str("method", http.MethodPut). + Int64("contentLength", req.ContentLength). + Msg("[TencentFS] createWithTumblebug: sending PUT request") + + httpClient := &http.Client{} + httpResp, err := httpClient.Do(req) + if err != nil { + return fmt.Errorf("createWithTumblebug: PUT request failed: %w", err) + } + defer httpResp.Body.Close() + + respBody, _ := io.ReadAll(httpResp.Body) + + log.Debug(). + Str("key", w.name). + Int("statusCode", httpResp.StatusCode). + Str("responseBody", string(respBody)). + Msg("[TencentFS] createWithTumblebug: PUT response") + + if httpResp.StatusCode != http.StatusOK && httpResp.StatusCode != http.StatusNoContent { + return fmt.Errorf("createWithTumblebug: unexpected status %d for %q, body: %s", + httpResp.StatusCode, w.name, string(respBody)) + } + + log.Info().Str("key", w.name).Int("statusCode", httpResp.StatusCode). + Msg("[TencentFS] createWithTumblebug: upload succeeded") + return nil +} + +// createWithTumblebug은 Tumblebug의 Presigned URL API를 통해 오브젝트를 업로드합니다. +// +// 기존 Create()가 AWS SDK uploader를 직접 사용하는 것과 달리, +// 이 함수는 Tumblebug에 Presigned URL 발급을 요청한 뒤 +// 데이터를 버퍼링하여 Close() 시점에 Content-Length와 함께 HTTP PUT으로 전송합니다. +// +// POST /ns/{nsId}/resources/objectStorage/{osId}/object/{objectKey}/presignedUrl?operation=upload +func (f *TencentFS) Create(name string) (io.WriteCloser, error) { + nsId := utils.GetNsId() + connName := fmt.Sprintf("%s-%s", f.provider, f.region) + + encodedKey := strings.NewReplacer("+", "%20").Replace(url.QueryEscape(name)) + path := fmt.Sprintf("/tumblebug/ns/%s/resources/objectStorage/%s/object/%s/presignedUrl?operation=upload&expires=3600", + nsId, f.bucketName, encodedKey) + + body, err := utils.RequestTumblebug(path, http.MethodPost, connName, nil) + if err != nil { + return nil, fmt.Errorf("createWithTumblebug: failed to generate presigned URL for %q: %w", name, err) + } + + var resp presignedURLResponse + if err := json.Unmarshal(body, &resp); err != nil { + return nil, fmt.Errorf("createWithTumblebug: failed to parse presigned URL response: %w", err) + } + if resp.PresignedURL == "" { + return nil, fmt.Errorf("createWithTumblebug: empty presigned URL returned for %q", name) + } + + log.Debug().Str("key", name).Str("presignedURL", resp.PresignedURL). + Msg("[TencentFS] createWithTumblebug: presigned URL acquired") + + return &tumblebugWriter{ + presignedURL: resp.PresignedURL, + name: name, + ctx: f.ctx, + }, nil +} + +func (f *TencentFS) ObjectListWithFilter(flt *filtering.ObjectFilter) ([]*models.Object, error) { + log.Debug().Msg("[TencentFS] filtering") + var out []*models.Object + // var token *string + + var prefix *string + if flt != nil && flt.Path != "" { + pre := strings.TrimPrefix(flt.Path, "/") + prefix = aws.String(pre) + } + + for { + nsId := utils.GetNsId() + path := "/tumblebug/ns/" + nsId + "/resources/objectStorage/" + f.bucketName + method := http.MethodGet + connName := fmt.Sprintf("%s-%s", f.provider, f.region) + + result, err := utils.RequestTumblebug(path, method, connName, nil) + if err != nil { + return nil, err + } + + var resp models.ObjectStorage + if err := json.Unmarshal(result, &resp); err != nil { + fmt.Println("error: ", err.Error()) + return []*models.Object{}, fmt.Errorf("failed to get objects: %w", err) + } + + for _, o := range resp.Contents { + c := filtering.Candidate{ + Key: o.Key, + Size: o.Size, + LastModified: o.LastModified, + } + + log.Debug().Str("key", c.Key).Int64("size", c.Size). + Msg("[TencentFS] candidate") + + matched := filtering.MatchCandidate(flt, c) + if !matched { + if flt != nil { + log.Debug(). + Str("key", c.Key). + Str("prefix", aws.ToString(prefix)). + Strs("exact", flt.Exact). + Str("modifiedDate", c.LastModified.String()). + Msg("[TencentFS] filtered out") + } + continue + } + + out = append(out, &models.Object{ + ETag: o.ETag, + // ETag: aws.ToString(o.ETag), + Key: c.Key, + LastModified: c.LastModified, + Size: c.Size, + StorageClass: o.StorageClass, + Provider: f.provider, + }) + } + + break + } + return out, nil +} + +func (f *TencentFS) ObjectList() ([]*models.Object, error) { + return f.ObjectListWithFilter(nil) +} + +func (f *TencentFS) BucketList() ([]models.Bucket, error) { + nsId := utils.GetNsId() + path := "/tumblebug/ns/" + nsId + "/resources/objectStorage" + method := http.MethodGet + connName := fmt.Sprintf("%s-%s", f.provider, f.region) + + body, err := utils.RequestTumblebug(path, method, connName, nil) + if err != nil { + return []models.Bucket{}, fmt.Errorf("failed to get buckets: %w", err) + } + + // Parse the response to extract public key and token ID + var res models.ObjectStorageListResponse + if err := json.Unmarshal(body, &res); err != nil { + fmt.Println("error: ", err.Error()) + return []models.Bucket{}, fmt.Errorf("failed to get buckets: %w", err) + } + + buckets := make([]models.Bucket, 0, len(res.ObjectStorage)) + for _, os := range res.ObjectStorage { + buckets = append(buckets, models.Bucket{ + Name: os.Name, + }) + } + return buckets, nil +} diff --git a/pkg/rdbms/mysql/mysql.go b/pkg/rdbms/mysql/mysql.go index 16570ab..c8f300b 100644 --- a/pkg/rdbms/mysql/mysql.go +++ b/pkg/rdbms/mysql/mysql.go @@ -130,7 +130,7 @@ func (d *MysqlDBMS) ListDB(dst *[]string) error { return err } - if dbName != "information_schema" && dbName != "mysql" && dbName != "performance_schema" && dbName != "sys" { + if dbName != "information_schema" && dbName != "mysql" && dbName != "performance_schema" && dbName != "sys" && dbName != "__recycle_bin__" { *dst = append(*dst, dbName) } } diff --git a/pkg/utils/api.go b/pkg/utils/api.go index 8f20c72..3602bd9 100644 --- a/pkg/utils/api.go +++ b/pkg/utils/api.go @@ -8,6 +8,14 @@ import ( "os" ) +func GetNsId() string { + nsId := os.Getenv("TUMBLEBUG_NS_ID") + if nsId == "" { + nsId = "default" + } + return nsId +} + func RequestTumblebug(path string, method string, connName string, jsonBody []byte) ([]byte, error) { baseUrl := os.Getenv("TUMBLEBUG_URL") url := fmt.Sprintf("%s%s", baseUrl, path) @@ -18,8 +26,14 @@ func RequestTumblebug(path string, method string, connName string, jsonBody []by } req.Header.Set("Content-Type", "application/json") - username := "default" - password := "default" + username := os.Getenv("TUMBLEBUG_USERNAME") + if username == "" { + username = "default" + } + password := os.Getenv("TUMBLEBUG_PASSWORD") + if password == "" { + password = "default" + } req.SetBasicAuth(username, password) req.Header.Add("Accept", "application/json") if connName != "" { diff --git a/service/credential/credentialService.go b/service/credential/credentialService.go index 7e19090..7729a8a 100644 --- a/service/credential/credentialService.go +++ b/service/credential/credentialService.go @@ -70,7 +70,7 @@ func (c *CredentialService) CreateCredential(req models.CredentialCreateRequest) return nil, err } - if slices.Contains([]string{"aws", "ncp", "gcp", "alibaba"}, req.CspType) { + if slices.Contains([]string{"aws", "ncp", "gcp", "alibaba", "ibm", "kt", "tencent"}, req.CspType) { terr := createTumblebugCredential(req) if terr != nil { return nil, terr @@ -183,8 +183,8 @@ func getCredentialKeyValues(req models.CredentialCreateRequest) (map[string]stri } return map[string]string{ - "ClientId": aws.AccessKey, - "ClientSecret": aws.SecretKey, + "aws_access_key_id": aws.AccessKey, + "aws_secret_access_key": aws.SecretKey, }, nil case "ncp": var ncp models.NCPCredentials @@ -193,8 +193,8 @@ func getCredentialKeyValues(req models.CredentialCreateRequest) (map[string]stri } return map[string]string{ - "ClientId": ncp.AccessKey, - "ClientSecret": ncp.SecretKey, + "ncloud_access_key": ncp.AccessKey, + "ncloud_secret_key": ncp.SecretKey, }, nil case "gcp": var gcp models.GCPCredentials @@ -203,13 +203,11 @@ func getCredentialKeyValues(req models.CredentialCreateRequest) (map[string]stri } return map[string]string{ - // "client_id": gcp.ClientID, - "ClientEmail": gcp.ClientEmail, - // "private_key_id": gcp.PrivateKeyID, - "PrivateKey": gcp.PrivateKey, - "ProjectID": gcp.ProjectID, - "S3AccessKey": req.S3AccessKey, - "S3SecretKey": req.S3SecretKey, + "client_email": gcp.ClientEmail, + "private_key": gcp.PrivateKey, + "project_id": gcp.ProjectID, + "S3AccessKey": req.S3AccessKey, + "S3SecretKey": req.S3SecretKey, }, nil case "alibaba": var alibaba models.AlibabaCredentials @@ -218,9 +216,49 @@ func getCredentialKeyValues(req models.CredentialCreateRequest) (map[string]stri } return map[string]string{ - "ClientId": alibaba.AccessKey, - "ClientSecret": alibaba.SecretKey, + "AccessKeyId": alibaba.AccessKey, + "AccessKeySecret": alibaba.SecretKey, }, nil + + case "ibm": + var ibm models.IBMCredentials + if err := json.Unmarshal(req.CredentialJson, &ibm); err != nil { + return nil, fmt.Errorf("invalid ibm credential json: %w", err) + } + + return map[string]string{ + "ApiKey": ibm.ApiKey, + "S3AccessKey": ibm.S3AccessKey, + "S3SecretKey": ibm.S3SecretKey, + }, nil + + case "kt": + var kt models.KTCredentials + if err := json.Unmarshal(req.CredentialJson, &kt); err != nil { + return nil, fmt.Errorf("invalid kt credential json: %w", err) + } + + return map[string]string{ + "IdentityEndpoint": models.KTIdentityEndpoint, + "Username": kt.Username, + "Password": kt.Password, + "DomainName": kt.DomainName, + "ProjectID": kt.ProjectID, + "S3AccessKey": kt.S3AccessKey, + "S3SecretKey": kt.S3SecretKey, + }, nil + + case "tencent": + var tencent models.TencentCredentials + if err := json.Unmarshal(req.CredentialJson, &tencent); err != nil { + return nil, fmt.Errorf("invalid tencent credential json: %w", err) + } + + return map[string]string{ + "SecretId": tencent.SecretId, + "SecretKey": tencent.SecretKey, + }, nil + default: return nil, fmt.Errorf("unsupported cspType: %q", req.CspType) } @@ -247,7 +285,6 @@ func getPublicKey() (string, string, error) { func encryptCredentialsWithPublicKey(publicKeyPem string, credentials map[string]string) ([]map[string]string, string, error) { // PEM → rsa.PublicKey 변환 block, _ := pem.Decode([]byte(strings.ReplaceAll(publicKeyPem, `\n`, "\n"))) - fmt.Println("block: ", block) if block == nil { return nil, "", fmt.Errorf("invalid public key PEM") } diff --git a/service/rdbc/rdbc.go b/service/rdbc/rdbc.go index 33213a9..eb9c97d 100644 --- a/service/rdbc/rdbc.go +++ b/service/rdbc/rdbc.go @@ -126,28 +126,20 @@ func (rdb *RDBController) PutDoc(sql string) error { return nil } -// Migration using put and get -func (rdb *RDBController) Copy(dst *RDBController) error { - var dbList []string +// Migration using put and get for a specific database +func (rdb *RDBController) Copy(dst *RDBController, srcDbName string) error { var sql string - if err := rdb.ListDB(&dbList); err != nil { - rdb.logWrite("Error", "ListDB error", err) + rdb.Client.SetTargetProvdier(dst.Client.GetProvdier()) + if err := rdb.Get(srcDbName, &sql); err != nil { + rdb.logWrite("Error", "Get error", err) return err } - for _, db := range dbList { - sql = "" - rdb.Client.SetTargetProvdier(dst.Client.GetProvdier()) - if err := rdb.Get(db, &sql); err != nil { - rdb.logWrite("Error", "Get error", err) - return err - } - if err := dst.Put(sql); err != nil { - rdb.logWrite("Error", "Get error", err) - return err - } - rdb.logWrite("Info", fmt.Sprintf("Migration success: src:/%s -> dst:/%s", db, db), nil) + if err := dst.Put(sql); err != nil { + rdb.logWrite("Error", "Put error", err) + return err } + rdb.logWrite("Info", fmt.Sprintf("Migration success: src:/%s -> dst:/%s", srcDbName, srcDbName), nil) return nil } diff --git a/service/region/regionService.go b/service/region/regionService.go index c7f7be8..d91f3c1 100644 --- a/service/region/regionService.go +++ b/service/region/regionService.go @@ -2,6 +2,7 @@ package service import ( "encoding/json" + "fmt" "net/http" "sort" "sync" @@ -35,27 +36,22 @@ func GetRegions(cspType string) []string { } } - // CSP별 endpoint 확인 - path := "/tumblebug/connConfig?filterRegionRepresentative=true" + path := fmt.Sprintf("/tumblebug/provider/%s/region", cspType) method := http.MethodGet - // API 호출 body, err := utils.RequestTumblebug(path, method, "", nil) if err != nil { return nil } - var conns models.ConnectionConfigList - if err := json.Unmarshal(body, &conns); err != nil { + var regionList models.ProviderRegionList + if err := json.Unmarshal(body, ®ionList); err != nil { return nil } var result []string - for _, connConfig := range conns.ConnectionConfig { - if connConfig.ProviderName != cspType { - continue - } - result = append(result, connConfig.RegionDetail.RegionName) + for _, r := range regionList.Regions { + result = append(result, r.RegionName) } sort.Strings(result) diff --git a/service/task/task.go b/service/task/task.go index 1c23abb..26340ac 100644 --- a/service/task/task.go +++ b/service/task/task.go @@ -9,6 +9,7 @@ import ( "os" "path/filepath" "regexp" + "slices" "strings" "sync" "time" @@ -988,7 +989,7 @@ func handleRDBMSMigrateTask(params models.BasicDataTask) models.Status { } log.Info().Msg("Launch RDBController Copy") - if err := srcRDBC.Copy(dstRDBC); err != nil { + if err := srcRDBC.Copy(dstRDBC, params.SourcePoint.DatabaseName); err != nil { log.Error().Err(err).Msg("Copy error copying into rdbms ") return models.StatusFailed } @@ -1020,6 +1021,13 @@ func handleRDBMSBackupTask(params models.BasicDataTask) models.Status { return models.StatusFailed } + sourceDB := params.SourcePoint.DatabaseName + if !slices.Contains(dbList, sourceDB) { + log.Error().Msgf("database %q not found in server", sourceDB) + return models.StatusFailed + } + dbList = []string{sourceDB} + var sqlData string for _, db := range dbList { sqlData = "" diff --git a/web/js/scripts.js b/web/js/scripts.js index da86e4a..9d6c594 100644 --- a/web/js/scripts.js +++ b/web/js/scripts.js @@ -139,36 +139,9 @@ function resolveAlibabaEndpoint(region) { return ""; } const normalized = region.trim().toLowerCase(); - return ALIBABA_ENDPOINTS[normalized] || `https://oss-${normalized}.aliyuncs.com`; + return `https://oss-${normalized}.aliyuncs.com`; } -function emitAlibabaRegionEvent(prefix, provider, region) { - const normalized = region && region !== "none" ? region : null; - const detail = { - prefix, - provider, - region: normalized, - }; - document.dispatchEvent(new CustomEvent("alibabaRegionChange", { detail })); -} - -document.addEventListener("alibabaRegionChange", (event) => { - const { prefix, provider, region } = event.detail; - const hint = document.getElementById(`${prefix}AlibabaRegionHint`); - const endpointInput = document.getElementById(`${prefix}Point[endpoint]`); - - if (!endpointInput) { - return; - } - - if (provider === "alibaba") { - endpointInput.value = region ? resolveAlibabaEndpoint(region) : ""; - } else { - endpointInput.value = ""; - } - endpointInput.dispatchEvent(new Event("change")); -}); - function generateFormSubmit() { const form = document.getElementById('genForm'); @@ -308,12 +281,98 @@ function setSelectBox() {