Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion omniload/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import pytest

from .main_test import DESTINATIONS, SOURCES # type: ignore
from .main_test import DESTINATIONS, SOURCES


def pytest_configure(config):
Expand Down
66 changes: 33 additions & 33 deletions omniload/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,21 +95,21 @@ def ingest(
help="The URI of the [green]source[/green]",
envvar=["SOURCE_URI", "OMNILOAD_SOURCE_URI"],
),
], # type: ignore
],
dest_uri: Annotated[
str,
typer.Option(
help="The URI of the [cyan]destination[/cyan]",
envvar=["DESTINATION_URI", "OMNILOAD_DESTINATION_URI"],
),
], # type: ignore
],
source_table: Annotated[
str,
typer.Option(
help="The table name in the [green]source[/green] to fetch",
envvar=["SOURCE_TABLE", "OMNILOAD_SOURCE_TABLE"],
),
], # type: ignore
],
dest_table: Annotated[
str,
typer.Option(
Expand All @@ -123,170 +123,170 @@ def ingest(
help="The incremental key from the table to be used for incremental strategies",
envvar=["INCREMENTAL_KEY", "OMNILOAD_INCREMENTAL_KEY"],
),
] = None, # type: ignore
] = None,
incremental_strategy: Annotated[
IncrementalStrategy,
typer.Option(
help="The incremental strategy to use",
envvar=["INCREMENTAL_STRATEGY", "OMNILOAD_INCREMENTAL_STRATEGY"],
),
] = IncrementalStrategy.create_replace, # type: ignore
] = IncrementalStrategy.create_replace,
interval_start: Annotated[
Optional[datetime],
typer.Option(
help="The start of the interval the incremental key will cover",
formats=DATE_FORMATS,
envvar=["INTERVAL_START", "OMNILOAD_INTERVAL_START"],
),
] = None, # type: ignore
] = None,
interval_end: Annotated[
Optional[datetime],
typer.Option(
help="The end of the interval the incremental key will cover",
formats=DATE_FORMATS,
envvar=["INTERVAL_END", "OMNILOAD_INTERVAL_END"],
),
] = None, # type: ignore
] = None,
primary_key: Annotated[
Optional[list[str]],
typer.Option(
help="The key that will be used to deduplicate the resulting table",
envvar=["PRIMARY_KEY", "OMNILOAD_PRIMARY_KEY"],
),
] = None, # type: ignore
] = None,
partition_by: Annotated[
Optional[str],
typer.Option(
help="The partition key to be used for partitioning the destination table",
envvar=["PARTITION_BY", "OMNILOAD_PARTITION_BY"],
),
] = None, # type: ignore
] = None,
cluster_by: Annotated[
Optional[str],
typer.Option(
help="The clustering key to be used for clustering the destination table, not every destination supports clustering.",
envvar=["CLUSTER_BY", "OMNILOAD_CLUSTER_BY"],
),
] = None, # type: ignore
] = None,
dry_run: Annotated[
Optional[bool],
typer.Option(
help="Display data transfer plan but don't invoke it",
envvar=["DRY_RUN", "OMNILOAD_DRY_RUN"],
),
] = False, # type: ignore
] = False,
full_refresh: Annotated[
bool,
typer.Option(
help="Ignore the state and refresh the destination table completely",
envvar=["FULL_REFRESH", "OMNILOAD_FULL_REFRESH"],
),
] = False, # type: ignore
] = False,
progress: Annotated[
Progress,
typer.Option(
help="The progress display type, must be one of 'interactive', 'log'",
envvar=["PROGRESS", "OMNILOAD_PROGRESS"],
),
] = Progress.interactive, # type: ignore
] = Progress.interactive,
sql_backend: Annotated[
SqlBackend,
typer.Option(
help="The SQL backend to use",
envvar=["SQL_BACKEND", "OMNILOAD_SQL_BACKEND"],
),
] = SqlBackend.default, # type: ignore
] = SqlBackend.default,
loader_file_format: Annotated[
Optional[LoaderFileFormat],
typer.Option(
help="The file format to use when loading data",
envvar=["LOADER_FILE_FORMAT", "OMNILOAD_LOADER_FILE_FORMAT"],
),
] = None, # type: ignore
] = None,
page_size: Annotated[
Optional[int],
typer.Option(
help="The page size to be used when fetching data from SQL sources",
envvar=["PAGE_SIZE", "OMNILOAD_PAGE_SIZE"],
),
] = 50000, # type: ignore
] = 50000,
loader_file_size: Annotated[
Optional[int],
typer.Option(
help="The file size to be used by the loader to split the data into multiple files. This can be set independent of the page size, since page size is used for fetching the data from the sources whereas this is used for the processing/loading part.",
envvar=["LOADER_FILE_SIZE", "OMNILOAD_LOADER_FILE_SIZE"],
),
] = 100000, # type: ignore
] = 100000,
schema_naming: Annotated[
SchemaNaming,
typer.Option(
help="The naming convention to use when moving the tables from source to destination. The default behavior is explained here: https://dlthub.com/docs/general-usage/schema#naming-convention",
envvar=["SCHEMA_NAMING", "OMNILOAD_SCHEMA_NAMING"],
),
] = SchemaNaming.default, # type: ignore
] = SchemaNaming.default,
pipelines_dir: Annotated[
Optional[str],
typer.Option(
help="The path to store dlt-related pipeline metadata. By default, omniload will create a temporary directory and delete it after the execution is done in order to make retries stateless.",
envvar=["PIPELINES_DIR", "OMNILOAD_PIPELINES_DIR"],
),
] = None, # type: ignore
] = None,
extract_parallelism: Annotated[
Optional[int],
typer.Option(
help="The number of parallel jobs to run for extracting data from the source, only applicable for certain sources",
envvar=["EXTRACT_PARALLELISM", "OMNILOAD_EXTRACT_PARALLELISM"],
),
] = 5, # type: ignore
] = 5,
sql_reflection_level: Annotated[
SqlReflectionLevel,
typer.Option(
help="The reflection level to use when reflecting the table schema from the source",
envvar=["SQL_REFLECTION_LEVEL", "OMNILOAD_SQL_REFLECTION_LEVEL"],
),
] = SqlReflectionLevel.full, # type: ignore
] = SqlReflectionLevel.full,
sql_limit: Annotated[
Optional[int],
typer.Option(
help="The limit to use when fetching data from the source",
envvar=["SQL_LIMIT", "OMNILOAD_SQL_LIMIT"],
),
] = None, # type: ignore
] = None,
sql_exclude_columns: Annotated[
Optional[list[str]],
typer.Option(
help="The columns to exclude from the source table",
envvar=["SQL_EXCLUDE_COLUMNS", "OMNILOAD_SQL_EXCLUDE_COLUMNS"],
),
] = [], # type: ignore
] = [],
columns: Annotated[
Optional[list[str]],
typer.Option(
help="The column types to be used for the destination table in the format of 'column_name:column_type'",
envvar=["OMNILOAD_COLUMNS"],
),
] = None, # type: ignore
] = None,
yield_limit: Annotated[
Optional[int],
typer.Option(
help="Limit the number of pages yielded from the source",
envvar=["YIELD_LIMIT", "OMNILOAD_YIELD_LIMIT"],
),
] = None, # type: ignore
] = None,
staging_bucket: Annotated[
Optional[str],
typer.Option(
help="The staging bucket to be used for the ingestion, must be prefixed with 'gs://' or 's3://'",
envvar=["STAGING_BUCKET", "OMNILOAD_STAGING_BUCKET"],
),
] = None, # type: ignore
] = None,
mask: Annotated[
Optional[list[str]],
typer.Option(
help="Column masking configuration in format 'column:algorithm[:param]'. Can be specified multiple times.",
envvar=["MASK", "OMNILOAD_MASK"],
),
] = [], # type: ignore
] = [],
):
import hashlib
import tempfile
Expand Down Expand Up @@ -472,7 +472,7 @@ def parse_columns(columns: list[str]) -> dict:

