Tasks são unidades de trabalho isoladas que executam ações específicas no workflow. Cada task:
- Herda de
BaseTask(classe abstrata) - Implementa
execute()com sua lógica - Recebe e retorna dados via
shared_data - É executada assincronamente
BaseTask é a classe abstrata que fornece infraestrutura para todas as tasks:
class BaseTask(ABC):
def __init__(self, name: str):
self.name = name
self.logger = logging.getLogger(name)
self.retry_attempts = settings.performance.retry_attempts
self.sleep_time = settings.performance.task_sleep_time
@abstractmethod
async def execute(self, shared_data: dict) -> dict:
"""Implementado pelas subclasses"""
pass
async def run_async(self, shared_data: dict) -> dict:
"""Template method com retry logic"""
# ...async def run_async(self, shared_data: dict) -> dict:
for attempt in range(1, self.retry_attempts + 1):
try:
result = await self.execute(shared_data)
return result
except Exception as e:
if attempt == self.retry_attempts:
raise # Última tentativa, propaga erro
await asyncio.sleep(2 ** attempt) # Backoff exponencialConfigurável em settings.toml:
[default.performance]
retry_attempts = 3 # Padrão: 3 tentativasdef _log_success(self, result: dict) -> None:
"""Log de sucesso com timestamp"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_file = get_path("logs") / "success" / f"{timestamp}.log"
with open(log_file, "w", encoding="utf-8") as f:
f.write(f"Task: {self.name}\n")
f.write(f"Status: SUCCESS\n")
f.write(self._format_result_with_files(result))
def _log_failure(self, error: Exception) -> None:
"""Log de erro com traceback"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_file = get_path("logs") / "failed" / f"{timestamp}.log"
with open(log_file, "w", encoding="utf-8") as f:
f.write(f"Task: {self.name}\n")
f.write(f"Status: FAILED\n")
f.write(f"Error: {str(error)}\n")
f.write(traceback.format_exc())Arquivos gerados:
logs/success/20240115_143052.loglogs/failed/20240115_143052.log
async def sleep(self, multiplier: float = 1.0) -> None:
"""Sleep configurável"""
sleep_time = self.sleep_time * multiplier
await asyncio.sleep(sleep_time)
# Uso na task
await self.sleep() # Padrão: settings.performance.task_sleep_time
await self.sleep(2.0) # 2x o tempo padrãoConfigurável em settings.toml:
[development.performance]
task_sleep_time = 1.0 # 1 segundo em dev
[production.performance]
task_sleep_time = 0.2 # 200ms em produçãodef wait_for_input(self, message: str = "Pressione Enter para continuar...") -> None:
"""Pausa para debug manual (apenas em development)"""
if not settings.performance.get("enable_wait_for_input", False):
return # Desabilitado em production
print(f"\n⏸️ {message}")
input()
# Uso na task
self.wait_for_input("Verificar se upload funcionou")
# Aguarda pressionar Enter no terminaldef _format_result_with_files(self, result: dict) -> str:
"""Formata resultado para log legível"""
output = []
for key, value in result.items():
if isinstance(value, list) and value and isinstance(value[0], Path):
# Formata lista de arquivos
output.append(f"{key}:")
for file in value:
output.append(f" - {file.name}")
else:
output.append(f"{key}: {value}")
return "\n".join(output)async def run_async(self, shared_data: dict) -> dict:
# ETAPA 1: Log início
self.logger.info(f"Executando task: {self.name}")
# ETAPA 2: Retry loop
for attempt in range(1, self.retry_attempts + 1):
try:
# ETAPA 3: Executa lógica (HOOK)
result = await self.execute(shared_data)
# ETAPA 4: Log sucesso
self._log_success(result)
# ETAPA 5: Retorna
return result
except Exception as e:
# ETAPA 6: Log erro
self._log_failure(e)
# ETAPA 7: Retry ou propaga
if attempt == self.retry_attempts:
raise
await asyncio.sleep(2 ** attempt)Buscar, validar e retornar lista de arquivos Word para conversão.
async def execute(self, shared_data: dict) -> dict:
# ETAPA 1: Buscar arquivos
files = self._find_word_files()
# ETAPA 2: Validar
valid_files = self._validate_files(files)
# ETAPA 3: Retornar estrutura
return {
"valid_files": valid_files,
"total_files": len(valid_files),
"conversion_pending": True
}def _find_word_files(self) -> list[Path]:
"""Busca arquivos .docx e .doc"""
input_dir = get_path("input")
docx_files = list(input_dir.glob("*.docx"))
doc_files = list(input_dir.glob("*.doc"))
return docx_files + doc_filesO que faz:
- Usa
globpara buscar padrões - Suporta
.docxe.doc - Retorna lista de
Pathobjects
def _validate_files(self, files: list[Path]) -> list[Path]:
"""Valida existência e tamanho"""
valid_files = []
max_size_bytes = settings.conversion.max_file_size_mb * 1024 * 1024
for file in files:
# Validação 1: Existe?
if not file.exists():
self.logger.warning(f"Arquivo não encontrado: {file}")
continue
# Validação 2: Tamanho OK?
file_size = file.stat().st_size
if file_size > max_size_bytes:
self.logger.warning(f"Arquivo muito grande: {file.name} ({file_size} bytes)")
continue
# Validação 3: Tamanho mínimo (>0)?
if file_size == 0:
self.logger.warning(f"Arquivo vazio: {file.name}")
continue
valid_files.append(file)
return valid_filesValidações realizadas:
- ✅ Arquivo existe?
- ✅ Tamanho <=
max_file_size_mb? - ✅ Tamanho > 0 bytes?
{
"valid_files": [
Path("documents/documents_to_convert/doc1.docx"),
Path("documents/documents_to_convert/doc2.docx"),
],
"total_files": 2,
"conversion_pending": True
}Chaves retornadas:
valid_files→ Lista dePathobjectstotal_files→ Contador (int)conversion_pending→ Flag para próxima task (bool)
# settings.toml
[default.conversion]
max_file_size_mb = 50
allowed_extensions = [".docx", ".doc"]
[default.paths]
input = "documents/documents_to_convert"# workflow.py
def setup_tasks(self):
self.add_task(DocumentTask("document_manager"))
# Output de DocumentTask vai para shared_data["valid_files"]Usar Playwright para automatizar conversão Word → PDF no iLovePDF.
async def execute(self, shared_data: dict) -> dict:
# ETAPA 1: Pegar arquivos de shared_data
files = shared_data.get("valid_files", [])
# ETAPA 2: Setup Playwright
async with async_playwright() as p:
browser, context = await create_browser_context(p)
page = await context.new_page()
# ETAPA 3: Loop nos arquivos
for file in files:
await self._process_file(page, file)
# ETAPA 4: Retornar estatísticas
return {
"converted_files": len(files),
"conversion_pending": False
}async def _process_file(self, page: Page, file: Path) -> None:
"""Processa um arquivo: upload → convert → download"""
# ETAPA 1: Navegar
await page.goto(settings.urls.ilovepdf)
await self.sleep()
# ETAPA 2: Upload
await self._upload_file(page, file)
await self.sleep(2.0) # Aguarda processamento
# ETAPA 3: Download
await self._download_pdf(page, file)async def _upload_file(self, page: Page, file: Path) -> None:
"""Upload com múltiplas estratégias de seletor"""
# Estratégia 1: input[type="file"]
try:
await page.set_input_files('input[type="file"]', str(file))
return
except Exception:
pass
# Estratégia 2: #pickfiles
try:
await page.set_input_files("#pickfiles", str(file))
return
except Exception:
pass
# Estratégia 3: Texto do botão
try:
upload_button = page.locator('text="Selecionar arquivos"')
await upload_button.set_input_files(str(file))
return
except Exception:
raise Exception("Nenhum seletor de upload funcionou")Por que múltiplas estratégias?
- Sites podem mudar seletores
- Fallback aumenta robustez
- Playwright testa cada estratégia automaticamente
async def _download_pdf(self, page: Page, file: Path) -> None:
"""Download com captura async"""
# ETAPA 1: Aguardar botão de download
download_button = page.locator('text="Baixar PDF"')
await download_button.wait_for(state="visible", timeout=30000)
# ETAPA 2: Capturar download
async with page.expect_download() as download_info:
await download_button.click()
download = await download_info.value
# ETAPA 3: Salvar com nome correto
output_dir = get_path("output")
pdf_name = file.stem + ".pdf"
output_path = output_dir / pdf_name
await download.save_as(str(output_path))
self.logger.info(f"PDF salvo: {pdf_name}")async def _process_file(self, page: Page, file: Path) -> None:
try:
# ... processo normal
except TimeoutError:
self.logger.error(f"Timeout ao processar {file.name}")
# BaseTask fará retry automático
raise
except Exception as e:
self.logger.error(f"Erro ao processar {file.name}: {str(e)}")
# Continua para próximo arquivoEstratégias:
TimeoutError→ Propaga para retry- Outros erros → Log e continua
- Última tentativa → Log em
logs/failed/
# settings.toml
[default.urls]
ilovepdf = "https://www.ilovepdf.com/word_to_pdf"
[default.playwright]
headless = false
timeout = 30000
browser = "chromium"
[default.paths]
output = "documents/converted_documents"# workflow.py
def setup_tasks(self):
self.add_task(DocumentTask("document_manager"))
self.add_task(NavigationTask("ilovepdf_navigation"))
# NavigationTask lê shared_data["valid_files"]# workflow/tasks/new_task.py
from .base_task import BaseTask
class NewTask(BaseTask):
"""Descrição da nova task"""
async def execute(self, shared_data: dict) -> dict:
# ETAPA 1: Ler dados compartilhados
input_data = shared_data.get("some_key", [])
# ETAPA 2: Processar
result = await self._process_data(input_data)
# ETAPA 3: Retornar estrutura
return {
"new_key": result,
"processed_count": len(result)
}
async def _process_data(self, data):
"""Lógica privada"""
# Sua implementação aqui
await self.sleep() # Use recursos de BaseTask
return data# workflow/workflow.py
from .tasks.new_task import NewTask
class Workflow:
def setup_tasks(self):
self.add_task(DocumentTask("document_manager"))
self.add_task(NewTask("my_new_task")) # ✅ Adicione aqui
self.add_task(NavigationTask("ilovepdf_navigation"))# settings.toml
[default.new_task]
enabled = true
max_items = 100
timeout = 30# No código da task
max_items = settings.new_task.max_itemsasync def execute(self, shared_data: dict) -> dict:
result = await async_operation() # Não bloqueia
return resultasync def execute(self, shared_data: dict) -> dict:
result = sync_operation() # Bloqueia!
return resultasync def execute(self, shared_data: dict) -> dict:
result = await risky_operation() # Se falhar, retry automático
return resultasync def execute(self, shared_data: dict) -> dict:
try:
result = await risky_operation()
except Exception:
return {} # Perde informação do erro!async def execute(self, shared_data: dict) -> dict:
files = shared_data.get("valid_files")
if not files:
raise ValueError("Nenhum arquivo para processar")
# ... processarasync def execute(self, shared_data: dict) -> dict:
files = shared_data["valid_files"] # KeyError se não existir!
# ... processarreturn {
"processed_files": [Path("file1.pdf")],
"success_count": 1,
"failed_count": 0,
"next_step_ready": True
}return {} # Próxima task não sabe o que aconteceu# DocumentTask retorna
return {
"valid_files": [Path("doc1.docx")],
"total_files": 1
}
# NavigationTask lê
files = shared_data.get("valid_files", [])DocumentTask
↓ valid_files
NavigationTask
↓ converted_files
NextTask (futuro)
# test_document_task.py
import asyncio
from pdf_conversion.workflow.tasks.document_task import DocumentTask
async def test_document_task():
task = DocumentTask("test")
result = await task.execute({})
print(f"Arquivos encontrados: {result['total_files']}")
print(f"Arquivos válidos: {len(result['valid_files'])}")
asyncio.run(test_document_task())- Workflow Completo → workflow.md
- Playwright → playwright.md
- Logs → logs.md
- API Reference → api-reference.md
Quer criar uma task customizada? Veja contributing.md para guidelines.