Access Amazon S3 Tables from Local using Apache Spark Docker Container | Demo

· 8 min read
Access Amazon S3 Tables from Local using Apache Spark Docker Container | Demo

Recently, Amazon S3 has introduced S3 Tables, a fully managed Apache Iceberg tables for storing tabular data at scale. It is specifically optimized for analytics workloads to deliver up to 3x faster query throughput and up to 10x higher transactions per second compared to self-managed tables (Iceberg tables stored in general purpose S3 buckets).

You can easily create the new type of S3 bucket called Table Bucket using Amazon Management Console. But creation of S3 Tables, populating data are not straight forward. To create, modify and delete tables it is suggested to use Amazon EMR (Elastic MapReduce).

If you only want to do hands-on this new S3 table, then creating EMR cluster may be over-killing and unnecessary cost for you. In this post, I'll share all the details and code, so that you can quickly launch Apache Spark docker container in your local and interact with your Table Bucket and S3 tables.

Prerequisites

Steps

Download all required direct and transitive dependencies (jars) in local for Amazon S3 Tables Catalog for Apache Iceberg and Iceberg Spark Runtime.

Amazon S3 Tables Catalog for Apache Iceberg?

"The Amazon S3 Tables Catalog for Apache Iceberg is an open-source library that bridges S3 Tables operations to engines like Apache Spark, when used with the Apache Iceberg Open Table Format." - https://github.com/awslabs/s3-tables-catalog

From Gihub, we can see following Gradle dependencies for Amazon S3 Tables Catalog for Apache Iceberg.

dependencies {
    implementation 'software.amazon.awssdk:s3tables:2.29.26'
    implementation 'software.amazon.s3tables:s3-tables-catalog-for-iceberg:0.1.4'
}

I have created a Gradle project to pull all the dependencies.

As there is no compile time or runtime dependencies for Iceberg Spark Runtime, I have manually downloaded it from Maven repository.

Finally copied all jars to a separate folder (say, ~/extlib) and created a comma separated list of all jars which we'll set to an environment variable later.

"/extlib/s3-tables-catalog-for-iceberg-0.1.4.jar,/extlib/s3tables-2.29.26.jar,/extlib/dynamodb-2.29.26.jar,/extlib/glue-2.29.26.jar,/extlib/kms-2.29.26.jar,/extlib/aws-json-protocol-2.29.26.jar,/extlib/s3-2.29.26.jar,/extlib/sts-2.29.26.jar,/extlib/aws-xml-protocol-2.29.26.jar,/extlib/aws-query-protocol-2.29.26.jar,/extlib/protocol-core-2.29.26.jar,/extlib/aws-core-2.29.26.jar,/extlib/auth-2.29.26.jar,/extlib/regions-2.29.26.jar,/extlib/sdk-core-2.29.26.jar,/extlib/http-auth-aws-2.29.26.jar,/extlib/http-auth-2.29.26.jar,/extlib/http-auth-spi-2.29.26.jar,/extlib/identity-spi-2.29.26.jar,/extlib/apache-client-2.29.26.jar,/extlib/netty-nio-client-2.29.26.jar,/extlib/url-connection-client-2.29.26.jar,/extlib/http-client-spi-2.29.26.jar,/extlib/metrics-spi-2.29.26.jar,/extlib/json-utils-2.29.26.jar,/extlib/retries-2.29.26.jar,/extlib/retries-spi-2.29.26.jar,/extlib/checksums-2.29.26.jar,/extlib/profiles-2.29.26.jar,/extlib/arns-2.29.26.jar,/extlib/crt-core-2.29.26.jar,/extlib/utils-2.29.26.jar,/extlib/endpoints-spi-2.29.26.jar,/extlib/checksums-spi-2.29.26.jar,/extlib/http-auth-aws-eventstream-2.29.26.jar,/extlib/annotations-2.29.26.jar,/extlib/iceberg-aws-1.6.1.jar,/extlib/iceberg-core-1.6.1.jar,/extlib/caffeine-2.9.3.jar,/extlib/commons-configuration2-2.11.0.jar,/extlib/iceberg-api-1.6.1.jar,/extlib/iceberg-common-1.6.1.jar,/extlib/iceberg-bundled-guava-1.6.1.jar,/extlib/third-party-jackson-core-2.29.26.jar,/extlib/avro-1.11.3.jar,/extlib/httpclient5-5.3.1.jar,/extlib/slf4j-api-1.7.36.jar,/extlib/reactive-streams-1.0.4.jar,/extlib/eventstream-1.0.1.jar,/extlib/httpclient-4.5.13.jar,/extlib/httpcore-4.4.16.jar,/extlib/commons-codec-1.17.1.jar,/extlib/netty-codec-http2-4.1.115.Final.jar,/extlib/netty-codec-http-4.1.115.Final.jar,/extlib/netty-handler-4.1.115.Final.jar,/extlib/netty-codec-4.1.115.Final.jar,/extlib/netty-transport-classes-epoll-4.1.115.Final.jar,/extlib/netty-transport-native-unix-common-4.1.115.Final.jar,/extlib/netty-transport-4.1.115.Final.jar,/extlib/netty-buffer-4.1.115.Final.jar,/extlib/netty-resolver-4.1.115.Final.jar,/extlib/netty-common-4.1.115.Final.jar,/extlib/checker-qual-3.19.0.jar,/extlib/error_prone_annotations-2.10.0.jar,/extlib/commons-text-1.12.0.jar,/extlib/commons-lang3-3.14.0.jar,/extlib/commons-logging-1.3.2.jar,/extlib/jackson-databind-2.14.2.jar,/extlib/jackson-core-2.14.2.jar,/extlib/jackson-annotations-2.14.2.jar,/extlib/aircompressor-0.27.jar,/extlib/RoaringBitmap-1.2.0.jar,/extlib/commons-compress-1.22.jar,/extlib/httpcore5-h2-5.2.4.jar,/extlib/httpcore5-5.2.4.jar,/extlib/iceberg-spark-runtime-3.5_2.12-1.7.1.jar"
💡
In the comma-separated jars list, you will see /extlib/ though I copied all jars in ~/extlib/. Actually I'll mount ~/extlib/ to volume /extlib/ when I'll launch the Apache Spark docker container in interactive mode and Spark container get all the jars inside /extlib/ .