column_hints[key]["primary_key"] = True

pipeline = dlt.pipeline( # type: ignore
pipeline = dlt.pipeline(
pipeline_name=m.hexdigest(),
destination=dlt_dest,
progress=progressInstance,
Expand Down Expand Up @@ -657,13 +657,13 @@ def run_pipeline():
table=dest_table,
staging_bucket=staging_bucket,
),
write_disposition=write_disposition, # type: ignore
write_disposition=write_disposition,
primary_key=(
primary_key if primary_key and len(primary_key) > 0 else None
), # type: ignore
),
loader_file_format=(
loader_file_format.value if loader_file_format is not None else None # type: ignore
), # type: ignore
loader_file_format.value if loader_file_format is not None else None
),
)

# Databricks concurrency error patterns that are safe to retry
Expand Down Expand Up @@ -796,7 +796,7 @@ def example_uris():

@app.command()
def version():
from omniload import __version__ # type: ignore
from omniload import __version__

print(f"v{__version__}")

Expand Down
36 changes: 18 additions & 18 deletions omniload/main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,28 @@

import duckdb
import numpy as np
import pandas as pd # type: ignore
import pandas as pd
import pendulum
import pyarrow as pa # type: ignore
import pyarrow.csv # type: ignore
import pyarrow.ipc as ipc # type: ignore
import pyarrow.parquet as pya_parquet # type: ignore
import pyarrow as pa
import pyarrow.csv
import pyarrow.ipc as ipc
import pyarrow.parquet as pya_parquet
import pytest
import requests
import sqlalchemy
from confluent_kafka import Producer # type: ignore
from confluent_kafka import Producer
from dlt.sources.filesystem import glob_files
from elasticsearch import Elasticsearch
from fsspec.implementations.memory import MemoryFileSystem # type: ignore
from fsspec.implementations.memory import MemoryFileSystem
from sqlalchemy.pool import NullPool
from testcontainers.clickhouse import ClickHouseContainer # type: ignore
from testcontainers.core.container import DockerContainer # type: ignore
from testcontainers.core.waiting_utils import wait_for_logs # type: ignore
from testcontainers.kafka import KafkaContainer # type: ignore
from testcontainers.localstack import LocalStackContainer # type: ignore
from testcontainers.mongodb import MongoDbContainer # type: ignore
from testcontainers.mysql import MySqlContainer # type: ignore
from testcontainers.postgres import PostgresContainer # type: ignore
from testcontainers.clickhouse import ClickHouseContainer
from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs
from testcontainers.kafka import KafkaContainer
from testcontainers.localstack import LocalStackContainer
from testcontainers.mongodb import MongoDbContainer
from testcontainers.mysql import MySqlContainer
from testcontainers.postgres import PostgresContainer
from typer.testing import CliRunner

