diff --git a/l10n/bundle.l10n.json b/l10n/bundle.l10n.json index a45583b3d..3c2957a81 100644 --- a/l10n/bundle.l10n.json +++ b/l10n/bundle.l10n.json @@ -17,10 +17,15 @@ "⏩ Run All": "⏩ Run All", "⏳ Running All…": "⏳ Running All…", "⏳ Running Command…": "⏳ Running Command…", + "⏹️ Task '{taskName}' was stopped. {message}": "⏹️ Task '{taskName}' was stopped. {message}", "▶️ Run Command": "▶️ Run Command", + "▶️ Task '{taskName}' starting...": "▶️ Task '{taskName}' starting...", "⚠️ **Security:** TLS/SSL Disabled": "⚠️ **Security:** TLS/SSL Disabled", + "⚠️ Task '{taskName}' failed. {message}": "⚠️ Task '{taskName}' failed. {message}", "⚠ TLS/SSL Disabled": "⚠ TLS/SSL Disabled", "✅ **Security:** TLS/SSL Enabled": "✅ **Security:** TLS/SSL Enabled", + "✅ Task '{taskName}' completed successfully. {message}": "✅ Task '{taskName}' completed successfully. {message}", + "🟡 Task '{taskName}' initializing...": "🟡 Task '{taskName}' initializing...", "$(add) Create...": "$(add) Create...", "$(check) Success": "$(check) Success", "$(error) Failure": "$(error) Failure", @@ -46,7 +51,9 @@ "Always upload": "Always upload", "An element with the following id already exists: {id}": "An element with the following id already exists: {id}", "An error has occurred. Check output window for more details.": "An error has occurred. Check output window for more details.", + "An error occurred while writing documents: {0}": "An error occurred while writing documents: {0}", "An item with id \"{0}\" already exists for workspace \"{1}\".": "An item with id \"{0}\" already exists for workspace \"{1}\".", + "An unknown error occurred while inserting documents.": "An unknown error occurred while inserting documents.", "API version \"{0}\" for extension id \"{1}\" is no longer supported. Minimum version is \"{2}\".": "API version \"{0}\" for extension id \"{1}\" is no longer supported. Minimum version is \"{2}\".", "API: Registered new migration provider: \"{providerId}\" - \"{providerLabel}\"": "API: Registered new migration provider: \"{providerId}\" - \"{providerLabel}\"", "Are you sure?": "Are you sure?", @@ -81,6 +88,7 @@ "Click here to retry": "Click here to retry", "Click here to update credentials": "Click here to update credentials", "Click to view resource": "Click to view resource", + "Collection \"{0}\" from database \"{1}\" has been marked for copy.": "Collection \"{0}\" from database \"{1}\" has been marked for copy.", "Collection name cannot begin with the system. prefix (Reserved for internal use).": "Collection name cannot begin with the system. prefix (Reserved for internal use).", "Collection name cannot contain .system.": "Collection name cannot contain .system.", "Collection name cannot contain the $.": "Collection name cannot contain the $.", @@ -98,10 +106,16 @@ "Connection string is not set": "Connection string is not set", "Connection string not found.": "Connection string not found.", "Connection updated successfully.": "Connection updated successfully.", + "Copied {0} of {1} documents": "Copied {0} of {1} documents", + "Copy": "Copy", + "Copy \"{0}\"\nto \"{1}\"?\nThis will add all documents from the source collection to the target collection.": "Copy \"{0}\"\nto \"{1}\"?\nThis will add all documents from the source collection to the target collection.", + "Copy \"{sourceCollection}\" from \"{sourceDatabase}\" to \"{targetDatabase}/{targetCollection}\"": "Copy \"{sourceCollection}\" from \"{sourceDatabase}\" to \"{targetDatabase}/{targetCollection}\"", + "Copy operation completed successfully": "Copy operation completed successfully", "CosmosDB Accounts": "CosmosDB Accounts", "Could not find {0}": "Could not find {0}", "Could not find the Azure Resource Groups extension": "Could not find the Azure Resource Groups extension", "Could not find unique name for new file.": "Could not find unique name for new file.", + "Counting documents in source collection...": "Counting documents in source collection...", "Create an Azure Account...": "Create an Azure Account...", "Create an Azure for Students Account...": "Create an Azure for Students Account...", "Create collection": "Create collection", @@ -148,6 +162,7 @@ "Element with id of {rootId} not found.": "Element with id of {rootId} not found.", "Enable TLS/SSL (Default)": "Enable TLS/SSL (Default)", "Enforce TLS/SSL checks for a secure connection.": "Enforce TLS/SSL checks for a secure connection.", + "Ensuring target collection exists...": "Ensuring target collection exists...", "Enter a collection name.": "Enter a collection name.", "Enter a database name.": "Enter a database name.", "Enter the Azure VM tag key used for discovering DocumentDB instances.": "Enter the Azure VM tag key used for discovering DocumentDB instances.", @@ -189,26 +204,37 @@ "Exporting documents": "Exporting documents", "Exporting…": "Exporting…", "Extension dependency with id \"{0}\" must be updated.": "Extension dependency with id \"{0}\" must be updated.", + "Failed to abort transaction: {0}": "Failed to abort transaction: {0}", + "Failed to commit transaction: {0}": "Failed to commit transaction: {0}", "Failed to connect to \"{cluster}\"": "Failed to connect to \"{cluster}\"", "Failed to connect to VM \"{vmName}\"": "Failed to connect to VM \"{vmName}\"", + "Failed to copy collection: {0}": "Failed to copy collection: {0}", + "Failed to count documents in source collection: {0}": "Failed to count documents in source collection: {0}", "Failed to create Azure management clients: {0}": "Failed to create Azure management clients: {0}", "Failed to create role assignment \"{0}\" for the {2} resource \"{1}\".": "Failed to create role assignment \"{0}\" for the {2} resource \"{1}\".", "Failed to create role assignment(s).": "Failed to create role assignment(s).", "Failed to delete documents. Unknown error.": "Failed to delete documents. Unknown error.", "Failed to delete item \"{0}\".": "Failed to delete item \"{0}\".", "Failed to delete secrets for item \"{0}\".": "Failed to delete secrets for item \"{0}\".", + "Failed to end session: {0}": "Failed to end session: {0}", + "Failed to ensure target collection exists: {0}": "Failed to ensure target collection exists: {0}", "Failed to export documents. Please see the output for details.": "Failed to export documents. Please see the output for details.", "Failed to extract the connection string from the selected account.": "Failed to extract the connection string from the selected account.", "Failed to extract the connection string from the selected node.": "Failed to extract the connection string from the selected node.", "Failed to find commandId on generic tree item.": "Failed to find commandId on generic tree item.", + "Failed to get collection {0} in database {1}: {2}": "Failed to get collection {0} in database {1}: {2}", "Failed to get public IP": "Failed to get public IP", "Failed to initialize Azure management clients": "Failed to initialize Azure management clients", "Failed to initialize task": "Failed to initialize task", + "Failed to overwrite documents: {0}": "Failed to overwrite documents: {0}", "Failed to parse secrets for key {0}:": "Failed to parse secrets for key {0}:", "Failed to process URI: {0}": "Failed to process URI: {0}", "Failed to rename the connection.": "Failed to rename the connection.", "Failed to save credentials for \"{cluster}\".": "Failed to save credentials for \"{cluster}\".", "Failed to save credentials.": "Failed to save credentials.", + "Failed to start a session: {0}": "Failed to start a session: {0}", + "Failed to start a transaction with the provided session: {0}": "Failed to start a transaction with the provided session: {0}", + "Failed to start a transaction: {0}": "Failed to start a transaction: {0}", "Failed to store secrets for key {0}:": "Failed to store secrets for key {0}:", "Failed to update the connection.": "Failed to update the connection.", "Failed with code \"{0}\".": "Failed with code \"{0}\".", @@ -253,6 +279,7 @@ "Invalid connection type selected.": "Invalid connection type selected.", "Invalid document ID: {0}": "Invalid document ID: {0}", "Invalid semver \"{0}\".": "Invalid semver \"{0}\".", + "Invalid source or target node type.": "Invalid source or target node type.", "Invalid workspace resource ID: {0}": "Invalid workspace resource ID: {0}", "JSON View": "JSON View", "Learn more": "Learn more", @@ -292,6 +319,7 @@ "No": "No", "No Azure subscription found for this tree item.": "No Azure subscription found for this tree item.", "No Azure VMs found with tag \"{tagName}\" in subscription \"{subscriptionName}\".": "No Azure VMs found with tag \"{tagName}\" in subscription \"{subscriptionName}\".", + "No collection has been marked for copy. Please use Copy Collection first.": "No collection has been marked for copy. Please use Copy Collection first.", "No collection selected.": "No collection selected.", "No commands found in this document.": "No commands found in this document.", "No Connectivity": "No Connectivity", @@ -304,6 +332,7 @@ "No scope was provided for the role assignment.": "No scope was provided for the role assignment.", "No session found for id {sessionId}": "No session found for id {sessionId}", "No subscriptions found": "No subscriptions found", + "No target node selected.": "No target node selected.", "Not connected to any MongoDB database.": "Not connected to any MongoDB database.", "Note: This confirmation type can be configured in the extension settings.": "Note: This confirmation type can be configured in the extension settings.", "Note: You can disable these URL handling confirmations in the extension settings.": "Note: You can disable these URL handling confirmations in the extension settings.", @@ -369,8 +398,11 @@ "Signing out programmatically is not supported. You must sign out by selecting the account in the Accounts menu and choosing Sign Out.": "Signing out programmatically is not supported. You must sign out by selecting the account in the Accounts menu and choosing Sign Out.", "Simulated failure at step {0} for testing purposes": "Simulated failure at step {0} for testing purposes", "Skip for now": "Skip for now", + "Skipped document with _id: {0} due to error: {1}": "Skipped document with _id: {0} due to error: {1}", "Small breadcrumb example with buttons": "Small breadcrumb example with buttons", "Some items could not be displayed": "Some items could not be displayed", + "Source collection is empty.": "Source collection is empty.", + "Source: Collection \"{0}\" from database \"{1}\", connectionId: {2}": "Source: Collection \"{0}\" from database \"{1}\", connectionId: {2}", "Specified character lengths should be 1 character or greater.": "Specified character lengths should be 1 character or greater.", "Started executable: \"{command}\". Connecting to host…": "Started executable: \"{command}\". Connecting to host…", "Starting executable: \"{command}\"": "Starting executable: \"{command}\"", @@ -386,6 +418,8 @@ "Tag can only contain alphanumeric characters, underscores, periods, and hyphens.": "Tag can only contain alphanumeric characters, underscores, periods, and hyphens.", "Tag cannot be empty.": "Tag cannot be empty.", "Tag cannot be longer than 256 characters.": "Tag cannot be longer than 256 characters.", + "Target: Collection \"{0}\" from database \"{1}\", connectionId: {2}": "Target: Collection \"{0}\" from database \"{1}\", connectionId: {2}", + "Task aborted due to an error: {0}. {1} document(s) were inserted in total.": "Task aborted due to an error: {0}. {1} document(s) were inserted in total.", "Task completed successfully": "Task completed successfully", "Task created and ready to start": "Task created and ready to start", "Task failed": "Task failed", @@ -479,7 +513,6 @@ "with Popover": "with Popover", "Working…": "Working…", "Would you like to open the Collection View?": "Would you like to open the Collection View?", - "Write error: {0}": "Write error: {0}", "Yes": "Yes", "Yes, continue": "Yes, continue", "Yes, open Collection View": "Yes, open Collection View", diff --git a/package.json b/package.json index 41f3923f3..df80c68f9 100644 --- a/package.json +++ b/package.json @@ -444,6 +444,18 @@ "category": "DocumentDB", "command": "vscode-documentdb.command.containerView.open", "title": "Open Collection" + }, + { + "//": "Copy Collection", + "category": "DocumentDB", + "command": "vscode-documentdb.command.copyCollection", + "title": "Copy Collection…" + }, + { + "//": "Paste Collection", + "category": "DocumentDB", + "command": "vscode-documentdb.command.pasteCollection", + "title": "Paste Collection…" } ], "submenus": [ @@ -682,6 +694,18 @@ "command": "vscode-documentdb.command.refresh", "when": "view =~ /discoveryView/ && viewItem =~ /\\benableRefreshCommand\\b/i", "group": "zheLastGroup@1" + }, + { + "//": "[Collection] Copy Collection", + "command": "vscode-documentdb.command.copyCollection", + "when": "view =~ /connectionsView|discoveryView/ && viewItem =~ /treeitem[.]collection(?![a-z.\\/])/i && viewItem =~ /experience[.](mongocluster|mongodb)/i", + "group": "A@2" + }, + { + "//": "[Collection] Paste Collection", + "command": "vscode-documentdb.command.pasteCollection", + "when": "view =~ /connectionsView|discoveryView/ && viewItem =~ /treeitem[.]collection(?![a-z.\\/])/i && viewItem =~ /experience[.](mongocluster|mongodb)/i", + "group": "A@2" } ], "explorer/context": [], diff --git a/src/commands/copyCollection/copyCollection.ts b/src/commands/copyCollection/copyCollection.ts new file mode 100644 index 000000000..b9f1f72b8 --- /dev/null +++ b/src/commands/copyCollection/copyCollection.ts @@ -0,0 +1,25 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +import { type IActionContext } from '@microsoft/vscode-azext-utils'; +import * as vscode from 'vscode'; +import { ext } from '../../extensionVariables'; +import { type CollectionItem } from '../../tree/documentdb/CollectionItem'; + +export async function copyCollection(_context: IActionContext, node: CollectionItem): Promise { + if (!node) { + throw new Error(vscode.l10n.t('No node selected.')); + } + // Store the node in extension variables + ext.copiedCollectionNode = node; + + // Show confirmation message + const collectionName = node.collectionInfo.name; + const databaseName = node.databaseInfo.name; + + void vscode.window.showInformationMessage( + vscode.l10n.t('Collection "{0}" from database "{1}" has been marked for copy.', collectionName, databaseName), + ); +} diff --git a/src/commands/importDocuments/importDocuments.ts b/src/commands/importDocuments/importDocuments.ts index 79acd0468..7742e98be 100644 --- a/src/commands/importDocuments/importDocuments.ts +++ b/src/commands/importDocuments/importDocuments.ts @@ -8,7 +8,7 @@ import * as l10n from '@vscode/l10n'; import { EJSON, type Document } from 'bson'; import * as fs from 'node:fs/promises'; import * as vscode from 'vscode'; -import { ClustersClient } from '../../documentdb/ClustersClient'; +import { ClustersClient, isBulkWriteError } from '../../documentdb/ClustersClient'; import { AzureDomains, getHostsFromConnectionString, @@ -285,10 +285,31 @@ async function insertDocumentWithBufferIntoCluster( // Documents to process could be the current document (if too large) // or the contents of the buffer (if it was full) const client = await ClustersClient.getClient(node.cluster.id); - const insertResult = await client.insertDocuments(databaseName, collectionName, documentsToProcess as Document[]); - - return { - count: insertResult.insertedCount, - errorOccurred: insertResult.insertedCount < (documentsToProcess?.length || 0), - }; + try { + const insertResult = await client.insertDocuments( + databaseName, + collectionName, + documentsToProcess as Document[], + false, + ); + return { + count: insertResult.insertedCount, + errorOccurred: false, + }; + } catch (error) { + if (isBulkWriteError(error)) { + // Handle MongoDB bulk write errors + // It could be a partial failure, so we need to check the result + return { + count: error.result.insertedCount, + errorOccurred: true, + }; + } else { + // Handle other errors + return { + count: 0, + errorOccurred: true, + }; + } + } } diff --git a/src/commands/pasteCollection/pasteCollection.ts b/src/commands/pasteCollection/pasteCollection.ts new file mode 100644 index 000000000..d836aa4e2 --- /dev/null +++ b/src/commands/pasteCollection/pasteCollection.ts @@ -0,0 +1,102 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +import { type IActionContext } from '@microsoft/vscode-azext-utils'; +import * as l10n from '@vscode/l10n'; +import * as vscode from 'vscode'; +import { ext } from '../../extensionVariables'; +import { CopyPasteCollectionTask } from '../../services/tasks/copy-and-paste/CopyPasteCollectionTask'; +import { ConflictResolutionStrategy, type CopyPasteConfig } from '../../services/tasks/copy-and-paste/copyPasteConfig'; +import { DocumentDbDocumentReader } from '../../services/tasks/copy-and-paste/documentdb/documentDbDocumentReader'; +import { DocumentDbDocumentWriter } from '../../services/tasks/copy-and-paste/documentdb/documentDbDocumentWriter'; +import { TaskService } from '../../services/taskService'; +import { CollectionItem } from '../../tree/documentdb/CollectionItem'; + +export async function pasteCollection(_context: IActionContext, targetNode: CollectionItem): Promise { + const sourceNode = ext.copiedCollectionNode; + if (!sourceNode) { + void vscode.window.showWarningMessage( + l10n.t('No collection has been marked for copy. Please use Copy Collection first.'), + ); + return; + } + + if (!targetNode) { + throw new Error(vscode.l10n.t('No target node selected.')); + } + + // Check type of sourceNode or targetNodeAdd commentMore actions + // Currently we only support CollectionItem types + // Later we need to check if they are supported types that with document reader and writer implementations + if (!(sourceNode instanceof CollectionItem) || !(targetNode instanceof CollectionItem)) { + void vscode.window.showWarningMessage(l10n.t('Invalid source or target node type.')); + return; + } + + const sourceInfo = l10n.t( + 'Source: Collection "{0}" from database "{1}", connectionId: {2}', + sourceNode.collectionInfo.name, + sourceNode.databaseInfo.name, + sourceNode.cluster.id, + ); + const targetInfo = l10n.t( + 'Target: Collection "{0}" from database "{1}", connectionId: {2}', + targetNode.collectionInfo.name, + targetNode.databaseInfo.name, + targetNode.cluster.id, + ); + + // void vscode.window.showInformationMessage(`${sourceInfo}\n${targetInfo}`); + // Confirm the copy operation with the userAdd commentMore actions + const confirmMessage = l10n.t( + 'Copy "{0}"\nto "{1}"?\nThis will add all documents from the source collection to the target collection.', + sourceInfo, + targetInfo, + ); + + const confirmation = await vscode.window.showWarningMessage(confirmMessage, { modal: true }, l10n.t('Copy')); + + if (confirmation !== l10n.t('Copy')) { + return; + } + + try { + // Create copy-paste configuration + const config: CopyPasteConfig = { + source: { + connectionId: sourceNode.cluster.id, + databaseName: sourceNode.databaseInfo.name, + collectionName: sourceNode.collectionInfo.name, + }, + target: { + connectionId: targetNode.cluster.id, + databaseName: targetNode.databaseInfo.name, + collectionName: targetNode.collectionInfo.name, + }, + // Currently we only support aborting and skipping on conflict + // onConflict: ConflictResolutionStrategy.Abort, + // onConflict: ConflictResolutionStrategy.Skip, + onConflict: ConflictResolutionStrategy.Overwrite, + }; + + // Create task with documentDB document providers + // Need to check reader and writer implementations before creating the task + // For now, we only support DocumentDB collections + const reader = new DocumentDbDocumentReader(); + const writer = new DocumentDbDocumentWriter(); + const task = new CopyPasteCollectionTask(config, reader, writer); + + // Register task with the task service + TaskService.registerTask(task); + + // Start the copy-paste task + await task.start(); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); + void vscode.window.showErrorMessage(l10n.t('Failed to copy collection: {0}', errorMessage)); + + throw error; + } +} diff --git a/src/documentdb/ClustersClient.ts b/src/documentdb/ClustersClient.ts index f2941c0e8..fa15e3998 100644 --- a/src/documentdb/ClustersClient.ts +++ b/src/documentdb/ClustersClient.ts @@ -16,19 +16,19 @@ import { MongoBulkWriteError, MongoClient, ObjectId, + type ClientSession, type Collection, type DeleteResult, type Document, type Filter, type FindOptions, + type InsertManyResult, type ListDatabasesResult, type MongoClientOptions, type WithId, type WithoutId, - type WriteError, } from 'mongodb'; import { Links } from '../constants'; -import { ext } from '../extensionVariables'; import { type EmulatorConfiguration } from '../utils/emulatorConfiguration'; import { CredentialCache } from './CredentialCache'; import { getHostsFromConnectionString, hasAzureDomain } from './utils/connectionStringHelpers'; @@ -57,12 +57,9 @@ export interface IndexItemModel { version?: number; } -// Currently we only return insertedCount, but we can add more fields in the future if needed -// Keep the type definition here for future extensibility -export type InsertDocumentsResult = { - /** The number of inserted documents for this operations */ - insertedCount: number; -}; +export function isBulkWriteError(error: unknown): error is MongoBulkWriteError { + return error instanceof MongoBulkWriteError; +} export class ClustersClient { // cache of active/existing clients @@ -197,6 +194,62 @@ export class ClustersClient { } } + startTransaction(): ClientSession { + try { + const session = this._mongoClient.startSession(); + session.startTransaction(); + return session; + } catch (error) { + throw new Error(l10n.t('Failed to start a transaction: {0}', parseError(error).message)); + } + } + + startTransactionWithSession(session: ClientSession): void { + try { + session.startTransaction(); + } catch (error) { + throw new Error( + l10n.t('Failed to start a transaction with the provided session: {0}', parseError(error).message), + ); + } + } + + async commitTransaction(session: ClientSession): Promise { + try { + await session.commitTransaction(); + } catch (error) { + throw new Error(l10n.t('Failed to commit transaction: {0}', parseError(error).message)); + } finally { + this.endSession(session); + } + } + + async abortTransaction(session: ClientSession): Promise { + try { + await session.abortTransaction(); + } catch (error) { + throw new Error(l10n.t('Failed to abort transaction: {0}', parseError(error).message)); + } finally { + this.endSession(session); + } + } + + startSession(): ClientSession { + try { + return this._mongoClient.startSession(); + } catch (error) { + throw new Error(l10n.t('Failed to start a session: {0}', parseError(error).message)); + } + } + + endSession(session: ClientSession): void { + try { + void session.endSession(); + } catch (error) { + throw new Error(l10n.t('Failed to end session: {0}', parseError(error).message)); + } + } + getUserName() { return CredentialCache.getCredentials(this.credentialId)?.connectionUser; } @@ -208,6 +261,21 @@ export class ClustersClient { return CredentialCache.getConnectionStringWithPassword(this.credentialId); } + getCollection(databaseName: string, collectionName: string): Collection { + try { + return this._mongoClient.db(databaseName).collection(collectionName); + } catch (error) { + throw new Error( + l10n.t( + 'Failed to get collection {0} in database {1}: {2}', + collectionName, + databaseName, + parseError(error).message, + ), + ); + } + } + async listDatabases(): Promise { const rawDatabases: ListDatabasesResult = await this._mongoClient.db().admin().listDatabases(); const databases: DatabaseItemModel[] = rawDatabases.databases.filter( @@ -285,6 +353,21 @@ export class ClustersClient { return documents; } + async countDocuments(databaseName: string, collectionName: string, findQuery: string = '{}'): Promise { + if (findQuery === undefined || findQuery.trim().length === 0) { + findQuery = '{}'; + } + const findQueryObj: Filter = toFilterQueryObj(findQuery); + const collection = this._mongoClient.db(databaseName).collection(collectionName); + + const count = await collection.countDocuments(findQueryObj, { + // Use a read preference of 'primary' to ensure we get the most up-to-date + // count, especially important for sharded clusters. + readPreference: 'primary', + }); + return count; + } + async *streamDocuments( databaseName: string, collectionName: string, @@ -456,9 +539,10 @@ export class ClustersClient { databaseName: string, collectionName: string, documents: Document[], - ): Promise { + ordered: boolean = true, + ): Promise { if (documents.length === 0) { - return { insertedCount: 0 }; + return { acknowledged: false, insertedIds: {}, insertedCount: 0 }; } const collection = this._mongoClient.db(databaseName).collection(collectionName); @@ -468,37 +552,18 @@ export class ClustersClient { // Setting `ordered` to be false allows MongoDB to continue inserting remaining documents even if previous fails. // More details: https://www.mongodb.com/docs/manual/reference/method/db.collection.insertMany/#syntax - ordered: false, + ordered: ordered, }); - return { - insertedCount: insertManyResults.insertedCount, - }; + return insertManyResults; } catch (error) { - // print error messages to the console + // Log error messages to the console if (error instanceof MongoBulkWriteError) { - const writeErrors: WriteError[] = Array.isArray(error.writeErrors) - ? (error.writeErrors as WriteError[]) - : [error.writeErrors as WriteError]; - - for (const writeError of writeErrors) { - const generalErrorMessage = parseError(writeError).message; - const descriptiveErrorMessage = writeError.err?.errmsg; - - const fullErrorMessage = descriptiveErrorMessage - ? `${generalErrorMessage} - ${descriptiveErrorMessage}` - : generalErrorMessage; - - ext.outputChannel.appendLog(l10n.t('Write error: {0}', fullErrorMessage)); - } - ext.outputChannel.show(); + throw error; } else if (error instanceof Error) { - ext.outputChannel.appendLog(l10n.t('Error: {0}', error.message)); - ext.outputChannel.show(); + throw error; } - return { - insertedCount: error instanceof MongoBulkWriteError ? error.insertedCount || 0 : 0, - }; + throw new Error(l10n.t('An unknown error occurred while inserting documents.')); } } } diff --git a/src/documentdb/ClustersExtension.ts b/src/documentdb/ClustersExtension.ts index 4875138b3..f46f5eb95 100644 --- a/src/documentdb/ClustersExtension.ts +++ b/src/documentdb/ClustersExtension.ts @@ -20,6 +20,7 @@ import * as vscode from 'vscode'; import { addConnectionFromRegistry } from '../commands/addConnectionFromRegistry/addConnectionFromRegistry'; import { addDiscoveryRegistry } from '../commands/addDiscoveryRegistry/addDiscoveryRegistry'; import { chooseDataMigrationExtension } from '../commands/chooseDataMigrationExtension/chooseDataMigrationExtension'; +import { copyCollection } from '../commands/copyCollection/copyCollection'; import { copyAzureConnectionString } from '../commands/copyConnectionString/copyConnectionString'; import { createCollection } from '../commands/createCollection/createCollection'; import { createAzureDatabase } from '../commands/createDatabase/createDatabase'; @@ -35,6 +36,7 @@ import { newConnection } from '../commands/newConnection/newConnection'; import { newLocalConnection } from '../commands/newLocalConnection/newLocalConnection'; import { openCollectionView, openCollectionViewInternal } from '../commands/openCollectionView/openCollectionView'; import { openDocumentView } from '../commands/openDocument/openDocument'; +import { pasteCollection } from '../commands/pasteCollection/pasteCollection'; import { refreshTreeElement } from '../commands/refreshTreeElement/refreshTreeElement'; import { refreshView } from '../commands/refreshView/refreshView'; import { removeConnection } from '../commands/removeConnection/removeConnection'; @@ -216,6 +218,9 @@ export class ClustersExtension implements vscode.Disposable { renameConnection, ); + registerCommandWithTreeNodeUnwrapping('vscode-documentdb.command.copyCollection', copyCollection); + registerCommandWithTreeNodeUnwrapping('vscode-documentdb.command.pasteCollection', pasteCollection); + // using registerCommand instead of vscode.commands.registerCommand for better telemetry: // https://github.com/microsoft/vscode-azuretools/tree/main/utils#telemetry-and-error-handling diff --git a/src/extensionVariables.ts b/src/extensionVariables.ts index 0ad211868..bb7763ddb 100644 --- a/src/extensionVariables.ts +++ b/src/extensionVariables.ts @@ -11,6 +11,7 @@ import { type MongoDBLanguageClient } from './documentdb/scrapbook/languageClien import { type MongoVCoreBranchDataProvider } from './tree/azure-resources-view/documentdb/mongo-vcore/MongoVCoreBranchDataProvider'; import { type ConnectionsBranchDataProvider } from './tree/connections-view/ConnectionsBranchDataProvider'; import { type DiscoveryBranchDataProvider } from './tree/discovery-view/DiscoveryBranchDataProvider'; +import { type CollectionItem } from './tree/documentdb/CollectionItem'; import { type TreeElement } from './tree/TreeElement'; import { type AccountsItem } from './tree/workspace-view/documentdb/AccountsItem'; import { type ClustersWorkspaceBranchDataProvider } from './tree/workspace-view/documentdb/ClustersWorkbenchBranchDataProvider'; @@ -27,6 +28,9 @@ export namespace ext { export let fileSystem: DatabasesFileSystem; export let mongoLanguageClient: MongoDBLanguageClient; + // TODO: TN improve this: This is a temporary solution to get going. + export let copiedCollectionNode: CollectionItem | undefined; + // Since the Azure Resources extension did not update API interface, but added a new interface with activity // we have to use the new interface AzureResourcesExtensionApiWithActivity instead of AzureResourcesExtensionApi export let rgApiV2: AzureResourcesExtensionApiWithActivity; diff --git a/src/services/taskService.test.ts b/src/services/taskService.test.ts index 112319972..0e49beab9 100644 --- a/src/services/taskService.test.ts +++ b/src/services/taskService.test.ts @@ -5,6 +5,15 @@ import { Task, TaskService, TaskState, type TaskStatus } from './taskService'; +// Mock extensionVariables (ext) module +jest.mock('../extensionVariables', () => ({ + ext: { + outputChannel: { + appendLine: jest.fn(), // Mock appendLine as a no-op function + }, + }, +})); + // Mock vscode module jest.mock('vscode', () => ({ l10n: { diff --git a/src/services/taskService.ts b/src/services/taskService.ts index 4b2f7444e..7efd36ec3 100644 --- a/src/services/taskService.ts +++ b/src/services/taskService.ts @@ -4,6 +4,7 @@ *--------------------------------------------------------------------------------------------*/ import * as vscode from 'vscode'; +import { ext } from '../extensionVariables'; /** * Enumeration of possible states a task can be in. @@ -166,6 +167,36 @@ export abstract class Task { newState: state, taskId: this.id, }); + + // Centralized logging for final state transitions + if (state === TaskState.Completed) { + const msg = this._status.message ?? ''; + ext.outputChannel.appendLine( + vscode.l10n.t("✅ Task '{taskName}' completed successfully. {message}", { + taskName: this.name, + message: msg, + }), + ); + } else if (state === TaskState.Stopped) { + const msg = this._status.message ?? ''; + ext.outputChannel.appendLine( + vscode.l10n.t("⏹️ Task '{taskName}' was stopped. {message}", { + taskName: this.name, + message: msg, + }), + ); + } else if (state === TaskState.Failed) { + const msg = this._status.message ?? ''; + const err = this._status.error instanceof Error ? this._status.error.message : ''; + // Include error details if available + const detail = err ? ` ${vscode.l10n.t('Error: {0}', err)}` : ''; + ext.outputChannel.appendLine( + vscode.l10n.t("⚠️ Task '{taskName}' failed. {message}", { + taskName: this.name, + message: `${msg}${detail}`.trim(), + }), + ); + } } } @@ -197,6 +228,9 @@ export abstract class Task { if (this._status.state !== TaskState.Pending) { throw new Error(vscode.l10n.t('Cannot start task in state: {0}', this._status.state)); } + + ext.outputChannel.appendLine(vscode.l10n.t("🟡 Task '{taskName}' initializing...", { taskName: this.name })); + this.updateStatus(TaskState.Initializing, vscode.l10n.t('Initializing task...'), 0); try { @@ -214,6 +248,7 @@ export abstract class Task { } this.updateStatus(TaskState.Running, vscode.l10n.t('Task is running'), 0); + ext.outputChannel.appendLine(vscode.l10n.t("▶️ Task '{taskName}' starting...", { taskName: this.name })); // Start the actual work asynchronously void this.runWork().catch((error) => { diff --git a/src/services/tasks/copy-and-paste/CopyPasteCollectionTask.ts b/src/services/tasks/copy-and-paste/CopyPasteCollectionTask.ts new file mode 100644 index 000000000..cc004fd93 --- /dev/null +++ b/src/services/tasks/copy-and-paste/CopyPasteCollectionTask.ts @@ -0,0 +1,266 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +import * as vscode from 'vscode'; +import { ext } from '../../../extensionVariables'; +import { Task } from '../../taskService'; +import { ConflictResolutionStrategy, type CopyPasteConfig } from './copyPasteConfig'; +import { type DocumentDetails, type DocumentReader, type DocumentWriter } from './documentInterfaces'; + +/** + * Task for copying documents from a source to a target collection. + * + * This task uses a database-agnostic approach with `DocumentReader` and `DocumentWriter` + * interfaces. It streams documents from the source and writes them in batches to the + * target, managing memory usage with a configurable buffer. + */ +export class CopyPasteCollectionTask extends Task { + public readonly type: string = 'copy-paste-collection'; + public readonly name: string; + + private readonly config: CopyPasteConfig; + private readonly documentReader: DocumentReader; + private readonly documentWriter: DocumentWriter; + private totalDocuments: number = 0; + private processedDocuments: number = 0; + + // Buffer configuration for memory management + private readonly bufferSize: number = 100; // Number of documents to buffer + private readonly maxBufferMemoryMB: number = 32; // Rough memory limit for buffer + + /** + * Creates a new CopyPasteCollectionTask instance. + * + * @param config Configuration for the copy-paste operation + * @param documentReader Reader implementation for the source database + * @param documentWriter Writer implementation for the target database + */ + constructor(config: CopyPasteConfig, documentReader: DocumentReader, documentWriter: DocumentWriter) { + super(); + this.config = config; + this.documentReader = documentReader; + this.documentWriter = documentWriter; + + // Generate a descriptive name for the task + this.name = vscode.l10n.t( + 'Copy "{sourceCollection}" from "{sourceDatabase}" to "{targetDatabase}/{targetCollection}"', + { + sourceCollection: config.source.collectionName, + sourceDatabase: config.source.databaseName, + targetDatabase: config.target.databaseName, + targetCollection: config.target.collectionName, + }, + ); + } + + /** + * Initializes the task by counting documents and ensuring target collection exists. + * + * @param signal AbortSignal to check for cancellation + */ + protected async onInitialize(signal: AbortSignal): Promise { + // Count total documents for progress calculation + this.updateStatus(this.getStatus().state, vscode.l10n.t('Counting documents in source collection...')); + + try { + this.totalDocuments = await this.documentReader.countDocuments( + this.config.source.connectionId, + this.config.source.databaseName, + this.config.source.collectionName, + ); + } catch (error) { + throw new Error( + vscode.l10n.t( + 'Failed to count documents in source collection: {0}', + error instanceof Error ? error.message : String(error), + ), + ); + } + + if (signal.aborted) { + return; + } + + // Ensure target collection exists + this.updateStatus(this.getStatus().state, vscode.l10n.t('Ensuring target collection exists...')); + + try { + await this.documentWriter.ensureCollectionExists( + this.config.target.connectionId, + this.config.target.databaseName, + this.config.target.collectionName, + ); + } catch (error) { + throw new Error( + vscode.l10n.t( + 'Failed to ensure target collection exists: {0}', + error instanceof Error ? error.message : String(error), + ), + ); + } + } + + /** + * Performs the main copy-paste operation using buffer-based streaming. + * + * @param signal AbortSignal to check for cancellation + */ + protected async doWork(signal: AbortSignal): Promise { + // Handle the case where there are no documents to copy + if (this.totalDocuments === 0) { + this.updateProgress(100, vscode.l10n.t('Source collection is empty.')); + return; + } + + const documentStream = this.documentReader.streamDocuments( + this.config.source.connectionId, + this.config.source.databaseName, + this.config.source.collectionName, + ); + + const buffer: DocumentDetails[] = []; + let bufferMemoryEstimate = 0; + + for await (const document of documentStream) { + if (signal.aborted) { + // Buffer is a local variable, no need to clear, just exit. + return; + } + + // Add document to buffer + buffer.push(document); + bufferMemoryEstimate += this.estimateDocumentMemory(document); + + // Check if we need to flush the buffer + if (this.shouldFlushBuffer(buffer.length, bufferMemoryEstimate)) { + await this.flushBuffer(buffer, signal); + buffer.length = 0; // Clear buffer + bufferMemoryEstimate = 0; + } + } + + if (signal.aborted) { + return; + } + + // Flush any remaining documents in the buffer + if (buffer.length > 0) { + await this.flushBuffer(buffer, signal); + } + + // Ensure we report 100% completion + this.updateProgress(100, vscode.l10n.t('Copy operation completed successfully')); + } + + /** + * Flushes the document buffer by writing all documents to the target collection. + * + * @param buffer Array of documents to write. + * @param signal AbortSignal to check for cancellation. + */ + private async flushBuffer(buffer: DocumentDetails[], signal: AbortSignal): Promise { + if (buffer.length === 0 || signal.aborted) { + return; + } + + const result = await this.documentWriter.writeDocuments( + this.config.target.connectionId, + this.config.target.databaseName, + this.config.target.collectionName, + this.config, + buffer, + { batchSize: buffer.length }, + ); + + // Update processed count + this.processedDocuments += result.insertedCount; + + // Check for errors in the write result + if (result.errors && result.errors.length > 0) { + // Handle errors based on the configured conflict resolution strategy. + if (this.config.onConflict === ConflictResolutionStrategy.Abort) { + // Abort strategy: fail the entire task on the first error. + const firstError = result.errors[0] as { error: Error }; + throw new Error( + vscode.l10n.t( + 'Task aborted due to an error: {0}. {1} document(s) were inserted in total.', + firstError.error?.message ?? 'Unknown error', + this.processedDocuments.toString(), + ), + ); + } else if (this.config.onConflict === ConflictResolutionStrategy.Skip) { + // Skip strategy: log each error and continue. + for (const error of result.errors) { + ext.outputChannel.appendLog( + vscode.l10n.t( + 'Skipped document with _id: {0} due to error: {1}', + String(error.documentId ?? 'unknown'), + error.error?.message ?? 'Unknown error', + ), + ); + } + ext.outputChannel.show(); + } else { + // Overwrite or other strategies: treat errors as fatal for now. + // This can be expanded if other strategies need more nuanced error handling. + const firstError = result.errors[0] as { error: Error }; + throw new Error( + vscode.l10n.t( + 'An error occurred while writing documents: {0}', + firstError.error?.message ?? 'Unknown error', + ), + ); + } + } + + // Update progress + const progress = Math.min(100, (this.processedDocuments / this.totalDocuments) * 100); + this.updateProgress( + progress, + vscode.l10n.t('Copied {0} of {1} documents', this.processedDocuments, this.totalDocuments), + ); + } + + /** + * Determines whether the buffer should be flushed based on size and memory constraints. + * + * @param bufferCount Number of documents in the buffer + * @param memoryEstimate Estimated memory usage in bytes + * @returns True if the buffer should be flushed + */ + private shouldFlushBuffer(bufferCount: number, memoryEstimate: number): boolean { + // Flush if we've reached the document count limit + if (bufferCount >= this.bufferSize) { + return true; + } + + // Flush if we've exceeded the memory limit (converted to bytes) + const memoryLimitBytes = this.maxBufferMemoryMB * 1024 * 1024; + if (memoryEstimate >= memoryLimitBytes) { + return true; + } + + return false; + } + + /** + * Estimates the memory usage of a document in bytes. + * This is a rough estimate based on JSON serialization. + * + * @param document Document to estimate + * @returns Estimated memory usage in bytes + */ + private estimateDocumentMemory(document: DocumentDetails): number { + try { + // A rough estimate based on the length of the JSON string representation. + // V8 strings are typically 2 bytes per character (UTF-16). + const jsonString = JSON.stringify(document.documentContent); + return jsonString.length * 2; + } catch { + // If serialization fails, return a conservative default. + return 1024; // 1KB + } + } +} diff --git a/src/services/tasks/copy-and-paste/copyPasteConfig.ts b/src/services/tasks/copy-and-paste/copyPasteConfig.ts new file mode 100644 index 000000000..a787c49e8 --- /dev/null +++ b/src/services/tasks/copy-and-paste/copyPasteConfig.ts @@ -0,0 +1,60 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +/** + * Enumeration of conflict resolution strategies for copy-paste operations + */ +export enum ConflictResolutionStrategy { + /** + * Abort the operation if any conflict or error occurs + */ + Abort = 'abort', + + /** + * Skip the conflicting document and continue with the operation + */ + Skip = 'skip', + + /** + * Overwrite the existing document in the target collection with the source document + */ + Overwrite = 'overwrite', +} + +/** + * Configuration for copy-paste operations + */ +export interface CopyPasteConfig { + /** + * Source collection information + */ + source: { + connectionId: string; + databaseName: string; + collectionName: string; + }; + + /** + * Target collection information + */ + target: { + connectionId: string; + databaseName: string; + collectionName: string; + }; + + /** + * Conflict resolution strategy + */ + onConflict: ConflictResolutionStrategy; + + /** + * Optional reference to a connection manager or client object. + * For now, this is typed as `unknown` to allow flexibility. + * Specific task implementations (e.g., for DocumentDB) will cast this to their + * required client/connection type. + */ + connectionManager?: unknown; // e.g. could be cast to a DocumentDB client instance +} diff --git a/src/services/tasks/copy-and-paste/documentInterfaces.ts b/src/services/tasks/copy-and-paste/documentInterfaces.ts new file mode 100644 index 000000000..82c90a8fa --- /dev/null +++ b/src/services/tasks/copy-and-paste/documentInterfaces.ts @@ -0,0 +1,108 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +import { type CopyPasteConfig } from './copyPasteConfig'; + +/** + * Represents a single document in the copy-paste operation. + */ +export interface DocumentDetails { + /** + * The document's unique identifier (e.g., _id in DocumentDB) + */ + id: unknown; + + /** + * The document content treated as opaque data by the core task logic. + * Specific readers/writers will know how to interpret/serialize this. + * For DocumentDB, this would typically be a BSON document. + */ + documentContent: unknown; +} + +/** + * Interface for reading documents from a source collection + */ +export interface DocumentReader { + /** + * Streams documents from the source collection. + * + * @param connectionId Connection identifier for the source + * @param databaseName Name of the source database + * @param collectionName Name of the source collection + * @returns AsyncIterable of documents + */ + streamDocuments(connectionId: string, databaseName: string, collectionName: string): AsyncIterable; + + /** + * Counts documents in the source collection for progress calculation. + * + * @param connectionId Connection identifier for the source + * @param databaseName Name of the source database + * @param collectionName Name of the source collection + * @returns Promise resolving to the number of documents + */ + countDocuments(connectionId: string, databaseName: string, collectionName: string): Promise; +} + +/** + * Options for document writing operations. + */ +export interface DocumentWriterOptions { + /** + * Batch size for bulk write operations. + */ + batchSize?: number; +} + +/** + * Result of a bulk write operation. + */ +export interface BulkWriteResult { + /** + * Number of documents successfully inserted. + */ + insertedCount: number; + + /** + * Array of errors that occurred during the write operation. + */ + errors: Array<{ documentId?: string; error: Error }> | null; // Should be typed more specifically based on the implementation +} + +/** + * Interface for writing documents to a target collection. + */ +export interface DocumentWriter { + /** + * Writes documents in bulk to the target collection. + * + * @param connectionId Connection identifier for the target + * @param databaseName Name of the target database + * @param collectionName Name of the target collection + * @param documents Array of documents to write + * @param options Optional write options + * @returns Promise resolving to the write result + */ + writeDocuments( + connectionId: string, + databaseName: string, + collectionName: string, + config: CopyPasteConfig, + documents: DocumentDetails[], + options?: DocumentWriterOptions, + ): Promise; + + /** + * Ensures the target collection exists before writing. + * May need methods for pre-flight checks or setup. + * + * @param connectionId Connection identifier for the target + * @param databaseName Name of the target database + * @param collectionName Name of the target collection + * @returns Promise that resolves when the collection is ready + */ + ensureCollectionExists(connectionId: string, databaseName: string, collectionName: string): Promise; +} diff --git a/src/services/tasks/copy-and-paste/documentdb/documentDbDocumentReader.ts b/src/services/tasks/copy-and-paste/documentdb/documentDbDocumentReader.ts new file mode 100644 index 000000000..523228094 --- /dev/null +++ b/src/services/tasks/copy-and-paste/documentdb/documentDbDocumentReader.ts @@ -0,0 +1,56 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +import { type Document, type WithId } from 'mongodb'; +import { ClustersClient } from '../../../../documentdb/ClustersClient'; +import { type DocumentDetails, type DocumentReader } from '../documentInterfaces'; + +/** + * DocumentDB-specific implementation of DocumentReader. + */ +export class DocumentDbDocumentReader implements DocumentReader { + /** + * Streams documents from a DocumentDB collection. + * + * @param connectionId Connection identifier to get the DocumentDB client + * @param databaseName Name of the database + * @param collectionName Name of the collection + * @returns AsyncIterable of document details + */ + async *streamDocuments( + connectionId: string, + databaseName: string, + collectionName: string, + ): AsyncIterable { + const client = await ClustersClient.getClient(connectionId); + + const documentStream = client.streamDocuments(databaseName, collectionName, new AbortController().signal); + for await (const document of documentStream) { + yield { + id: (document as WithId)._id, + documentContent: document, + }; + } + } + + /** + * Counts the total number of documents in the DocumentDB collection. + * + * @param connectionId Connection identifier to get the DocumentDB client + * @param databaseName Name of the database + * @param collectionName Name of the collection, + * @param filter Optional filter to apply to the count operation (default is '{}') + * @returns Promise resolving to the document count + */ + async countDocuments( + connectionId: string, + databaseName: string, + collectionName: string, + filter: string = '{}', + ): Promise { + const client = await ClustersClient.getClient(connectionId); + return await client.countDocuments(databaseName, collectionName, filter); + } +} diff --git a/src/services/tasks/copy-and-paste/documentdb/documentDbDocumentWriter.ts b/src/services/tasks/copy-and-paste/documentdb/documentDbDocumentWriter.ts new file mode 100644 index 000000000..eb08a3184 --- /dev/null +++ b/src/services/tasks/copy-and-paste/documentdb/documentDbDocumentWriter.ts @@ -0,0 +1,142 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +import { parseError } from '@microsoft/vscode-azext-utils'; +import { type Document, type ObjectId, type WithId, type WriteError } from 'mongodb'; +import { l10n } from 'vscode'; +import { ClustersClient, isBulkWriteError } from '../../../../documentdb/ClustersClient'; +import { ConflictResolutionStrategy, type CopyPasteConfig } from '../copyPasteConfig'; +import { + type BulkWriteResult, + type DocumentDetails, + type DocumentWriter, + type DocumentWriterOptions, +} from '../documentInterfaces'; + +/** + * DocumentDB-specific implementation of DocumentWriter. + */ +export class DocumentDbDocumentWriter implements DocumentWriter { + /** + * Writes documents to a DocumentDB collection using bulk operations. + * + * @param connectionId Connection identifier to get the DocumentDB client + * @param databaseName Name of the target database + * @param collectionName Name of the target collection + * @param documents Array of documents to write + * @param options Optional write options + * @returns Promise resolving to the bulk write result + */ + async writeDocuments( + connectionId: string, + databaseName: string, + collectionName: string, + config: CopyPasteConfig, + documents: DocumentDetails[], + _options?: DocumentWriterOptions, + ): Promise { + if (documents.length === 0) { + return { + insertedCount: 0, + errors: [], + }; + } + + const client = await ClustersClient.getClient(connectionId); + + // Convert DocumentDetails to DocumentDB documents + const rawDocuments = documents.map((doc) => doc.documentContent as WithId); + + try { + const insertResult = await client.insertDocuments( + databaseName, + collectionName, + rawDocuments, + // For abort on conflict, we set ordered to true to make it throw on the first error + // For skip on conflict, we set ordered to false + // For overwrite on conflict, we use ordered as a filter to find documents that should be overwritten + config.onConflict === ConflictResolutionStrategy.Abort, + ); + + return { + insertedCount: insertResult.insertedCount, + errors: null, // DocumentDB bulk write errors will be handled in the catch block + }; + } catch (error: unknown) { + if (isBulkWriteError(error)) { + const writeErrorsArray = ( + Array.isArray(error.writeErrors) ? error.writeErrors : [error.writeErrors] + ) as Array; + + if (config.onConflict === ConflictResolutionStrategy.Overwrite) { + // For overwrite strategy, we need to delete the conflicting documents and then re-insert + const session = client.startTransaction(); + const collection = client.getCollection(databaseName, collectionName); + try { + // eslint-disable-next-line @typescript-eslint/no-unsafe-return + const idsToOverwrite = writeErrorsArray.map((we) => we.getOperation()._id) as Array; + const documentsToOverwrite = rawDocuments.filter((doc) => + idsToOverwrite.includes((doc as WithId)._id as ObjectId), + ); + await collection.deleteMany({ _id: { $in: idsToOverwrite } }, { session }); + const insertResult = await collection.insertMany(documentsToOverwrite, { session }); + await client.commitTransaction(session); + return { + insertedCount: insertResult.insertedCount, + errors: null, + }; + } catch (error) { + await client.abortTransaction(session); + throw new Error(l10n.t('Failed to overwrite documents: {0}', parseError(error).message)); + } + } + + return { + insertedCount: error.result.insertedCount, + errors: writeErrorsArray.map((writeError) => ({ + documentId: (writeError.getOperation()._id as string) || undefined, + error: new Error(writeError.errmsg || 'Unknown write error'), + })), + }; + } else if (error instanceof Error) { + return { + insertedCount: 0, + errors: [{ documentId: undefined, error }], + }; + } else { + // Handle unknown error types + return { + insertedCount: 0, + errors: [{ documentId: undefined, error: new Error(String(error)) }], + }; + } + } + } + + /** + * Ensures the target collection exists. + * + * @param connectionId Connection identifier to get the DocumentDB client + * @param databaseName Name of the target database + * @param collectionName Name of the target collection + * @returns Promise that resolves when the collection is ready + */ + async ensureCollectionExists(connectionId: string, databaseName: string, collectionName: string): Promise { + const client = await ClustersClient.getClient(connectionId); + + // Check if collection exists by trying to list collections + const collections = await client.listCollections(databaseName); + const collectionExists = collections.some((col) => col.name === collectionName); + + // we could have just run 'createCollection' without this check. This will work just fine + // for basic scenarios. However, an exiting collection with the same name but a different + // configuration could lead to unexpected behavior. + + if (!collectionExists) { + // Create the collection by running createCollection + await client.createCollection(databaseName, collectionName); + } + } +}