Create a file (say, awscred) and add your AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_REGION.

AWS_ACCESS_KEY_ID=XXXXXXXXXXXXXXXXXXXXXXXX
AWS_SECRET_ACCESS_KEY=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
AWS_REGION=XXXXXXXXXXXXXXXXXX

Set all the comma-separated jars to an environment variable using export command.

export dependencies="/extlib/s3-tables-catalog-for-iceberg-0.1.4.jar,/extlib/s3tables-2.29.26.jar,/extlib/dynamodb-2.29.26.jar,/extlib/glue-2.29.26.jar,/extlib/kms-2.29.26.jar,/extlib/aws-json-protocol-2.29.26.jar,/extlib/s3-2.29.26.jar,/extlib/sts-2.29.26.jar,/extlib/aws-xml-protocol-2.29.26.jar,/extlib/aws-query-protocol-2.29.26.jar,/extlib/protocol-core-2.29.26.jar,/extlib/aws-core-2.29.26.jar,/extlib/auth-2.29.26.jar,/extlib/regions-2.29.26.jar,/extlib/sdk-core-2.29.26.jar,/extlib/http-auth-aws-2.29.26.jar,/extlib/http-auth-2.29.26.jar,/extlib/http-auth-spi-2.29.26.jar,/extlib/identity-spi-2.29.26.jar,/extlib/apache-client-2.29.26.jar,/extlib/netty-nio-client-2.29.26.jar,/extlib/url-connection-client-2.29.26.jar,/extlib/http-client-spi-2.29.26.jar,/extlib/metrics-spi-2.29.26.jar,/extlib/json-utils-2.29.26.jar,/extlib/retries-2.29.26.jar,/extlib/retries-spi-2.29.26.jar,/extlib/checksums-2.29.26.jar,/extlib/profiles-2.29.26.jar,/extlib/arns-2.29.26.jar,/extlib/crt-core-2.29.26.jar,/extlib/utils-2.29.26.jar,/extlib/endpoints-spi-2.29.26.jar,/extlib/checksums-spi-2.29.26.jar,/extlib/http-auth-aws-eventstream-2.29.26.jar,/extlib/annotations-2.29.26.jar,/extlib/iceberg-aws-1.6.1.jar,/extlib/iceberg-core-1.6.1.jar,/extlib/caffeine-2.9.3.jar,/extlib/commons-configuration2-2.11.0.jar,/extlib/iceberg-api-1.6.1.jar,/extlib/iceberg-common-1.6.1.jar,/extlib/iceberg-bundled-guava-1.6.1.jar,/extlib/third-party-jackson-core-2.29.26.jar,/extlib/avro-1.11.3.jar,/extlib/httpclient5-5.3.1.jar,/extlib/slf4j-api-1.7.36.jar,/extlib/reactive-streams-1.0.4.jar,/extlib/eventstream-1.0.1.jar,/extlib/httpclient-4.5.13.jar,/extlib/httpcore-4.4.16.jar,/extlib/commons-codec-1.17.1.jar,/extlib/netty-codec-http2-4.1.115.Final.jar,/extlib/netty-codec-http-4.1.115.Final.jar,/extlib/netty-handler-4.1.115.Final.jar,/extlib/netty-codec-4.1.115.Final.jar,/extlib/netty-transport-classes-epoll-4.1.115.Final.jar,/extlib/netty-transport-native-unix-common-4.1.115.Final.jar,/extlib/netty-transport-4.1.115.Final.jar,/extlib/netty-buffer-4.1.115.Final.jar,/extlib/netty-resolver-4.1.115.Final.jar,/extlib/netty-common-4.1.115.Final.jar,/extlib/checker-qual-3.19.0.jar,/extlib/error_prone_annotations-2.10.0.jar,/extlib/commons-text-1.12.0.jar,/extlib/commons-lang3-3.14.0.jar,/extlib/commons-logging-1.3.2.jar,/extlib/jackson-databind-2.14.2.jar,/extlib/jackson-core-2.14.2.jar,/extlib/jackson-annotations-2.14.2.jar,/extlib/aircompressor-0.27.jar,/extlib/RoaringBitmap-1.2.0.jar,/extlib/commons-compress-1.22.jar,/extlib/httpcore5-h2-5.2.4.jar,/extlib/httpcore5-5.2.4.jar,/extlib/iceberg-spark-runtime-3.5_2.12-1.7.1.jar"

