Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
eb942a4
Initial plan for issue
Copilot Jun 3, 2025
6372953
Implement buffered document importing for improved performance
Copilot Jun 3, 2025
c8a1470
Refactor buffering logic into reusable DocumentBuffer utility
Copilot Jun 3, 2025
fe1b6cd
Implement buffered document importing following sample PR pattern
Copilot Jun 3, 2025
3baf5c3
Merge branch 'copilot/fix-130' of https://github.com/microsoft/vscode…
xingfan-git Jun 3, 2025
f833289
update increment for importing documents
xingfan-git Jun 3, 2025
de64a9d
remove error message for cosmosdb mis-introduced from origin PR
xingfan-git Jun 3, 2025
9a35e25
update format with prettier to fix pipeline
xingfan-git Jun 3, 2025
a89796e
fix typo
xingfan-git Jun 4, 2025
090cf93
Merge branch 'main' into copilot/fix-130
tnaum-ms Jun 6, 2025
2d1f524
revert formatting changes to config folders and files
Copilot Jun 6, 2025
24813ad
feat: improved error handling
tnaum-ms Jun 6, 2025
da11341
fix: applied l10n for user-facing messages
tnaum-ms Jun 6, 2025
22e5015
fix: improve error message and code comments as requested
Copilot Jun 6, 2025
717f2d6
Merge branch 'next' into copilot/fix-130
tnaum-ms Jun 9, 2025
23463dd
Merge branch 'next' into copilot/fix-130
tnaum-ms Jun 9, 2025
efc838c
disable bulk insert for mongo ru to avoid throttling issue
xingfan-git Jun 9, 2025
6a13653
run l10n to fix pipeline
xingfan-git Jun 9, 2025
3c84ad3
Simplified Mongo RU detection (removed server calls)
tnaum-ms Jun 10, 2025
a4d68b8
fix: updated eslint config to exclude the `api/dist` folder
tnaum-ms Jun 10, 2025
f9a6b9f
fix: l10n update
tnaum-ms Jun 10, 2025
1508dfd
Merge branch 'next' into copilot/fix-130
tnaum-ms Jun 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions l10n/bundle.l10n.json
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@
"I want to connect using a connection string.": "I want to connect using a connection string.",
"Ignoring the following files that do not match the \"*.json\" file name pattern:": "Ignoring the following files that do not match the \"*.json\" file name pattern:",
"Import": "Import",
"Import completed with errors.": "Import completed with errors.",
"Import From JSON…": "Import From JSON…",
"Import has accomplished with errors.": "Import has accomplished with errors.",
"Import successful.": "Import successful.",
"IMPORTANT: Please be sure to remove any private information before submitting.": "IMPORTANT: Please be sure to remove any private information before submitting.",
"Importing document {num} of {countDocuments}": "Importing document {num} of {countDocuments}",
Expand Down Expand Up @@ -378,8 +378,6 @@
"The document with the _id \"{0}\" has been saved.": "The document with the _id \"{0}\" has been saved.",
"The entered value does not match the original.": "The entered value does not match the original.",
"The export operation was canceled.": "The export operation was canceled.",
"The insertion failed. The operation was not acknowledged by the database.": "The insertion failed. The operation was not acknowledged by the database.",
"The insertion of document {number} failed with error: {error}": "The insertion of document {number} failed with error: {error}",
"The issue text was copied to the clipboard. Please paste it into this window.": "The issue text was copied to the clipboard. Please paste it into this window.",
"The local instance is using a self-signed certificate. To connect, you must import the appropriate TLS/SSL certificate. See {link} for tips.": "The local instance is using a self-signed certificate. To connect, you must import the appropriate TLS/SSL certificate. See {link} for tips.",
"The location where resources will be deployed.": "The location where resources will be deployed.",
Expand Down Expand Up @@ -450,6 +448,7 @@
"Where to save the exported documents?": "Where to save the exported documents?",
"with Popover": "with Popover",
"Working…": "Working…",
"Write error: {0}": "Write error: {0}",
"Yes": "Yes",
"Yes, save my credentials": "Yes, save my credentials",
"You are not signed in to an Azure account. Please sign in.": "You are not signed in to an Azure account. Please sign in.",
Expand Down
150 changes: 104 additions & 46 deletions src/commands/importDocuments/importDocuments.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,20 @@
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/

