Skip to content

feat: add TwelveLabs nodes (Pegasus video loader + Marengo embeddings)#6565

Open
mohit-twelvelabs wants to merge 1 commit into
FlowiseAI:mainfrom
mohit-twelvelabs:feat/twelvelabs-integration
Open

feat: add TwelveLabs nodes (Pegasus video loader + Marengo embeddings)#6565
mohit-twelvelabs wants to merge 1 commit into
FlowiseAI:mainfrom
mohit-twelvelabs:feat/twelvelabs-integration

Conversation

@mohit-twelvelabs

Copy link
Copy Markdown

Hi! I'm Mohit, I work at TwelveLabs (@mohit-twelvelabs).

What this adds

Two new opt-in nodes backed by the TwelveLabs video understanding platform, plus a shared API credential:

  • TwelveLabs Video (Document Loader) — analyzes a publicly accessible video URL with the Pegasus model and loads the generated description as a Document. Supports a custom prompt, model name, max tokens, a TextSplitter, and the standard metadata / omit-metadata-keys options used by the other loaders.
  • TwelveLabs Embedding (Embeddings) — exposes Marengo's multimodal embeddings (a shared 512-dim space across video, image, audio and text) as a standard LangChain Embeddings implementation, so text queries can be embedded against video embeddings stored in any vector store.
  • TwelveLabs API credential — single API key field, typed password per the credential guidelines in CONTRIBUTING.

Why it helps Flowise

Flowise has rich text/image embedding and loader coverage but no video understanding provider. These nodes let users build flows that describe/Q&A over videos (Pegasus) and do cross-modal semantic search (Marengo) without leaving Flowise.

Opt-in / non-breaking

