diff --git a/l10n/bundle.l10n.json b/l10n/bundle.l10n.json index 7921215f5..4215f6795 100644 --- a/l10n/bundle.l10n.json +++ b/l10n/bundle.l10n.json @@ -215,8 +215,8 @@ "I want to connect using a connection string.": "I want to connect using a connection string.", "Ignoring the following files that do not match the \"*.json\" file name pattern:": "Ignoring the following files that do not match the \"*.json\" file name pattern:", "Import": "Import", + "Import completed with errors.": "Import completed with errors.", "Import From JSON…": "Import From JSON…", - "Import has accomplished with errors.": "Import has accomplished with errors.", "Import successful.": "Import successful.", "IMPORTANT: Please be sure to remove any private information before submitting.": "IMPORTANT: Please be sure to remove any private information before submitting.", "Importing document {num} of {countDocuments}": "Importing document {num} of {countDocuments}", @@ -378,8 +378,6 @@ "The document with the _id \"{0}\" has been saved.": "The document with the _id \"{0}\" has been saved.", "The entered value does not match the original.": "The entered value does not match the original.", "The export operation was canceled.": "The export operation was canceled.", - "The insertion failed. The operation was not acknowledged by the database.": "The insertion failed. The operation was not acknowledged by the database.", - "The insertion of document {number} failed with error: {error}": "The insertion of document {number} failed with error: {error}", "The issue text was copied to the clipboard. Please paste it into this window.": "The issue text was copied to the clipboard. Please paste it into this window.", "The local instance is using a self-signed certificate. To connect, you must import the appropriate TLS/SSL certificate. See {link} for tips.": "The local instance is using a self-signed certificate. To connect, you must import the appropriate TLS/SSL certificate. See {link} for tips.", "The location where resources will be deployed.": "The location where resources will be deployed.", @@ -450,6 +448,7 @@ "Where to save the exported documents?": "Where to save the exported documents?", "with Popover": "with Popover", "Working…": "Working…", + "Write error: {0}": "Write error: {0}", "Yes": "Yes", "Yes, save my credentials": "Yes, save my credentials", "You are not signed in to an Azure account. Please sign in.": "You are not signed in to an Azure account. Please sign in.", diff --git a/src/commands/importDocuments/importDocuments.ts b/src/commands/importDocuments/importDocuments.ts index 6056b07ce..0013fd1af 100644 --- a/src/commands/importDocuments/importDocuments.ts +++ b/src/commands/importDocuments/importDocuments.ts @@ -3,14 +3,20 @@ * Licensed under the MIT License. See License.txt in the project root for license information. *--------------------------------------------------------------------------------------------*/ -import { parseError, type IActionContext } from '@microsoft/vscode-azext-utils'; +import { nonNullProp, parseError, type IActionContext } from '@microsoft/vscode-azext-utils'; import * as l10n from '@vscode/l10n'; import { EJSON, type Document } from 'bson'; import * as fse from 'fs-extra'; import * as vscode from 'vscode'; import { ClustersClient } from '../../documentdb/ClustersClient'; +import { + AzureDomains, + getHostsFromConnectionString, + hasDomainSuffix, +} from '../../documentdb/utils/connectionStringHelpers'; import { ext } from '../../extensionVariables'; import { CollectionItem } from '../../tree/documentdb/CollectionItem'; +import { BufferErrorCode, createMongoDbBuffer, type DocumentBuffer } from '../../utils/documentBuffer'; import { getRootPath } from '../../utils/workspacUtils'; export async function importDocuments( @@ -51,6 +57,10 @@ export async function importDocuments( return undefined; } + if (!(selectedItem instanceof CollectionItem)) { + throw new Error('Selected item must be a CollectionItem'); + } + context.telemetry.properties.experience = selectedItem.experience.api; await ext.state.runWithTemporaryDescription(selectedItem.id, l10n.t('Importing…'), async () => { @@ -70,18 +80,21 @@ export async function importDocumentsWithProgress(selectedItem: CollectionItem, progress.report({ increment: 0, message: l10n.t('Loading documents…') }); const countUri = uris.length; - const incrementUri = 50 / (countUri || 1); + const incrementUri = 25 / (countUri || 1); const documents: unknown[] = []; let hasErrors = false; - for (let i = 0, percent = 0; i < countUri; i++, percent += incrementUri) { + for (let i = 0; i < countUri; i++) { + const increment = (i + 1) * incrementUri; progress.report({ - increment: Math.floor(percent), + increment: Math.floor(increment), message: l10n.t('Loading document {num} of {countUri}', { num: i + 1, countUri }), }); const result = await parseAndValidateFile(selectedItem, uris[i]); + // Note to future maintainers: the validation can return 0 valid documents and still have errors. + if (result.errors && result.errors.length) { ext.outputChannel.appendLog( l10n.t('Errors found in document {path}. Please fix these.', { path: uris[i].path }), @@ -91,47 +104,62 @@ export async function importDocumentsWithProgress(selectedItem: CollectionItem, hasErrors = true; } - if (result.documents && result.documents.length) { + if (result.documents && result.documents.length > 0) { documents.push(...result.documents); } } const countDocuments = documents.length; - const incrementDocuments = 50 / (countDocuments || 1); + const incrementDocuments = 75 / (countDocuments || 1); let count = 0; + let buffer: DocumentBuffer | undefined; + if (selectedItem instanceof CollectionItem) { + const hosts = getHostsFromConnectionString(nonNullProp(selectedItem.cluster, 'connectionString')); + const isRuResource = hasDomainSuffix(AzureDomains.RU, ...hosts); + + if (isRuResource) { + // For Azure MongoDB RU, we use a buffer with maxDocumentCount = 1 + buffer = createMongoDbBuffer({ + maxDocumentCount: 1, + }); + } else { + buffer = createMongoDbBuffer(); + } + } - for (let i = 0, percent = 0; i < countDocuments; i++, percent += incrementDocuments) { + for (let i = 0; i < countDocuments; i++) { progress.report({ - increment: Math.floor(percent), + increment: incrementDocuments, message: l10n.t('Importing document {num} of {countDocuments}', { num: i + 1, countDocuments, }), }); - const result = await insertDocument(selectedItem, documents[i]); + const result = await insertDocument(selectedItem, documents[i], buffer); - if (result.error) { - ext.outputChannel.appendLog( - l10n.t('The insertion of document {number} failed with error: {error}', { - number: i + 1, - error: result.error, - }), - ); - ext.outputChannel.show(); - hasErrors = true; - } else { - count++; - } + // 'count' in result means that the result is from the buffer + count += result.count; + // check if error occurred as partial failure would happen in bulk insertion + hasErrors = hasErrors || result.errorOccurred; + } + + // Do insertion for the last batch for bulk insertion + if (buffer && buffer.getStats().documentCount > 0) { + const lastBatchFlushResult = await insertDocument(selectedItem, undefined, buffer); + + count += lastBatchFlushResult.count; + hasErrors = hasErrors || lastBatchFlushResult.errorOccurred; } - progress.report({ increment: 50, message: l10n.t('Finished importing') }); + // let's make sure we reach 100% progress, useful in case of errors etc. + progress.report({ increment: 100, message: l10n.t('Finished importing') }); - return hasErrors - ? l10n.t('Import has accomplished with errors.') - : l10n.t('Import successful.') + - ' ' + - l10n.t('Inserted {0} document(s). See output for more details.', count); + return ( + (hasErrors ? l10n.t('Import completed with errors.') : l10n.t('Import successful.')) + + ' ' + + l10n.t('Inserted {0} document(s). See output for more details.', count) + ); }, ); @@ -205,32 +233,62 @@ async function parseAndValidateFileForMongo(uri: vscode.Uri): Promise<{ document return { documents, errors }; } -async function insertDocument(node: CollectionItem, document: unknown): Promise<{ document: unknown; error: string }> { +async function insertDocument( + node: CollectionItem, + document: unknown, + buffer: DocumentBuffer | undefined, +): Promise<{ count: number; errorOccurred: boolean }> { try { + // Check for valid buffer + if (!buffer) { + return { count: 0, errorOccurred: true }; + } + + // Route to appropriate handler based on node type if (node instanceof CollectionItem) { - // await needs to catch the error here, otherwise it will be thrown to the caller - return await insertDocumentIntoCluster(node, document as Document); + return await insertDocumentWithBufferIntoCluster(node, buffer, document as Document); } - } catch (e) { - return { document, error: parseError(e).message }; - } - return { document, error: l10n.t('Unknown error') }; + // Should only reach here if node is neither CollectionItem nor CosmosDBContainerResourceItem + return { count: 0, errorOccurred: true }; + } catch { + return { count: 0, errorOccurred: true }; + } } -async function insertDocumentIntoCluster( +async function insertDocumentWithBufferIntoCluster( node: CollectionItem, - document: Document, -): Promise<{ document: Document; error: string }> { - const client = await ClustersClient.getClient(node.cluster.id); - const response = await client.insertDocuments(node.databaseInfo.name, node.collectionInfo.name, [document]); + buffer: DocumentBuffer, + document?: Document, + // If document is undefined, it means that we are flushing the buffer + // It is used for the last batch, and not recommended to be used for normal batches +): Promise<{ count: number; errorOccurred: boolean }> { + const databaseName = node.databaseInfo.name; + const collectionName = node.collectionInfo.name; + // Try to add document to buffer + const insertOrFlushToBufferResult = buffer.insertOrFlush(document); + // If successful, no immediate action needed + if (insertOrFlushToBufferResult.success) { + return { count: 0, errorOccurred: false }; + } - if (response?.acknowledged) { - return { document, error: '' }; - } else { - return { - document, - error: l10n.t('The insertion failed. The operation was not acknowledged by the database.'), - }; + let documentsToProcess = insertOrFlushToBufferResult.documentsToProcess; + if (insertOrFlushToBufferResult.errorCode === BufferErrorCode.BufferFull) { + // The buffer has been flushed by the insertOrFlush method. + // Reinserting the current document into the buffer ensures it is processed after the flush. + // This is safe because the document has already been validated (e.g., it is not too large and not undefined). + buffer.insert(document); + } else if (insertOrFlushToBufferResult.errorCode === BufferErrorCode.EmptyDocument) { + documentsToProcess = buffer.flush(); } + + // Documents to process could be the current document (if too large) + // or the contents of the buffer (if it was full) + const client = await ClustersClient.getClient(node.cluster.id); + const insertResult = await client.insertDocuments(databaseName, collectionName, documentsToProcess as Document[]); + + return { + count: insertResult.insertedCount, + errorOccurred: insertResult.insertedCount < (documentsToProcess?.length || 0), + }; } diff --git a/src/documentdb/ClustersClient.ts b/src/documentdb/ClustersClient.ts index 64d17f6cb..09efafbf8 100644 --- a/src/documentdb/ClustersClient.ts +++ b/src/documentdb/ClustersClient.ts @@ -13,6 +13,7 @@ import { appendExtensionUserAgent, callWithTelemetryAndErrorHandling, parseError import * as l10n from '@vscode/l10n'; import { EJSON } from 'bson'; import { + MongoBulkWriteError, MongoClient, ObjectId, type Collection, @@ -24,8 +25,10 @@ import { type MongoClientOptions, type WithId, type WithoutId, + type WriteError, } from 'mongodb'; import { Links } from '../constants'; +import { ext } from '../extensionVariables'; import { type EmulatorConfiguration } from '../utils/emulatorConfiguration'; import { CredentialCache } from './CredentialCache'; import { getHostsFromConnectionString, hasAzureDomain } from './utils/connectionStringHelpers'; @@ -54,9 +57,9 @@ export interface IndexItemModel { version?: number; } +// Currently we only return insertedCount, but we can add more fields in the future if needed +// Keep the type definition here for future extensibility export type InsertDocumentsResult = { - /** Indicates whether this write result was acknowledged. If not, then all other members of this result will be undefined */ - acknowledged: boolean; /** The number of inserted documents for this operations */ insertedCount: number; }; @@ -457,13 +460,48 @@ export class ClustersClient { collectionName: string, documents: Document[], ): Promise { + if (documents.length === 0) { + return { insertedCount: 0 }; + } const collection = this._mongoClient.db(databaseName).collection(collectionName); - const insertManyResults = await collection.insertMany(documents, { forceServerObjectId: true }); + try { + const insertManyResults = await collection.insertMany(documents, { + forceServerObjectId: true, + + // Setting `ordered` to be false allows MongoDB to continue inserting remaining documents even if previous fails. + // More details: https://www.mongodb.com/docs/manual/reference/method/db.collection.insertMany/#syntax + ordered: false, + }); + return { + insertedCount: insertManyResults.insertedCount, + }; + } catch (error) { + // print error messages to the console + if (error instanceof MongoBulkWriteError) { + const writeErrors: WriteError[] = Array.isArray(error.writeErrors) + ? (error.writeErrors as WriteError[]) + : [error.writeErrors as WriteError]; + + for (const writeError of writeErrors) { + const generalErrorMessage = parseError(writeError).message; + const descriptiveErrorMessage = writeError.err?.errmsg; - return { - acknowledged: insertManyResults.acknowledged, - insertedCount: insertManyResults.insertedCount, - }; + const fullErrorMessage = descriptiveErrorMessage + ? `${generalErrorMessage} - ${descriptiveErrorMessage}` + : generalErrorMessage; + + ext.outputChannel.appendLog(l10n.t('Write error: {0}', fullErrorMessage)); + } + ext.outputChannel.show(); + } else if (error instanceof Error) { + ext.outputChannel.appendLog(l10n.t('Error: {0}', error.message)); + ext.outputChannel.show(); + } + + return { + insertedCount: error instanceof MongoBulkWriteError ? error.insertedCount || 0 : 0, + }; + } } } diff --git a/src/utils/documentBuffer.ts b/src/utils/documentBuffer.ts new file mode 100644 index 000000000..be3dfebde --- /dev/null +++ b/src/utils/documentBuffer.ts @@ -0,0 +1,253 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +import { EJSON } from 'bson'; + +/** + * Configuration options for the document buffer + */ +export interface DocumentBufferOptions { + /** + * Maximum size of the buffer in bytes before it should be flushed + */ + maxBufferSizeBytes: number; + + /** + * Maximum number of documents in the buffer before it should be flushed + */ + maxDocumentCount: number; + + /** + * Maximum size of a single document in bytes that can be added to the buffer + */ + maxSingleDocumentSizeBytes: number; + + /** + * Function to calculate the size of a document + */ + calculateDocumentSize: (document: unknown) => number; +} + +/** + * Error codes for document buffer operations + */ +export enum BufferErrorCode { + /** + * No error occurred + */ + None = 'none', + + /** + * Document is too large to fit in the buffer based on maxSingleDocumentSizeBytes + */ + DocumentTooLarge = 'document_too_large', + + /** + * Buffer has reached maximum number of documents (maxDocumentCount) + * or maximum size in bytes (maxBufferSizeBytes) + */ + BufferFull = 'buffer_full', + + /** + * Document is null or undefined + */ + EmptyDocument = 'empty_document', + + /** + * Other unexpected errors + */ + Other = 'other', +} + +/** + * Result of an insert operation into the document buffer + */ +export interface BufferInsertResult { + /** + * Whether the insert operation was successful + * If true, the documentsToProcess will be undefined + */ + success: boolean; + + /** + * Error code indicating the reason for failure + */ + errorCode: BufferErrorCode; +} + +export interface BufferInsertOrFlushResult extends BufferInsertResult { + /** + * Documents that need to be processed immediately if not buffered + * This could be the current document if it's too large, or + * the contents of the buffer if it's full and needs to be flushed + */ + documentsToProcess?: T[]; +} + +/** + * Document buffer for a specific database/collection pair. + * Used for batching document inserts to improve performance. + */ +export class DocumentBuffer { + private documents: T[] = []; + private currentSize: number = 0; + + /** + * Create a new document buffer + * + * @param options Configuration options for the buffer + */ + constructor(private readonly options: DocumentBufferOptions) {} + + /** + * Calculate the size of a document using the provided size calculation function + */ + public getDocumentSize(document?: T): number { + if (!document) { + return 0; + } + return this.options.calculateDocumentSize(document); + } + + /** + * Check if the buffer should be flushed + * + * @param documentSize Size of the document to be added (optional) + */ + public shouldFlush(documentSize: number = 0): boolean { + return ( + this.documents.length + 1 > this.options.maxDocumentCount || + this.currentSize + documentSize > this.options.maxBufferSizeBytes + ); + } + + /** + * Insert a document into the buffer + * If the document is too large or the buffer is full, it will return the documents that need to be processed + * Note: It is highly recommended to check if flush needed with `shouldFlush` before inserting + * + * @param document The document to insert + * @returns Result indicating success or documents that need immediate processing + */ + public insertOrFlush(document: T): BufferInsertOrFlushResult { + const insertResult = this.insert(document); + if (insertResult.success) { + // If the insert was successful, return success with no documents to process + return { ...insertResult, documentsToProcess: undefined }; + } + // If the insert failed, we need to determine what to do next + switch (insertResult.errorCode) { + case BufferErrorCode.DocumentTooLarge: + // If the document is too large, return it for immediate processing + return { + ...insertResult, + documentsToProcess: [document], + }; + + case BufferErrorCode.BufferFull: + // If the buffer is full, return the current buffer for processing + // Note that current document is not added to the buffer yet + return { + ...insertResult, + documentsToProcess: this.flush(), + }; + + case BufferErrorCode.EmptyDocument: + // If the document is empty, return an empty array for processing + return { + ...insertResult, + documentsToProcess: [], + }; + + case BufferErrorCode.None: + // This shouldn't happen since we already checked success, but handle it anyway + return { ...insertResult, documentsToProcess: undefined }; + + case BufferErrorCode.Other: + default: + // Handle any other error codes or future additions + return { + ...insertResult, + documentsToProcess: [], + }; + } + } + + public insert(document: T): BufferInsertResult { + // Check if the document is valid + if (!document) { + return { success: false, errorCode: BufferErrorCode.EmptyDocument }; + } + + const documentSize = this.getDocumentSize(document); + + // If the document is too large to fit in the buffer, return it for immediate processing + if (documentSize > this.options.maxSingleDocumentSizeBytes) { + return { + success: false, + errorCode: BufferErrorCode.DocumentTooLarge, + }; + } + + // Check if buffer is full + if (this.shouldFlush(documentSize)) { + return { + success: false, + errorCode: BufferErrorCode.BufferFull, + }; + } + + // Add the document to the buffer + this.documents.push(document); + this.currentSize += documentSize; + + return { success: true, errorCode: BufferErrorCode.None }; + } + + /** + * Flush all documents from the buffer + * + * @returns All documents currently in the buffer + */ + public flush(): T[] { + const documents = [...this.documents]; + this.documents = []; + this.currentSize = 0; + + return documents; + } + + /** + * Get statistics about the current buffer state + */ + public getStats(): { documentCount: number; currentSizeBytes: number } { + return { + documentCount: this.documents.length, + currentSizeBytes: this.currentSize, + }; + } +} + +// Default configuration for MongoDB buffers +const defaultMongoBufferConfig: DocumentBufferOptions = { + maxBufferSizeBytes: 32 * 1024 * 1024, // 32MB + maxDocumentCount: 50, + maxSingleDocumentSizeBytes: 16 * 1024 * 1024, // 16MB + calculateDocumentSize: (document: unknown) => { + // Use EJSON to calculate the size of MongoDB documents + // Adding 20% for BSON overhead compared to JSON + return document ? Buffer.byteLength(EJSON.stringify(document)) * 1.2 : 0; + }, +}; + +/** + * Create a document buffer configured for MongoDB + */ +export function createMongoDbBuffer(customConfig?: Partial): DocumentBuffer { + return new DocumentBuffer({ + ...defaultMongoBufferConfig, + ...customConfig, + }); +}