from omniload.main import app
Expand Down Expand Up @@ -737,9 +737,9 @@ def insert_documents(self, documents: list):
"""Insert documents using Couchbase Python SDK from test machine."""
from datetime import timedelta

from couchbase.auth import PasswordAuthenticator # type: ignore
from couchbase.cluster import Cluster # type: ignore
from couchbase.options import ClusterOptions # type: ignore
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.options import ClusterOptions

# Connect using SDK (from test machine to container)
auth = PasswordAuthenticator(self.username, self.password)
Expand Down
4 changes: 2 additions & 2 deletions omniload/src/arrow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Any, Optional

import dlt
import pyarrow as pa # type: ignore
import pyarrow as pa
from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns
from dlt.extract.items import TTableHintTemplate

Expand All @@ -22,7 +22,7 @@ def memory_mapped_arrow(
def arrow_mmap(
incremental: Optional[dlt.sources.incremental[Any]] = incremental,
):
import pyarrow.ipc as ipc # type: ignore
import pyarrow.ipc as ipc

with pa.memory_map(path, "rb") as mmap:
reader: ipc.RecordBatchFileReader = ipc.open_file(mmap)
Expand Down
2 changes: 1 addition & 1 deletion omniload/src/blob_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import pytest

from omniload.src.blob import parse_uri # type: ignore
from omniload.src.blob import parse_uri


@dataclass
Expand Down
2 changes: 1 addition & 1 deletion omniload/src/chess/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def players_games(
def _get_archive(url: str) -> List[TDataItem]:
try:
games = get_url_with_retry(url).get("games", [])
return games # type: ignore
return games
except requests.HTTPError as http_err:
# sometimes archives are not available and the error seems to be permanent
if http_err.response.status_code == 404:
Expand Down
2 changes: 1 addition & 1 deletion omniload/src/chess/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

def get_url_with_retry(url: str) -> StrAny:
r = requests.get(url)
return r.json() # type: ignore
return r.json()


def get_path_with_retry(path: str) -> StrAny:
Expand Down
2 changes: 1 addition & 1 deletion omniload/src/collector/spinner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def update(
name: str,
inc: int = 1,
total: Optional[int] = None,
message: Optional[str] = None, # type: ignore
message: Optional[str] = None,
label: str = "",
**kwargs,
) -> None:
Expand Down
4 changes: 2 additions & 2 deletions omniload/src/destinations.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def open_connection(self):
):
return super().open_connection()

import pyodbc # type: ignore
import pyodbc

dsn = ";".join(
[f"{k}={v}" for k, v in cfg.items() if k not in self.SKIP_CREDENTIALS]
Expand Down Expand Up @@ -561,7 +561,7 @@ def dlt_dest(self, uri: str, **kwargs):
region_name = source_params.get("region_name", [None])[0]

if not access_key_id and not secret_access_key:
import botocore.session # type: ignore
import botocore.session

session = botocore.session.Session(profile=profile_name)
default = session.get_credentials()
Expand Down
Loading
Loading