diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index ba7b49e28..1b584f464 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -135,7 +135,7 @@ jobs: conda config --env --add pinned_packages python=$PYTHON_VERSION conda config --env --add pinned_packages pandas==$PANDAS_VERSION conda config --env --add pinned_packages pyarrow==$PYARROW_VERSION - conda install -c conda-forge --yes pandas==$PANDAS_VERSION pyarrow==$PYARROW_VERSION + conda install -c conda-forge --yes pandas==$PANDAS_VERSION pyarrow==$PYARROW_VERSION pip sed -i -e "/pandas/d" -e "/pyarrow/d" python/requirements-dev.txt conda install -c conda-forge --yes --file python/requirements-dev.txt conda list diff --git a/README.md b/README.md index 7836ace26..e324c23b8 100644 --- a/README.md +++ b/README.md @@ -436,6 +436,38 @@ Download the pre-built package `delta-sharing-server-x.y.z.zip` from [GitHub Rel - Make changes to your yaml file. You may also need to update some server configs for special requirements. - To add Shared Data, add reference to Delta Lake tables you would like to share from this server in this config file. +### Optional Share Egress Access Logging + +The server can emit share-attributed structured access log entries for each query and CDF request. + +Fields emitted in each log line: + +- `share` +- `schema` +- `table` +- `requestType` (`query` or `cdf_stream`) +- `egressBytes` +- `pricingTier` +- `timestampMs` +- `clientRegion` (if available) + +To enable this feature, configure the `accessLogging` block in the server yaml. + +Example yaml: + +```yaml +shares: +- name: "share1" + schemas: + - name: "schema1" + tables: + - name: "table1" + location: "gs://my-bucket/my-table" + +accessLogging: + enabled: true +``` + ## Config the server to access tables on cloud storage We support sharing Delta Lake tables on S3, Azure Blob Storage and Azure Data Lake Storage Gen2. diff --git a/docs/PER_SHARE_EGRESS_MONITORING.md b/docs/PER_SHARE_EGRESS_MONITORING.md new file mode 100644 index 000000000..e5dfa897d --- /dev/null +++ b/docs/PER_SHARE_EGRESS_MONITORING.md @@ -0,0 +1,209 @@ +# Per-Share Egress Monitoring + +## Overview + +Structured access logs for egress tracking with automatic GCP pricing tier classification. + +When enabled, each data access emits JSON logs with: +- Share/schema/table identification +- Total egress bytes +- Pricing tier (e.g., `internet_to_na_eu`, `interregion_na_to_eu`) +- Client region code + +--- + +## Pricing Tiers + +### 1. Free/Internal Traffic + +| Tier | Description | Approximate Cost | +|------|-------------|------------------| +| `same_zone` | Traffic within the same GCP zone | Free | +| `same_region` | Traffic within the same region or Kubernetes cluster | ~Free | + +### 2. Inter-Region GCP Traffic + +Traffic between GCP services (e.g., between regions via service mesh): + +| Tier | Route | Approximate Cost | +|------|-------|------------------| +| `interregion_na_to_na` | North America → North America | $0.02/GiB | +| `interregion_eu_to_eu` | Europe → Europe | $0.02/GiB | +| `interregion_na_to_eu` | North America → Europe | $0.05/GiB | +| `interregion_eu_to_na` | Europe → North America | $0.05/GiB | +| `interregion_to_apac` | Any region → Asia Pacific | $0.08/GiB | +| `interregion_to_oceania` | Any region → Australia/Oceania | $0.10/GiB | +| `interregion_to_latam` | Any region → Latin America | $0.14/GiB | + +### 3. Internet Egress (Premium Tier) + +Traffic leaving GCP to external clients: + +| Tier | Destination | Approximate Cost | +|------|-------------|------------------| +| `internet_to_na_eu` | North America or Europe | $0.12/GiB | +| `internet_to_apac` | Asia Pacific | $0.12/GiB | +| `internet_to_latam` | Latin America | $0.19/GiB | +| `internet_to_oceania` | Australia/Oceania | $0.15/GiB | + +### Special Cases + +| Tier | Description | +|------|-------------| +| `unknown` | Unable to determine pricing tier | + +--- + +## How Pricing Tiers Are Resolved + +### Step 1: Determine Egress Type + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ Egress Type Detection │ +├─────────────────────────────────────────────────────────────────┤ +│ 1. Client IP is private (10.x, 172.16-31.x, 192.168.x)? │ +│ → SAME_REGION (internal cluster traffic) │ +│ │ +│ 2. GCP IP Range Lookup (from cloud.json) finds client IP? │ +│ a. Client GCP region matches sourceRegion? │ +│ → SAME_REGION │ +│ b. Client GCP region differs from sourceRegion? │ +│ → INTER_REGION (with exact destination region) │ +│ │ +│ 3. Client IP looks like GCP (34.x/35.x) but not in ranges? │ +│ → INTER_REGION (conservative fallback) │ +│ │ +│ 4. Non-GCP IP with valid country code? │ +│ → INTERNET │ +│ │ +│ 5. No region header? │ +│ → UNKNOWN │ +└─────────────────────────────────────────────────────────────────┘ +``` + +The system fetches GCP's published IP ranges from `https://www.gstatic.com/ipranges/cloud.json` +and uses a CIDR trie for efficient IP-to-region lookup (refreshed every 24 hours). + +### Step 2: Determine Continents + +Based on the source region (server configuration) and destination (detected from headers): + +**GCP Regions → Continent Mapping:** +- `us-*`, `northamerica-*` → North America (NA) +- `europe-*` → Europe (EU) +- `asia-*` → Asia Pacific (APAC) +- `southamerica-*` → Latin America (LATAM) +- `australia-*` → Oceania (OCEANIA) + +**Country Codes → Continent Mapping:** +- US, CA, MX → NA +- GB, IE, DE, FR, NL, BE, CH, AT, ES, PT, IT, SE, NO, DK, FI, PL, CZ, HU, RO, etc. → EU +- JP, KR, CN, HK, TW, SG, MY, ID, TH, VN, PH, IN, PK, BD → APAC +- AU, NZ → OCEANIA +- BR, AR, CL, CO, PE, VE, EC, etc. → LATAM + +### Step 3: Calculate Pricing Tier + +Based on egress type and continent pair: + +| Egress Type | Calculation Method | +|-------------|-------------------| +| `SAME_ZONE` | Returns `same_zone` | +| `SAME_REGION` | Returns `same_region` | +| `INTER_REGION` | Based on source→destination continent pair | +| `INTERNET` | Based on destination continent only | + +--- + +## Configuration + +```yaml +accessLogging: + enabled: true + sourceRegion: "us-central1" # GCP region where server runs + detectGcpTraffic: true # Enable GCP IP range lookup + clientRegionHeader: "x-client-region" # Header with country code + clientIpHeader: "x-forwarded-for" # Header with client IP chain +``` + +### GCP Load Balancer Headers + +Configure custom request headers on your backend: +``` +X-Client-Region: {client_region} +X-Client-Region-Subdivision: {client_region_subdivision} +``` + +--- + +## Log Output + +**ACCESS_LOG** — Emitted for each request with non-zero egress: +```json +{ + "logType": "ACCESS_LOG", + "share": "myshare", + "schema": "myschema", + "table": "mytable", + "egressBytes": 1048576, + "pricingTier": "internet_to_na_eu", + "timestampMs": 1717502400000, + "requestType": "query", + "clientRegion": "US" +} +``` + +**PRICING_CONTEXT** — Debug entry with detection details: +```json +{ + "logType": "PRICING_CONTEXT", + "clientIp": "203.0.113.45", + "isGcpIp": false, + "egressType": "internet", + "sourceRegion": "us-central1", + "destinationContinent": "NA", + "pricingTier": "internet_to_na_eu" +} +``` + +**REQUEST_HEADERS** — All headers for debugging (keys lowercased). + +--- + +## Header Detection Priority + +**Region Headers:** +1. Configured `clientRegionHeader` (default: `x-client-region`) +2. `x-appengine-country` +3. `cf-ipcountry` +4. `cloudfront-viewer-country` + +**Client IP Headers:** +1. Configured `clientIpHeader` (default: `x-forwarded-for`) +2. `x-envoy-external-address` + +--- + +## Implementation + +| File | Purpose | +|------|---------| +| `GcpPricingTier.scala` | Continent mapping, egress type detection, pricing calculation | +| `GcpIpRangeLookup.scala` | GCP IP range fetching and CIDR trie lookup | +| `AccessLogEmitter.scala` | Log entry models and JSON emission | +| `DeltaSharingService.scala` | Integration for query/CDF endpoints | +| `ServerConfig.scala` | `AccessLoggingConfig` model | + +### Egress Bytes Calculation + +Sum of `size` from all file actions: `AddFile`, `AddFileForCDF`, `AddCDCFile`. + +--- + +## Notes + +- Logs emitted to `delta.sharing.access` logger +- Zero-byte requests not logged +- GCP IP ranges refreshed every 24 hours +- Pricing tiers match GCP documentation diff --git a/manifests/base/configmap.yaml b/manifests/base/configmap.yaml index 4f86d4b0a..e0112b525 100644 --- a/manifests/base/configmap.yaml +++ b/manifests/base/configmap.yaml @@ -26,3 +26,9 @@ data: queryTablePageSizeLimit: 10000 queryTablePageTokenTtlMs: 259200000 refreshTokenTtlMs: 3600000 + accessLogging: + enabled: true + sourceRegion: "us-central1" + detectGcpTraffic: true + clientRegionHeader: "x-client-region" + clientIpHeader: "x-forwarded-for" diff --git a/manifests/base/deployment.yaml b/manifests/base/deployment.yaml index ef0e83750..de0549c45 100644 --- a/manifests/base/deployment.yaml +++ b/manifests/base/deployment.yaml @@ -27,8 +27,8 @@ spec: - -c - | echo "Merging base config and shares config..." - # Replace bearer token placeholder in base config - sed "s/BEARER_TOKEN_PLACEHOLDER/${BEARER_TOKEN}/g" /base-config/base-config.yaml > /tmp/base.yaml + # Replace placeholders in base config + sed -e "s/BEARER_TOKEN_PLACEHOLDER/${BEARER_TOKEN}/g" -e "s/GCP_PROJECT_ID_PLACEHOLDER/${GCP_PROJECT_ID}/g" /base-config/base-config.yaml > /tmp/base.yaml # Merge base config with shares config yq eval-all 'select(fileIndex == 0) * select(fileIndex == 1)' /tmp/base.yaml /shares-config/shares-config.yaml > /shared-config/delta-sharing-server-config.yaml echo "Config merged successfully" @@ -39,6 +39,12 @@ spec: secretKeyRef: name: delta-sharing-auth key: bearer-token + - name: GCP_PROJECT_ID + valueFrom: + configMapKeyRef: + name: project-common + key: PROJECT_ID + optional: true volumeMounts: - name: base-config mountPath: /base-config @@ -54,6 +60,7 @@ spec: configMapKeyRef: name: project-common key: PROJECT_ID + optional: true - name: ZC_API_PROXY_DL_SHARING_BEARER valueFrom: secretKeyRef: diff --git a/manifests/zcloud-emea/configmap.yaml b/manifests/zcloud-emea/configmap.yaml new file mode 100644 index 000000000..ca48f1c4e --- /dev/null +++ b/manifests/zcloud-emea/configmap.yaml @@ -0,0 +1,29 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: delta-sharing-base-config + namespace: default +data: + base-config.yaml: | + version: 1 + authorization: + bearerToken: "BEARER_TOKEN_PLACEHOLDER" + host: "0.0.0.0" + port: 8080 + endpoint: "/delta-sharing" + preSignedUrlTimeoutSeconds: 3600 + deltaTableCacheSize: 100 + stalenessAcceptable: false + evaluatePredicateHints: false + evaluateJsonPredicateHints: true + evaluateJsonPredicateHintsV2: true + requestTimeoutSeconds: 180 + queryTablePageSizeLimit: 10000 + queryTablePageTokenTtlMs: 259200000 + refreshTokenTtlMs: 3600000 + accessLogging: + enabled: true + sourceRegion: "europe-west3" + detectGcpTraffic: true + clientRegionHeader: "x-client-region" + clientIpHeader: "x-forwarded-for" diff --git a/manifests/zcloud-emea/kustomization.yaml b/manifests/zcloud-emea/kustomization.yaml index 27bb5cb0b..cbec7e795 100644 --- a/manifests/zcloud-emea/kustomization.yaml +++ b/manifests/zcloud-emea/kustomization.yaml @@ -2,3 +2,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization resources: - ../base +patchesStrategicMerge: + - configmap.yaml diff --git a/manifests/zcloud-prod/configmap.yaml b/manifests/zcloud-prod/configmap.yaml index b67a515fe..155fd5738 100644 --- a/manifests/zcloud-prod/configmap.yaml +++ b/manifests/zcloud-prod/configmap.yaml @@ -21,3 +21,9 @@ data: queryTablePageSizeLimit: 10000 queryTablePageTokenTtlMs: 259200000 refreshTokenTtlMs: 3600000 + accessLogging: + enabled: true + sourceRegion: "us-central1" + detectGcpTraffic: true + clientRegionHeader: "x-client-region" + clientIpHeader: "x-forwarded-for" diff --git a/manifests/zcloud-prod2/configmap.yaml b/manifests/zcloud-prod2/configmap.yaml index b67a515fe..577806dd9 100644 --- a/manifests/zcloud-prod2/configmap.yaml +++ b/manifests/zcloud-prod2/configmap.yaml @@ -21,3 +21,9 @@ data: queryTablePageSizeLimit: 10000 queryTablePageTokenTtlMs: 259200000 refreshTokenTtlMs: 3600000 + accessLogging: + enabled: true + sourceRegion: "us-west4" + detectGcpTraffic: true + clientRegionHeader: "x-client-region" + clientIpHeader: "x-forwarded-for" diff --git a/manifests/zcloud-prod3/configmap.yaml b/manifests/zcloud-prod3/configmap.yaml new file mode 100644 index 000000000..62b112646 --- /dev/null +++ b/manifests/zcloud-prod3/configmap.yaml @@ -0,0 +1,29 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: delta-sharing-base-config + namespace: default +data: + base-config.yaml: | + version: 1 + authorization: + bearerToken: "BEARER_TOKEN_PLACEHOLDER" + host: "0.0.0.0" + port: 8080 + endpoint: "/delta-sharing" + preSignedUrlTimeoutSeconds: 3600 + deltaTableCacheSize: 100 + stalenessAcceptable: false + evaluatePredicateHints: false + evaluateJsonPredicateHints: true + evaluateJsonPredicateHintsV2: true + requestTimeoutSeconds: 180 + queryTablePageSizeLimit: 10000 + queryTablePageTokenTtlMs: 259200000 + refreshTokenTtlMs: 3600000 + accessLogging: + enabled: true + sourceRegion: "australia-southeast1" + detectGcpTraffic: true + clientRegionHeader: "x-client-region" + clientIpHeader: "x-forwarded-for" diff --git a/manifests/zcloud-prod3/kustomization.yaml b/manifests/zcloud-prod3/kustomization.yaml index 27bb5cb0b..cbec7e795 100644 --- a/manifests/zcloud-prod3/kustomization.yaml +++ b/manifests/zcloud-prod3/kustomization.yaml @@ -2,3 +2,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization resources: - ../base +patchesStrategicMerge: + - configmap.yaml diff --git a/manifests/zing-preview/configmap.yaml b/manifests/zing-preview/configmap.yaml index b67a515fe..155fd5738 100644 --- a/manifests/zing-preview/configmap.yaml +++ b/manifests/zing-preview/configmap.yaml @@ -21,3 +21,9 @@ data: queryTablePageSizeLimit: 10000 queryTablePageTokenTtlMs: 259200000 refreshTokenTtlMs: 3600000 + accessLogging: + enabled: true + sourceRegion: "us-central1" + detectGcpTraffic: true + clientRegionHeader: "x-client-region" + clientIpHeader: "x-forwarded-for" diff --git a/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala b/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala index 873e62386..01bb426f3 100644 --- a/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala +++ b/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala @@ -17,9 +17,11 @@ package io.delta.sharing.server import java.io.{ByteArrayOutputStream, File, FileNotFoundException} +import java.net.InetAddress import java.nio.charset.StandardCharsets.UTF_8 import java.nio.file.AccessDeniedException import java.security.MessageDigest +import java.util.Locale import java.util.concurrent.CompletableFuture import javax.annotation.Nullable @@ -33,6 +35,7 @@ import com.linecorp.armeria.server.{Server, ServiceRequestContext} import com.linecorp.armeria.server.annotation.{ConsumesJson, Default, ExceptionHandler, ExceptionHandlerFunction, Get, Head, Param, Post, ProducesJson} import com.linecorp.armeria.server.auth.AuthService import io.delta.kernel.exceptions.{KernelException, TableNotFoundException} +import io.delta.standalone.internal.{DeltaResponseSingleAction => StandaloneDeltaResponseSingleAction} import io.delta.standalone.internal.DeltaCDFErrors import io.delta.standalone.internal.DeltaCDFIllegalArgumentException import io.delta.standalone.internal.DeltaDataSource @@ -43,9 +46,12 @@ import org.slf4j.LoggerFactory import scalapb.json4s.Printer import io.delta.sharing.server.common.JsonUtils -import io.delta.sharing.server.config.ServerConfig -import io.delta.sharing.server.model.{QueryStatus, SingleAction} +import io.delta.sharing.server.config.{AccessLoggingConfig, ServerConfig} +import io.delta.sharing.server.model.{AddCDCFile, AddFile, AddFileForCDF, QueryStatus, RemoveFile, SingleAction} import io.delta.sharing.server.protocol._ +import io.delta.sharing.server.telemetry.{ + AccessLogEmitter, AccessLogEntry, GcpPricingTier, PricingContextLogEntry, RequestHeadersLogEntry +} object ErrorCode { val UNSUPPORTED_OPERATION = "UNSUPPORTED_OPERATION" @@ -189,6 +195,145 @@ class DeltaSharingService(serverConfig: ServerConfig) { private val deltaSharedTableLoader = new DeltaSharedTableLoader(serverConfig) private val logger = LoggerFactory.getLogger(classOf[DeltaSharingService]) + private val accessLogEmitter = AccessLogEmitter.create(serverConfig) + + private def extractRequestHeaders(req: HttpRequest): Map[String, String] = { + req.headers().names().asScala + .flatMap { name => + Option(req.headers().get(name)).map { value => + name.toString.toLowerCase(Locale.ROOT) -> value + } + } + .toMap + } + + private def emitQueryEgressMetric( + req: HttpRequest, + share: String, + schema: String, + table: String, + actions: Seq[Object]): Unit = { + if (!Option(serverConfig.getAccessLogging).exists(_.enabled)) { + return + } + + val bytes = DeltaSharingService.extractEgressBytes(actions) + if (bytes <= 0) { + return + } + + val nowMs = System.currentTimeMillis() + val headers = extractRequestHeaders(req) + val pricingCtx = DeltaSharingService.buildPricingContext( + headers, + serverConfig.getAccessLogging) + + // Emit access log with essential fields + val entry = AccessLogEntry( + share = share, + schema = schema, + table = table, + egressBytes = bytes, + timestampMs = nowMs, + pricingTier = pricingCtx.location.pricingTier, + clientRegion = pricingCtx.location.clientRegion, + requestType = AccessLogEmitter.QueryRequestType + ) + accessLogEmitter.record(entry) + + // Emit context log with full details for verification + val contextEntry = PricingContextLogEntry( + share = share, + table = table, + timestampMs = nowMs, + clientIp = pricingCtx.clientIp, + clientIpSource = pricingCtx.clientIpSource, + rawRegionHeader = pricingCtx.rawRegionHeader, + regionHeaderSource = pricingCtx.regionHeaderSource, + hasEnvoyMetadata = pricingCtx.hasEnvoyMetadata, + isGcpIp = pricingCtx.isGcpIp, + egressType = pricingCtx.egressType, + sourceRegion = pricingCtx.sourceRegion, + sourceContinent = pricingCtx.sourceContinent, + destinationRegion = pricingCtx.destinationRegion, + destinationContinent = pricingCtx.destinationContinent, + pricingTier = pricingCtx.location.pricingTier + ) + accessLogEmitter.recordContext(contextEntry) + + // Emit all request headers for debugging pricing tier detection + val headersEntry = RequestHeadersLogEntry( + share = share, + table = table, + timestampMs = nowMs, + headers = headers + ) + accessLogEmitter.recordHeaders(headersEntry) + } + + private def emitCdfEgressMetric( + req: HttpRequest, + share: String, + schema: String, + table: String, + actions: Seq[Object]): Unit = { + if (!Option(serverConfig.getAccessLogging).exists(_.enabled)) { + return + } + + val bytes = DeltaSharingService.extractEgressBytes(actions) + if (bytes <= 0) { + return + } + + val nowMs = System.currentTimeMillis() + val headers = extractRequestHeaders(req) + val pricingCtx = DeltaSharingService.buildPricingContext( + headers, + serverConfig.getAccessLogging) + + // Emit access log with essential fields + val entry = AccessLogEntry( + share = share, + schema = schema, + table = table, + egressBytes = bytes, + timestampMs = nowMs, + pricingTier = pricingCtx.location.pricingTier, + clientRegion = pricingCtx.location.clientRegion, + requestType = AccessLogEmitter.CdfStreamRequestType + ) + accessLogEmitter.record(entry) + + // Emit context log with full details for verification + val contextEntry = PricingContextLogEntry( + share = share, + table = table, + timestampMs = nowMs, + clientIp = pricingCtx.clientIp, + clientIpSource = pricingCtx.clientIpSource, + rawRegionHeader = pricingCtx.rawRegionHeader, + regionHeaderSource = pricingCtx.regionHeaderSource, + hasEnvoyMetadata = pricingCtx.hasEnvoyMetadata, + isGcpIp = pricingCtx.isGcpIp, + egressType = pricingCtx.egressType, + sourceRegion = pricingCtx.sourceRegion, + sourceContinent = pricingCtx.sourceContinent, + destinationRegion = pricingCtx.destinationRegion, + destinationContinent = pricingCtx.destinationContinent, + pricingTier = pricingCtx.location.pricingTier + ) + accessLogEmitter.recordContext(contextEntry) + + // Emit all request headers for debugging pricing tier detection + val headersEntry = RequestHeadersLogEntry( + share = share, + table = table, + timestampMs = nowMs, + headers = headers + ) + accessLogEmitter.recordHeaders(headersEntry) + } private def logCdfRequestComplete( wallStartNs: Long, @@ -475,6 +620,8 @@ class DeltaSharingService(serverConfig: ServerConfig) { ) } + emitQueryEgressMetric(req, share, schema, table, queryResult.actions) + streamingOutput(Some(queryResult.version), queryResult.responseFormat, queryResult.actions) } } @@ -625,6 +772,8 @@ class DeltaSharingService(serverConfig: ServerConfig) { s"You can only query table data since version ${tableConfig.startVersion}." ) } + + emitQueryEgressMetric(req, share, schema, table, queryResult.actions) logTableQueryComplete(start, share, schema, table, queryResult) streamingOutput( Some(queryResult.version), @@ -683,6 +832,7 @@ class DeltaSharingService(serverConfig: ServerConfig) { deltaLogUpdateNs = updateNs, requestTimeoutSecondsForLogging = Some(serverConfig.requestTimeoutSeconds) ) + emitCdfEgressMetric(req, share, schema, table, queryResult.actions) logCdfRequestComplete(start, share, schema, table, queryResult) streamingOutput( Some(queryResult.version), @@ -730,6 +880,9 @@ class DeltaSharingService(serverConfig: ServerConfig) { object DeltaSharingService { + // TEMPORARY: Logger for debugging headers - remove after testing X-Client-Region + private val logger = LoggerFactory.getLogger("delta.sharing.headers.debug") + val DELTA_TABLE_VERSION_HEADER = "Delta-Table-Version" val DELTA_TABLE_METADATA_CONTENT_TYPE = "application/x-ndjson; charset=utf-8" val DELTA_SHARING_CAPABILITIES_HEADER = "delta-sharing-capabilities" @@ -738,6 +891,309 @@ object DeltaSharingService { val DELTA_SHARING_INCLUDE_END_STREAM_ACTION = "includeendstreamaction" val DELTA_SHARING_READER_FEATURES = "readerfeatures" val DELTA_SHARING_CAPABILITIES_DELIMITER = ";" + private val DefaultRegionHeaders = Seq( + "x-client-region", + "x-appengine-country", + "cf-ipcountry", + "cloudfront-viewer-country") + private val DefaultIpHeaders = Seq( + "x-forwarded-for", + "x-envoy-external-address") + private val DefaultPricingGroupsByRegion = Map( + "US" -> "na_eu", + "CA" -> "na_eu", + "MX" -> "na_eu", + "GB" -> "na_eu", + "IE" -> "na_eu", + "DE" -> "na_eu", + "FR" -> "na_eu", + "NL" -> "na_eu", + "BE" -> "na_eu", + "CH" -> "na_eu", + "AT" -> "na_eu", + "ES" -> "na_eu", + "PT" -> "na_eu", + "IT" -> "na_eu", + "SE" -> "na_eu", + "NO" -> "na_eu", + "DK" -> "na_eu", + "FI" -> "na_eu", + "PL" -> "na_eu", + "CZ" -> "na_eu", + "HU" -> "na_eu", + "RO" -> "na_eu", + "JP" -> "apac", + "KR" -> "apac", + "IN" -> "apac", + "SG" -> "apac", + "HK" -> "apac", + "TW" -> "apac", + "ID" -> "apac", + "MY" -> "apac", + "PH" -> "apac", + "TH" -> "apac", + "VN" -> "apac", + "AU" -> "apac", + "NZ" -> "apac", + "BR" -> "latam", + "AR" -> "latam", + "CL" -> "latam", + "CO" -> "latam", + "PE" -> "latam") + + /** + * Simplified location context for egress pricing. + * @param clientRegion ISO country code (e.g., "US", "MT") from X-Client-Region header + * @param pricingTier GCP egress pricing tier (see GcpPricingTier for values) + */ + private[server] case class ClientLocationContext( + clientRegion: Option[String], + pricingTier: String) + + /** + * Full context for pricing tier calculation, used for debugging/verification. + */ + private[server] case class PricingContext( + location: ClientLocationContext, + clientIp: Option[String], + clientIpSource: Option[String], + rawRegionHeader: Option[String], + regionHeaderSource: Option[String], + hasEnvoyMetadata: Boolean, + isGcpIp: Boolean, + egressType: String, + sourceRegion: Option[String], + sourceContinent: Option[String], + destinationRegion: Option[String], + destinationContinent: Option[String]) + + private[server] def extractEgressBytes(actions: Seq[Object]): Long = { + actions.map(extractActionBytes).sum + } + + private[server] def buildClientLocationContext( + req: HttpRequest, + accessLoggingConfig: AccessLoggingConfig): ClientLocationContext = { + buildPricingContext(req, accessLoggingConfig).location + } + + private[server] def buildPricingContext( + req: HttpRequest, + accessLoggingConfig: AccessLoggingConfig): PricingContext = { + val headerMap: Map[String, String] = req.headers().names().asScala + .flatMap { name => + Option(req.headers().get(name)).map { value => + name.toString.toLowerCase(Locale.ROOT) -> value + } + } + .toMap + + buildPricingContext(headerMap, accessLoggingConfig) + } + + private[server] def buildClientLocationContext( + requestHeaders: Map[String, String], + accessLoggingConfig: AccessLoggingConfig): ClientLocationContext = { + buildPricingContext(requestHeaders, accessLoggingConfig).location + } + + private[server] def buildPricingContext( + requestHeaders: Map[String, String], + accessLoggingConfig: AccessLoggingConfig): PricingContext = { + val cfgOpt = Option(accessLoggingConfig) + val normalizedHeaders = requestHeaders.map { case (k, v) => + k.toLowerCase(Locale.ROOT) -> v + } + + // Extract client region from headers + val regionHeaders = getOrderedHeaders( + cfgOpt.flatMap(c => Option(c.getClientRegionHeader)), + DefaultRegionHeaders) + val ipHeaders = getOrderedHeaders( + cfgOpt.flatMap(c => Option(c.getClientIpHeader)), + DefaultIpHeaders) + + val regionWithSource = getFirstLocation(regionHeaders, normalizedHeaders) + val ipWithSource = getFirstClientIp(ipHeaders, normalizedHeaders) + + val region = regionWithSource.map(_._1) + val rawRegion = regionWithSource.map { case (normalized, header) => + normalizedHeaders.getOrElse(header, normalized) + } + val regionSource = regionWithSource.map(_._2) + val clientIp = ipWithSource.map(_._1) + val clientIpSource = ipWithSource.map(_._2) + + // Check if client IP is in GCP public ranges + val isGcpIp = clientIp.exists(GcpPricingTier.isGcpPublicIp) + + // GCP Pricing Tier calculation + val sourceRegion = cfgOpt + .flatMap(c => Option(c.getSourceRegion)) + .map(_.trim) + .filter(_.nonEmpty) + + val detectGcpTraffic = cfgOpt + .map(c => c.getDetectGcpTraffic) + .getOrElse(true) + + val envoyPeerMetadata = normalizedHeaders.get("x-envoy-peer-metadata") + + val (egressType, destGcpRegion) = GcpPricingTier.determineEgressType( + clientIp, + envoyPeerMetadata, + region, + detectGcpTraffic, + sourceRegion) + + // Calculate continents for context logging + val sourceContinent = sourceRegion.map(GcpPricingTier.continentFromGcpRegion) + val destContinent = destGcpRegion.map(GcpPricingTier.continentFromGcpRegion) + .orElse(region.map(GcpPricingTier.continentFromCountryCode)) + + // Calculate pricing tier: + // - For internet traffic: based on destination only (no sourceRegion needed) + // - For inter-region traffic: requires sourceRegion + // - For same_zone/same_region: tier is the egress type itself + val pricingTier = egressType match { + case GcpPricingTier.EgressType.INTERNET => + GcpPricingTier.calculatePricingTier( + sourceRegion, destGcpRegion, region, egressType) + case GcpPricingTier.EgressType.SAME_ZONE | + GcpPricingTier.EgressType.SAME_REGION => + egressType.toString + case GcpPricingTier.EgressType.INTER_REGION if sourceRegion.isDefined => + GcpPricingTier.calculatePricingTier( + sourceRegion, destGcpRegion, region, egressType) + case GcpPricingTier.EgressType.INTER_REGION => + // Inter-region detected but no source region configured - use placeholder + "interregion_unknown" + case _ => + "unknown" + } + + PricingContext( + location = ClientLocationContext( + clientRegion = region, + pricingTier = pricingTier), + clientIp = clientIp, + clientIpSource = clientIpSource, + rawRegionHeader = rawRegion, + regionHeaderSource = regionSource, + hasEnvoyMetadata = envoyPeerMetadata.isDefined, + isGcpIp = isGcpIp, + egressType = egressType.toString, + sourceRegion = sourceRegion, + sourceContinent = sourceContinent.map(_.toString), + destinationRegion = destGcpRegion, + destinationContinent = destContinent.map(_.toString)) + } + + private def getOrderedHeaders( + configuredHeader: Option[String], + defaultHeaders: Seq[String]): Seq[String] = { + (configuredHeader.toSeq ++ defaultHeaders) + .map(_.trim.toLowerCase(Locale.ROOT)) + .filter(_.nonEmpty) + .distinct + } + + private def getFirstLocation( + headers: Seq[String], + requestHeaders: Map[String, String]): Option[(String, String)] = { + headers.iterator + .flatMap { h => + requestHeaders.get(h) + .map(v => normalizeLocationCode(v) -> h) + } + .find(_._1.nonEmpty) + } + + private def getFirstClientIp( + headers: Seq[String], + requestHeaders: Map[String, String]): Option[(String, String)] = { + headers.iterator + .flatMap { h => + requestHeaders.get(h) + .flatMap(v => extractClientIp(v).map(_ -> h)) + } + .toSeq + .headOption + } + + private def extractClientIp(value: String): Option[String] = { + val candidates = value.split(",").map(_.trim).filter(_.nonEmpty) + val valid = candidates.filter(isValidIp) + valid.find(isPublicIp).orElse(valid.headOption) + } + + private def isValidIp(ip: String): Boolean = { + // Validate IP format without DNS resolution to avoid latency and security issues + val ipv4Pattern = """^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$""".r + val ipv6Pattern = """^([0-9a-fA-F:]+)$""".r + + ip match { + case ipv4Pattern(a, b, c, d) => + Seq(a, b, c, d).forall { octet => + val n = octet.toInt + n >= 0 && n <= 255 + } + case ipv6Pattern(_) => + try { + // For IPv6, use InetAddress but only for format validation + // This won't trigger DNS since it matches the IPv6 pattern + InetAddress.getByName(ip) + true + } catch { + case _: Throwable => false + } + case _ => false + } + } + + private def isPublicIp(ip: String): Boolean = { + try { + val inet = InetAddress.getByName(ip) + !inet.isAnyLocalAddress && + !inet.isLoopbackAddress && + !inet.isLinkLocalAddress && + !inet.isSiteLocalAddress && + !inet.isMulticastAddress + } catch { + case _: Throwable => false + } + } + + private def normalizeLocationCode(value: String): String = { + value.trim.toUpperCase(Locale.ROOT).replaceAll("[^A-Z0-9*]", "") + } + + private def extractActionBytes(action: Object): Long = { + action match { + case singleAction: SingleAction => + val raw = singleAction.unwrap + raw match { + case addFile: AddFile => addFile.size + case addFileForCDF: AddFileForCDF => addFileForCDF.size + case addCDCFile: AddCDCFile => addCDCFile.size + case _ => 0L + } + case deltaResponse: StandaloneDeltaResponseSingleAction => + Option(deltaResponse.file) + .flatMap(f => Option(f.deltaSingleAction)) + .map { deltaAction => + if (deltaAction.add != null) { + deltaAction.add.size + } else if (deltaAction.cdc != null) { + deltaAction.cdc.size + } else { + 0L + } + } + .getOrElse(0L) + case _ => 0L + } + } private val parser = { val parser = ArgumentParsers diff --git a/server/src/main/scala/io/delta/sharing/server/config/ServerConfig.scala b/server/src/main/scala/io/delta/sharing/server/config/ServerConfig.scala index 76eb63526..3cfd15df0 100644 --- a/server/src/main/scala/io/delta/sharing/server/config/ServerConfig.scala +++ b/server/src/main/scala/io/delta/sharing/server/config/ServerConfig.scala @@ -67,7 +67,9 @@ case class ServerConfig( // The TTL of the refresh token generated in queryTable API (in milliseconds). @BeanProperty var refreshTokenTtlMs: Int, // Whether to emit performance/timing log lines for table queries and CDF requests. - @BeanProperty var perfLoggingEnabled: Boolean + @BeanProperty var perfLoggingEnabled: Boolean, + // Access logging configuration for tracking share data egress via structured logs. + @BeanProperty var accessLogging: AccessLoggingConfig ) extends ConfigItem { import ServerConfig._ @@ -91,7 +93,8 @@ case class ServerConfig( queryTablePageSizeLimit = 10000, queryTablePageTokenTtlMs = 259200000, // 3 days refreshTokenTtlMs = 3600000, // 1 hour - perfLoggingEnabled = true + perfLoggingEnabled = true, + accessLogging = null ) } @@ -122,6 +125,47 @@ case class ServerConfig( if (ssl != null) { ssl.checkConfig() } + if (accessLogging != null) { + accessLogging.checkConfig() + } + } +} + +/** + * Configuration for access logging to track share data egress. + * When enabled, structured JSON logs are emitted for each data access. + */ +case class AccessLoggingConfig( + @BeanProperty var enabled: Boolean, + // Header that contains the client region code (for example: US, DE). + @BeanProperty var clientRegionHeader: String, + // Header that contains client IP or forwarding chain (for example: X-Forwarded-For). + @BeanProperty var clientIpHeader: String, + // Optional mapping from region codes to pricing group labels. + // Keys should be uppercase location codes (for example: US, DE). + // A wildcard key "*" can be used as a catch-all default. + // NOTE: pricingGroups is reserved for future use and not currently applied. + @BeanProperty var pricingGroups: java.util.Map[String, String], + // The GCP region where this server runs (for example: us-central1). + // Used for pricing tier calculation based on source→destination pairs. + @BeanProperty var sourceRegion: String, + // Enable GCP traffic detection using GCP's published IP ranges (cloud.json). + // When true, client IPs belonging to other GCP regions can be classified as inter-region + // (cheaper) rather than internet egress. Set to false to disable this detection. + @BeanProperty var detectGcpTraffic: Boolean) extends ConfigItem { + + def this() = { + this( + enabled = false, + clientRegionHeader = "x-client-region", + clientIpHeader = "x-forwarded-for", + pricingGroups = Collections.emptyMap(), + sourceRegion = "", + detectGcpTraffic = true) + } + + override def checkConfig(): Unit = { + // No required fields to validate } } @@ -238,7 +282,7 @@ case class TableConfig( @BeanProperty var location: String, @BeanProperty var id: String = "", @BeanProperty var historyShared: Boolean = false, - @BeanProperty var startVersion: Long = 0) extends ConfigItem { + @BeanProperty var startVersion: Long = 0) extends ConfigItem { def this() { this(null, null, null) diff --git a/server/src/main/scala/io/delta/sharing/server/telemetry/AccessLogEmitter.scala b/server/src/main/scala/io/delta/sharing/server/telemetry/AccessLogEmitter.scala new file mode 100644 index 000000000..c9ee3ecb5 --- /dev/null +++ b/server/src/main/scala/io/delta/sharing/server/telemetry/AccessLogEmitter.scala @@ -0,0 +1,274 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.sharing.server.telemetry + +import java.util.Locale + +import org.slf4j.LoggerFactory + +import io.delta.sharing.server.common.JsonUtils +import io.delta.sharing.server.config.ServerConfig + +/** + * Represents a single access log entry for share data egress tracking. + * + * == Core Fields (always present) == + * @param share The name of the share being accessed + * @param schema The schema containing the table + * @param table The table being accessed + * @param egressBytes The number of bytes transferred (used for cost calculation) + * @param timestampMs The timestamp of the access in milliseconds since epoch + * + * == Pricing Fields == + * @param pricingTier GCP egress pricing tier. See GcpPricingTier for values: + * + * '''Internet Egress (Premium Tier):''' + * - `internet_to_na_eu` - To North America or Europe (~$0.12/GiB) + * - `internet_to_apac` - To Asia Pacific (~$0.12/GiB) + * - `internet_to_latam` - To Latin America (~$0.19/GiB) + * - `internet_to_oceania` - To Australia/Oceania (~$0.15/GiB) + * + * '''Inter-region GCP Traffic:''' + * - `interregion_na_to_na` - NA to NA (~$0.02/GiB) + * - `interregion_na_to_eu` - NA to EU (~$0.05/GiB) + * - `interregion_eu_to_na` - EU to NA (~$0.05/GiB) + * - `interregion_eu_to_eu` - EU to EU (~$0.02/GiB) + * - `interregion_to_apac` - Any to APAC (~$0.08/GiB) + * - `interregion_to_oceania` - Any to Oceania (~$0.10/GiB) + * - `interregion_to_latam` - Any to LATAM (~$0.14/GiB) + * + * '''Free/Low-cost:''' + * - `same_zone` - Within same GCP zone (free) + * - `same_region` - Within same region or K8s cluster (~free) + * + * - `unknown` - Could not determine pricing tier + * + * Pricing reference: https://cloud.google.com/vpc/network-pricing + * + * == Optional Context == + * @param clientRegion ISO 3166-1 alpha-2 country code of the client (e.g., "US", "MT") + * @param requestType Type of request: "query" (snapshot read) or "cdf_stream" (CDF streaming) + */ +case class AccessLogEntry( + share: String, + schema: String, + table: String, + egressBytes: Long, + timestampMs: Long, + pricingTier: String = "unknown", + clientRegion: Option[String] = None, + requestType: String = "query") + +/** + * Captures all context information used to calculate the pricing tier. + * Emitted alongside ACCESS_LOG for debugging and accuracy verification. + * + * @param share The share being accessed (for correlation) + * @param table The table being accessed (for correlation) + * @param timestampMs Timestamp for correlation with ACCESS_LOG + * + * == Raw Header Values == + * @param clientIp First non-private, non-GCP IP from X-Forwarded-For chain + * @param clientIpSource Header that provided the client IP (e.g., "x-forwarded-for") + * @param rawRegionHeader Raw value from region header (before normalization) + * @param regionHeaderSource Header that provided the region (e.g., "x-client-region") + * @param hasEnvoyMetadata Whether X-Envoy-Peer-Metadata header was present + * @param isGcpIp Whether the client IP is in a known GCP public IP range (34.x, 35.x) + * + * == Derived Values == + * @param egressType Detected egress type: internet, inter_region, same_region, same_zone + * @param sourceRegion GCP region where server runs (from config) + * @param sourceContinent Continent of source region + * @param destinationRegion GCP region of destination (if detected from Envoy metadata) + * @param destinationContinent Continent of destination + * @param pricingTier Final calculated pricing tier + */ +case class PricingContextLogEntry( + share: String, + table: String, + timestampMs: Long, + // Raw header values + clientIp: Option[String] = None, + clientIpSource: Option[String] = None, + rawRegionHeader: Option[String] = None, + regionHeaderSource: Option[String] = None, + hasEnvoyMetadata: Boolean = false, + isGcpIp: Boolean = false, + // Derived values + egressType: String = "unknown", + sourceRegion: Option[String] = None, + sourceContinent: Option[String] = None, + destinationRegion: Option[String] = None, + destinationContinent: Option[String] = None, + pricingTier: String = "unknown") + +/** + * Log entry containing all request headers for debugging pricing tier detection. + * Emitted alongside PRICING_CONTEXT when header debugging is needed. + * + * @param share Share name for correlation + * @param table Table name for correlation + * @param timestampMs Timestamp for correlation with ACCESS_LOG + * @param headers All request headers (keys lowercased) + */ +case class RequestHeadersLogEntry( + share: String, + table: String, + timestampMs: Long, + headers: Map[String, String]) + +/** + * Trait for emitting access logs for share data egress tracking. + * Implementations can write to different destinations (logging, external services, etc.) + */ +trait AccessLogEmitter { + def record(entry: AccessLogEntry): Unit + def recordContext(entry: PricingContextLogEntry): Unit + def recordHeaders(entry: RequestHeadersLogEntry): Unit +} + +object AccessLogEmitter { + val QueryRequestType = "query" + val CdfStreamRequestType = "cdf_stream" + + /** + * Creates an AccessLogEmitter based on server configuration. + * Returns a JsonAccessLogEmitter if access logging is enabled, otherwise a no-op emitter. + */ + def create(serverConfig: ServerConfig): AccessLogEmitter = { + val cfg = Option(serverConfig.getAccessLogging) + cfg match { + case Some(c) if c.enabled => new JsonAccessLogEmitter() + case _ => NoopAccessLogEmitter + } + } +} + +/** + * No-op implementation for when access logging is disabled. + */ +object NoopAccessLogEmitter extends AccessLogEmitter { + override def record(entry: AccessLogEntry): Unit = {} + override def recordContext(entry: PricingContextLogEntry): Unit = {} + override def recordHeaders(entry: RequestHeadersLogEntry): Unit = {} +} + +/** + * Emits access log entries as JSON-structured log lines. + * Uses a dedicated logger that can be filtered/routed separately in Cloud Logging. + */ +class JsonAccessLogEmitter extends AccessLogEmitter { + // Dedicated logger for access logs - can be filtered in Cloud Logging by logger name + private val logger = LoggerFactory.getLogger("delta.sharing.access") + + override def record(entry: AccessLogEntry): Unit = { + if (entry.egressBytes <= 0) { + return + } + + // Core payload with essential fields for cost tracking + val basePayload = Map( + "logType" -> "ACCESS_LOG", + "share" -> entry.share, + "schema" -> entry.schema, + "table" -> entry.table, + "egressBytes" -> entry.egressBytes, + "pricingTier" -> entry.pricingTier, + "timestampMs" -> entry.timestampMs, + "requestType" -> entry.requestType + ) + + // Optional context fields + val contextPayload = Seq( + entry.clientRegion.map("clientRegion" -> _) + ).flatten.toMap + + val logPayload = basePayload ++ contextPayload + + logger.info(JsonUtils.toJson(logPayload)) + } + + override def recordContext(entry: PricingContextLogEntry): Unit = { + // Core payload for correlation + val basePayload = Map( + "logType" -> "PRICING_CONTEXT", + "share" -> entry.share, + "table" -> entry.table, + "timestampMs" -> entry.timestampMs, + "egressType" -> entry.egressType, + "pricingTier" -> entry.pricingTier, + "hasEnvoyMetadata" -> entry.hasEnvoyMetadata, + "isGcpIp" -> entry.isGcpIp + ) + + // Optional context fields - only include if present + val optionalPayload = Seq( + entry.clientIp.map("clientIp" -> _), + entry.clientIpSource.map("clientIpSource" -> _), + entry.rawRegionHeader.map("rawRegionHeader" -> _), + entry.regionHeaderSource.map("regionHeaderSource" -> _), + entry.sourceRegion.map("sourceRegion" -> _), + entry.sourceContinent.map("sourceContinent" -> _), + entry.destinationRegion.map("destinationRegion" -> _), + entry.destinationContinent.map("destinationContinent" -> _) + ).flatten.toMap + + val logPayload = basePayload ++ optionalPayload + + // Use DEBUG level for context logs to reduce volume/costs - only enable intentionally + if (logger.isDebugEnabled) { + logger.debug(JsonUtils.toJson(logPayload)) + } + } + + // Headers that should be redacted to prevent leaking secrets/PII + private val sensitiveHeaders = Set( + "authorization", + "cookie", + "set-cookie", + "x-api-key", + "x-auth-token", + "proxy-authorization", + "www-authenticate", + "x-csrf-token", + "x-xsrf-token" + ) + + override def recordHeaders(entry: RequestHeadersLogEntry): Unit = { + // Redact sensitive headers to prevent leaking secrets/PII + val redactedHeaders = entry.headers.map { case (key, value) => + if (sensitiveHeaders.contains(key.toLowerCase(Locale.ROOT))) { + key -> "[REDACTED]" + } else { + key -> value + } + } + + val logPayload = Map( + "logType" -> "REQUEST_HEADERS", + "share" -> entry.share, + "table" -> entry.table, + "timestampMs" -> entry.timestampMs, + "headers" -> redactedHeaders + ) + + // Use DEBUG level for header logs - high-volume and can leak sensitive data + if (logger.isDebugEnabled) { + logger.debug(JsonUtils.toJson(logPayload)) + } + } +} diff --git a/server/src/main/scala/io/delta/sharing/server/telemetry/GcpIpRangeLookup.scala b/server/src/main/scala/io/delta/sharing/server/telemetry/GcpIpRangeLookup.scala new file mode 100644 index 000000000..780870d59 --- /dev/null +++ b/server/src/main/scala/io/delta/sharing/server/telemetry/GcpIpRangeLookup.scala @@ -0,0 +1,334 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.sharing.server.telemetry + +import java.net.{HttpURLConnection, InetAddress, URL} +import java.util.concurrent.{Executors, TimeUnit} +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable +import scala.util.{Failure, Success, Try} +import scala.util.control.NonFatal + +import org.slf4j.LoggerFactory + +import io.delta.sharing.server.common.JsonUtils + +/** + * GCP IP Range lookup utility. + * + * Fetches GCP's published IP ranges from https://www.gstatic.com/ipranges/cloud.json + * and provides efficient IP-to-region lookup using a CIDR trie. + * + * The IP ranges are refreshed periodically (default: every 24 hours) to pick up + * any new allocations from GCP. + */ +object GcpIpRangeLookup { + + private val logger = LoggerFactory.getLogger("delta.sharing.gcp.ip.lookup") + + private val GCP_IP_RANGES_URL = "https://www.gstatic.com/ipranges/cloud.json" + private val REFRESH_INTERVAL_HOURS = 24 + private val CONNECTION_TIMEOUT_MS = 10000 + private val READ_TIMEOUT_MS = 30000 + + /** + * Case class representing a single IP prefix entry from GCP's JSON. + */ + case class IpPrefix( + ipv4Prefix: Option[String] = None, + ipv6Prefix: Option[String] = None, + service: String = "", + scope: String = "" + ) + + /** + * Case class representing the full GCP IP ranges response. + */ + case class GcpIpRanges( + syncToken: String = "", + creationTime: String = "", + prefixes: Seq[IpPrefix] = Seq.empty + ) + + /** + * A node in the CIDR trie for efficient IP-to-region lookup. + */ + private class CidrTrieNode { + var region: Option[String] = None + var children: Array[CidrTrieNode] = null + + def getOrCreateChild(bit: Int): CidrTrieNode = { + if (children == null) { + children = new Array[CidrTrieNode](2) + } + if (children(bit) == null) { + children(bit) = new CidrTrieNode() + } + children(bit) + } + + def getChild(bit: Int): Option[CidrTrieNode] = { + if (children == null) None + else Option(children(bit)) + } + } + + /** + * CIDR Trie for efficient longest-prefix-match lookups. + */ + private class CidrTrie { + private val root = new CidrTrieNode() + + /** + * Insert a CIDR block with its associated region. + */ + def insert(cidr: String, region: String): Unit = { + parseCidr(cidr).foreach { case (ipBytes, prefixLen) => + var node = root + for (i <- 0 until prefixLen) { + val byteIndex = i / 8 + val bitIndex = 7 - (i % 8) + val bit = (ipBytes(byteIndex) >> bitIndex) & 1 + node = node.getOrCreateChild(bit) + } + node.region = Some(region) + } + } + + /** + * Look up the region for an IP address using longest-prefix-match. + */ + def lookup(ip: String): Option[String] = { + parseIp(ip).flatMap { ipBytes => + var node = root + var lastMatch: Option[String] = None + var i = 0 + val maxBits = ipBytes.length * 8 + + while (i < maxBits && node != null) { + // Record the region if this node has one (longest prefix match) + if (node.region.isDefined) { + lastMatch = node.region + } + + val byteIndex = i / 8 + val bitIndex = 7 - (i % 8) + val bit = (ipBytes(byteIndex) >> bitIndex) & 1 + + node.getChild(bit) match { + case Some(child) => + node = child + i += 1 + case None => + node = null + } + } + + // Check if final node has a region + if (node != null && node.region.isDefined) { + lastMatch = node.region + } + + lastMatch + } + } + + /** + * Parse a CIDR notation string into (ip bytes, prefix length). + * Validates that the prefix length is valid for the address type. + */ + private def parseCidr(cidr: String): Option[(Array[Byte], Int)] = { + Try { + val parts = cidr.split("/") + val ipBytes = InetAddress.getByName(parts(0)).getAddress + val prefixLen = parts(1).toInt + val maxPrefixLen = ipBytes.length * 8 // 32 for IPv4, 128 for IPv6 + if (prefixLen < 0 || prefixLen > maxPrefixLen) { + throw new IllegalArgumentException( + s"Invalid prefix length $prefixLen for ${ipBytes.length * 8}-bit address") + } + (ipBytes, prefixLen) + }.toOption + } + + /** + * Parse an IP address string into bytes. + */ + private def parseIp(ip: String): Option[Array[Byte]] = { + Try(InetAddress.getByName(ip).getAddress).toOption + } + } + + // The cached trie, atomically updated + private val cachedTrie = new AtomicReference[CidrTrie](new CidrTrie()) + private val lastRefreshTime = new AtomicReference[Long](0L) + private val isInitialized = new AtomicReference[Boolean](false) + + // Background refresh scheduler + private lazy val scheduler = { + val s = Executors.newSingleThreadScheduledExecutor((r: Runnable) => { + val t = new Thread(r, "gcp-ip-range-refresh") + t.setDaemon(true) + t + }) + s.scheduleAtFixedRate( + () => refresh(), + REFRESH_INTERVAL_HOURS, + REFRESH_INTERVAL_HOURS, + TimeUnit.HOURS + ) + s + } + + /** + * Look up the GCP region for a given IP address. + * + * @param ip The IP address to look up (IPv4 or IPv6) + * @return Optional GCP region (e.g., "us-central1", "europe-west1") + */ + def lookupRegion(ip: String): Option[String] = { + ensureInitialized() + cachedTrie.get().lookup(ip) + } + + /** + * Check if an IP is in any GCP range. + */ + def isGcpIp(ip: String): Boolean = { + lookupRegion(ip).isDefined + } + + /** + * Ensure the IP ranges are loaded at least once. + * Initialization is async to avoid blocking the request path. + */ + def ensureInitialized(): Unit = { + if (!isInitialized.get()) { + synchronized { + if (!isInitialized.get()) { + // Mark as initialized immediately with empty trie to avoid blocking + // The background thread will populate it async + isInitialized.set(true) + // Start async refresh in background thread + scheduler.execute(() => refresh()) + } + } + } + } + + /** + * Refresh the IP ranges from GCP's endpoint. + * This is called automatically on a schedule, but can also be called manually. + */ + def refresh(): Unit = { + try { + fetchAndParse() match { + case Success(ranges) => + val newTrie = buildTrie(ranges) + cachedTrie.set(newTrie) + lastRefreshTime.set(System.currentTimeMillis()) + isInitialized.set(true) + logger.info(s"Loaded ${ranges.prefixes.size} IP ranges (syncToken: ${ranges.syncToken})") + case Failure(e) => + logger.warn(s"Failed to refresh IP ranges: ${e.getMessage}") + // Keep using the old trie if we have one + if (!isInitialized.get()) { + // First-time failure - create empty trie + cachedTrie.set(new CidrTrie()) + isInitialized.set(true) + } + } + } catch { + case NonFatal(e) => + logger.error(s"Unexpected error during refresh: ${e.getMessage}") + } + } + + /** + * Fetch and parse the GCP IP ranges JSON. + */ + private def fetchAndParse(): Try[GcpIpRanges] = { + Try { + val url = new URL(GCP_IP_RANGES_URL) + val conn = url.openConnection().asInstanceOf[HttpURLConnection] + conn.setConnectTimeout(CONNECTION_TIMEOUT_MS) + conn.setReadTimeout(READ_TIMEOUT_MS) + conn.setRequestProperty("Accept", "application/json") + + try { + val responseCode = conn.getResponseCode + if (responseCode != 200) { + throw new RuntimeException(s"HTTP $responseCode from GCP IP ranges endpoint") + } + + val inputStream = conn.getInputStream + val content = scala.io.Source.fromInputStream(inputStream).mkString + inputStream.close() + + JsonUtils.fromJson[GcpIpRanges](content) + } finally { + conn.disconnect() + } + } + } + + /** + * Build a CIDR trie from the parsed IP ranges. + */ + private def buildTrie(ranges: GcpIpRanges): CidrTrie = { + val trie = new CidrTrie() + + for (prefix <- ranges.prefixes) { + // Only include "Google Cloud" service entries with a valid scope + // Skip "global" scope as it doesn't help with region detection + if (prefix.service == "Google Cloud" && prefix.scope.nonEmpty && + prefix.scope != "global") { + prefix.ipv4Prefix.foreach(cidr => trie.insert(cidr, prefix.scope)) + prefix.ipv6Prefix.foreach(cidr => trie.insert(cidr, prefix.scope)) + } + } + + trie + } + + /** + * Get the time of the last successful refresh, in milliseconds since epoch. + * Returns 0 if never refreshed. + */ + def getLastRefreshTime: Long = lastRefreshTime.get() + + /** + * For testing: load IP ranges from a pre-fetched JSON string. + */ + def loadFromJson(json: String): Unit = { + val ranges = JsonUtils.fromJson[GcpIpRanges](json) + val newTrie = buildTrie(ranges) + cachedTrie.set(newTrie) + lastRefreshTime.set(System.currentTimeMillis()) + isInitialized.set(true) + } + + /** + * For testing: reset to uninitialized state. + */ + def reset(): Unit = { + cachedTrie.set(new CidrTrie()) + lastRefreshTime.set(0L) + isInitialized.set(false) + } +} diff --git a/server/src/main/scala/io/delta/sharing/server/telemetry/GcpPricingTier.scala b/server/src/main/scala/io/delta/sharing/server/telemetry/GcpPricingTier.scala new file mode 100644 index 000000000..1fa1aada5 --- /dev/null +++ b/server/src/main/scala/io/delta/sharing/server/telemetry/GcpPricingTier.scala @@ -0,0 +1,417 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.sharing.server.telemetry + +import java.util.{Base64, Locale} + +import scala.util.Try + +/** + * GCP pricing tier calculation for egress traffic. + * + * Implements classification of network egress into GCP pricing tiers based on: + * - Source region (where the data originates, e.g., GCS bucket location) + * - Destination region (where the client is located) + * - Traffic type (same-zone, inter-region GCP, or internet) + * + * Pricing tiers are based on: + * https://cloud.google.com/vpc/pricing-announce + */ +object GcpPricingTier { + + /** + * Continent groupings for GCP regions and country codes. + */ + object Continent extends Enumeration { + type Continent = Value + val NA, EU, APAC, LATAM, OCEANIA = Value + val UNKNOWN_CONTINENT: Continent = Value("UNKNOWN") + } + + import Continent._ + + /** + * Traffic types for egress pricing. + */ + object EgressType extends Enumeration { + type EgressType = Value + val SAME_ZONE = Value("same_zone") + val SAME_REGION = Value("same_region") + val INTER_REGION = Value("inter_region") + val INTERNET = Value("internet") + val UNKNOWN_TYPE: EgressType = Value("unknown") + } + + import EgressType._ + + /** + * Map GCP region prefix to continent. + * GCP region format: {location}-{zone_letter}{zone_number} (e.g., us-central1-a) + * We extract the location prefix. + */ + private val GcpRegionToContinent: Map[String, Continent] = Map( + // North America + "us" -> NA, + "northamerica" -> NA, + // Europe + "europe" -> EU, + // Asia Pacific + "asia" -> APAC, + // South America + "southamerica" -> LATAM, + // Australia / Oceania + "australia" -> OCEANIA + ) + + /** + * Map ISO 3166-1 alpha-2 country codes to continents. + * This covers the major countries; for others, we default to UNKNOWN. + */ + private val CountryToContinent: Map[String, Continent] = Map( + // North America + "US" -> NA, "CA" -> NA, "MX" -> NA, + // Europe + "GB" -> EU, "IE" -> EU, "DE" -> EU, "FR" -> EU, "NL" -> EU, "BE" -> EU, + "CH" -> EU, "AT" -> EU, "ES" -> EU, "PT" -> EU, "IT" -> EU, "SE" -> EU, + "NO" -> EU, "DK" -> EU, "FI" -> EU, "PL" -> EU, "CZ" -> EU, "HU" -> EU, + "RO" -> EU, "BG" -> EU, "GR" -> EU, "HR" -> EU, "SK" -> EU, "SI" -> EU, + "EE" -> EU, "LV" -> EU, "LT" -> EU, "LU" -> EU, "MT" -> EU, "CY" -> EU, + "IS" -> EU, "UA" -> EU, "BY" -> EU, "RU" -> EU, + // Asia Pacific + "JP" -> APAC, "KR" -> APAC, "CN" -> APAC, "HK" -> APAC, "TW" -> APAC, + "SG" -> APAC, "MY" -> APAC, "ID" -> APAC, "TH" -> APAC, "VN" -> APAC, + "PH" -> APAC, "IN" -> APAC, "PK" -> APAC, "BD" -> APAC, + // Oceania + "AU" -> OCEANIA, "NZ" -> OCEANIA, + // Latin America + "BR" -> LATAM, "AR" -> LATAM, "CL" -> LATAM, "CO" -> LATAM, "PE" -> LATAM, + "VE" -> LATAM, "EC" -> LATAM, "BO" -> LATAM, "PY" -> LATAM, "UY" -> LATAM, + "CR" -> LATAM, "PA" -> LATAM, "GT" -> LATAM, "HN" -> LATAM, "SV" -> LATAM, + "NI" -> LATAM, "DO" -> LATAM, "CU" -> LATAM, "PR" -> LATAM + ) + + /** + * Pricing tiers matching GCP network egress pricing structure. + * + * Inter-region format: interregion_{source}_{dest} or interregion_to_{dest} + * Internet format: internet_to_{dest} (pricing based on destination only) + * Special cases: same_zone, same_region (both essentially free) + */ + val TIER_SAME_ZONE = "same_zone" + val TIER_SAME_REGION = "same_region" + val TIER_INTERREGION_NA_NA = "interregion_na_to_na" + val TIER_INTERREGION_NA_EU = "interregion_na_to_eu" + val TIER_INTERREGION_EU_NA = "interregion_eu_to_na" + val TIER_INTERREGION_EU_EU = "interregion_eu_to_eu" + val TIER_INTERREGION_APAC = "interregion_to_apac" + val TIER_INTERREGION_TO_OCEANIA = "interregion_to_oceania" + val TIER_INTERREGION_TO_LATAM = "interregion_to_latam" + val TIER_INTERNET_NA_EU = "internet_to_na_eu" + val TIER_INTERNET_APAC = "internet_to_apac" + val TIER_INTERNET_LATAM = "internet_to_latam" + val TIER_INTERNET_OCEANIA = "internet_to_oceania" + val TIER_UNKNOWN = "unknown" + + /** + * Extract continent from a GCP region string (e.g., "us-central1", "europe-west1-b"). + */ + def continentFromGcpRegion(region: String): Continent = { + if (region == null || region.isEmpty) return UNKNOWN_CONTINENT + + val normalized = region.toLowerCase(Locale.ROOT) + // GCP region format: {continent_prefix}-{location}{number} + // e.g., us-central1, europe-west1, asia-east1, australia-southeast1 + val prefix = normalized.split("-").headOption.getOrElse("") + + GcpRegionToContinent.getOrElse(prefix, UNKNOWN_CONTINENT) + } + + /** + * Extract continent from an ISO 3166-1 alpha-2 country code. + */ + def continentFromCountryCode(countryCode: String): Continent = { + if (countryCode == null || countryCode.isEmpty) return UNKNOWN_CONTINENT + + val normalized = countryCode.toUpperCase(Locale.ROOT).trim + // Handle special "ZZ" code (unknown location from GCP load balancer) + if (normalized == "ZZ") return UNKNOWN_CONTINENT + + CountryToContinent.getOrElse(normalized, UNKNOWN_CONTINENT) + } + + /** + * Calculate the pricing tier based on source region, destination, and traffic type. + * + * @param sourceRegion The GCP region where data originates (e.g., "us-central1") + * @param destinationRegion Optional GCP region for inter-region GCP traffic + * @param destinationCountry Optional ISO country code for internet traffic + * @param egressType The type of egress (same_zone, inter_region, internet) + * @return The pricing tier string + */ + def calculatePricingTier( + sourceRegion: Option[String], + destinationRegion: Option[String], + destinationCountry: Option[String], + egressType: EgressType): String = { + + egressType match { + case SAME_ZONE => TIER_SAME_ZONE + case SAME_REGION => TIER_SAME_REGION + + case INTER_REGION => + val srcContinent = sourceRegion + .map(continentFromGcpRegion).getOrElse(UNKNOWN_CONTINENT) + // For inter-region, try GCP region first, fall back to country code + val dstContinent = destinationRegion + .map(continentFromGcpRegion) + .orElse(destinationCountry.map(continentFromCountryCode)) + .getOrElse(UNKNOWN_CONTINENT) + calculateInterRegionTier(srcContinent, dstContinent) + + case INTERNET => + // Internet egress pricing depends only on destination, not source + val dstContinent = destinationCountry + .map(continentFromCountryCode).getOrElse(UNKNOWN_CONTINENT) + calculateInternetTier(dstContinent) + + case UNKNOWN_TYPE => TIER_UNKNOWN + } + } + + /** + * Calculate inter-region GCP pricing tier based on source and destination continents. + * + * GCP inter-region pricing (approximate): + * - NA to NA: $0.02/GiB + * - NA to EU: $0.05/GiB + * - EU to EU: $0.02/GiB + * - To Asia: $0.08/GiB + * - To Australia: $0.10/GiB + * - To LATAM: $0.14/GiB + */ + private def calculateInterRegionTier(src: Continent, dst: Continent): String = { + (src, dst) match { + case (NA, NA) => TIER_INTERREGION_NA_NA + case (NA, EU) => TIER_INTERREGION_NA_EU + case (EU, NA) => TIER_INTERREGION_EU_NA + case (EU, EU) => TIER_INTERREGION_EU_EU + case (APAC, APAC) => TIER_INTERREGION_APAC + case (_, OCEANIA) => TIER_INTERREGION_TO_OCEANIA + case (_, LATAM) => TIER_INTERREGION_TO_LATAM + case (_, APAC) => TIER_INTERREGION_APAC + case _ => TIER_UNKNOWN + } + } + + /** + * Calculate internet egress pricing tier based on destination continent. + * Note: GCP internet egress pricing depends only on destination, not source. + * + * GCP Premium Tier Internet Egress (approximate): + * - To NA/EU: $0.12/GiB + * - To Asia: $0.12/GiB + * - To LATAM: $0.19/GiB + * - To Australia: $0.15/GiB + */ + def calculateInternetTier(dst: Continent): String = { + dst match { + case NA | EU => TIER_INTERNET_NA_EU + case APAC => TIER_INTERNET_APAC + case LATAM => TIER_INTERNET_LATAM + case OCEANIA => TIER_INTERNET_OCEANIA + case UNKNOWN_CONTINENT => TIER_UNKNOWN + } + } + + /** + * Parse X-Envoy-Peer-Metadata header to extract GCP region. + * + * The header is base64-encoded and contains structured metadata including + * a "gcp_location" field with the region (e.g., "us-central1-f"). + * + * The metadata can be in various formats: + * - JSON: {"gcp_location":"us-central1-f"} + * - Protobuf text: gcp_location:us-central1-f + * - Protobuf binary: gcp_location[binary bytes]us-central1-f + * + * @param headerValue The base64-encoded X-Envoy-Peer-Metadata header value + * @return Optional GCP region extracted from the metadata + */ + def extractGcpRegionFromEnvoyMetadata(headerValue: String): Option[String] = { + if (headerValue == null || headerValue.isEmpty) return None + + Try { + val decoded = new String(Base64.getDecoder.decode(headerValue), "UTF-8") + // The metadata can be protobuf binary or text format. + // In protobuf binary, field names and values are separated by control characters. + // We look for "gcp_location" followed by any characters (including binary), + // then capture a GCP region pattern. + // The region format is: {continent}-{location}{number}[-{zone}] + // e.g., us-central1, europe-west1-b, asia-east1-a + val gcpLocationPattern = + """(?:gcp_location|GCP_LOCATION)[\x00-\x1f\s":]*([a-z]+-[a-z]+\d+(?:-[a-z])?)""".r + gcpLocationPattern.findFirstMatchIn(decoded).map { m => + // Extract region without zone suffix (e.g., "us-central1" from "us-central1-f") + val fullLocation = m.group(1) + // Remove zone letter if present (e.g., -a, -b, -f) + fullLocation.replaceAll("-[a-z]$", "") + } + }.toOption.flatten + } + + /** + * Check if an IP address appears to be from the same Kubernetes cluster. + * + * This is a heuristic check based on common pod CIDR ranges. + * In practice, same-cluster traffic often has no X-Forwarded-For header + * or has a private IP in the forwarding chain. + * + * @param clientIp The client IP address + * @return true if the IP appears to be from the same cluster + */ + def isLikelySameClusterTraffic(clientIp: Option[String]): Boolean = { + clientIp match { + case None => true // No forwarding header often means same-cluster + case Some(ip) => isPrivateIp(ip) + } + } + + /** + * Check if an IP address is in a private/internal range. + */ + def isPrivateIp(ip: String): Boolean = { + ip.startsWith("10.") || + ip.startsWith("172.16.") || ip.startsWith("172.17.") || ip.startsWith("172.18.") || + ip.startsWith("172.19.") || ip.startsWith("172.20.") || ip.startsWith("172.21.") || + ip.startsWith("172.22.") || ip.startsWith("172.23.") || ip.startsWith("172.24.") || + ip.startsWith("172.25.") || ip.startsWith("172.26.") || ip.startsWith("172.27.") || + ip.startsWith("172.28.") || ip.startsWith("172.29.") || ip.startsWith("172.30.") || + ip.startsWith("172.31.") || + ip.startsWith("192.168.") || + ip == "127.0.0.1" || ip == "::1" + } + + /** + * Check if an IP address is in GCP's public IP ranges. + * GCP commonly uses these ranges for Cloud NAT, GKE nodes, and other services. + * + * Note: This is a heuristic. For precise detection, GCP publishes their IP ranges at: + * https://www.gstatic.com/ipranges/cloud.json + * + * @param ip The IP address to check + * @return true if the IP appears to be a GCP public IP + */ + def isGcpPublicIp(ip: String): Boolean = { + // GCP commonly uses 34.x.x.x and 35.x.x.x ranges + // These are the most common ranges for GKE, Cloud Run, Compute Engine, etc. + ip.startsWith("34.") || ip.startsWith("35.") + } + + /** + * Determine the egress type based on available information. + * + * Detection priority: + * 1. Private IP (10.x, 172.16-31.x, 192.168.x) => SAME_REGION (internal cluster) + * 2. GCP IP detected via IP range lookup => Use exact region from lookup + * a. If client region matches source region => SAME_REGION + * b. Otherwise => INTER_REGION (with exact client region for pricing) + * 3. Non-GCP IP with valid country code => INTERNET + * 4. Otherwise => UNKNOWN + * + * NOTE: The x-envoy-peer-metadata header is NOT used for region detection + * because it contains the INGRESS GATEWAY's location, not the actual + * client's location. We use GCP's published IP ranges instead. + * + * @param clientIp The client IP address (from X-Forwarded-For) + * @param envoyPeerMetadata Unused, kept for API compatibility + * @param clientRegion The client region from X-Client-Region header + * @param detectGcpTraffic Whether GCP traffic detection is enabled + * @param sourceRegion The GCP region where this server runs + * @return Tuple of (EgressType, Optional destination GCP region) + */ + def determineEgressType( + clientIp: Option[String], + envoyPeerMetadata: Option[String], + clientRegion: Option[String], + detectGcpTraffic: Boolean, + sourceRegion: Option[String] = None): (EgressType, Option[String]) = { + + // Check for same-cluster traffic first (private IPs) + if (isLikelySameClusterTraffic(clientIp)) { + return (SAME_REGION, None) + } + + // If GCP traffic detection is enabled and the IP looks like GCP, use IP range lookup for + // accurate region detection. This avoids unnecessary initialization/refresh overhead for + // non-GCP internet clients. + if (detectGcpTraffic && clientIp.exists(isGcpPublicIp)) { + clientIp.flatMap(GcpIpRangeLookup.lookupRegion) match { + case Some(clientGcpRegion) => + // We have exact GCP region from IP lookup + val isSameRegion = sourceRegion match { + case Some(src) => normalizeRegion(src) == normalizeRegion(clientGcpRegion) + case None => false + } + if (isSameRegion) { + return (SAME_REGION, Some(clientGcpRegion)) + } + return (INTER_REGION, Some(clientGcpRegion)) + + case None => + // IP not found in GCP ranges - check if it might still be GCP + // (fallback for any gaps in the published ranges) + if (clientIp.exists(isGcpPublicIp)) { + // IP looks like GCP but not in ranges - classify as inter-region conservatively + clientRegion match { + // scalastyle:off caselocale + case Some(region) if region.toUpperCase == "ZZ" => + // scalastyle:on caselocale + return (INTER_REGION, None) + case _ => + return (INTER_REGION, None) + } + } + // Not a GCP IP - fall through to internet traffic detection + } + } + + // Non-GCP client IP - this is internet traffic + clientRegion match { + // scalastyle:off caselocale + case Some(region) if region.toUpperCase == "ZZ" => + // scalastyle:on caselocale + // ZZ typically means GCP couldn't determine the location + (INTERNET, None) + case Some(_) => + // Has a valid country code - this is internet traffic + (INTERNET, None) + case None => + // No region header - could be internal or misconfigured + (UNKNOWN_TYPE, None) + } + } + + /** + * Normalize a GCP region string for comparison. + * Removes zone suffix (e.g., "us-central1-f" -> "us-central1") and lowercases. + */ + private def normalizeRegion(region: String): String = { + val lower = region.toLowerCase(Locale.ROOT).trim + // Remove zone letter suffix if present (e.g., -a, -b, -f) + lower.replaceAll("-[a-z]$", "") + } +} diff --git a/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala b/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala index e034a62b0..900e4aafa 100644 --- a/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala +++ b/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala @@ -23,6 +23,7 @@ import java.security.cert.X509Certificate import java.sql.Timestamp import javax.net.ssl._ +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import com.linecorp.armeria.server.Server @@ -34,13 +35,27 @@ import scalapb.json4s.JsonFormat import io.delta.sharing.server.DeltaSharingService.DELTA_SHARING_INCLUDE_END_STREAM_ACTION import io.delta.sharing.server.common.JsonUtils import io.delta.sharing.server.common.actions.{ColumnMappingTableFeature, DeletionVectorDescriptor, DeletionVectorsTableFeature, DeltaAddFile, DeltaFormat, DeltaProtocol, DeltaSingleAction} -import io.delta.sharing.server.config.ServerConfig +import io.delta.sharing.server.config.{AccessLoggingConfig, ServerConfig} import io.delta.sharing.server.model._ import io.delta.sharing.server.protocol._ +import io.delta.sharing.server.telemetry.GcpIpRangeLookup // scalastyle:off maxLineLength class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll { + // Test IP ranges for deterministic GCP IP lookup tests + // Prevents live HTTP fetch to gstatic.com during tests + private val testGcpIpRangesJson = + """ + { + "syncToken": "test", + "creationTime": "2026-06-01T00:00:00.000000", + "prefixes": [ + {"ipv4Prefix": "34.100.0.0/16", "service": "Google Cloud", "scope": "us-central1"} + ] + } + """ + def shouldRunIntegrationTest: Boolean = { sys.env.get("AWS_ACCESS_KEY_ID").exists(_.length > 0) && sys.env.get("AZURE_TEST_ACCOUNT_KEY").exists(_.length > 0) && @@ -232,6 +247,201 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll { }.getMessage.contains("endingVersion is not a valid number") } + test("extractEgressBytes") { + val actions: Seq[Object] = Seq( + AddFile( + url = "u1", + id = "1", + partitionValues = Map.empty, + size = 10L + ).wrap, + AddFileForCDF( + url = "u2", + id = "2", + partitionValues = Map.empty, + size = 15L, + version = 1L, + timestamp = 1L + ).wrap, + AddCDCFile( + url = "u3", + id = "3", + partitionValues = Map.empty, + size = 20L, + version = 1L, + timestamp = 1L + ).wrap, + RemoveFile( + url = "u4", + id = "4", + partitionValues = Map.empty, + size = 25L, + version = 1L, + timestamp = 1L + ).wrap, + QueryStatus(queryId = "q1").wrap + ) + + assert(DeltaSharingService.extractEgressBytes(actions) == 45L) + } + + test("buildClientLocationContext uses configured geo headers and pricing map") { + val cfg = new AccessLoggingConfig() + cfg.setEnabled(true) + + val ctx = DeltaSharingService.buildClientLocationContext( + Map( + "X-Client-Region" -> "us", + "X-Forwarded-For" -> "1.2.3.4" // Public IP -> internet traffic + ), + cfg) + + assert(ctx.clientRegion.contains("US")) + assert(ctx.pricingTier == "internet_to_na_eu") + } + + test("buildClientLocationContext falls back to default headers for region") { + val cfg = new AccessLoggingConfig() + cfg.setEnabled(true) + + val ctx = DeltaSharingService.buildClientLocationContext( + Map( + "CF-IPCountry" -> "br", + "X-Forwarded-For" -> "1.2.3.4" // Public IP -> internet traffic + ), + cfg) + + assert(ctx.clientRegion.contains("BR")) + assert(ctx.pricingTier == "internet_to_latam") + } + + test("buildClientLocationContext returns same_region for local traffic") { + val cfg = new AccessLoggingConfig() + cfg.setEnabled(true) + + // No region header, no X-Forwarded-For -> same_region + val ctx = DeltaSharingService.buildClientLocationContext(Map.empty[String, String], cfg) + + assert(ctx.clientRegion.isEmpty) + assert(ctx.pricingTier == "same_region") + } + + test("buildClientLocationContext uses IP from X-Forwarded-For for pricing tier") { + val cfg = new AccessLoggingConfig() + cfg.setEnabled(true) + + val ctx = DeltaSharingService.buildClientLocationContext( + Map("X-Forwarded-For" -> "10.0.0.1, 217.9.4.27, 35.191.144.206"), + cfg) + + assert(ctx.clientRegion.isEmpty) + // Public IP in chain -> internet traffic, but no region -> unknown destination + assert(ctx.pricingTier == "unknown") + } + + test("buildClientLocationContext calculates internet tier for EU destination") { + val cfg = new AccessLoggingConfig() + cfg.setEnabled(true) + + val ctx = DeltaSharingService.buildClientLocationContext( + Map( + "X-Appengine-Country" -> "de", + "X-Forwarded-For" -> "1.2.3.4" // Public IP -> internet traffic + ), + cfg) + + assert(ctx.clientRegion.contains("DE")) + assert(ctx.pricingTier == "internet_to_na_eu") + } + + test("buildClientLocationContext calculates internet tier for DE destination") { + val cfg = new AccessLoggingConfig() + cfg.setEnabled(true) + cfg.setSourceRegion("us-central1") + cfg.setDetectGcpTraffic(true) + + val ctx = DeltaSharingService.buildClientLocationContext( + Map( + "X-Client-Region" -> "DE", + "X-Forwarded-For" -> "1.2.3.4" + ), + cfg) + + assert(ctx.clientRegion.contains("DE")) + assert(ctx.pricingTier == "internet_to_na_eu") + } + + test("buildClientLocationContext detects same_region for no external IP") { + val cfg = new AccessLoggingConfig() + cfg.setEnabled(true) + cfg.setSourceRegion("us-central1") + cfg.setDetectGcpTraffic(true) + + val ctx = DeltaSharingService.buildClientLocationContext( + Map.empty[String, String], + cfg) + + assert(ctx.pricingTier == "same_region") + } + + test("buildClientLocationContext calculates inter-region tier for ZZ region") { + // Load deterministic IP ranges to avoid live HTTP fetch + GcpIpRangeLookup.loadFromJson(testGcpIpRangesJson) + + val cfg = new AccessLoggingConfig() + cfg.setEnabled(true) + cfg.setSourceRegion("us-central1") + cfg.setDetectGcpTraffic(true) + + val ctx = DeltaSharingService.buildClientLocationContext( + Map( + "X-Client-Region" -> "ZZ", + "X-Forwarded-For" -> "34.100.0.1" + ), + cfg) + + assert(ctx.clientRegion.contains("ZZ")) + // With deterministic IP ranges, 34.100.0.1 maps to us-central1 (same as sourceRegion) + assert(ctx.pricingTier == "same_region") + } + + test("buildClientLocationContext calculates internet tier without source region") { + val cfg = new AccessLoggingConfig() + cfg.setEnabled(true) + cfg.setDetectGcpTraffic(true) + // Note: sourceRegion is NOT set + + val ctx = DeltaSharingService.buildClientLocationContext( + Map( + "X-Client-Region" -> "DE", + "X-Forwarded-For" -> "1.2.3.4" + ), + cfg) + + // Internet pricing doesn't require sourceRegion + assert(ctx.pricingTier == "internet_to_na_eu") + } + + test("buildClientLocationContext returns interregion_unknown for GCP IP without source") { + // Load deterministic IP ranges to avoid live HTTP fetch + GcpIpRangeLookup.loadFromJson(testGcpIpRangesJson) + + val cfg = new AccessLoggingConfig() + cfg.setEnabled(true) + cfg.setDetectGcpTraffic(true) + // Note: sourceRegion is NOT set + + val ctx = DeltaSharingService.buildClientLocationContext( + Map( + "X-Client-Region" -> "ZZ", // ZZ indicates internal GCP traffic + "X-Forwarded-For" -> "34.100.0.1" // GCP public IP + ), + cfg) + + // Inter-region detected (GCP IP + ZZ region), but no sourceRegion for exact tier + assert(ctx.pricingTier == "interregion_unknown") + } + integrationTest("401 Unauthorized Error: incorrect token") { val url = requestPath("/shares") val connection = new URL(url).openConnection().asInstanceOf[HttpsURLConnection] diff --git a/server/src/test/scala/io/delta/sharing/server/config/ServerConfigSuite.scala b/server/src/test/scala/io/delta/sharing/server/config/ServerConfigSuite.scala index 5eae5cacb..0be6be3c9 100644 --- a/server/src/test/scala/io/delta/sharing/server/config/ServerConfigSuite.scala +++ b/server/src/test/scala/io/delta/sharing/server/config/ServerConfigSuite.scala @@ -20,6 +20,7 @@ import java.io.File import java.nio.charset.StandardCharsets.UTF_8 import java.nio.file.Files import java.util.Arrays +import java.util.Collections import org.apache.commons.io.FileUtils import org.scalatest.FunSuite @@ -98,12 +99,54 @@ class ServerConfigSuite extends FunSuite { serverConfig.setVersion(1) serverConfig.setShares(sharesInTemplate) serverConfig.setPort(8080) + val accessLogging = new AccessLoggingConfig() + accessLogging.setEnabled(false) + serverConfig.setAccessLogging(accessLogging) assert(loaded == serverConfig) } finally { tempFile.delete() } } + test("access logging config") { + val serverConfig = new ServerConfig() + serverConfig.setVersion(1) + serverConfig.setShares(Arrays.asList( + ShareConfig("share1", Arrays.asList( + SchemaConfig("schema1", Arrays.asList( + TableConfig( + name = "table1", + location = "s3a://bucket/path", + id = null + ) + )) + )) + )) + val accessLogging = new AccessLoggingConfig() + accessLogging.setEnabled(true) + accessLogging.setClientRegionHeader("x-client-region") + accessLogging.setClientIpHeader("x-forwarded-for") + accessLogging.setPricingGroups(Collections.singletonMap("US", "na-tier-1")) + serverConfig.setAccessLogging(accessLogging) + testConfig( + """version: 1 + |shares: + |- name: share1 + | schemas: + | - name: schema1 + | tables: + | - name: table1 + | location: s3a://bucket/path + |accessLogging: + | enabled: true + | clientRegionHeader: x-client-region + | clientIpHeader: x-forwarded-for + | pricingGroups: + | US: na-tier-1 + |""".stripMargin, + serverConfig) + } + test("accept unknown fields") { val serverConfig = new ServerConfig() serverConfig.setVersion(1) @@ -158,6 +201,12 @@ class ServerConfigSuite extends FunSuite { } } + test("AccessLoggingConfig") { + val accessLogging = new AccessLoggingConfig() + accessLogging.setEnabled(true) + accessLogging.checkConfig() + } + test("SSLConfig") { assertInvalidConfig("'certificateFile' in a SSL config must be provided") { val s = new SSLConfig() diff --git a/server/src/test/scala/io/delta/sharing/server/telemetry/AccessLogEmitterSuite.scala b/server/src/test/scala/io/delta/sharing/server/telemetry/AccessLogEmitterSuite.scala new file mode 100644 index 000000000..da2762746 --- /dev/null +++ b/server/src/test/scala/io/delta/sharing/server/telemetry/AccessLogEmitterSuite.scala @@ -0,0 +1,113 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.sharing.server.telemetry + +import org.scalatest.FunSuite + +import io.delta.sharing.server.config.{AccessLoggingConfig, ServerConfig} + +class AccessLogEmitterSuite extends FunSuite { + + test("AccessLogEntry captures all required fields") { + val entry = AccessLogEntry( + share = "test-share", + schema = "test-schema", + table = "test-table", + egressBytes = 1024L, + timestampMs = System.currentTimeMillis(), + pricingTier = "internet_to_na_eu", + clientRegion = Some("US"), + requestType = AccessLogEmitter.QueryRequestType + ) + + assert(entry.share == "test-share") + assert(entry.schema == "test-schema") + assert(entry.table == "test-table") + assert(entry.egressBytes == 1024L) + assert(entry.requestType == "query") + assert(entry.clientRegion.contains("US")) + assert(entry.pricingTier == "internet_to_na_eu") + } + + test("NoopAccessLogEmitter ignores all records") { + // Should not throw + NoopAccessLogEmitter.record(AccessLogEntry( + share = "s", + schema = "sc", + table = "t", + egressBytes = 100L, + timestampMs = 0L + )) + } + + test("JsonAccessLogEmitter skips zero-byte entries") { + val emitter = new JsonAccessLogEmitter() + // Should not throw, and should skip logging + emitter.record(AccessLogEntry( + share = "s", + schema = "sc", + table = "t", + egressBytes = 0L, + timestampMs = 0L + )) + } + + test("JsonAccessLogEmitter logs non-zero entries") { + val emitter = new JsonAccessLogEmitter() + // Should not throw + emitter.record(AccessLogEntry( + share = "customer-share", + schema = "analytics", + table = "events", + egressBytes = 50000L, + timestampMs = 1716864000000L, + requestType = AccessLogEmitter.CdfStreamRequestType + )) + } + + test("AccessLogEmitter.create returns NoopAccessLogEmitter when disabled") { + val config = new ServerConfig() + // accessLogging is null by default + val emitter = AccessLogEmitter.create(config) + assert(emitter == NoopAccessLogEmitter) + } + + test("AccessLogEmitter.create returns NoopAccessLogEmitter when explicitly disabled") { + val config = new ServerConfig() + val accessConfig = new AccessLoggingConfig() + accessConfig.setEnabled(false) + config.setAccessLogging(accessConfig) + + val emitter = AccessLogEmitter.create(config) + assert(emitter == NoopAccessLogEmitter) + } + + test("AccessLogEmitter.create returns JsonAccessLogEmitter when enabled") { + val config = new ServerConfig() + val accessConfig = new AccessLoggingConfig() + accessConfig.setEnabled(true) + config.setAccessLogging(accessConfig) + + val emitter = AccessLogEmitter.create(config) + assert(emitter.isInstanceOf[JsonAccessLogEmitter]) + } + + test("request type constants are defined") { + assert(AccessLogEmitter.QueryRequestType == "query") + assert(AccessLogEmitter.CdfStreamRequestType == "cdf_stream") + } +} diff --git a/server/src/test/scala/io/delta/sharing/server/telemetry/GcpIpRangeLookupSuite.scala b/server/src/test/scala/io/delta/sharing/server/telemetry/GcpIpRangeLookupSuite.scala new file mode 100644 index 000000000..08ac384b3 --- /dev/null +++ b/server/src/test/scala/io/delta/sharing/server/telemetry/GcpIpRangeLookupSuite.scala @@ -0,0 +1,246 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.sharing.server.telemetry + +import org.scalatest.{BeforeAndAfterEach, FunSuite} + +class GcpIpRangeLookupSuite extends FunSuite with BeforeAndAfterEach { + + // Sample GCP IP ranges JSON for testing + private val testRangesJson = + """ + { + "syncToken": "1234567890", + "creationTime": "2026-06-01T00:00:00.000000", + "prefixes": [ + { + "ipv4Prefix": "34.44.0.0/15", + "service": "Google Cloud", + "scope": "us-central1" + }, + { + "ipv4Prefix": "34.72.0.0/16", + "service": "Google Cloud", + "scope": "us-central1" + }, + { + "ipv4Prefix": "34.73.0.0/16", + "service": "Google Cloud", + "scope": "us-east1" + }, + { + "ipv4Prefix": "35.187.0.0/17", + "service": "Google Cloud", + "scope": "europe-west1" + }, + { + "ipv4Prefix": "34.87.0.0/17", + "service": "Google Cloud", + "scope": "asia-southeast1" + }, + { + "ipv4Prefix": "34.151.64.0/18", + "service": "Google Cloud", + "scope": "australia-southeast1" + }, + { + "ipv4Prefix": "34.95.128.0/17", + "service": "Google Cloud", + "scope": "southamerica-east1" + }, + { + "ipv4Prefix": "34.8.0.0/16", + "service": "Google Cloud", + "scope": "global" + }, + { + "ipv6Prefix": "2600:1900:4000::/44", + "service": "Google Cloud", + "scope": "us-central1" + } + ] + } + """ + + override def beforeEach(): Unit = { + GcpIpRangeLookup.reset() + } + + test("lookupRegion returns None for empty/null IP") { + GcpIpRangeLookup.loadFromJson(testRangesJson) + assert(GcpIpRangeLookup.lookupRegion(null).isEmpty) + assert(GcpIpRangeLookup.lookupRegion("").isEmpty) + } + + test("lookupRegion returns None for non-GCP IPs") { + GcpIpRangeLookup.loadFromJson(testRangesJson) + assert(GcpIpRangeLookup.lookupRegion("8.8.8.8").isEmpty) + assert(GcpIpRangeLookup.lookupRegion("1.2.3.4").isEmpty) + assert(GcpIpRangeLookup.lookupRegion("217.9.6.27").isEmpty) // Malta IP + } + + test("lookupRegion returns correct region for us-central1 IP") { + GcpIpRangeLookup.loadFromJson(testRangesJson) + + // 34.44.0.0/15 covers 34.44.0.0 - 34.45.255.255 + assert(GcpIpRangeLookup.lookupRegion("34.44.0.1") == Some("us-central1")) + assert(GcpIpRangeLookup.lookupRegion("34.45.22.184") == Some("us-central1")) + assert(GcpIpRangeLookup.lookupRegion("34.45.255.255") == Some("us-central1")) + + // 34.72.0.0/16 + assert(GcpIpRangeLookup.lookupRegion("34.72.100.50") == Some("us-central1")) + } + + test("lookupRegion returns correct region for us-east1 IP") { + GcpIpRangeLookup.loadFromJson(testRangesJson) + + // 34.73.0.0/16 + assert(GcpIpRangeLookup.lookupRegion("34.73.0.1") == Some("us-east1")) + assert(GcpIpRangeLookup.lookupRegion("34.73.128.100") == Some("us-east1")) + } + + test("lookupRegion returns correct region for europe-west1 IP") { + GcpIpRangeLookup.loadFromJson(testRangesJson) + + // 35.187.0.0/17 covers 35.187.0.0 - 35.187.127.255 + assert(GcpIpRangeLookup.lookupRegion("35.187.0.1") == Some("europe-west1")) + assert(GcpIpRangeLookup.lookupRegion("35.187.64.100") == Some("europe-west1")) + assert(GcpIpRangeLookup.lookupRegion("35.187.127.255") == Some("europe-west1")) + + // 35.187.128.0 is outside the /17 + assert(GcpIpRangeLookup.lookupRegion("35.187.128.0").isEmpty) + } + + test("lookupRegion returns correct region for asia-southeast1 IP") { + GcpIpRangeLookup.loadFromJson(testRangesJson) + + // 34.87.0.0/17 + assert(GcpIpRangeLookup.lookupRegion("34.87.0.1") == Some("asia-southeast1")) + assert(GcpIpRangeLookup.lookupRegion("34.87.100.50") == Some("asia-southeast1")) + } + + test("lookupRegion returns correct region for australia-southeast1 IP") { + GcpIpRangeLookup.loadFromJson(testRangesJson) + + // 34.151.64.0/18 covers 34.151.64.0 - 34.151.127.255 + assert(GcpIpRangeLookup.lookupRegion("34.151.64.1") == Some("australia-southeast1")) + assert(GcpIpRangeLookup.lookupRegion("34.151.100.50") == Some("australia-southeast1")) + } + + test("lookupRegion returns correct region for southamerica-east1 IP") { + GcpIpRangeLookup.loadFromJson(testRangesJson) + + // 34.95.128.0/17 + assert(GcpIpRangeLookup.lookupRegion("34.95.128.1") == Some("southamerica-east1")) + assert(GcpIpRangeLookup.lookupRegion("34.95.200.50") == Some("southamerica-east1")) + } + + test("lookupRegion skips global scope ranges") { + GcpIpRangeLookup.loadFromJson(testRangesJson) + + // 34.8.0.0/16 is in the JSON but with scope "global" - should be skipped + assert(GcpIpRangeLookup.lookupRegion("34.8.1.1").isEmpty) + } + + test("isGcpIp returns true for GCP IPs") { + GcpIpRangeLookup.loadFromJson(testRangesJson) + + assert(GcpIpRangeLookup.isGcpIp("34.45.22.184")) + assert(GcpIpRangeLookup.isGcpIp("34.73.100.50")) + assert(GcpIpRangeLookup.isGcpIp("35.187.64.100")) + } + + test("isGcpIp returns false for non-GCP IPs") { + GcpIpRangeLookup.loadFromJson(testRangesJson) + + assert(!GcpIpRangeLookup.isGcpIp("8.8.8.8")) + assert(!GcpIpRangeLookup.isGcpIp("1.2.3.4")) + assert(!GcpIpRangeLookup.isGcpIp("192.168.1.1")) + } + + test("handles invalid IP addresses gracefully") { + GcpIpRangeLookup.loadFromJson(testRangesJson) + + assert(GcpIpRangeLookup.lookupRegion("not-an-ip").isEmpty) + assert(GcpIpRangeLookup.lookupRegion("256.256.256.256").isEmpty) + assert(GcpIpRangeLookup.lookupRegion("34.45").isEmpty) + } + + test("handles IPv6 addresses") { + GcpIpRangeLookup.loadFromJson(testRangesJson) + + // 2600:1900:4000::/44 is us-central1 + assert(GcpIpRangeLookup.lookupRegion("2600:1900:4000::1") == Some("us-central1")) + assert(GcpIpRangeLookup.lookupRegion("2600:1900:4001:1234::5678") == Some("us-central1")) + + // Outside the range + assert(GcpIpRangeLookup.lookupRegion("2600:1901::1").isEmpty) + } + + test("longest prefix match selects most specific range") { + // Create test data with overlapping ranges + val overlappingRangesJson = + """ + { + "syncToken": "123", + "creationTime": "2026-06-01T00:00:00.000000", + "prefixes": [ + { + "ipv4Prefix": "34.0.0.0/8", + "service": "Google Cloud", + "scope": "us-west1" + }, + { + "ipv4Prefix": "34.45.0.0/16", + "service": "Google Cloud", + "scope": "us-central1" + }, + { + "ipv4Prefix": "34.45.22.0/24", + "service": "Google Cloud", + "scope": "us-east1" + } + ] + } + """ + GcpIpRangeLookup.loadFromJson(overlappingRangesJson) + + // Most specific match wins + assert(GcpIpRangeLookup.lookupRegion("34.45.22.184") == Some("us-east1")) + assert(GcpIpRangeLookup.lookupRegion("34.45.100.1") == Some("us-central1")) + assert(GcpIpRangeLookup.lookupRegion("34.100.1.1") == Some("us-west1")) + } + + test("returns None for IPs not in loaded ranges") { + // After loading test ranges, IPs not in those ranges should return None + GcpIpRangeLookup.loadFromJson(testRangesJson) + // This IP is not in our test ranges + assert(GcpIpRangeLookup.lookupRegion("34.200.200.200").isEmpty) + } + + test("getLastRefreshTime updates after loadFromJson") { + val timeBefore = GcpIpRangeLookup.getLastRefreshTime + Thread.sleep(10) // Small delay to ensure time difference + GcpIpRangeLookup.loadFromJson(testRangesJson) + assert(GcpIpRangeLookup.getLastRefreshTime > timeBefore) + } + + test("getLastRefreshTime returns non-zero after load") { + GcpIpRangeLookup.loadFromJson(testRangesJson) + assert(GcpIpRangeLookup.getLastRefreshTime > 0) + } +} diff --git a/server/src/test/scala/io/delta/sharing/server/telemetry/GcpPricingTierSuite.scala b/server/src/test/scala/io/delta/sharing/server/telemetry/GcpPricingTierSuite.scala new file mode 100644 index 000000000..3317951d7 --- /dev/null +++ b/server/src/test/scala/io/delta/sharing/server/telemetry/GcpPricingTierSuite.scala @@ -0,0 +1,441 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.sharing.server.telemetry + +import java.util.Base64 + +import org.scalatest.{BeforeAndAfterEach, FunSuite} + +class GcpPricingTierSuite extends FunSuite with BeforeAndAfterEach { + import GcpPricingTier._ + import GcpPricingTier.Continent._ + import GcpPricingTier.EgressType._ + + // Sample GCP IP ranges for testing + private val testRangesJson = + """ + { + "syncToken": "1234567890", + "creationTime": "2026-06-01T00:00:00.000000", + "prefixes": [ + { + "ipv4Prefix": "34.44.0.0/15", + "service": "Google Cloud", + "scope": "us-central1" + }, + { + "ipv4Prefix": "34.72.0.0/16", + "service": "Google Cloud", + "scope": "us-central1" + }, + { + "ipv4Prefix": "34.73.0.0/16", + "service": "Google Cloud", + "scope": "us-east1" + }, + { + "ipv4Prefix": "35.187.0.0/17", + "service": "Google Cloud", + "scope": "europe-west1" + }, + { + "ipv4Prefix": "34.87.0.0/17", + "service": "Google Cloud", + "scope": "asia-southeast1" + }, + { + "ipv4Prefix": "34.151.64.0/18", + "service": "Google Cloud", + "scope": "australia-southeast1" + }, + { + "ipv4Prefix": "34.95.128.0/17", + "service": "Google Cloud", + "scope": "southamerica-east1" + }, + { + "ipv4Prefix": "34.100.0.0/16", + "service": "Google Cloud", + "scope": "us-central1" + } + ] + } + """ + + override def beforeEach(): Unit = { + GcpIpRangeLookup.reset() + GcpIpRangeLookup.loadFromJson(testRangesJson) + } + + override def afterEach(): Unit = { + GcpIpRangeLookup.reset() + } + + // ===== Continent Mapping Tests ===== + + test("continentFromGcpRegion maps US regions to NA") { + assert(continentFromGcpRegion("us-central1") == NA) + assert(continentFromGcpRegion("us-east1") == NA) + assert(continentFromGcpRegion("us-west1-a") == NA) + } + + test("continentFromGcpRegion maps Europe regions to EU") { + assert(continentFromGcpRegion("europe-west1") == EU) + assert(continentFromGcpRegion("europe-north1-b") == EU) + } + + test("continentFromGcpRegion maps Asia regions to APAC") { + assert(continentFromGcpRegion("asia-east1") == APAC) + assert(continentFromGcpRegion("asia-southeast1") == APAC) + } + + test("continentFromGcpRegion maps Australia regions to OCEANIA") { + assert(continentFromGcpRegion("australia-southeast1") == OCEANIA) + } + + test("continentFromGcpRegion maps South America regions to LATAM") { + assert(continentFromGcpRegion("southamerica-east1") == LATAM) + } + + test("continentFromGcpRegion handles null and empty") { + assert(continentFromGcpRegion(null) == UNKNOWN_CONTINENT) + assert(continentFromGcpRegion("") == UNKNOWN_CONTINENT) + assert(continentFromGcpRegion("unknown-region") == UNKNOWN_CONTINENT) + } + + test("continentFromCountryCode maps NA countries correctly") { + assert(continentFromCountryCode("US") == NA) + assert(continentFromCountryCode("CA") == NA) + assert(continentFromCountryCode("MX") == NA) + } + + test("continentFromCountryCode maps EU countries correctly") { + assert(continentFromCountryCode("DE") == EU) + assert(continentFromCountryCode("FR") == EU) + assert(continentFromCountryCode("GB") == EU) + assert(continentFromCountryCode("MT") == EU) + } + + test("continentFromCountryCode maps APAC countries correctly") { + assert(continentFromCountryCode("JP") == APAC) + assert(continentFromCountryCode("SG") == APAC) + assert(continentFromCountryCode("IN") == APAC) + } + + test("continentFromCountryCode maps Oceania countries correctly") { + assert(continentFromCountryCode("AU") == OCEANIA) + assert(continentFromCountryCode("NZ") == OCEANIA) + } + + test("continentFromCountryCode maps LATAM countries correctly") { + assert(continentFromCountryCode("BR") == LATAM) + assert(continentFromCountryCode("AR") == LATAM) + } + + test("continentFromCountryCode handles ZZ (unknown) code") { + assert(continentFromCountryCode("ZZ") == UNKNOWN_CONTINENT) + } + + test("continentFromCountryCode is case-insensitive") { + assert(continentFromCountryCode("us") == NA) + assert(continentFromCountryCode("Us") == NA) + } + + // ===== Pricing Tier Calculation Tests ===== + + test("calculatePricingTier returns same_zone for SAME_ZONE egress type") { + val tier = calculatePricingTier( + sourceRegion = Some("us-central1"), + destinationRegion = None, + destinationCountry = None, + egressType = SAME_ZONE) + assert(tier == TIER_SAME_ZONE) + } + + test("calculatePricingTier returns same_region for SAME_REGION egress type") { + val tier = calculatePricingTier( + sourceRegion = Some("us-central1"), + destinationRegion = None, + destinationCountry = None, + egressType = SAME_REGION) + assert(tier == TIER_SAME_REGION) + } + + test("calculatePricingTier returns interregion_na_na for NA to NA inter-region") { + val tier = calculatePricingTier( + sourceRegion = Some("us-central1"), + destinationRegion = Some("us-east1"), + destinationCountry = None, + egressType = INTER_REGION) + assert(tier == TIER_INTERREGION_NA_NA) + } + + test("calculatePricingTier returns interregion_na_eu for NA to EU inter-region") { + val tier = calculatePricingTier( + sourceRegion = Some("us-central1"), + destinationRegion = Some("europe-west1"), + destinationCountry = None, + egressType = INTER_REGION) + assert(tier == TIER_INTERREGION_NA_EU) + } + + test("calculatePricingTier returns interregion_eu_na for EU to NA inter-region") { + val tier = calculatePricingTier( + sourceRegion = Some("europe-west1"), + destinationRegion = Some("us-east1"), + destinationCountry = None, + egressType = INTER_REGION) + assert(tier == TIER_INTERREGION_EU_NA) + } + + test("calculatePricingTier returns interregion_apac for Asia to Asia inter-region") { + val tier = calculatePricingTier( + sourceRegion = Some("asia-east1"), + destinationRegion = Some("asia-southeast1"), + destinationCountry = None, + egressType = INTER_REGION) + assert(tier == TIER_INTERREGION_APAC) + } + + test("calculatePricingTier returns interregion_to_oceania for any to Oceania inter-region") { + val tier = calculatePricingTier( + sourceRegion = Some("us-central1"), + destinationRegion = Some("australia-southeast1"), + destinationCountry = None, + egressType = INTER_REGION) + assert(tier == TIER_INTERREGION_TO_OCEANIA) + } + + test("calculatePricingTier returns internet_to_na_eu for internet egress to NA/EU") { + val tierNA = calculatePricingTier( + sourceRegion = Some("europe-west1"), + destinationRegion = None, + destinationCountry = Some("US"), + egressType = INTERNET) + assert(tierNA == TIER_INTERNET_NA_EU) + + val tierEU = calculatePricingTier( + sourceRegion = Some("us-central1"), + destinationRegion = None, + destinationCountry = Some("DE"), + egressType = INTERNET) + assert(tierEU == TIER_INTERNET_NA_EU) + } + + test("calculatePricingTier returns internet_apac for internet egress to Asia") { + val tier = calculatePricingTier( + sourceRegion = Some("us-central1"), + destinationRegion = None, + destinationCountry = Some("JP"), + egressType = INTERNET) + assert(tier == TIER_INTERNET_APAC) + } + + test("calculatePricingTier returns internet_latam for internet egress to LATAM") { + val tier = calculatePricingTier( + sourceRegion = Some("us-central1"), + destinationRegion = None, + destinationCountry = Some("BR"), + egressType = INTERNET) + assert(tier == TIER_INTERNET_LATAM) + } + + test("calculatePricingTier returns internet_oceania for internet egress to Oceania") { + val tier = calculatePricingTier( + sourceRegion = Some("us-central1"), + destinationRegion = None, + destinationCountry = Some("AU"), + egressType = INTERNET) + assert(tier == TIER_INTERNET_OCEANIA) + } + + // ===== Egress Type Detection Tests ===== + + test("isLikelySameClusterTraffic returns true for missing client IP") { + assert(isLikelySameClusterTraffic(None)) + } + + test("isLikelySameClusterTraffic returns true for private IPs") { + assert(isLikelySameClusterTraffic(Some("10.0.0.1"))) + assert(isLikelySameClusterTraffic(Some("172.16.0.1"))) + assert(isLikelySameClusterTraffic(Some("192.168.1.1"))) + assert(isLikelySameClusterTraffic(Some("127.0.0.1"))) + } + + test("isLikelySameClusterTraffic returns false for public IPs") { + assert(!isLikelySameClusterTraffic(Some("8.8.8.8"))) + assert(!isLikelySameClusterTraffic(Some("1.2.3.4"))) + } + + test("determineEgressType returns SAME_REGION for same-cluster traffic") { + val (egressType, destRegion) = determineEgressType( + clientIp = None, + envoyPeerMetadata = None, + clientRegion = None, + detectGcpTraffic = true) + assert(egressType == SAME_REGION) + assert(destRegion.isEmpty) + } + + test("determineEgressType returns INTER_REGION for GCP IP not in ranges with ZZ region") { + // Use a GCP-looking IP that's not in our test ranges + val (egressType, destRegion) = determineEgressType( + clientIp = Some("34.200.0.1"), // GCP-looking IP not in test ranges + envoyPeerMetadata = None, + clientRegion = Some("ZZ"), + detectGcpTraffic = true) + assert(egressType == INTER_REGION) + assert(destRegion.isEmpty) + } + + test("determineEgressType returns INTERNET when valid country code present") { + val (egressType, destRegion) = determineEgressType( + clientIp = Some("1.2.3.4"), + envoyPeerMetadata = None, + clientRegion = Some("US"), + detectGcpTraffic = true) + assert(egressType == INTERNET) + assert(destRegion.isEmpty) + } + + test("determineEgressType returns INTERNET for non-GCP IP even with Envoy metadata") { + // This is the critical test: non-GCP IP (like from Malta) should be INTERNET + // even if Envoy metadata contains gcp_location (which is the ingress gateway's location) + val payload = """{"gcp_location":"us-central1-f"}""" + val encoded = Base64.getEncoder.encodeToString(payload.getBytes("UTF-8")) + + val (egressType, destRegion) = determineEgressType( + clientIp = Some("217.9.6.27"), // Non-GCP IP from Malta (not in any GCP range) + envoyPeerMetadata = Some(encoded), // Ingress gateway's metadata (should be ignored) + clientRegion = Some("MT"), + detectGcpTraffic = true, + sourceRegion = Some("us-central1")) + assert(egressType == INTERNET, "Non-GCP IP should be classified as INTERNET") + assert(destRegion.isEmpty, "No GCP region for non-GCP IPs") + } + + test("determineEgressType returns SAME_REGION when client GCP region matches source region") { + // Use IP from us-central1 range (34.100.0.0/16 is in our test data) + val (egressType, destRegion) = determineEgressType( + clientIp = Some("34.100.0.1"), // GCP IP in us-central1 range + envoyPeerMetadata = None, // Not used anymore + clientRegion = Some("US"), + detectGcpTraffic = true, + sourceRegion = Some("us-central1")) + assert(egressType == SAME_REGION) + assert(destRegion.contains("us-central1")) + } + + // scalastyle:off + test("determineEgressType returns INTER_REGION when client GCP region differs from source region") { + // scalastyle:on + // Use IP from europe-west1 range (35.187.0.0/17 is in our test data) + val (egressType, destRegion) = determineEgressType( + clientIp = Some("35.187.64.100"), // GCP IP in europe-west1 range + envoyPeerMetadata = None, // Not used anymore + clientRegion = Some("DE"), + detectGcpTraffic = true, + sourceRegion = Some("us-central1")) + assert(egressType == INTER_REGION) + assert(destRegion.contains("europe-west1")) + } + + test("determineEgressType returns SAME_REGION with zone suffix variations") { + // Test that "us-central1" source matches client IP in us-central1 range + val (egressType, destRegion) = determineEgressType( + clientIp = Some("34.100.0.1"), // GCP IP in us-central1 range + envoyPeerMetadata = None, // Not used anymore + clientRegion = Some("US"), + detectGcpTraffic = true, + sourceRegion = Some("us-central1-f")) // With zone suffix + assert(egressType == SAME_REGION) + assert(destRegion.contains("us-central1")) + } + + test("determineEgressType uses IP lookup instead of Envoy metadata") { + // Even with Envoy metadata pointing to us-central1, if the IP is in europe-west1, + // the lookup should return europe-west1 + val payload = """{"gcp_location":"us-central1-f"}""" + val encoded = Base64.getEncoder.encodeToString(payload.getBytes("UTF-8")) + + val (egressType, destRegion) = determineEgressType( + clientIp = Some("35.187.64.100"), // GCP IP in europe-west1 range + envoyPeerMetadata = Some(encoded), // Should be ignored + clientRegion = Some("DE"), + detectGcpTraffic = true, + sourceRegion = Some("us-central1")) + assert(egressType == INTER_REGION) + // The region should come from IP lookup, not Envoy metadata + assert(destRegion.contains("europe-west1")) + } + + // ===== Envoy Metadata Parsing Tests ===== + + test("extractGcpRegionFromEnvoyMetadata returns None for null/empty") { + assert(extractGcpRegionFromEnvoyMetadata(null).isEmpty) + assert(extractGcpRegionFromEnvoyMetadata("").isEmpty) + } + + test("extractGcpRegionFromEnvoyMetadata extracts region from base64 metadata") { + // Create a simple test payload with gcp_location + val payload = """{"gcp_location":"us-central1-f"}""" + val encoded = Base64.getEncoder.encodeToString(payload.getBytes("UTF-8")) + val result = extractGcpRegionFromEnvoyMetadata(encoded) + assert(result.contains("us-central1")) + } + + test("extractGcpRegionFromEnvoyMetadata strips zone suffix") { + val payload = """gcp_location:europe-west1-b""" + val encoded = Base64.getEncoder.encodeToString(payload.getBytes("UTF-8")) + val result = extractGcpRegionFromEnvoyMetadata(encoded) + assert(result.contains("europe-west1")) + } + + test("extractGcpRegionFromEnvoyMetadata handles protobuf binary format") { + // Simulate protobuf binary where field name and value are separated by control chars + // gcp_location + [0x12 0x0f 0x1a 0x0d] + us-central1-f + val binaryPayload = "gcp_location\u0012\u000f\u001a\rus-central1-f" + val encoded = Base64.getEncoder.encodeToString(binaryPayload.getBytes("UTF-8")) + val result = extractGcpRegionFromEnvoyMetadata(encoded) + assert(result.contains("us-central1")) + } + + test("extractGcpRegionFromEnvoyMetadata parses real Istio/ASM metadata") { + // Real X-Envoy-Peer-Metadata from Istio ingress gateway (GKE with ASM) + // Contains PLATFORM_METADATA with gcp_location:us-central1-f + val realMetadata = "ChQKDkFQUF9DT05UQUlORVJTEgIaAAo9CgpDTFVTVEVSX0lEEi8aLWNuLXppbmct" + + "ZGV2LTE5NzUyMi11cy1jZW50cmFsMS1mLXppbmctY2x1c3RlcgoeCgxJTlNUQU5DRV9JUFMSDhoM" + + "MTAuMjUyLjguMTI3CiAKDUlTVElPX1ZFUlNJT04SDxoNMS4yMC44LWFzbS43MwqzAgoGTEFCRUxT" + + "EqgCKqUCCh0KA2FwcBIWGhRpc3Rpby1pbmdyZXNzZ2F0ZXdheQoZCgVpc3RpbxIQGg5pbmdyZXNz" + + "Z2F0ZXdheQodCgxpc3Rpby5pby9yZXYSDRoLYXNtLW1hbmFnZWQKOQofc2VydmljZS5pc3Rpby5p" + + "by9jYW5vbmljYWwtbmFtZRIWGhRpc3Rpby1pbmdyZXNzZ2F0ZXdheQovCiNzZXJ2aWNlLmlzdGlv" + + "LmlvL2Nhbm9uaWNhbC1yZXZpc2lvbhIIGgZsYXRlc3QKLgoddG9wb2xvZ3kua3ViZXJuZXRlcy5p" + + "by9yZWdpb24SDRoLdXMtY2VudHJhbDEKLgobdG9wb2xvZ3kua3ViZXJuZXRlcy5pby96b25lEg8a" + + "DXVzLWNlbnRyYWwxLWMKHgoHTUVTSF9JRBITGhFwcm9qLTMwMzkzMzg2ODgxMAovCgROQU1FEica" + + "JWlzdGlvLWluZ3Jlc3NnYXRld2F5LTY2NjRjZGRkN2YtamM5bTIKGwoJTkFNRVNQQUNFEg4aDGlz" + + "dGlvLXN5c3RlbQpdCgVPV05FUhJUGlJrdWJlcm5ldGVzOi8vYXBpcy9hcHBzL3YxL25hbWVzcGFj" + + "ZXMvaXN0aW8tc3lzdGVtL2RlcGxveW1lbnRzL2lzdGlvLWluZ3Jlc3NnYXRld2F5CrACChFQTEFU" + + "Rk9STV9NRVRBREFUQRKaAiqXAgomChRnY3BfZ2tlX2NsdXN0ZXJfbmFtZRIOGgx6aW5nLWNsdXN0" + + "ZXIKgwEKE2djcF9na2VfY2x1c3Rlcl91cmwSbBpqaHR0cHM6Ly9jb250YWluZXIuZ29vZ2xlYXBp" + + "cy5jb20vdjEvcHJvamVjdHMvemluZy1kZXYtMTk3NTIyL2xvY2F0aW9ucy91cy1jZW50cmFsMS1m" + + "L2NsdXN0ZXJzL3ppbmctY2x1c3RlcgofCgxnY3BfbG9jYXRpb24SDxoNdXMtY2VudHJhbDEtZgog" + + "CgtnY3BfcHJvamVjdBIRGg96aW5nLWRldi0xOTc1MjIKJAoSZ2NwX3Byb2plY3RfbnVtYmVyEg4a" + + "DDMwMzkzMzg2ODgxMAonCg1XT1JLTE9BRF9OQU1FEhYaFGlzdGlvLWluZ3Jlc3NnYXRld2F5" + val result = extractGcpRegionFromEnvoyMetadata(realMetadata) + assert(result.contains("us-central1"), s"Expected us-central1 but got $result") + } +} diff --git a/server/src/universal/conf/delta-sharing-server.yaml.template b/server/src/universal/conf/delta-sharing-server.yaml.template index 4fd95a9b4..fffd0a76a 100644 --- a/server/src/universal/conf/delta-sharing-server.yaml.template +++ b/server/src/universal/conf/delta-sharing-server.yaml.template @@ -65,3 +65,26 @@ queryTablePageSizeLimit: 10000 queryTablePageTokenTtlMs: 259200000 # The TTL of the refresh token generated in queryTable API (in milliseconds). refreshTokenTtlMs: 3600000 +# Whether to emit performance/timing log lines for table queries and CDF requests. +perfLoggingEnabled: true +# Optional share-attributed egress access logs emitted as structured JSON log lines. +accessLogging: + enabled: false + # Name of the incoming header that carries client region code (for example: US, DE). + # In GCP, configure your load balancer backend custom request headers to inject this. + # Example: X-Client-Region:{client_region} + clientRegionHeader: "x-client-region" + # Name of the incoming header that carries client IP information. + # This is typically a forwarding chain header and does not require LB changes when already present. + clientIpHeader: "x-forwarded-for" + # Optional mapping from region codes to billing price groups. + # Use uppercase keys. A wildcard key "*" can be used as a catch-all. + # NOTE: pricingGroups is reserved for future use and not currently applied. + pricingGroups: {} + # GCP region where this server runs (for example: us-central1). + # Used to determine source→destination pairs for egress cost attribution. + sourceRegion: "" + # Enable GCP traffic detection using GCP's published IP ranges (cloud.json). + # When true, client IPs belonging to other GCP regions can be classified as inter-region + # (cheaper) rather than internet egress. Set to false to disable this detection. + detectGcpTraffic: true