1 Star 0 Fork 0

穆孜 / emr-serverless-samples

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
README.md 7.28 KB
一键复制 编辑 原始数据 按行查看 历史

Python Dependencies

You can create isolated Python virtual environments to package multiple Python libraries for a PySpark job. Here is an example of how you can package Great Expectations and profile a set of sample data.

Pre-requisites

Note: If using Docker on Apple Silicon ensure you use --platform linux/amd64

Set the following variables according to your environment.

export S3_BUCKET=<YOUR_S3_BUCKET_NAME>
export APPLICATION_ID=<EMR_SERVERLESS_APPLICATION_ID>
export JOB_ROLE_ARN=<EMR_SERVERLESS_IAM_ROLE>

Profile data with EMR Serverless and Great Expectations

The example below builds a virtual environment with the necessary dependencies to use Great Expectations to profile a limited set of data from the New York City Taxi and Limo trip data.

All the commands below should be executed in this (examples/pyspark/dependencies) directory.

  1. Build your virtualenv archive

This command builds the included Dockerfile and exports the resulting pyspark_ge.tar.gz file to your local filesystem.

Note The included Dockerfile builds for x86 - if you would like to build for Graviton, update the Dockerfile to use linux/arm64 as the platform and see the EMR Serverless architecture options for more detail.

# Enable BuildKit backend
DOCKER_BUILDKIT=1 docker build --output . .
aws s3 cp pyspark_ge.tar.gz s3://${S3_BUCKET}/artifacts/pyspark/
  1. Copy your code

There's a sample ge_profile.py script included here.

aws s3 cp ge_profile.py s3://${S3_BUCKET}/code/pyspark/
  1. Run your job
  • entryPoint should point to your script on S3
  • entryPointArguments defines the output location of the Great Expectations profiler
  • The virtualenv archive is added via the --archives parameter
  • The driver and executor Python paths are configured via the various --conf spark.emr-serverless parameters
aws emr-serverless start-job-run \
    --application-id $APPLICATION_ID \
    --execution-role-arn $JOB_ROLE_ARN \
    --job-driver '{
        "sparkSubmit": {
            "entryPoint": "s3://'${S3_BUCKET}'/code/pyspark/ge_profile.py",
            "entryPointArguments": ["s3://'${S3_BUCKET}'/tmp/ge-profile"],
            "sparkSubmitParameters": "--conf spark.archives=s3://'${S3_BUCKET}'/artifacts/pyspark/pyspark_ge.tar.gz#environment --conf spark.emr-serverless.driverEnv.PYSPARK_DRIVER_PYTHON=./environment/bin/python --conf spark.emr-serverless.driverEnv.PYSPARK_PYTHON=./environment/bin/python --conf spark.emr-serverless.executorEnv.PYSPARK_PYTHON=./environment/bin/python"
        }
    }' \
    --configuration-overrides '{
        "monitoringConfiguration": {
            "s3MonitoringConfiguration": {
                "logUri": "s3://'${S3_BUCKET}'/logs/"
            }
        }
    }'

When the job finishes, it will write a part-00000 file out to s3://${S3_BUCKET}/tmp/ge-profile.

  1. Copy and view the output
aws s3 cp s3://${S3_BUCKET}/tmp/ge-profile/part-00000 ./ge.html
open ./ge.html

PySpark jobs with Java dependencies

Sometimes you need to pull in Java dependencies like Kafka or PostgreSQL libraries. As of release label emr-6.7.0, you can use either spark.jars.packages or the --packages flag in your sparkSubmitParameters as shown below. Be sure to create your application within a VPC so that it can download the necessary dependencies.

# First create an application with release label emr-6.7.0 and a network configuration
aws emr-serverless create-application \
    --release-label emr-6.7.0 \
    --type SPARK \
    --name spark-packages \
    --network-configuration '{
        "subnetIds": ["subnet-abcdef01234567890", "subnet-abcdef01234567891"],
        "securityGroupIds": ["sg-abcdef01234567893"]
    }'

# Then submit a job (replacing the application id, arn, and your code/packages)
aws emr-serverless start-job-run \
    --name pg-query \
    --application-id $APPLICATION_ID \
    --execution-role-arn $JOB_ROLE_ARN \
    --job-driver '{
        "sparkSubmit": {
            "entryPoint": "s3://'${S3_BUCKET}'/code/pyspark/pg_query.py",
            "sparkSubmitParameters": "--packages org.postgresql:postgresql:42.4.0"
        }
    }' \
    --configuration-overrides '{
        "monitoringConfiguration": {
            "s3MonitoringConfiguration": {
                "logUri": "s3://'${S3_BUCKET}'/logs/"
            }
        }
    }'

Packaging dependencies into an uberjar

While --packages will let you easily specify additional dependencies for your job, these dependencies are not cached between job runs. In other words, each job run will need to re-fetch the dependencies potentially leading to increased startup time. To mitigate this, and to create reproducible builds, you can create a dependency uberjar and upload that to S3.

This approach can also be used with EMR release label emr-6.6.0.

To do this, we'll create a pom.xml that specifies our dependencies and use a maven Docker container to build the uberjar. In this example, we'll package org.postgresql:postgresql:42.4.0 and use the example script in ./pg_query.py to query a Postgres database.

Note: The code in pg_query.py is for demonstration purposes only - never store credentials directly in your code. 😁

  1. Build an uberjar with your dependencies
# Enable BuildKit backend
DOCKER_BUILDKIT=1 docker build -f Dockerfile.jars --output . .

This will create a uber-jars-1.0-SNAPSHOT.jar file locally that you will copy to S3 in the next step.

  1. Copy your code and jar
aws s3 cp pg_query.py s3://${S3_BUCKET}/code/pyspark/
aws s3 cp uber-jars-1.0-SNAPSHOT.jar s3://${S3_BUCKET}/code/pyspark/jars/
  1. Set the following variables according to your environment.
export S3_BUCKET=<YOUR_S3_BUCKET_NAME>
export APPLICATION_ID=<EMR_SERVERLESS_APPLICATION_ID>
export JOB_ROLE_ARN=<EMR_SERVERLESS_IAM_ROLE>
  1. Start your job with --jars
aws emr-serverless start-job-run \
    --name pg-query \
    --application-id $APPLICATION_ID \
    --execution-role-arn $JOB_ROLE_ARN \
    --job-driver '{
        "sparkSubmit": {
            "entryPoint": "s3://'${S3_BUCKET}'/code/pyspark/pg_query.py",
            "sparkSubmitParameters": "--jars s3://'${S3_BUCKET}'/code/pyspark/jars/uber-jars-1.0-SNAPSHOT.jar"
        }
    }' \
    --configuration-overrides '{
        "monitoringConfiguration": {
            "s3MonitoringConfiguration": {
                "logUri": "s3://'${S3_BUCKET}'/logs/"
            }
        }
    }'
  1. See the output of your job!

Once your job finishes, you can copy the output locally to view the stdout.

export JOB_RUN_ID=<YOUR_JOB_RUN_ID>

aws s3 cp s3://${S3_BUCKET}/logs/applications/${APPLICATION_ID}/jobs/${JOB_RUN_ID}/SPARK_DRIVER/stdout.gz - | gunzip 
1
https://gitee.com/baby_muxin/emr-serverless-samples.git
git@gitee.com:baby_muxin/emr-serverless-samples.git
baby_muxin
emr-serverless-samples
emr-serverless-samples
main

搜索帮助