Both are new nodes auto-discovered from packages/components/nodes. Nothing existing is changed, no defaults touched, and no new dependencies — they call the public TwelveLabs REST API (https://api.twelvelabs.io/v1.3) using the axios already in the package and @langchain/core's Embeddings base.

How it was tested

  • Co-located unit tests for both nodes (*.test.ts), all passing — they mock axios and assert request wiring, the 512-dim vector path, and error handling.
  • pnpm build (tsc + gulp) for flowise-components succeeds; both nodes and their icons land in dist.
  • Prettier + ESLint clean on the changed files.
  • Verified against the live API: a Marengo marengo3.0 text embed returns a 512-dim vector, and a Pegasus analysis of a public sample video returns generated text via the /analyze/tasks submit + poll flow the loader implements.

You can grab a free API key at https://twelvelabs.io — there's a generous free tier.

Adds two opt-in components backed by the TwelveLabs video AI platform:

- TwelveLabs Video document loader: analyzes a video URL with the Pegasus
  model and loads the generated text as a Document (with TextSplitter and
  metadata support, matching the other loaders).
- TwelveLabs Embedding: exposes Marengo's multimodal 512-dim text embeddings
  as a standard LangChain embedder for use with vector stores.

Both share a new TwelveLabs API credential and call the public REST API via
the existing axios dependency (no new deps). Co-located unit tests included.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces TwelveLabs integration, adding a TwelveLabs API credential, a Video Document Loader utilizing the Pegasus model, and a Marengo-based text embedding node. The review feedback highlights several opportunities for improvement: ensuring compatibility and robustness of multipart requests by using the form-data package and passing explicit headers, parallelizing document embeddings using Promise.all to resolve a performance bottleneck, utilizing standard Flowise helper functions for metadata and output handling, and wrapping the long-polling loop in a try/catch block to handle transient network errors while failing fast on invalid API responses.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +1 to +2
import axios from 'axios'
import { Embeddings, EmbeddingsParams } from '@langchain/core/embeddings'

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Relying on the global FormData can cause runtime errors on older Node.js versions (Node < 18) or boundary issues with Axios. Importing the form-data package ensures compatibility and robust multipart requests across all environments.

Suggested change
import axios from 'axios'
import { Embeddings, EmbeddingsParams } from '@langchain/core/embeddings'
import axios from 'axios'
import FormData from 'form-data'
import { Embeddings, EmbeddingsParams } from '@langchain/core/embeddings'

Comment on lines +51 to +60
const form = new FormData()
form.append('model_name', this.model)
form.append('text', text.replace(/\n/g, ' '))

const response = await this.caller.call(async () => {
const res = await axios.post<EmbedResponse>(`${this.baseUrl}/embed`, form, {
headers: { 'x-api-key': this.apiKey as string }
})
return res.data
})

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

When using the form-data package, you should explicitly pass the form headers (including the boundary) to Axios to ensure the multipart request is correctly formatted.

        const form = new FormData()
        form.append('model_name', this.model)
        form.append('text', text.replace(/\n/g, ' '))

        const response = await this.caller.call(async () => {
            const res = await axios.post<EmbedResponse>(`${this.baseUrl}/embed`, form, {
                headers: {
                    ...form.getHeaders(),
                    'x-api-key': this.apiKey as string
                }
            })
            return res.data
        })

Comment on lines +73 to +79
async embedDocuments(documents: string[]): Promise<number[][]> {
const embeddings: number[][] = []
for (const document of documents) {
embeddings.push(await this.embedText(document))
}
return embeddings
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Embedding documents sequentially in a loop can be a major performance bottleneck. Since this.caller handles concurrency and rate limits automatically, we can use Promise.all to embed all documents in parallel.

    async embedDocuments(documents: string[]): Promise<number[][]> {
        return Promise.all(documents.map((document) => this.embedText(document)))
    }

Comment on lines +1 to +6
import axios from 'axios'
import { omit } from 'lodash'
import { Document } from '@langchain/core/documents'
import { TextSplitter } from '@langchain/textsplitters'
import { ICommonObject, IDocument, INode, INodeData, INodeOutputsValue, INodeParams } from '../../../src/Interface'
import { getCredentialData, getCredentialParam, handleEscapeCharacters } from '../../../src/utils'

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Importing the standard handleDocumentLoaderMetadata and handleDocumentLoaderOutput helper functions from src/utils avoids duplicating code and ensures consistent behavior across all document loaders in Flowise.

import axios from 'axios'
import { Document } from '@langchain/core/documents'
import { TextSplitter } from '@langchain/textsplitters'
import { ICommonObject, IDocument, INode, INodeData, INodeOutputsValue, INodeParams } from '../../../src/Interface'
import {
    getCredentialData,
    getCredentialParam,
    handleDocumentLoaderMetadata,
    handleDocumentLoaderOutput
} from '../../../src/utils'

Comment on lines +147 to +176
let omitMetadataKeys: string[] = []
if (_omitMetadataKeys) {
omitMetadataKeys = _omitMetadataKeys.split(',').map((key) => key.trim())
}

let docs: IDocument[] = []
const baseMetadata = { source: videoUrl, model: modelName }

if (textSplitter) {
const splitDocs = await textSplitter.createDocuments([analysis])
docs.push(...splitDocs.map((doc) => ({ ...doc, metadata: { ...doc.metadata, ...baseMetadata } })))
} else {
docs.push(new Document({ pageContent: analysis, metadata: baseMetadata }))
}

const parsedMetadata = metadata ? (typeof metadata === 'object' ? metadata : JSON.parse(metadata)) : {}
docs = docs.map((doc) => ({
...doc,
metadata: _omitMetadataKeys === '*' ? { ...parsedMetadata } : omit({ ...doc.metadata, ...parsedMetadata }, omitMetadataKeys)
}))

if (output === 'document') {
return docs
} else {
let finaltext = ''
for (const doc of docs) {
finaltext += `${doc.pageContent}\n`
}
return handleEscapeCharacters(finaltext, false)
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Use the standard handleDocumentLoaderMetadata and handleDocumentLoaderOutput helper functions to parse metadata, omit keys, and format the output consistently with other document loaders.

        let docs: Document[] = []
        const baseMetadata = { source: videoUrl, model: modelName }

        if (textSplitter) {
            const splitDocs = await textSplitter.createDocuments([analysis])
            docs.push(...splitDocs.map((doc) => new Document({ pageContent: doc.pageContent, metadata: { ...doc.metadata, ...baseMetadata } })))
        } else {
            docs.push(new Document({ pageContent: analysis, metadata: baseMetadata }))
        }

        docs = handleDocumentLoaderMetadata(docs, _omitMetadataKeys, metadata)

        return handleDocumentLoaderOutput(docs, output)

Comment on lines +195 to +205
const deadline = Date.now() + params.timeoutSec * 1000
while (Date.now() < deadline) {
const { data: status } = await axios.get<AnalyzeTask>(`${TWELVELABS_API_BASE}/analyze/tasks/${taskId}`, { headers })
if (status.status === 'ready') {
return status.result?.data ?? ''
}
if (status.status === 'failed') {
throw new Error('TwelveLabs analysis task failed')
}
await new Promise((resolve) => setTimeout(resolve, 5000))
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

During long polling (which can take up to 10 minutes), a single transient network error or API glitch will crash the entire flow. Wrapping the status check in a try/catch block makes the polling logic significantly more resilient. Additionally, if the API response is missing the expected analysis data, we should throw an error rather than silently returning an empty string, promoting fail-fast behavior.

        const deadline = Date.now() + params.timeoutSec * 1000
        while (Date.now() < deadline) {
            try {
                const { data: status } = await axios.get<AnalyzeTask>(`${TWELVELABS_API_BASE}/analyze/tasks/${taskId}`, { headers })
                if (status.status === 'ready') {
                    if (!status.result?.data) {
                        throw new Error('TwelveLabs analysis task returned invalid or missing data')
                    }
                    return status.result.data
                }
                if (status.status === 'failed') {
                    throw new Error('TwelveLabs analysis task failed')
                }
            } catch (error: any) {
                if (error.message === 'TwelveLabs analysis task failed' || error.message === 'TwelveLabs analysis task returned invalid or missing data') {
                    throw error
                }
                console.error(`Error polling TwelveLabs task status: ${error.message || error}`)
            }
            await new Promise((resolve) => setTimeout(resolve, 5000))
        }
References
  1. When handling potentially invalid data from external sources (like an API response), prefer throwing an error for invalid input types rather than silently returning a default or empty value. This promotes fail-fast behavior.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant