diff --git a/README.md b/README.md index 9ac17a73d..ef676737f 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,7 @@ ![Python Dataproc Serverless Integration Test Status](https://dataproctemplatesci.com/buildStatus/icon?job=dataproc-templates-build%2Fintegration-tests-python&&subject=python-serverless-integration-tests) ![Python Dataproc Cluster Integration Tests Status](https://dataproctemplatesci.com/buildStatus/icon?job=dataproc-templates-build%2Fcluster-integration-tests-python&&subject=python-cluster-integration-tests) +![Notebooks Integration Test Status](https://dataproctemplatesci.com/buildStatus/icon?job=dataproc-templates-build%2Fintegration-tests-notebooks&&subject=integration-tests-notebooks) # Dataproc Templates Dataproc templates are designed to address various in-cloud data tasks, including data import/export/backup/restore and bulk API operations. These templates leverage the power of [Google Cloud's Dataproc](https://cloud.google.com/dataproc/), supporting both Dataproc Serverless and Dataproc clusters. diff --git a/notebooks/.ci/Jenkinsfile b/notebooks/.ci/Jenkinsfile new file mode 100644 index 000000000..5bfc5eb8f --- /dev/null +++ b/notebooks/.ci/Jenkinsfile @@ -0,0 +1,117 @@ +def stageRetryCount = 3 + +pipeline { + + agent any + + environment { + DATAPROC_TELEPORT_WEBHOOK_URL = credentials('dataproc-teleport-webhook-url') + + TEST_JDBC_URL = credentials('env-test-jdbc-url') + + GIT_BRANCH_LOCAL = sh ( + script: "echo $branchName | sed -e 's|origin/||g' | sed -e 's|^null\$|main|'", // Remove "origin/" and set the default branch to main + returnStdout: true + ).trim() + } + + stages { + stage('Checkout') { + steps{ + git branch: "${GIT_BRANCH_LOCAL}", changelog: false, poll: false, url: 'https://github.com/GoogleCloudPlatform/dataproc-templates/' + } + } + stage('Build'){ + steps { + catchError { + sh ''' + python3.8 -m pip install --user virtualenv + python3.8 -m venv env + source env/bin/activate + + export PACKAGE_EGG_FILE=dist/dataproc_templates_distribution.egg + + cd python + python setup.py bdist_egg --output=$PACKAGE_EGG_FILE + + cd ../notebooks + pip install --upgrade pip ipython ipykernel + ipython kernel install --name "python3" --user + pip install -r requirements.txt + + ''' + } + } + } + stage('Extract JDBC URL Parameters') { + steps { + script { + def host = TEST_JDBC_URL.substring(TEST_JDBC_URL.indexOf("//") + 2, TEST_JDBC_URL.indexOf(":", TEST_JDBC_URL.indexOf("//") + 2)) + def database = TEST_JDBC_URL.substring(TEST_JDBC_URL.indexOf("/", TEST_JDBC_URL.indexOf("//") + 2) + 1, TEST_JDBC_URL.indexOf("?")) + def user = TEST_JDBC_URL.substring(TEST_JDBC_URL.indexOf("user=") + 5, TEST_JDBC_URL.indexOf("&")) + def password = TEST_JDBC_URL.substring(TEST_JDBC_URL.indexOf("password=") + 9) + env.DB_HOST = host + env.DB_NAME = database + env.DB_USER = user + env.DB_PASSWORD = password + } + } + } + stage('Parallel Execution'){ + parallel{ + stage('MYSQL TO SPANNER') { + steps{ + retry(count: stageRetryCount) { + sh ''' + source env/bin/activate + + export GCS_STAGING_LOCATION=gs://dataproc-templates/integration-testing + export JARS="gs://datproc_template_nk/jars/mysql-connector-java-8.0.29.jar,gs://datproc_template_nk/jars/postgresql-42.2.6.jar,gs://datproc_template_nk/jars/mssql-jdbc-6.4.0.jre8.jar" + export SKIP_BUILD=true + + cd notebooks/mysql2spanner + pip install -r requirements.txt + cd .. + + wget https://dl.google.com/cloudsql/cloud_sql_proxy.linux.amd64 -O ./cloud_sql_proxy + chmod +x cloud_sql_proxy + nohup ./cloud_sql_proxy -instances=$ENV_TEST_MYSQL_INSTANCE_CONNECTION_NAME=tcp:3306 & + + python run_notebook.py --script=MYSQLTOSPANNER \ + --mysql.host="${DB_HOST}" \ + --mysql.port="3306" \ + --mysql.username="${DB_USER}" \ + --mysql.password="${DB_PASSWORD}" \ + --mysql.database="${DB_NAME}" \ + --mysql.table.list="employee" \ + --mysql.read.partition.columns="{}" \ + --use.cloud.sql.proxy="true" \ + --spanner.instance="dataproc-spark-test" \ + --spanner.database="spark-ci-db" \ + --spanner.table.primary.keys="{\\"employee\\":\\"empno\\"}" + + kill $(pgrep cloud_sql_proxy) + + ''' + } + } + } + } + } + } + post { + always{ + script { + if( env.GIT_BRANCH_LOCAL == 'main' ){ + googlechatnotification url: DATAPROC_TELEPORT_WEBHOOK_URL, + message: 'Jenkins: ${JOB_NAME}\nBuild status is ${BUILD_STATUS}\nSee ${BUILD_URL}\n', + notifyFailure: 'true', + notifyAborted: 'true', + notifyUnstable: 'true', + notifyNotBuilt: 'true', + notifyBackToNormal: 'true' + } + } + } + } +} \ No newline at end of file diff --git a/notebooks/mysql2spanner/MySqlToSpanner_notebook.ipynb b/notebooks/mysql2spanner/MySqlToSpanner_notebook.ipynb index 66d397801..e27fe93dc 100644 --- a/notebooks/mysql2spanner/MySqlToSpanner_notebook.ipynb +++ b/notebooks/mysql2spanner/MySqlToSpanner_notebook.ipynb @@ -100,10 +100,7 @@ }, "outputs": [], "source": [ - "!pip3 install pymysql SQLAlchemy\n", - "!pip3 install --upgrade google-cloud-pipeline-components kfp --user -q\n", - "!pip3 install google-cloud-spanner\n", - "!pip3 install --upgrade google-cloud-storage" + "!pip3 install --upgrade google-cloud-storage google-cloud-aiplatform kfp google-cloud-pipeline-components pandas google-cloud-spanner pymysql SQLAlchemy" ] }, { @@ -115,9 +112,9 @@ }, "outputs": [], "source": [ - "!sudo apt-get update -y\n", - "!sudo apt-get install default-jdk -y\n", - "!sudo apt-get install maven -y" + "#!sudo apt-get update -y\n", + "#!sudo apt-get install default-jdk -y\n", + "#!sudo apt-get install maven -y" ] }, { @@ -179,7 +176,10 @@ "execution_count": null, "id": "2703b502-1b41-44f1-bf21-41069255bc32", "metadata": { - "tags": [] + "tags": [], + "pycharm": { + "is_executing": true + } }, "outputs": [], "source": [ @@ -312,6 +312,7 @@ " MYSQL_PASSWORD = \"\"\n", " MYSQL_DATABASE = \"\"\n", " MYSQL_TABLE_LIST = [] # Leave list empty for migrating complete database else provide tables as ['table1','table2']\n", + " USE_CLOUD_SQL_PROXY = \"false\"\n", " MYSQL_READ_PARTITION_COLUMNS = {} # Leave empty for default read partition columns\n", " MYSQL_OUTPUT_SPANNER_MODE = \"overwrite\" # one of overwrite|append (Use append when schema already exists in Spanner)\n", "\n", @@ -399,7 +400,7 @@ " username=MYSQL_USERNAME,\n", " password=MYSQL_PASSWORD,\n", " database=MYSQL_DATABASE,\n", - " host=MYSQL_HOST,\n", + " host=MYSQL_HOST if USE_CLOUD_SQL_PROXY==\"false\" else \"127.0.0.1\",\n", " port=MYSQL_PORT\n", " )\n", ")\n", @@ -541,7 +542,7 @@ "metadata": {}, "outputs": [], "source": [ - "!wget https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-java-8.0.29.tar.gz\n", + "!wget --backups=1 https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-java-8.0.29.tar.gz\n", "!tar -xf mysql-connector-java-8.0.29.tar.gz\n", "!mvn clean spotless:apply install -DskipTests " ] @@ -658,9 +659,8 @@ " file_uris=FILE_URIS,\n", " subnetwork_uri=SUBNETWORK_URI,\n", " runtime_config_version=\"1.1\", # issue 665\n", - " service_account=DATAPROC_SERVICE_ACCOUNT,\n", " args=TEMPLATE_SPARK_ARGS\n", - " )\n", + " ) # add DATAPROC_SERVICE_ACCOUNT if needed\n", " time.sleep(1)\n", "\n", " compiler.Compiler().compile(pipeline_func=pipeline, package_path=\"pipeline.json\")\n", @@ -858,14 +858,6 @@ "- You may create relationships (FKs), constraints and indexes (as needed).\n", "- You may configure countinuous replication with [DataStream](https://cloud.google.com/datastream/docs/configure-your-source-mysql-database) or any other 3rd party tools." ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "89ec62c1-0b95-4536-9339-03a4a8de035e", - "metadata": {}, - "outputs": [], - "source": [] } ], "metadata": { diff --git a/notebooks/mysql2spanner/MySqlToSpanner_parameterize_script.py b/notebooks/mysql2spanner/MySqlToSpanner_parameterize_script.py index b09fe7d37..ad58312c0 100644 --- a/notebooks/mysql2spanner/MySqlToSpanner_parameterize_script.py +++ b/notebooks/mysql2spanner/MySqlToSpanner_parameterize_script.py @@ -71,7 +71,6 @@ def parse_args(args: Optional[Sequence[str]] = None) -> Dict[str, Any]: required=True, help="MySQL database name", ) - parser.add_argument( f"--{constants.MYSQL_TABLE_LIST_ARG}", dest=constants.MYSQL_TABLE_LIST, @@ -91,6 +90,14 @@ def parse_args(args: Optional[Sequence[str]] = None) -> Dict[str, Any]: choices=[constants.OUTPUT_MODE_OVERWRITE, constants.OUTPUT_MODE_APPEND], ) + parser.add_argument( + f"--{constants.USE_CLOUD_SQL_PROXY_ARG}", + dest=constants.USE_CLOUD_SQL_PROXY, + default="false", + required=False, + help="Flag to reach mysql instance using cloud sql proxy" + ) + parser.add_argument( f"--{constants.SPANNER_INSTANCE_ARG}", dest=constants.SPANNER_INSTANCE, @@ -112,6 +119,13 @@ def parse_args(args: Optional[Sequence[str]] = None) -> Dict[str, Any]: help='Provide table & PK column which do not have PK in MySQL table {"table_name":"primary_key"}', ) + parser.add_argument( + f"--{constants.MYSQL_READ_PARTITION_COLUMNS_ARG}", + dest=constants.MYSQL_READ_PARTITION_COLUMNS, + required=True, + help='Dictionary of custom read partition columns, e.g.: {"table2": "secondary_id"}', + ) + parser.add_argument( f"--{constants.MAX_PARALLELISM_ARG}", dest=constants.MAX_PARALLELISM, @@ -141,6 +155,9 @@ def run(self, args: Dict[str, Any]) -> None: args[constants.SPANNER_TABLE_PRIMARY_KEYS] = json.loads( args[constants.SPANNER_TABLE_PRIMARY_KEYS] ) + args[constants.MYSQL_READ_PARTITION_COLUMNS] = json.loads( + args[constants.MYSQL_READ_PARTITION_COLUMNS] + ) # Exclude arguments that are not needed to be passed to the notebook ignore_keys = {constants.LOG_LEVEL_ARG, constants.OUTPUT_NOTEBOOK_ARG} diff --git a/notebooks/mysql2spanner/requirements.txt b/notebooks/mysql2spanner/requirements.txt new file mode 100644 index 000000000..a028b5c81 --- /dev/null +++ b/notebooks/mysql2spanner/requirements.txt @@ -0,0 +1,8 @@ +google-cloud-storage +google-cloud-aiplatform +kfp +google-cloud-pipeline-components +pandas +google-cloud-spanner +pymysql +SQLAlchemy \ No newline at end of file diff --git a/notebooks/parameterize_script/util/notebook_constants.py b/notebooks/parameterize_script/util/notebook_constants.py index b3aa8010f..8090d3aba 100644 --- a/notebooks/parameterize_script/util/notebook_constants.py +++ b/notebooks/parameterize_script/util/notebook_constants.py @@ -51,7 +51,9 @@ MYSQL_PASSWORD_ARG = "mysql.password" MYSQL_DATABASE_ARG = "mysql.database" MYSQL_TABLE_LIST_ARG = "mysql.table.list" +MYSQL_READ_PARTITION_COLUMNS_ARG = "mysql.read.partition.columns" MYSQL_OUTPUT_SPANNER_MODE_ARG = "mysql.output.spanner.mode" +USE_CLOUD_SQL_PROXY_ARG = "use.cloud.sql.proxy" SPANNER_INSTANCE_ARG = "spanner.instance" SPANNER_DATABASE_ARG = "spanner.database" # provide table & pk column which do not have PK in MYSQL "{"table_name":"primary_key"}" @@ -64,7 +66,9 @@ MYSQL_PASSWORD = "MYSQL_PASSWORD" MYSQL_DATABASE = "MYSQL_DATABASE" MYSQL_TABLE_LIST = "MYSQL_TABLE_LIST" +MYSQL_READ_PARTITION_COLUMNS = "MYSQL_READ_PARTITION_COLUMNS" MYSQL_OUTPUT_SPANNER_MODE = "MYSQL_OUTPUT_SPANNER_MODE" +USE_CLOUD_SQL_PROXY = "USE_CLOUD_SQL_PROXY" SPANNER_INSTANCE = "SPANNER_INSTANCE" SPANNER_DATABASE = "SPANNER_DATABASE" SPANNER_TABLE_PRIMARY_KEYS = "SPANNER_TABLE_PRIMARY_KEYS" diff --git a/notebooks/requirements.txt b/notebooks/requirements.txt index 15034118c..beeda79dc 100644 --- a/notebooks/requirements.txt +++ b/notebooks/requirements.txt @@ -4,3 +4,4 @@ google-cloud-spanner>=3.35.1 google-cloud-storage>=2.9.0 pandas>=1.3.5 papermill>=2.4.0 +ipykernel>=6.29.5 \ No newline at end of file