import { parseError, type IActionContext } from '@microsoft/vscode-azext-utils';
import { nonNullProp, parseError, type IActionContext } from '@microsoft/vscode-azext-utils';
import * as l10n from '@vscode/l10n';
import { EJSON, type Document } from 'bson';
import * as fse from 'fs-extra';
import * as vscode from 'vscode';
import { ClustersClient } from '../../documentdb/ClustersClient';
import {
AzureDomains,
getHostsFromConnectionString,
hasDomainSuffix,
} from '../../documentdb/utils/connectionStringHelpers';
import { ext } from '../../extensionVariables';
import { CollectionItem } from '../../tree/documentdb/CollectionItem';
import { BufferErrorCode, createMongoDbBuffer, type DocumentBuffer } from '../../utils/documentBuffer';
import { getRootPath } from '../../utils/workspacUtils';

export async function importDocuments(
Expand Down Expand Up @@ -51,6 +57,10 @@ export async function importDocuments(
return undefined;
}

if (!(selectedItem instanceof CollectionItem)) {
throw new Error('Selected item must be a CollectionItem');
Comment thread
tnaum-ms marked this conversation as resolved.
Comment thread
tnaum-ms marked this conversation as resolved.
}

context.telemetry.properties.experience = selectedItem.experience.api;

await ext.state.runWithTemporaryDescription(selectedItem.id, l10n.t('Importing…'), async () => {
Expand All @@ -70,18 +80,21 @@ export async function importDocumentsWithProgress(selectedItem: CollectionItem,
progress.report({ increment: 0, message: l10n.t('Loading documents…') });

const countUri = uris.length;
const incrementUri = 50 / (countUri || 1);
const incrementUri = 25 / (countUri || 1);
const documents: unknown[] = [];
let hasErrors = false;

for (let i = 0, percent = 0; i < countUri; i++, percent += incrementUri) {
for (let i = 0; i < countUri; i++) {
const increment = (i + 1) * incrementUri;
progress.report({
increment: Math.floor(percent),
increment: Math.floor(increment),
message: l10n.t('Loading document {num} of {countUri}', { num: i + 1, countUri }),
});

const result = await parseAndValidateFile(selectedItem, uris[i]);

// Note to future maintainers: the validation can return 0 valid documents and still have errors.

if (result.errors && result.errors.length) {
ext.outputChannel.appendLog(
l10n.t('Errors found in document {path}. Please fix these.', { path: uris[i].path }),
Expand All @@ -91,47 +104,62 @@ export async function importDocumentsWithProgress(selectedItem: CollectionItem,
hasErrors = true;
}

if (result.documents && result.documents.length) {
if (result.documents && result.documents.length > 0) {
documents.push(...result.documents);
}
}

const countDocuments = documents.length;
const incrementDocuments = 50 / (countDocuments || 1);
const incrementDocuments = 75 / (countDocuments || 1);
let count = 0;
let buffer: DocumentBuffer<unknown> | undefined;
if (selectedItem instanceof CollectionItem) {
const hosts = getHostsFromConnectionString(nonNullProp(selectedItem.cluster, 'connectionString'));
const isRuResource = hasDomainSuffix(AzureDomains.RU, ...hosts);

if (isRuResource) {
// For Azure MongoDB RU, we use a buffer with maxDocumentCount = 1
buffer = createMongoDbBuffer<unknown>({
maxDocumentCount: 1,
});
} else {
buffer = createMongoDbBuffer<unknown>();
}
}

for (let i = 0, percent = 0; i < countDocuments; i++, percent += incrementDocuments) {
for (let i = 0; i < countDocuments; i++) {
progress.report({
increment: Math.floor(percent),
increment: incrementDocuments,
Comment thread
xingfan-git marked this conversation as resolved.
message: l10n.t('Importing document {num} of {countDocuments}', {
num: i + 1,
countDocuments,
}),
});

const result = await insertDocument(selectedItem, documents[i]);
const result = await insertDocument(selectedItem, documents[i], buffer);

if (result.error) {
ext.outputChannel.appendLog(
l10n.t('The insertion of document {number} failed with error: {error}', {
number: i + 1,
error: result.error,
}),
);
ext.outputChannel.show();
hasErrors = true;
} else {
count++;
}
// 'count' in result means that the result is from the buffer
count += result.count;
// check if error occurred as partial failure would happen in bulk insertion
hasErrors = hasErrors || result.errorOccurred;
}

// Do insertion for the last batch for bulk insertion
if (buffer && buffer.getStats().documentCount > 0) {
const lastBatchFlushResult = await insertDocument(selectedItem, undefined, buffer);

count += lastBatchFlushResult.count;
hasErrors = hasErrors || lastBatchFlushResult.errorOccurred;
}

progress.report({ increment: 50, message: l10n.t('Finished importing') });
// let's make sure we reach 100% progress, useful in case of errors etc.
progress.report({ increment: 100, message: l10n.t('Finished importing') });
Comment thread
tnaum-ms marked this conversation as resolved.

return hasErrors
? l10n.t('Import has accomplished with errors.')
: l10n.t('Import successful.') +
' ' +
l10n.t('Inserted {0} document(s). See output for more details.', count);
return (
(hasErrors ? l10n.t('Import completed with errors.') : l10n.t('Import successful.')) +
' ' +
l10n.t('Inserted {0} document(s). See output for more details.', count)
);
},
);

Expand Down Expand Up @@ -205,32 +233,62 @@ async function parseAndValidateFileForMongo(uri: vscode.Uri): Promise<{ document
return { documents, errors };
}

async function insertDocument(node: CollectionItem, document: unknown): Promise<{ document: unknown; error: string }> {
async function insertDocument(
node: CollectionItem,
document: unknown,
buffer: DocumentBuffer<unknown> | undefined,
): Promise<{ count: number; errorOccurred: boolean }> {
try {
// Check for valid buffer
if (!buffer) {
return { count: 0, errorOccurred: true };
}

// Route to appropriate handler based on node type
if (node instanceof CollectionItem) {
// await needs to catch the error here, otherwise it will be thrown to the caller
return await insertDocumentIntoCluster(node, document as Document);
return await insertDocumentWithBufferIntoCluster(node, buffer, document as Document);
}
} catch (e) {
return { document, error: parseError(e).message };
}

return { document, error: l10n.t('Unknown error') };
// Should only reach here if node is neither CollectionItem nor CosmosDBContainerResourceItem
return { count: 0, errorOccurred: true };
} catch {
return { count: 0, errorOccurred: true };
}
}

async function insertDocumentIntoCluster(
async function insertDocumentWithBufferIntoCluster(
node: CollectionItem,
document: Document,
): Promise<{ document: Document; error: string }> {
const client = await ClustersClient.getClient(node.cluster.id);
const response = await client.insertDocuments(node.databaseInfo.name, node.collectionInfo.name, [document]);
buffer: DocumentBuffer<unknown>,
document?: Document,
// If document is undefined, it means that we are flushing the buffer
// It is used for the last batch, and not recommended to be used for normal batches
): Promise<{ count: number; errorOccurred: boolean }> {
const databaseName = node.databaseInfo.name;
const collectionName = node.collectionInfo.name;
// Try to add document to buffer
const insertOrFlushToBufferResult = buffer.insertOrFlush(document);
// If successful, no immediate action needed
if (insertOrFlushToBufferResult.success) {
return { count: 0, errorOccurred: false };
}

if (response?.acknowledged) {
return { document, error: '' };
} else {
return {
document,
error: l10n.t('The insertion failed. The operation was not acknowledged by the database.'),
};
let documentsToProcess = insertOrFlushToBufferResult.documentsToProcess;
if (insertOrFlushToBufferResult.errorCode === BufferErrorCode.BufferFull) {
// The buffer has been flushed by the insertOrFlush method.
// Reinserting the current document into the buffer ensures it is processed after the flush.
// This is safe because the document has already been validated (e.g., it is not too large and not undefined).
buffer.insert(document);
} else if (insertOrFlushToBufferResult.errorCode === BufferErrorCode.EmptyDocument) {
documentsToProcess = buffer.flush();
}

// Documents to process could be the current document (if too large)
// or the contents of the buffer (if it was full)
const client = await ClustersClient.getClient(node.cluster.id);
const insertResult = await client.insertDocuments(databaseName, collectionName, documentsToProcess as Document[]);

return {
count: insertResult.insertedCount,
errorOccurred: insertResult.insertedCount < (documentsToProcess?.length || 0),
};
}
52 changes: 45 additions & 7 deletions src/documentdb/ClustersClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { appendExtensionUserAgent, callWithTelemetryAndErrorHandling, parseError
import * as l10n from '@vscode/l10n';
import { EJSON } from 'bson';
import {
MongoBulkWriteError,
MongoClient,
ObjectId,
type Collection,
Expand All @@ -24,8 +25,10 @@ import {
type MongoClientOptions,
type WithId,
type WithoutId,
type WriteError,
} from 'mongodb';
import { Links } from '../constants';
import { ext } from '../extensionVariables';
import { type EmulatorConfiguration } from '../utils/emulatorConfiguration';
import { CredentialCache } from './CredentialCache';
import { getHostsFromConnectionString, hasAzureDomain } from './utils/connectionStringHelpers';
Expand Down Expand Up @@ -54,9 +57,9 @@ export interface IndexItemModel {
version?: number;
}

// Currently we only return insertedCount, but we can add more fields in the future if needed
// Keep the type definition here for future extensibility
export type InsertDocumentsResult = {
/** Indicates whether this write result was acknowledged. If not, then all other members of this result will be undefined */
acknowledged: boolean;
/** The number of inserted documents for this operations */
insertedCount: number;
};
Expand Down Expand Up @@ -457,13 +460,48 @@ export class ClustersClient {
collectionName: string,
documents: Document[],
): Promise<InsertDocumentsResult> {
if (documents.length === 0) {
return { insertedCount: 0 };
}
const collection = this._mongoClient.db(databaseName).collection(collectionName);

const insertManyResults = await collection.insertMany(documents, { forceServerObjectId: true });
try {
const insertManyResults = await collection.insertMany(documents, {
forceServerObjectId: true,

// Setting `ordered` to be false allows MongoDB to continue inserting remaining documents even if previous fails.
// More details: https://www.mongodb.com/docs/manual/reference/method/db.collection.insertMany/#syntax
ordered: false,
});
return {
insertedCount: insertManyResults.insertedCount,
};
} catch (error) {
// print error messages to the console
if (error instanceof MongoBulkWriteError) {
const writeErrors: WriteError[] = Array.isArray(error.writeErrors)
? (error.writeErrors as WriteError[])
: [error.writeErrors as WriteError];

for (const writeError of writeErrors) {
const generalErrorMessage = parseError(writeError).message;
const descriptiveErrorMessage = writeError.err?.errmsg;

return {
acknowledged: insertManyResults.acknowledged,
insertedCount: insertManyResults.insertedCount,
};
const fullErrorMessage = descriptiveErrorMessage
? `${generalErrorMessage} - ${descriptiveErrorMessage}`
: generalErrorMessage;

ext.outputChannel.appendLog(l10n.t('Write error: {0}', fullErrorMessage));
}
ext.outputChannel.show();
} else if (error instanceof Error) {
ext.outputChannel.appendLog(l10n.t('Error: {0}', error.message));
ext.outputChannel.show();
}

return {
insertedCount: error instanceof MongoBulkWriteError ? error.insertedCount || 0 : 0,
};
}
}
}
Loading
Loading