Skip to content

Latest commit

 

History

History
706 lines (481 loc) · 13.7 KB

File metadata and controls

706 lines (481 loc) · 13.7 KB

🔍 API Reference

📖 Visão Geral

Documentação completa de todas as classes, funções e módulos do projeto.


📦 Módulos

pdf_conversion.main

Entry point do projeto.

Funções

async run_workflow() -> None

Executa workflow de conversão completo.

Uso:

import asyncio
from pdf_conversion.main import run_workflow

asyncio.run(run_workflow())

Fluxo:

  1. Instancia Workflow()
  2. Chama await workflow.run()

Exceções:

  • Exception - Qualquer erro propagado das tasks

def main() -> None

Entry point síncrono, cria event loop.

Uso:

uv run main

Fluxo:

  1. Chama asyncio.run(run_workflow())

pdf_conversion.config.config

Gerenciamento de configuração com Dynaconf.

Constantes

ROOT_DIR: Path

Diretório raiz do projeto.

from pdf_conversion.config import ROOT_DIR

print(ROOT_DIR)  # C:\Users\...\PDF-Conversion

settings: Dynaconf

Objeto de configuração global.

from pdf_conversion.config import settings

timeout = settings.playwright.timeout
max_size = settings.conversion.max_file_size_mb

Seções disponíveis:

  • project - Metadados do projeto
  • paths - Caminhos dos diretórios
  • conversion - Regras de conversão
  • urls - URLs de serviços
  • performance - Configurações de performance
  • playwright - Configurações do Playwright
  • logging - Configurações de log

Funções

def get_path(key: str) -> Path

Retorna caminho absoluto de diretório configurado.

Parâmetros:

  • key (str) - Chave do caminho: "input", "output", "logs", "temp", "backup"

Retorna:

  • Path - Caminho absoluto do diretório

Uso:

from pdf_conversion.config import get_path

input_dir = get_path("input")
# C:\Users\...\documents\documents_to_convert

output_dir = get_path("output")
# C:\Users\...\documents\converted_documents

Exceções:

  • KeyError - Se chave não existe em settings.paths

def ensure_directories() -> None

Cria estrutura de diretórios do projeto.

Diretórios criados:

  • logs/ e subpastas success/, failed/
  • documents/documents_to_convert/
  • documents/converted_documents/
  • temp/
  • backup/

Uso:

from pdf_conversion.config import ensure_directories

ensure_directories()  # Cria todos os diretórios

Nota: Chamado automaticamente ao importar o módulo.


pdf_conversion.config.playwright

Helpers para configuração do Playwright.

Funções

async create_browser_context(playwright: Playwright) -> tuple[Browser, BrowserContext]

Cria browser e context com configurações do settings.

Parâmetros:

  • playwright (Playwright) - Instância do Playwright

Retorna:

  • tuple[Browser, BrowserContext] - Browser e context configurados

Uso:

from pdf_conversion.config.playwright import create_browser_context
from playwright.async_api import async_playwright

async with async_playwright() as p:
    browser, context = await create_browser_context(p)
    page = await context.new_page()
    # ... usar page

Configurações aplicadas:

  • headless de settings.playwright.headless
  • timeout de settings.playwright.timeout
  • slow_mo de settings.playwright.slow_mo
  • viewport de settings.playwright.context_options.viewport
  • user_agent de settings.playwright.context_options.user_agent

pdf_conversion.workflow.workflow

Orquestrador de tasks.

Classes

class Workflow

Gerencia execução sequencial de tasks.

Atributos:

  • tasks (list[BaseTask]) - Lista de tasks a executar
  • shared_data (dict) - Dados compartilhados entre tasks

Métodos:

def __init__(self) -> None

Inicializa workflow e chama setup_tasks().

from pdf_conversion.workflow import Workflow

workflow = Workflow()
print(len(workflow.tasks))  # 2 (DocumentTask, NavigationTask)

def setup_tasks(self) -> None

Define pipeline de tasks. Sobrescreva para customizar.

def setup_tasks(self):
    self.add_task(DocumentTask("document_manager"))
    self.add_task(NavigationTask("ilovepdf_navigation"))

def add_task(self, task: BaseTask) -> None

Adiciona task ao pipeline.

Parâmetros:

  • task (BaseTask) - Instância da task
workflow.add_task(MyTask("my_task"))

async def run(self) -> None

Executa todas as tasks sequencialmente.

Fluxo:

  1. Loop sobre self.tasks
  2. Para cada task: await task.run_async(self.shared_data)
  3. Atualiza self.shared_data com resultado

Uso:

workflow = Workflow()
await workflow.run()

Exceções:

  • Propaga qualquer exceção das tasks

pdf_conversion.workflow.tasks.base_task

Classe abstrata base para todas as tasks.

Classes

class BaseTask(ABC)

Fornece infraestrutura comum para tasks.

Atributos:

  • name (str) - Nome da task
  • logger (logging.Logger) - Logger configurado
  • retry_attempts (int) - Tentativas de retry
  • sleep_time (float) - Tempo de sleep entre operações

Métodos Abstratos:

@abstractmethod async def execute(self, shared_data: dict) -> dict

Implementado por subclasses. Contém lógica específica da task.

Parâmetros:

  • shared_data (dict) - Dados de tasks anteriores

Retorna:

  • dict - Resultado para próximas tasks

Métodos Concretos:

async def run_async(self, shared_data: dict) -> dict

Template method com retry automático.

Parâmetros:

  • shared_data (dict) - Dados de tasks anteriores

Retorna:

  • dict - Resultado de execute()

Fluxo:

  1. Loop de retry (até retry_attempts)
  2. Chama execute(shared_data)
  3. Em sucesso: log sucesso, retorna resultado
  4. Em erro: log erro, retry com backoff exponencial

Uso:

task = MyTask("my_task")
result = await task.run_async({"input": "data"})

async def sleep(self, multiplier: float = 1.0) -> None

Sleep configurável entre operações.

Parâmetros:

  • multiplier (float) - Multiplicador do tempo padrão

Uso:

await self.sleep()       # settings.performance.task_sleep_time
await self.sleep(2.0)    # 2x o tempo padrão

def wait_for_input(self, message: str = "Pressione Enter...") -> None

Pausa para debug manual (apenas em development).

Parâmetros:

  • message (str) - Mensagem exibida

Uso:

self.wait_for_input("Verificar página carregou")
# Aguarda Enter no terminal

Nota: Desabilitado se settings.performance.enable_wait_for_input=False


def _log_success(self, result: dict) -> None

Log de sucesso em arquivo.

Parâmetros:

  • result (dict) - Resultado da task

Arquivo gerado:

  • logs/success/{timestamp}.log

def _log_failure(self, error: Exception) -> None

Log de erro com traceback.

Parâmetros:

  • error (Exception) - Exceção capturada

Arquivo gerado:

  • logs/failed/{timestamp}.log

def _format_result_with_files(self, result: dict) -> str

Formata resultado para log legível.

Parâmetros:

  • result (dict) - Resultado da task

Retorna:

  • str - Resultado formatado

pdf_conversion.workflow.tasks.document_task

Task de descoberta e validação de arquivos.

Classes

class DocumentTask(BaseTask)

Busca e valida arquivos Word para conversão.

Métodos:

async def execute(self, shared_data: dict) -> dict

Busca e valida arquivos.

Parâmetros:

  • shared_data (dict) - Dados compartilhados (não usado)

Retorna:

{
    "valid_files": [Path("doc1.docx"), Path("doc2.docx")],
    "total_files": 2,
    "conversion_pending": True
}

Fluxo:

  1. Chama _find_word_files() → Lista de arquivos
  2. Chama _validate_files() → Valida cada arquivo
  3. Retorna estrutura com arquivos válidos

def _find_word_files(self) -> list[Path]

Busca arquivos .docx e .doc.

Retorna:

  • list[Path] - Lista de arquivos encontrados

Uso:

files = task._find_word_files()
# [Path("doc1.docx"), Path("doc2.docx")]

def _validate_files(self, files: list[Path]) -> list[Path]

Valida existência e tamanho dos arquivos.

Parâmetros:

  • files (list[Path]) - Arquivos a validar

Retorna:

  • list[Path] - Arquivos válidos

Validações:

  1. Arquivo existe?
  2. Tamanho <= settings.conversion.max_file_size_mb?
  3. Tamanho > 0?

pdf_conversion.workflow.tasks.navigation_task

Task de automação web para conversão.

Classes

class NavigationTask(BaseTask)

Automatiza conversão Word → PDF com Playwright.

Métodos:

async def execute(self, shared_data: dict) -> dict

Converte arquivos usando iLovePDF.

Parâmetros:

  • shared_data (dict) - Deve conter "valid_files"

Retorna:

{
    "converted_files": 2,
    "conversion_pending": False
}

Fluxo:

  1. shared_data["valid_files"]
  2. Setup Playwright (browser, context, page)
  3. Loop: Para cada arquivo 3.1. _process_file() → Upload, convert, download
  4. Retorna estatísticas

Exceções:

  • KeyError - Se "valid_files" não existe em shared_data

async def _process_file(self, page: Page, file: Path) -> None

Processa um arquivo: upload → convert → download.

Parâmetros:

  • page (Page) - Página do Playwright
  • file (Path) - Arquivo a processar

Fluxo:

  1. Navega para iLovePDF
  2. _upload_file() → Upload
  3. Sleep 2s (processamento)
  4. _download_pdf() → Download

async def _upload_file(self, page: Page, file: Path) -> None

Upload com múltiplas estratégias de seletor.

Parâmetros:

  • page (Page) - Página do Playwright
  • file (Path) - Arquivo a fazer upload

Estratégias (fallback):

  1. input[type="file"]
  2. #pickfiles
  3. text="Selecionar arquivos"

Exceções:

  • Exception - Se nenhum seletor funcionar

async def _download_pdf(self, page: Page, file: Path) -> None

Download e salva PDF convertido.

Parâmetros:

  • page (Page) - Página do Playwright
  • file (Path) - Arquivo original (para nome do PDF)

Fluxo:

  1. Aguarda botão de download
  2. Captura download com page.expect_download()
  3. Salva em documents/converted_documents/{filename}.pdf

🔧 Tipos de Dados

Configuração (Dynaconf)

# settings.toml estrutura
{
    "project": {
        "name": str,
        "version": str
    },
    "paths": {
        "input": str,
        "output": str,
        "logs": str,
        "temp": str,
        "backup": str
    },
    "conversion": {
        "max_file_size_mb": int,
        "allowed_extensions": list[str],
        "batch_size": int
    },
    "urls": {
        "ilovepdf": str
    },
    "performance": {
        "retry_attempts": int,
        "task_sleep_time": float,
        "enable_wait_for_input": bool
    },
    "playwright": {
        "headless": bool,
        "timeout": int,
        "slow_mo": int,
        "browser": str,
        "launch_options": dict,
        "context_options": dict
    },
    "logging": {
        "level": str,
        "format": str,
        "console_output": bool,
        "file_output": bool
    }
}

shared_data (Workflow)

# Após DocumentTask
{
    "valid_files": list[Path],
    "total_files": int,
    "conversion_pending": bool
}

# Após NavigationTask
{
    "valid_files": list[Path],
    "total_files": int,
    "conversion_pending": bool,  # False agora
    "converted_files": int
}

🎯 Exemplos de Uso

Exemplo 1: Executar Conversão

import asyncio
from pdf_conversion.main import run_workflow

async def main():
    await run_workflow()

asyncio.run(main())

Exemplo 2: Usar Configuração

from pdf_conversion.config import settings, get_path

# Ler configuração
max_size = settings.conversion.max_file_size_mb  # 50
timeout = settings.playwright.timeout  # 30000

# Obter caminhos
input_dir = get_path("input")
output_dir = get_path("output")

# Listar arquivos
files = list(input_dir.glob("*.docx"))

Exemplo 3: Criar Task Customizada

from pdf_conversion.workflow.tasks.base_task import BaseTask
from pathlib import Path

class BackupTask(BaseTask):
    async def execute(self, shared_data: dict) -> dict:
        files = shared_data.get("valid_files", [])
        backup_dir = Path("backup")
        
        for file in files:
            # Copia arquivo
            import shutil
            shutil.copy2(file, backup_dir / file.name)
        
        return {
            "backup_created": True,
            "backup_count": len(files)
        }

# Usar no workflow
from pdf_conversion.workflow import Workflow

class CustomWorkflow(Workflow):
    def setup_tasks(self):
        self.add_task(DocumentTask("document_manager"))
        self.add_task(BackupTask("backup"))
        self.add_task(NavigationTask("ilovepdf_navigation"))

# Executar
workflow = CustomWorkflow()
await workflow.run()

Exemplo 4: Acessar Logs

from pathlib import Path
from pdf_conversion.config import get_path

# Ler último log de sucesso
success_dir = get_path("logs") / "success"
logs = sorted(success_dir.glob("*.log"), reverse=True)

if logs:
    latest = logs[0]
    print(latest.read_text())

# Ler logs de erro
failed_dir = get_path("logs") / "failed"
errors = list(failed_dir.glob("*.log"))
print(f"Total de erros: {len(errors)}")

📚 Ver Também


Documentação gerada para versão 0.1.0