Start the Apache Saprk Docker container in interactive mode (docker run -it).

docker run -it \
   -v ~/extlib:/extlib \
   --env-file awscred \
   spark /opt/spark/bin/spark-shell \
   --jars $dependencies \
  --conf spark.sql.catalog.awsjunkie=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.awsjunkie.catalog-impl=software.amazon.s3tables.iceberg.S3TablesCatalog \
  --conf spark.sql.catalog.awsjunkie.warehouse=arn:aws:s3tables:us-east-2:3072222942166:bucket/demo-table-bucket \
  --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

Explanation:

-v ~/extlib:/extlib: Attaching the folder ( ~/extlib/) of downloaded all dependencies to volume /extlib.

--env-file awscred: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_REGION are passed as environment variables to docker container through the env file.

spark /opt/spark/bin/spark-shell: Pulls the official Apache Spark docker image from Docker Hub and runs as container and launch the spark shell. For PySpark use /opt/spark/bin/pyspark instead of /opt/spark/bin/spark-shell.

--jars $dependencies: Passing all jars as comma separated list of dependencies through a environment variable just to give the whole command a clean look.

--conf: Provided all the configurations as mentioned in "To initialize a Spark session for working with S3 tables" section of AWS user guide (Querying Amazon S3 tables with Apache Spark). awsjunkie is my Iceberg Spark session catalog name. If you change it, don't forget to update in all configurations. demo-table-bucket is the Table Bucket that I have created using AWS management console and used it here as warehouse conf.

$ docker run -it \
   -v ~/extlib:/extlib \
   --env-file awscred \
   spark /opt/spark/bin/spark-shell \
   --jars $dependencies \
  --conf spark.sql.catalog.awsjunkie=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.awsjunkie.catalog-impl=software.amazon.s3tables.iceberg.S3TablesCatalog \
  --conf spark.sql.catalog.awsjunkie.warehouse=arn:aws:s3tables:us-east-2:3072222166:bucket/demo-table-bucket \
  --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
25/01/22 18:23:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://a15abc7a21b8:4040
Spark context available as 'sc' (master = local[*], app id = local-1737570235935).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.4
      /_/
         
Using Scala version 2.12.18 (OpenJDK 64-Bit Server VM, Java 11.0.25)
Type in expressions to have them evaluated.
Type :help for more information.

scala> spark
res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@790bcc24

Hands-on:

To test, I have followed the AWS User guide Querying Amazon S3 tables with Apache Spark.

Create a namespace (logical grouping of S3 Tables).

scala> spark.sql(" CREATE NAMESPACE IF NOT EXISTS awsjunkie.testns")
res0: org.apache.spark.sql.DataFrame = []

Create Iceberg table (say, demo_table) in the namespace (testns).

scala> spark.sql("""CREATE TABLE IF NOT EXISTS awsjunkie.testns.demo_table
 (id INT,
  name STRING,
  value INT)
  USING iceberg
  """)     
res1: org.apache.spark.sql.DataFrame = []

Query the table to show all the records. It is empty now.

scala> spark.sql(" SELECT * FROM awsjunkie.testns.demo_table ").show()
+---+----+-----+
| id|name|value|
+---+----+-----+
+---+----+-----+

Insert some dummy records into demo_table.

scala> spark.sql(
"""
    INSERT INTO awsjunkie.testns.demo_table 
    VALUES 
        (1, 'ABC', 100), 
        (2, 'XYZ', 200)
""")  
res3: org.apache.spark.sql.DataFrame = []

Verify

scala> spark.sql(" SELECT * FROM awsjunkie.testns.demo_table ").show()
+---+----+-----+                                                                
| id|name|value|
+---+----+-----+
|  1| ABC|  100|
|  2| XYZ|  200|
+---+----+-----+

References