-
-
Notifications
You must be signed in to change notification settings - Fork 24.6k
feat: add TwelveLabs nodes (Pegasus video loader + Marengo embeddings) #6565
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| import { INodeParams, INodeCredential } from '../src/Interface' | ||
|
|
||
| class TwelveLabsApi implements INodeCredential { | ||
| label: string | ||
| name: string | ||
| version: number | ||
| description: string | ||
| inputs: INodeParams[] | ||
|
|
||
| constructor() { | ||
| this.label = 'TwelveLabs API' | ||
| this.name = 'twelveLabsApi' | ||
| this.version = 1.0 | ||
| this.description = | ||
| 'Get your API key from the <a target="_blank" href="https://playground.twelvelabs.io/dashboard/api-key">TwelveLabs Dashboard</a>. There is a generous free tier.' | ||
| this.inputs = [ | ||
| { | ||
| label: 'TwelveLabs Api Key', | ||
| name: 'twelveLabsApiKey', | ||
| type: 'password' | ||
| } | ||
| ] | ||
| } | ||
| } | ||
|
|
||
| module.exports = { credClass: TwelveLabsApi } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,73 @@ | ||
| const mockPost = jest.fn() | ||
| const mockGet = jest.fn() | ||
| jest.mock('axios', () => ({ post: (...args: any[]) => mockPost(...args), get: (...args: any[]) => mockGet(...args) })) | ||
|
|
||
| jest.mock('../../../src/utils', () => ({ | ||
| getCredentialData: jest.fn(), | ||
| getCredentialParam: jest.fn(), | ||
| handleEscapeCharacters: jest.fn((input) => input) | ||
| })) | ||
|
|
||
| import { getCredentialData, getCredentialParam } from '../../../src/utils' | ||
|
|
||
| const { nodeClass: TwelveLabsVideo } = require('./TwelveLabs') | ||
|
|
||
| describe('TwelveLabs Video Document Loader', () => { | ||
| beforeEach(() => { | ||
| jest.clearAllMocks() | ||
| ;(getCredentialData as jest.Mock).mockResolvedValue({ twelveLabsApiKey: 'tl-key' }) | ||
| ;(getCredentialParam as jest.Mock).mockImplementation((key, data) => data[key]) | ||
| }) | ||
|
|
||
| it('submits an analyze task and returns the generated text as a document', async () => { | ||
| mockPost.mockResolvedValue({ data: { task_id: 'task-123', status: 'pending' } }) | ||
| mockGet.mockResolvedValue({ data: { task_id: 'task-123', status: 'ready', result: { data: 'A cat plays piano.' } } }) | ||
|
|
||
| const node = new TwelveLabsVideo() | ||
| const docs = await node.init( | ||
| { | ||
| credential: 'cred-1', | ||
| inputs: { videoUrl: 'https://example.com/v.mp4', prompt: 'Describe', modelName: 'pegasus1.5' }, | ||
| outputs: { output: 'document' } | ||
| }, | ||
| '', | ||
| {} | ||
| ) | ||
|
|
||
| expect(docs).toHaveLength(1) | ||
| expect(docs[0].pageContent).toBe('A cat plays piano.') | ||
| expect(docs[0].metadata.source).toBe('https://example.com/v.mp4') | ||
| expect(mockPost).toHaveBeenCalledWith( | ||
| 'https://api.twelvelabs.io/v1.3/analyze/tasks', | ||
| expect.objectContaining({ | ||
| model_name: 'pegasus1.5', | ||
| video: { type: 'url', url: 'https://example.com/v.mp4' }, | ||
| prompt: 'Describe' | ||
| }), | ||
| expect.objectContaining({ headers: { 'x-api-key': 'tl-key' } }) | ||
| ) | ||
| }) | ||
|
|
||
| it('throws when the task fails', async () => { | ||
| mockPost.mockResolvedValue({ data: { task_id: 'task-123', status: 'pending' } }) | ||
| mockGet.mockResolvedValue({ data: { task_id: 'task-123', status: 'failed' } }) | ||
|
|
||
| const node = new TwelveLabsVideo() | ||
| await expect( | ||
| node.init( | ||
| { | ||
| credential: 'cred-1', | ||
| inputs: { videoUrl: 'https://example.com/v.mp4', prompt: 'Describe' }, | ||
| outputs: { output: 'document' } | ||
| }, | ||
| '', | ||
| {} | ||
| ) | ||
| ).rejects.toThrow('analysis task failed') | ||
| }) | ||
|
|
||
| it('requires a video url', async () => { | ||
| const node = new TwelveLabsVideo() | ||
| await expect(node.init({ credential: 'cred-1', inputs: {}, outputs: {} }, '', {})).rejects.toThrow('Video URL is required') | ||
| }) | ||
| }) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,210 @@ | ||
| import axios from 'axios' | ||
| import { omit } from 'lodash' | ||
| import { Document } from '@langchain/core/documents' | ||
| import { TextSplitter } from '@langchain/textsplitters' | ||
| import { ICommonObject, IDocument, INode, INodeData, INodeOutputsValue, INodeParams } from '../../../src/Interface' | ||
| import { getCredentialData, getCredentialParam, handleEscapeCharacters } from '../../../src/utils' | ||
|
|
||
| const TWELVELABS_API_BASE = 'https://api.twelvelabs.io/v1.3' | ||
|
|
||
| interface AnalyzeTask { | ||
| task_id?: string | ||
| status?: string | ||
| result?: { data?: string } | ||
| } | ||
|
|
||
| class TwelveLabs_DocumentLoaders implements INode { | ||
| label: string | ||
| name: string | ||
| version: number | ||
| description: string | ||
| type: string | ||
| icon: string | ||
| category: string | ||
| baseClasses: string[] | ||
| credential: INodeParams | ||
| inputs: INodeParams[] | ||
| outputs: INodeOutputsValue[] | ||
|
|
||
| constructor() { | ||
| this.label = 'TwelveLabs Video' | ||
| this.name = 'twelveLabsVideo' | ||
| this.version = 1.0 | ||
| this.type = 'Document' | ||
| this.icon = 'twelvelabs.svg' | ||
| this.category = 'Document Loaders' | ||
| this.description = 'Analyze a video with the TwelveLabs Pegasus model and load the generated text as a document' | ||
| this.baseClasses = [this.type] | ||
| this.credential = { | ||
| label: 'Connect Credential', | ||
| name: 'credential', | ||
| type: 'credential', | ||
| credentialNames: ['twelveLabsApi'] | ||
| } | ||
| this.inputs = [ | ||
| { | ||
| label: 'Video URL', | ||
| name: 'videoUrl', | ||
| type: 'string', | ||
| description: 'Publicly accessible URL of the video to analyze' | ||
| }, | ||
| { | ||
| label: 'Prompt', | ||
| name: 'prompt', | ||
| type: 'string', | ||
| rows: 4, | ||
| default: 'Provide a detailed description of this video.', | ||
| description: 'Prompt that guides what Pegasus generates from the video' | ||
| }, | ||
| { | ||
| label: 'Model Name', | ||
| name: 'modelName', | ||
| type: 'string', | ||
| default: 'pegasus1.5', | ||
| description: | ||
| 'Refer to <a target="_blank" href="https://docs.twelvelabs.io/v1.3/docs/concepts/models/pegasus">TwelveLabs documentation</a> for available models', | ||
| optional: true, | ||
| additionalParams: true | ||
| }, | ||
| { | ||
| label: 'Max Tokens', | ||
| name: 'maxTokens', | ||
| type: 'number', | ||
| default: 2048, | ||
| optional: true, | ||
| additionalParams: true | ||
| }, | ||
| { | ||
| label: 'Polling Timeout (s)', | ||
| name: 'timeout', | ||
| type: 'number', | ||
| default: 600, | ||
| description: 'Maximum time to wait for the analysis to complete', | ||
| optional: true, | ||
| additionalParams: true | ||
| }, | ||
| { | ||
| label: 'Text Splitter', | ||
| name: 'textSplitter', | ||
| type: 'TextSplitter', | ||
| optional: true | ||
| }, | ||
| { | ||
| label: 'Additional Metadata', | ||
| name: 'metadata', | ||
| type: 'json', | ||
| description: 'Additional metadata to be added to the extracted documents', | ||
| optional: true, | ||
| additionalParams: true | ||
| }, | ||
| { | ||
| label: 'Omit Metadata Keys', | ||
| name: 'omitMetadataKeys', | ||
| type: 'string', | ||
| rows: 4, | ||
| description: | ||
| 'Each document loader comes with a default set of metadata keys that are extracted from the document. You can use this field to omit some of the default metadata keys. The value should be a list of keys, seperated by comma. Use * to omit all metadata keys execept the ones you specify in the Additional Metadata field', | ||
| placeholder: 'key1, key2, key3.nestedKey1', | ||
| optional: true, | ||
| additionalParams: true | ||
| } | ||
| ] | ||
| this.outputs = [ | ||
| { | ||
| label: 'Document', | ||
| name: 'document', | ||
| description: 'Array of document objects containing metadata and pageContent', | ||
| baseClasses: [...this.baseClasses, 'json'] | ||
| }, | ||
| { | ||
| label: 'Text', | ||
| name: 'text', | ||
| description: 'Concatenated string from pageContent of documents', | ||
| baseClasses: ['string', 'json'] | ||
| } | ||
| ] | ||
| } | ||
|
|
||
| async init(nodeData: INodeData, _: string, options: ICommonObject): Promise<any> { | ||
| const videoUrl = nodeData.inputs?.videoUrl as string | ||
| const prompt = nodeData.inputs?.prompt as string | ||
| const modelName = (nodeData.inputs?.modelName as string) || 'pegasus1.5' | ||
| const maxTokens = nodeData.inputs?.maxTokens ? parseInt(nodeData.inputs?.maxTokens as string, 10) : undefined | ||
| const timeoutSec = nodeData.inputs?.timeout ? parseInt(nodeData.inputs?.timeout as string, 10) : 600 | ||
| const textSplitter = nodeData.inputs?.textSplitter as TextSplitter | ||
| const metadata = nodeData.inputs?.metadata | ||
| const output = nodeData.outputs?.output as string | ||
| const _omitMetadataKeys = nodeData.inputs?.omitMetadataKeys as string | ||
|
|
||
| if (!videoUrl) throw new Error('Video URL is required') | ||
|
|
||
| const credentialData = await getCredentialData(nodeData.credential ?? '', options) | ||
| const apiKey = getCredentialParam('twelveLabsApiKey', credentialData, nodeData) | ||
| if (!apiKey) throw new Error('TwelveLabs API key is required') | ||
|
|
||
| const analysis = await this.analyzeVideo(apiKey, { videoUrl, prompt, modelName, maxTokens, timeoutSec }) | ||
|
|
||
| let omitMetadataKeys: string[] = [] | ||
| if (_omitMetadataKeys) { | ||
| omitMetadataKeys = _omitMetadataKeys.split(',').map((key) => key.trim()) | ||
| } | ||
|
|
||
| let docs: IDocument[] = [] | ||
| const baseMetadata = { source: videoUrl, model: modelName } | ||
|
|
||
| if (textSplitter) { | ||
| const splitDocs = await textSplitter.createDocuments([analysis]) | ||
| docs.push(...splitDocs.map((doc) => ({ ...doc, metadata: { ...doc.metadata, ...baseMetadata } }))) | ||
| } else { | ||
| docs.push(new Document({ pageContent: analysis, metadata: baseMetadata })) | ||
| } | ||
|
|
||
| const parsedMetadata = metadata ? (typeof metadata === 'object' ? metadata : JSON.parse(metadata)) : {} | ||
| docs = docs.map((doc) => ({ | ||
| ...doc, | ||
| metadata: _omitMetadataKeys === '*' ? { ...parsedMetadata } : omit({ ...doc.metadata, ...parsedMetadata }, omitMetadataKeys) | ||
| })) | ||
|
|
||
| if (output === 'document') { | ||
| return docs | ||
| } else { | ||
| let finaltext = '' | ||
| for (const doc of docs) { | ||
| finaltext += `${doc.pageContent}\n` | ||
| } | ||
| return handleEscapeCharacters(finaltext, false) | ||
| } | ||
|
Comment on lines
+147
to
+176
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use the standard let docs: Document[] = []
const baseMetadata = { source: videoUrl, model: modelName }
if (textSplitter) {
const splitDocs = await textSplitter.createDocuments([analysis])
docs.push(...splitDocs.map((doc) => new Document({ pageContent: doc.pageContent, metadata: { ...doc.metadata, ...baseMetadata } })))
} else {
docs.push(new Document({ pageContent: analysis, metadata: baseMetadata }))
}
docs = handleDocumentLoaderMetadata(docs, _omitMetadataKeys, metadata)
return handleDocumentLoaderOutput(docs, output) |
||
| } | ||
|
|
||
| private async analyzeVideo( | ||
| apiKey: string, | ||
| params: { videoUrl: string; prompt: string; modelName: string; maxTokens?: number; timeoutSec: number } | ||
| ): Promise<string> { | ||
| const headers = { 'x-api-key': apiKey } | ||
| const body: ICommonObject = { | ||
| model_name: params.modelName, | ||
| video: { type: 'url', url: params.videoUrl }, | ||
| prompt: params.prompt | ||
| } | ||
| if (params.maxTokens) body.max_tokens = params.maxTokens | ||
|
|
||
| const { data: task } = await axios.post<AnalyzeTask>(`${TWELVELABS_API_BASE}/analyze/tasks`, body, { headers }) | ||
| const taskId = task?.task_id | ||
| if (!taskId) throw new Error('TwelveLabs did not return an analysis task id') | ||
|
|
||
| const deadline = Date.now() + params.timeoutSec * 1000 | ||
| while (Date.now() < deadline) { | ||
| const { data: status } = await axios.get<AnalyzeTask>(`${TWELVELABS_API_BASE}/analyze/tasks/${taskId}`, { headers }) | ||
| if (status.status === 'ready') { | ||
| return status.result?.data ?? '' | ||
| } | ||
| if (status.status === 'failed') { | ||
| throw new Error('TwelveLabs analysis task failed') | ||
| } | ||
| await new Promise((resolve) => setTimeout(resolve, 5000)) | ||
| } | ||
|
Comment on lines
+195
to
+205
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. During long polling (which can take up to 10 minutes), a single transient network error or API glitch will crash the entire flow. Wrapping the status check in a const deadline = Date.now() + params.timeoutSec * 1000
while (Date.now() < deadline) {
try {
const { data: status } = await axios.get<AnalyzeTask>(`${TWELVELABS_API_BASE}/analyze/tasks/${taskId}`, { headers })
if (status.status === 'ready') {
if (!status.result?.data) {
throw new Error('TwelveLabs analysis task returned invalid or missing data')
}
return status.result.data
}
if (status.status === 'failed') {
throw new Error('TwelveLabs analysis task failed')
}
} catch (error: any) {
if (error.message === 'TwelveLabs analysis task failed' || error.message === 'TwelveLabs analysis task returned invalid or missing data') {
throw error
}
console.error(`Error polling TwelveLabs task status: ${error.message || error}`)
}
await new Promise((resolve) => setTimeout(resolve, 5000))
}References
|
||
| throw new Error(`TwelveLabs analysis did not complete within ${params.timeoutSec}s`) | ||
| } | ||
| } | ||
|
|
||
| module.exports = { nodeClass: TwelveLabs_DocumentLoaders } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| const mockPost = jest.fn() | ||
| jest.mock('axios', () => ({ post: (...args: any[]) => mockPost(...args) })) | ||
|
|
||
| jest.mock('../../../src/utils', () => ({ | ||
| getBaseClasses: jest.fn().mockReturnValue(['Embeddings']), | ||
| getCredentialData: jest.fn(), | ||
| getCredentialParam: jest.fn() | ||
| })) | ||
|
|
||
| import { getCredentialData, getCredentialParam } from '../../../src/utils' | ||
|
|
||
| const { nodeClass: TwelveLabsEmbedding } = require('./TwelveLabsEmbedding') | ||
|
|
||
| describe('TwelveLabsEmbedding', () => { | ||
| beforeEach(() => { | ||
| jest.clearAllMocks() | ||
| }) | ||
|
|
||
| it('builds a Marengo embedder with the credential api key and default model', async () => { | ||
| ;(getCredentialData as jest.Mock).mockResolvedValue({ twelveLabsApiKey: 'tl-key' }) | ||
| ;(getCredentialParam as jest.Mock).mockImplementation((key, data) => data[key]) | ||
|
|
||
| const node = new TwelveLabsEmbedding() | ||
| const model = await node.init({ credential: 'cred-1', inputs: { modelName: 'marengo3.0' } }, '', {}) | ||
|
|
||
| expect(model.apiKey).toBe('tl-key') | ||
| expect(model.model).toBe('marengo3.0') | ||
| }) | ||
|
|
||
| it('returns the 512-dim float vector from the /embed response', async () => { | ||
| ;(getCredentialData as jest.Mock).mockResolvedValue({ twelveLabsApiKey: 'tl-key' }) | ||
| ;(getCredentialParam as jest.Mock).mockImplementation((key, data) => data[key]) | ||
|
|
||
| const vector = Array.from({ length: 512 }, (_, i) => i / 512) | ||
| mockPost.mockResolvedValue({ data: { text_embedding: { segments: [{ float: vector }] } } }) | ||
|
|
||
| const node = new TwelveLabsEmbedding() | ||
| const model = await node.init({ credential: 'cred-1', inputs: {} }, '', {}) | ||
| const result = await model.embedQuery('a man walking on the beach') | ||
|
|
||
| expect(result).toHaveLength(512) | ||
| expect(mockPost).toHaveBeenCalledWith( | ||
| 'https://api.twelvelabs.io/v1.3/embed', | ||
| expect.anything(), | ||
| expect.objectContaining({ headers: { 'x-api-key': 'tl-key' } }) | ||
| ) | ||
| }) | ||
|
|
||
| it('throws when the response has no embedding', async () => { | ||
| ;(getCredentialData as jest.Mock).mockResolvedValue({ twelveLabsApiKey: 'tl-key' }) | ||
| ;(getCredentialParam as jest.Mock).mockImplementation((key, data) => data[key]) | ||
| mockPost.mockResolvedValue({ data: { text_embedding: { segments: [] } } }) | ||
|
|
||
| const node = new TwelveLabsEmbedding() | ||
| const model = await node.init({ credential: 'cred-1', inputs: {} }, '', {}) | ||
|
|
||
| await expect(model.embedQuery('hi')).rejects.toThrow('did not return a text embedding') | ||
| }) | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Importing the standard
handleDocumentLoaderMetadataandhandleDocumentLoaderOutputhelper functions fromsrc/utilsavoids duplicating code and ensures consistent behavior across all document loaders in Flowise.