Using Athena Views As A Source In Glue
Whilst working with AWS Glue recently I noticed that I was unable to use a view created in Athena as a source for an ETL job in the same way that I could use a table that had been cataloged.
The error I received was this.
An error occurred while calling o73.getCatalogSource. No classification or connection in mydatabase.v_my_view
Rather than try and recreate the view using a new PySpark job I used the Athena JDBC drivers as a custom JAR in a glue job to be able to query the view I wanted to use.
This blog are my notes on how this works.
Drivers
Create or reuse an existing S3 bucket to store the Athena JDBC drivers JAR file. The JAR files are available to download from AWS. I used the latest version which at the time of writing was JDBC Driver with AWS SDK AthenaJDBC42_2.0.27.1000.jar (compatible with JDBC 4.2 and requires JDK 8.0 or later).
IAM
The Glue job will need not only Glue Service privileges but also IAM privileges to access the S3 Buckets and also the AWS Athena Service.
For Athena this would provide Glue will full permissions.
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:AssociateKmsKey",
"athena:*",
"logs:CreateLogGroup",
"logs:PutLogEvents"
],
"Resource": [
"arn:aws:athena:*:youraccount:workgroup/*",
"arn:aws:athena:*:youracccont:datacatalog/*",
"arn:aws:logs:*:*:/aws-glue/*"
]
}
Create Glue ETL Job
My use case for the Glue job was to query the view I had and save the results into Parquet format to speed up future queries against the same data.
The following code allows you to query an Athena view as a source for a data frame.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
athena_view_dataframe = (
glueContext.read.format("jdbc")
.option("driver", "com.simba.athena.jdbc.Driver")
.option("AwsCredentialsProviderClass","com.simba.athena.amazonaws.auth.InstanceProfileCredentialsProvider")
.option("url", "jdbc:awsathena://athena.eu-west-1.amazonaws.com:443")
.option("dbtable", "AwsDataCatalog.yourathenadatabase.yourathenaview")
.option("S3OutputLocation","s3://yours3bucket/temp")
.load()
)
athena_view_dataframe.printSchema()
The key things in this code snippet to be aware of are.
.option("driver", "com.simba.athena.jdbc.Driver")
We are telling Glue which class within the JDBC driver to use.
.option("AwsCredentialsProviderClass","com.simba.athena.amazonaws.auth.InstanceProfileCredentialsProvider")
This uses the IAM role assigned to the Glue job to authenticate to Athena. You can use other authentication method like AWS_ACCESS_KEY or federated authentication but using IAM I think makes most sense for an ETL job that will most likely run on a schedule or event.
.option("url", "jdbc:awsathena://athena.eu-west-1.amazonaws.com:443")
I am using Athena in Ireland (EU-WEST-1) if you are using a different region update this accordingly.
.option("dbtable", "AwsDataCatalog.yourathenadatabase.yourathenaview")
The fully qualified name of view in your Athena catalog. It’s in the format of ‘AwsDataCatalog.Database.View’. For example this query run in Athena.
SELECT * FROM "AwsDataCatalog"."vehicles"."v_electric_cars";
You would set the dbtable option to this
The last option tells Glue which S3 location to use as temporary storage to store the data returned from Athena.
.option("dbtable", "AwsDataCatalog.vehicles.v_electric_cars")
At this point you can test it works. When running the job you need to tell Glue about the location for the Athena JDBC drivers JAR file that was uploaded to S3.
If you are working in the AWS Glue Console the parameter to set can be found under Job Details → Advanced → Dependent JARs path.
The parameter needs to be set to the full path and filename of the JAR file. For example s3://yours3bucket/jdbc-drivers/AthenaJDBC42_2.0.27.1000.jar
By setting this in the console it ensures that the correct argument is passed into the Glue job.
--extra-jars s3://yours3bucket/jdbc-drivers/AthenaJDBC42_2.0.27.1000.jar
The final code including the conversion to Parquet format looked like this.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
athena_view_dataframe = (
glueContext.read.format("jdbc")
.option("driver", "com.simba.athena.jdbc.Driver")
.option("AwsCredentialsProviderClass","com.simba.athena.amazonaws.auth.InstanceProfileCredentialsProvider")
.option("url", "jdbc:awsathena://athena.eu-west-1.amazonaws.com:443")
.option("dbtable", "AwsDataCatalog.vehicles.v_electric_cars")
.option("S3OutputLocation","s3://yours3bucket/temp")
.load()
)
athena_view_dataframe.printSchema()
athena_view_datasource = DynamicFrame.fromDF(athena_view_dataframe, glueContext, "athena_view_source")
pq_output = glueContext.write_dynamic_frame.from_options(
frame=athena_view_datasource,
connection_type="s3",
format="glueparquet",
connection_options={
"path": "s3://yourotherS3Bucket/",
"partitionKeys": [],
},
format_options={"compression": "snappy"},
transformation_ctx="ParquetConversion",
)
job.commit()
Originally published at https://dev.to on February 16, 2022.