diff --git a/omniload/conftest.py b/omniload/conftest.py index fcb8022c2..b04e2d816 100644 --- a/omniload/conftest.py +++ b/omniload/conftest.py @@ -4,7 +4,7 @@ import pytest -from .main_test import DESTINATIONS, SOURCES # type: ignore +from .main_test import DESTINATIONS, SOURCES def pytest_configure(config): diff --git a/omniload/main.py b/omniload/main.py index 02551cf5d..bb004d13b 100644 --- a/omniload/main.py +++ b/omniload/main.py @@ -95,21 +95,21 @@ def ingest( help="The URI of the [green]source[/green]", envvar=["SOURCE_URI", "OMNILOAD_SOURCE_URI"], ), - ], # type: ignore + ], dest_uri: Annotated[ str, typer.Option( help="The URI of the [cyan]destination[/cyan]", envvar=["DESTINATION_URI", "OMNILOAD_DESTINATION_URI"], ), - ], # type: ignore + ], source_table: Annotated[ str, typer.Option( help="The table name in the [green]source[/green] to fetch", envvar=["SOURCE_TABLE", "OMNILOAD_SOURCE_TABLE"], ), - ], # type: ignore + ], dest_table: Annotated[ str, typer.Option( @@ -123,14 +123,14 @@ def ingest( help="The incremental key from the table to be used for incremental strategies", envvar=["INCREMENTAL_KEY", "OMNILOAD_INCREMENTAL_KEY"], ), - ] = None, # type: ignore + ] = None, incremental_strategy: Annotated[ IncrementalStrategy, typer.Option( help="The incremental strategy to use", envvar=["INCREMENTAL_STRATEGY", "OMNILOAD_INCREMENTAL_STRATEGY"], ), - ] = IncrementalStrategy.create_replace, # type: ignore + ] = IncrementalStrategy.create_replace, interval_start: Annotated[ Optional[datetime], typer.Option( @@ -138,7 +138,7 @@ def ingest( formats=DATE_FORMATS, envvar=["INTERVAL_START", "OMNILOAD_INTERVAL_START"], ), - ] = None, # type: ignore + ] = None, interval_end: Annotated[ Optional[datetime], typer.Option( @@ -146,147 +146,147 @@ def ingest( formats=DATE_FORMATS, envvar=["INTERVAL_END", "OMNILOAD_INTERVAL_END"], ), - ] = None, # type: ignore + ] = None, primary_key: Annotated[ Optional[list[str]], typer.Option( help="The key that will be used to deduplicate the resulting table", envvar=["PRIMARY_KEY", "OMNILOAD_PRIMARY_KEY"], ), - ] = None, # type: ignore + ] = None, partition_by: Annotated[ Optional[str], typer.Option( help="The partition key to be used for partitioning the destination table", envvar=["PARTITION_BY", "OMNILOAD_PARTITION_BY"], ), - ] = None, # type: ignore + ] = None, cluster_by: Annotated[ Optional[str], typer.Option( help="The clustering key to be used for clustering the destination table, not every destination supports clustering.", envvar=["CLUSTER_BY", "OMNILOAD_CLUSTER_BY"], ), - ] = None, # type: ignore + ] = None, dry_run: Annotated[ Optional[bool], typer.Option( help="Display data transfer plan but don't invoke it", envvar=["DRY_RUN", "OMNILOAD_DRY_RUN"], ), - ] = False, # type: ignore + ] = False, full_refresh: Annotated[ bool, typer.Option( help="Ignore the state and refresh the destination table completely", envvar=["FULL_REFRESH", "OMNILOAD_FULL_REFRESH"], ), - ] = False, # type: ignore + ] = False, progress: Annotated[ Progress, typer.Option( help="The progress display type, must be one of 'interactive', 'log'", envvar=["PROGRESS", "OMNILOAD_PROGRESS"], ), - ] = Progress.interactive, # type: ignore + ] = Progress.interactive, sql_backend: Annotated[ SqlBackend, typer.Option( help="The SQL backend to use", envvar=["SQL_BACKEND", "OMNILOAD_SQL_BACKEND"], ), - ] = SqlBackend.default, # type: ignore + ] = SqlBackend.default, loader_file_format: Annotated[ Optional[LoaderFileFormat], typer.Option( help="The file format to use when loading data", envvar=["LOADER_FILE_FORMAT", "OMNILOAD_LOADER_FILE_FORMAT"], ), - ] = None, # type: ignore + ] = None, page_size: Annotated[ Optional[int], typer.Option( help="The page size to be used when fetching data from SQL sources", envvar=["PAGE_SIZE", "OMNILOAD_PAGE_SIZE"], ), - ] = 50000, # type: ignore + ] = 50000, loader_file_size: Annotated[ Optional[int], typer.Option( help="The file size to be used by the loader to split the data into multiple files. This can be set independent of the page size, since page size is used for fetching the data from the sources whereas this is used for the processing/loading part.", envvar=["LOADER_FILE_SIZE", "OMNILOAD_LOADER_FILE_SIZE"], ), - ] = 100000, # type: ignore + ] = 100000, schema_naming: Annotated[ SchemaNaming, typer.Option( help="The naming convention to use when moving the tables from source to destination. The default behavior is explained here: https://dlthub.com/docs/general-usage/schema#naming-convention", envvar=["SCHEMA_NAMING", "OMNILOAD_SCHEMA_NAMING"], ), - ] = SchemaNaming.default, # type: ignore + ] = SchemaNaming.default, pipelines_dir: Annotated[ Optional[str], typer.Option( help="The path to store dlt-related pipeline metadata. By default, omniload will create a temporary directory and delete it after the execution is done in order to make retries stateless.", envvar=["PIPELINES_DIR", "OMNILOAD_PIPELINES_DIR"], ), - ] = None, # type: ignore + ] = None, extract_parallelism: Annotated[ Optional[int], typer.Option( help="The number of parallel jobs to run for extracting data from the source, only applicable for certain sources", envvar=["EXTRACT_PARALLELISM", "OMNILOAD_EXTRACT_PARALLELISM"], ), - ] = 5, # type: ignore + ] = 5, sql_reflection_level: Annotated[ SqlReflectionLevel, typer.Option( help="The reflection level to use when reflecting the table schema from the source", envvar=["SQL_REFLECTION_LEVEL", "OMNILOAD_SQL_REFLECTION_LEVEL"], ), - ] = SqlReflectionLevel.full, # type: ignore + ] = SqlReflectionLevel.full, sql_limit: Annotated[ Optional[int], typer.Option( help="The limit to use when fetching data from the source", envvar=["SQL_LIMIT", "OMNILOAD_SQL_LIMIT"], ), - ] = None, # type: ignore + ] = None, sql_exclude_columns: Annotated[ Optional[list[str]], typer.Option( help="The columns to exclude from the source table", envvar=["SQL_EXCLUDE_COLUMNS", "OMNILOAD_SQL_EXCLUDE_COLUMNS"], ), - ] = [], # type: ignore + ] = [], columns: Annotated[ Optional[list[str]], typer.Option( help="The column types to be used for the destination table in the format of 'column_name:column_type'", envvar=["OMNILOAD_COLUMNS"], ), - ] = None, # type: ignore + ] = None, yield_limit: Annotated[ Optional[int], typer.Option( help="Limit the number of pages yielded from the source", envvar=["YIELD_LIMIT", "OMNILOAD_YIELD_LIMIT"], ), - ] = None, # type: ignore + ] = None, staging_bucket: Annotated[ Optional[str], typer.Option( help="The staging bucket to be used for the ingestion, must be prefixed with 'gs://' or 's3://'", envvar=["STAGING_BUCKET", "OMNILOAD_STAGING_BUCKET"], ), - ] = None, # type: ignore + ] = None, mask: Annotated[ Optional[list[str]], typer.Option( help="Column masking configuration in format 'column:algorithm[:param]'. Can be specified multiple times.", envvar=["MASK", "OMNILOAD_MASK"], ), - ] = [], # type: ignore + ] = [], ): import hashlib import tempfile @@ -472,7 +472,7 @@ def parse_columns(columns: list[str]) -> dict: column_hints[key]["primary_key"] = True - pipeline = dlt.pipeline( # type: ignore + pipeline = dlt.pipeline( pipeline_name=m.hexdigest(), destination=dlt_dest, progress=progressInstance, @@ -657,13 +657,13 @@ def run_pipeline(): table=dest_table, staging_bucket=staging_bucket, ), - write_disposition=write_disposition, # type: ignore + write_disposition=write_disposition, primary_key=( primary_key if primary_key and len(primary_key) > 0 else None - ), # type: ignore + ), loader_file_format=( - loader_file_format.value if loader_file_format is not None else None # type: ignore - ), # type: ignore + loader_file_format.value if loader_file_format is not None else None + ), ) # Databricks concurrency error patterns that are safe to retry @@ -796,7 +796,7 @@ def example_uris(): @app.command() def version(): - from omniload import __version__ # type: ignore + from omniload import __version__ print(f"v{__version__}") diff --git a/omniload/main_test.py b/omniload/main_test.py index 7322762de..da8f939c2 100644 --- a/omniload/main_test.py +++ b/omniload/main_test.py @@ -23,28 +23,28 @@ import duckdb import numpy as np -import pandas as pd # type: ignore +import pandas as pd import pendulum -import pyarrow as pa # type: ignore -import pyarrow.csv # type: ignore -import pyarrow.ipc as ipc # type: ignore -import pyarrow.parquet as pya_parquet # type: ignore +import pyarrow as pa +import pyarrow.csv +import pyarrow.ipc as ipc +import pyarrow.parquet as pya_parquet import pytest import requests import sqlalchemy -from confluent_kafka import Producer # type: ignore +from confluent_kafka import Producer from dlt.sources.filesystem import glob_files from elasticsearch import Elasticsearch -from fsspec.implementations.memory import MemoryFileSystem # type: ignore +from fsspec.implementations.memory import MemoryFileSystem from sqlalchemy.pool import NullPool -from testcontainers.clickhouse import ClickHouseContainer # type: ignore -from testcontainers.core.container import DockerContainer # type: ignore -from testcontainers.core.waiting_utils import wait_for_logs # type: ignore -from testcontainers.kafka import KafkaContainer # type: ignore -from testcontainers.localstack import LocalStackContainer # type: ignore -from testcontainers.mongodb import MongoDbContainer # type: ignore -from testcontainers.mysql import MySqlContainer # type: ignore -from testcontainers.postgres import PostgresContainer # type: ignore +from testcontainers.clickhouse import ClickHouseContainer +from testcontainers.core.container import DockerContainer +from testcontainers.core.waiting_utils import wait_for_logs +from testcontainers.kafka import KafkaContainer +from testcontainers.localstack import LocalStackContainer +from testcontainers.mongodb import MongoDbContainer +from testcontainers.mysql import MySqlContainer +from testcontainers.postgres import PostgresContainer from typer.testing import CliRunner from omniload.main import app @@ -737,9 +737,9 @@ def insert_documents(self, documents: list): """Insert documents using Couchbase Python SDK from test machine.""" from datetime import timedelta - from couchbase.auth import PasswordAuthenticator # type: ignore - from couchbase.cluster import Cluster # type: ignore - from couchbase.options import ClusterOptions # type: ignore + from couchbase.auth import PasswordAuthenticator + from couchbase.cluster import Cluster + from couchbase.options import ClusterOptions # Connect using SDK (from test machine to container) auth = PasswordAuthenticator(self.username, self.password) diff --git a/omniload/src/arrow/__init__.py b/omniload/src/arrow/__init__.py index cfc306d3e..909c2d3f9 100644 --- a/omniload/src/arrow/__init__.py +++ b/omniload/src/arrow/__init__.py @@ -1,7 +1,7 @@ from typing import Any, Optional import dlt -import pyarrow as pa # type: ignore +import pyarrow as pa from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns from dlt.extract.items import TTableHintTemplate @@ -22,7 +22,7 @@ def memory_mapped_arrow( def arrow_mmap( incremental: Optional[dlt.sources.incremental[Any]] = incremental, ): - import pyarrow.ipc as ipc # type: ignore + import pyarrow.ipc as ipc with pa.memory_map(path, "rb") as mmap: reader: ipc.RecordBatchFileReader = ipc.open_file(mmap) diff --git a/omniload/src/blob_test.py b/omniload/src/blob_test.py index 2b3832ea2..812a8cbf3 100644 --- a/omniload/src/blob_test.py +++ b/omniload/src/blob_test.py @@ -3,7 +3,7 @@ import pytest -from omniload.src.blob import parse_uri # type: ignore +from omniload.src.blob import parse_uri @dataclass diff --git a/omniload/src/chess/__init__.py b/omniload/src/chess/__init__.py index 00c610477..8dd6c9d29 100644 --- a/omniload/src/chess/__init__.py +++ b/omniload/src/chess/__init__.py @@ -117,7 +117,7 @@ def players_games( def _get_archive(url: str) -> List[TDataItem]: try: games = get_url_with_retry(url).get("games", []) - return games # type: ignore + return games except requests.HTTPError as http_err: # sometimes archives are not available and the error seems to be permanent if http_err.response.status_code == 404: diff --git a/omniload/src/chess/helpers.py b/omniload/src/chess/helpers.py index 7f79abb9a..a264867a4 100644 --- a/omniload/src/chess/helpers.py +++ b/omniload/src/chess/helpers.py @@ -22,7 +22,7 @@ def get_url_with_retry(url: str) -> StrAny: r = requests.get(url) - return r.json() # type: ignore + return r.json() def get_path_with_retry(path: str) -> StrAny: diff --git a/omniload/src/collector/spinner.py b/omniload/src/collector/spinner.py index dbf327b32..8e97e30d2 100644 --- a/omniload/src/collector/spinner.py +++ b/omniload/src/collector/spinner.py @@ -18,7 +18,7 @@ def update( name: str, inc: int = 1, total: Optional[int] = None, - message: Optional[str] = None, # type: ignore + message: Optional[str] = None, label: str = "", **kwargs, ) -> None: diff --git a/omniload/src/destinations.py b/omniload/src/destinations.py index aab5b6f35..950bb6b3c 100644 --- a/omniload/src/destinations.py +++ b/omniload/src/destinations.py @@ -222,7 +222,7 @@ def open_connection(self): ): return super().open_connection() - import pyodbc # type: ignore + import pyodbc dsn = ";".join( [f"{k}={v}" for k, v in cfg.items() if k not in self.SKIP_CREDENTIALS] @@ -561,7 +561,7 @@ def dlt_dest(self, uri: str, **kwargs): region_name = source_params.get("region_name", [None])[0] if not access_key_id and not secret_access_key: - import botocore.session # type: ignore + import botocore.session session = botocore.session.Session(profile=profile_name) default = session.get_credentials() diff --git a/omniload/src/filesystem/helpers.py b/omniload/src/filesystem/helpers.py index b5f3d0ae1..243f0e06d 100644 --- a/omniload/src/filesystem/helpers.py +++ b/omniload/src/filesystem/helpers.py @@ -27,7 +27,7 @@ FileSystemCredentials, ) from dlt.sources.filesystem import fsspec_filesystem -from fsspec import AbstractFileSystem # type: ignore +from fsspec import AbstractFileSystem @configspec @@ -82,7 +82,7 @@ def add_columns(columns: List[str], rows: List[List[Any]]) -> List[Dict[str, Any return result -def fetch_arrow(file_data, chunk_size: int) -> Iterable[TDataItem]: # type: ignore +def fetch_arrow(file_data, chunk_size: int) -> Iterable[TDataItem]: """Fetches data from the given CSV file. Args: diff --git a/omniload/src/filesystem/readers.py b/omniload/src/filesystem/readers.py index b305ccf3a..a93525ad2 100644 --- a/omniload/src/filesystem/readers.py +++ b/omniload/src/filesystem/readers.py @@ -158,7 +158,7 @@ def _read_csv_duckdb( for item in items: with item.open() as f: - file_data = duckdb.from_csv_auto(f, **duckdb_kwargs) # type: ignore + file_data = duckdb.from_csv_auto(f, **duckdb_kwargs) yield from helper(file_data, chunk_size) diff --git a/omniload/src/frankfurter/__init__.py b/omniload/src/frankfurter/__init__.py index 1f9bee763..58a4b0c89 100644 --- a/omniload/src/frankfurter/__init__.py +++ b/omniload/src/frankfurter/__init__.py @@ -114,7 +114,7 @@ def exchange_rates( end_date = pendulum.now() # Ensure start_date.last_value is a pendulum.DateTime object - start_date_obj = ensure_pendulum_datetime(start_date) # type: ignore + start_date_obj = ensure_pendulum_datetime(start_date) start_date_str = start_date_obj.format("YYYY-MM-DD") # Ensure end_date is a pendulum.DateTime object diff --git a/omniload/src/frankfurter/helpers.py b/omniload/src/frankfurter/helpers.py index 29d14089a..c06bdf118 100644 --- a/omniload/src/frankfurter/helpers.py +++ b/omniload/src/frankfurter/helpers.py @@ -9,7 +9,7 @@ def get_url_with_retry(url: str) -> StrAny: r = requests.get(url, timeout=5) - return r.json() # type: ignore + return r.json() def get_path_with_retry(path: str) -> StrAny: diff --git a/omniload/src/github/helpers.py b/omniload/src/github/helpers.py index cc38405e2..86babd9fa 100644 --- a/omniload/src/github/helpers.py +++ b/omniload/src/github/helpers.py @@ -121,7 +121,7 @@ def _extract_top_connection(data: StrAny, node_type: str) -> StrAny: f"The data with list of {node_type} must be a dictionary and contain only one element" ) data = next(iter(data.values())) - return data[node_type] # type: ignore + return data[node_type] def _extract_nested_nodes(item: DictStrAny) -> DictStrAny: diff --git a/omniload/src/google_ads/__init__.py b/omniload/src/google_ads/__init__.py index 0a3e61aac..0deb6e10b 100644 --- a/omniload/src/google_ads/__init__.py +++ b/omniload/src/google_ads/__init__.py @@ -17,12 +17,12 @@ from typing import Any, Iterator, Optional import dlt -import proto # type: ignore +import proto from dlt.common.exceptions import MissingDependencyException from dlt.common.typing import TDataItem from dlt.sources import DltResource -from flatten_json import flatten # type: ignore -from googleapiclient.discovery import Resource # type: ignore +from flatten_json import flatten +from googleapiclient.discovery import Resource from . import field from .metrics import dlt_metrics_schema @@ -30,7 +30,7 @@ from .reports import BUILTIN_REPORTS, Report try: - from google.ads.googleads.client import GoogleAdsClient # type: ignore + from google.ads.googleads.client import GoogleAdsClient except ImportError: raise MissingDependencyException("Requests-OAuthlib", ["google-ads"]) @@ -47,7 +47,7 @@ def google_ads( date_range = dlt.sources.incremental( "segments_date", initial_value=start_date.date(), # type: ignore - end_value=end_date.date() if end_date is not None else None, # type: ignore + end_value=end_date.date() if end_date is not None else None, range_start="closed", range_end="closed", ) diff --git a/omniload/src/google_sheets/helpers/api_calls.py b/omniload/src/google_sheets/helpers/api_calls.py index d8dd0a3a7..b292a27de 100644 --- a/omniload/src/google_sheets/helpers/api_calls.py +++ b/omniload/src/google_sheets/helpers/api_calls.py @@ -32,7 +32,7 @@ def is_retry_status_code(exception: BaseException) -> bool: """Retry condition on HttpError""" - from googleapiclient.errors import HttpError # type: ignore + from googleapiclient.errors import HttpError # print(f"RETRY ON {str(HttpError)} = {isinstance(exception, HttpError) and exception.resp.status in DEFAULT_RETRY_STATUS}") # if isinstance(exception, HttpError): diff --git a/omniload/src/http/readers.py b/omniload/src/http/readers.py index 1b8a8506a..01d2f68c2 100644 --- a/omniload/src/http/readers.py +++ b/omniload/src/http/readers.py @@ -67,7 +67,7 @@ def _read_csv( self, content: bytes, chunksize: int = 10000, **pandas_kwargs: Any ) -> Iterator[TDataItems]: """Read CSV file with Pandas chunk by chunk""" - import pandas as pd # type: ignore + import pandas as pd kwargs = {**{"header": "infer", "chunksize": chunksize}, **pandas_kwargs} @@ -79,7 +79,7 @@ def _read_csv_headless( self, content: bytes, chunksize: int = 10000, **pandas_kwargs: Any ) -> Iterator[TDataItems]: """Read CSV file without headers, using provided column names or generating them""" - import pandas as pd # type: ignore + import pandas as pd # Determine column names if self.column_names: @@ -137,7 +137,7 @@ def _read_parquet( self, content: bytes, chunksize: int = 10000, **kwargs: Any ) -> Iterator[TDataItems]: """Read Parquet file""" - from pyarrow import parquet as pq # type: ignore + from pyarrow import parquet as pq file_obj = io.BytesIO(content) parquet_file = pq.ParquetFile(file_obj) diff --git a/omniload/src/influxdb/client.py b/omniload/src/influxdb/client.py index cc4944c5e..3a9600d64 100644 --- a/omniload/src/influxdb/client.py +++ b/omniload/src/influxdb/client.py @@ -1,6 +1,6 @@ from typing import Any, Dict, Iterable -from influxdb_client import InfluxDBClient # type: ignore +from influxdb_client import InfluxDBClient class InfluxClient: diff --git a/omniload/src/jira_source/helpers.py b/omniload/src/jira_source/helpers.py index 96e46f9aa..c0c5a4823 100644 --- a/omniload/src/jira_source/helpers.py +++ b/omniload/src/jira_source/helpers.py @@ -140,7 +140,7 @@ def _make_request( logger.warning( f"Rate limit exceeded. Waiting {retry_after} seconds before retry." ) - time.sleep(retry_after) # type: ignore + time.sleep(retry_after) continue else: raise JiraRateLimitError( @@ -155,7 +155,7 @@ def _make_request( logger.warning( f"Server error {response.status_code}. Retrying in {wait_time} seconds." ) - time.sleep(wait_time) # type: ignore + time.sleep(wait_time) continue else: raise JiraAPIError( @@ -183,7 +183,7 @@ def _make_request( logger.warning( f"Request failed: {str(e)}. Retrying in {wait_time} seconds." ) - time.sleep(wait_time) # type: ignore + time.sleep(wait_time) continue else: raise JiraAPIError( diff --git a/omniload/src/kafka/__init__.py b/omniload/src/kafka/__init__.py index fca2626d7..b65158bed 100644 --- a/omniload/src/kafka/__init__.py +++ b/omniload/src/kafka/__init__.py @@ -23,7 +23,7 @@ from typing import Any, Callable, Dict, Iterable, List, Optional, Union import dlt -from confluent_kafka import Consumer, Message # type: ignore +from confluent_kafka import Consumer, Message from dlt.common import logger from dlt.common.time import ensure_pendulum_datetime from dlt.common.typing import TAnyDateTime, TDataItem diff --git a/omniload/src/kafka/helpers.py b/omniload/src/kafka/helpers.py index c5b69970c..51d8815f8 100644 --- a/omniload/src/kafka/helpers.py +++ b/omniload/src/kafka/helpers.py @@ -14,8 +14,8 @@ from typing import Any, Dict, List, Optional -from confluent_kafka import Consumer, Message, TopicPartition # type: ignore -from confluent_kafka.admin import TopicMetadata # type: ignore +from confluent_kafka import Consumer, Message, TopicPartition +from confluent_kafka.admin import TopicMetadata from dlt import config from dlt.common import pendulum from dlt.common.configuration import configspec @@ -64,7 +64,7 @@ def default_msg_processor(msg: Message) -> Dict[str, Any]: } -class OffsetTracker(dict): # type: ignore +class OffsetTracker(dict): """Object to control offsets of the given topics. Tracks all the partitions of the given topics with two params: diff --git a/omniload/src/kinesis/__init__.py b/omniload/src/kinesis/__init__.py index 7ec97f784..f4974ef5b 100644 --- a/omniload/src/kinesis/__init__.py +++ b/omniload/src/kinesis/__init__.py @@ -94,7 +94,7 @@ def kinesis_stream( stream_name, shard_id, last_msg, # type: ignore - initial_at_datetime, # type: ignore + initial_at_datetime, ) while shard_iterator: @@ -147,7 +147,7 @@ def kinesis_stream( records_ms_behind_latest = records_response.get("MillisBehindLatest", 0) if records_ms_behind_latest < milliseconds_behind_latest: # stop taking messages from shard - shard_iterator = None # type: ignore + shard_iterator = None else: # continue taking messages shard_iterator = records_response["NextShardIterator"] diff --git a/omniload/src/loader.py b/omniload/src/loader.py index b5b8c7fd2..036755daa 100644 --- a/omniload/src/loader.py +++ b/omniload/src/loader.py @@ -5,7 +5,7 @@ from contextlib import contextmanager from typing import Generator -from pyarrow.parquet import ParquetFile # type: ignore +from pyarrow.parquet import ParquetFile PARQUET_BATCH_SIZE = 64 diff --git a/omniload/src/loader_test.py b/omniload/src/loader_test.py index 771d9132d..c41cbe082 100644 --- a/omniload/src/loader_test.py +++ b/omniload/src/loader_test.py @@ -6,7 +6,7 @@ import tempfile from typing import List -import pyarrow.parquet # type: ignore +import pyarrow.parquet import pytest from omniload.src.loader import load_dlt_file diff --git a/omniload/src/masking.py b/omniload/src/masking.py index a2d26a7b9..1fce364ce 100644 --- a/omniload/src/masking.py +++ b/omniload/src/masking.py @@ -256,7 +256,7 @@ def _date_shift(self, value: Any, max_days: int) -> Any: # Try to parse string dates try: - from dateutil import parser # type: ignore + from dateutil import parser dt = parser.parse(str(value)) shift_days = random.randint(-max_days, max_days) @@ -312,7 +312,7 @@ def create_masking_mapper(mask_configs: list[str]) -> Callable: def apply_masks(data: Any) -> Any: # Handle PyArrow tables try: - import pyarrow as pa # type: ignore + import pyarrow as pa if isinstance(data, pa.Table): # Convert to pandas for easier manipulation diff --git a/omniload/src/mongodb/helpers.py b/omniload/src/mongodb/helpers.py index 2a7727799..87f4d0d7d 100644 --- a/omniload/src/mongodb/helpers.py +++ b/omniload/src/mongodb/helpers.py @@ -170,7 +170,7 @@ def _projection_op( return projection_dict - def _limit(self, cursor: Cursor, limit: Optional[int] = None) -> TCursor: # type: ignore + def _limit(self, cursor: Cursor, limit: Optional[int] = None) -> TCursor: """Apply a limit to the cursor, if needed. Args: @@ -368,7 +368,7 @@ def load_documents( if self._sort_op: cursor = cursor.sort(self._sort_op) # type: ignore - cursor = self._limit(cursor, limit) # type: ignore + cursor = self._limit(cursor, limit) context = PyMongoArrowContext.from_schema( schema=pymongoarrow_schema, codec_options=self.collection.codec_options @@ -715,23 +715,23 @@ def collection_documents( if data_item_format == "arrow": LoaderClass = CollectionAggregationArrowLoaderParallel else: - LoaderClass = CollectionAggregationLoaderParallel # type: ignore + LoaderClass = CollectionAggregationLoaderParallel else: if data_item_format == "arrow": - LoaderClass = CollectionAggregationArrowLoader # type: ignore + LoaderClass = CollectionAggregationArrowLoader else: - LoaderClass = CollectionAggregationLoader # type: ignore + LoaderClass = CollectionAggregationLoader else: if parallel: if data_item_format == "arrow": LoaderClass = CollectionArrowLoaderParallel else: - LoaderClass = CollectionLoaderParallel # type: ignore + LoaderClass = CollectionLoaderParallel else: if data_item_format == "arrow": - LoaderClass = CollectionArrowLoader # type: ignore + LoaderClass = CollectionArrowLoader else: - LoaderClass = CollectionLoader # type: ignore + LoaderClass = CollectionLoader loader = LoaderClass( client, collection, incremental=incremental, chunk_size=chunk_size diff --git a/omniload/src/quickbooks/__init__.py b/omniload/src/quickbooks/__init__.py index 80f125295..26b8cc20a 100644 --- a/omniload/src/quickbooks/__init__.py +++ b/omniload/src/quickbooks/__init__.py @@ -7,9 +7,9 @@ from dlt.common.time import ensure_pendulum_datetime from dlt.common.typing import TDataItem from dlt.sources import DltResource -from intuitlib.client import AuthClient # type: ignore +from intuitlib.client import AuthClient -from quickbooks import QuickBooks # type: ignore +from quickbooks import QuickBooks @dlt.source(name="quickbooks", max_table_nesting=0) @@ -66,8 +66,8 @@ def fetch_object( obj_name: str, updated_at: dlt.sources.incremental[str] = dlt.sources.incremental( "lastupdatedtime", - initial_value=start_date, # type: ignore - end_value=end_date, # type: ignore + initial_value=start_date, + end_value=end_date, range_start="closed", range_end="closed", allow_external_schedulers=True, diff --git a/omniload/src/slack/helpers.py b/omniload/src/slack/helpers.py index f3169c96a..7c6725871 100644 --- a/omniload/src/slack/helpers.py +++ b/omniload/src/slack/helpers.py @@ -21,7 +21,7 @@ from dlt.common.time import ensure_pendulum_datetime from dlt.common.typing import Dict, TAnyDateTime, TDataItem from dlt.sources.helpers import requests -from jsonpath_ng.ext import parse # type: ignore +from jsonpath_ng.ext import parse from .settings import MAX_PAGE_SIZE, SLACK_API_URL diff --git a/omniload/src/smartsheets/__init__.py b/omniload/src/smartsheets/__init__.py index a6253087f..3edc28ef5 100644 --- a/omniload/src/smartsheets/__init__.py +++ b/omniload/src/smartsheets/__init__.py @@ -1,10 +1,10 @@ from typing import Iterable import dlt -import smartsheet # type: ignore +import smartsheet from dlt.extract import DltResource -from smartsheet.models.enums import ColumnType # type: ignore -from smartsheet.models.sheet import Sheet # type: ignore +from smartsheet.models.enums import ColumnType +from smartsheet.models.sheet import Sheet TYPE_MAPPING = { ColumnType.TEXT_NUMBER: "text", diff --git a/omniload/src/smartsheets/test_smartsheets.py b/omniload/src/smartsheets/test_smartsheets.py index f984fa7cc..5c98e7ee8 100644 --- a/omniload/src/smartsheets/test_smartsheets.py +++ b/omniload/src/smartsheets/test_smartsheets.py @@ -2,8 +2,8 @@ import unittest from unittest.mock import patch -import smartsheet # type: ignore -from smartsheet.models import Cell, Column, Row, Sheet # type: ignore +import smartsheet +from smartsheet.models import Cell, Column, Row, Sheet from omniload.src.smartsheets import _get_sheet_data, smartsheet_source diff --git a/omniload/src/sources.py b/omniload/src/sources.py index a49459125..5770bea27 100644 --- a/omniload/src/sources.py +++ b/omniload/src/sources.py @@ -19,7 +19,7 @@ ) from urllib.parse import ParseResult, parse_qs, urlencode, urlparse -import fsspec # type: ignore +import fsspec import pendulum from dlt.common.time import ensure_pendulum_datetime from dlt.extract import Incremental @@ -315,10 +315,10 @@ def table_rows( query_adapter_callback: Optional[TQueryAdapter] = None, resolve_foreign_keys: bool = False, ) -> Iterator[TDataItem]: - hints = { # type: ignore + hints = { "columns": [], } - cols = [] # type: ignore + cols = [] if incremental: switchDict = { @@ -333,11 +333,11 @@ def table_rows( cols.append( Column( incremental.cursor_path, - switchDict[type(incremental.last_value)], # type: ignore + switchDict[type(incremental.last_value)], ) ) else: - cols.append(Column(incremental.cursor_path, sa.TIMESTAMP)) # type: ignore + cols.append(Column(incremental.cursor_path, sa.TIMESTAMP)) table = Table( "query_result", @@ -373,7 +373,7 @@ def table_rows( params = parse_qs(parsed_uri.query) params = {k.lower(): v for k, v in params.items()} if params.get("authentication") == ["ActiveDirectoryAccessToken"]: - import pyodbc # type: ignore + import pyodbc from sqlalchemy import create_engine from omniload.src.destinations import ( @@ -746,7 +746,7 @@ def csv_file( return resource( csv_file, - merge_key=kwargs.get("merge_key"), # type: ignore + merge_key=kwargs.get("merge_key"), )( incremental=dlt_incremental( kwargs.get("incremental_key", ""), @@ -1609,12 +1609,12 @@ def dlt_source(self, uri: str, table: str, **kwargs): group_id=group_id[0], security_protocol=( security_protocol[0] if len(security_protocol) > 0 else None - ), # type: ignore + ), sasl_mechanisms=( sasl_mechanisms[0] if len(sasl_mechanisms) > 0 else None - ), # type: ignore - sasl_username=sasl_username[0] if len(sasl_username) > 0 else None, # type: ignore - sasl_password=sasl_password[0] if len(sasl_password) > 0 else None, # type: ignore + ), + sasl_username=sasl_username[0] if len(sasl_username) > 0 else None, + sasl_password=sasl_password[0] if len(sasl_password) > 0 else None, ), start_from=start_date, batch_size=int(batch_size[0]), @@ -1860,7 +1860,7 @@ def dlt_source(self, uri: str, table: str, **kwargs): bucket_url = f"s3://{bucket_name}/" - import s3fs # type: ignore + import s3fs endpoint_url = source_fields.get("endpoint_url") fs_kwargs: dict = { @@ -2112,7 +2112,7 @@ def dlt_source(self, uri: str, table: str, **kwargs): "skip_archived": False, } if ":" in table: - table, rest = table.split(":", 1) # type: ignore + table, rest = table.split(":", 1) for k in rest.split(":"): flags[k] = True @@ -2529,7 +2529,7 @@ def dlt_source(self, uri: str, table: str, **kwargs): # (The RECOMMENDED way of passing service account credentials) # directly with gcsfs. As a workaround, we construct the GCSFileSystem # and pass it directly to filesystem.readers. - import gcsfs # type: ignore + import gcsfs fs = gcsfs.GCSFileSystem( token=credentials, @@ -2587,7 +2587,7 @@ def handles_incrementality(self) -> bool: return False def init_client(self, params: Dict[str, List[str]]): - from google.ads.googleads.client import GoogleAdsClient # type: ignore + from google.ads.googleads.client import GoogleAdsClient dev_token = params.get("dev_token") if dev_token is None or len(dev_token) == 0: @@ -2653,7 +2653,7 @@ def init_client(self, params: Dict[str, List[str]]): client = GoogleAdsClient.load_from_dict(conf) finally: if fd is not None: - os.remove(path) # type: ignore + os.remove(path) return client