Simple way to query Amazon Athena in python with boto3

ETL takes time and it's a lot to maintain. Sometimes it breaks when you didn't expect a string to contain emojis. You might decide the transformation needs to be changed, which means you need to refresh all your data. So what can you do to avoid this?


Data virtualization helps tremendously with that. It's not the answer to everything of course, but it makes certain things loads easier. When you don't have to worry too much about the structure in which to store your data, you can choose to keep it in its richest, untouched format. All of it. So when you need it in a specific format, it's all there. With data virtualization, you impose a schema on read, not on write, and you can change that schema as you wish.


Towards the end of 2016, Amazon launched Athena - and it's pretty awesome. We can e.g. store our raw JSON data in S3, define virtual databases with virtual tables on top of them and query these tables with SQL. And on top of everything, it is quite simple to take into use.

To give it a go, just dump some raw data files (e.g. CSV, JSON or log files) into an S3 bucket, head over to Amazon Athena and run a wizard that takes you through a virtual table creation step-by-step. If you are familiar with Hive, you will find that the Data Definition Language is identical. Once a table is created, it's ready to be queried.

If you wish to run queries to Athena from e.g. python, you have a few options, for example
  1. PyAthenaJDBC
  2. JayDeBeApi
  3. boto3
From a user experience point of view the PyAthenaJDBC would have been my preferred order too, as the first two would have let me query easily into a pandas DataFrame, but I was too lazy to compile the PyAthenaJDBC on my Windows machine (would've required Visual C++ Build Tools which I didn't have). JayDeBeApi looked like a hassle to set up.

Boto3 was something I was already familiar with. With boto3, you specify the S3 path where you want to store the results, wait for the query execution to finish and fetch the file once it is there. And clean up afterwards. Once all of this is wrapped in a function, it gets really manageable.

If you want to see the code, go ahead and copy-paste this gist: query Athena using boto3. I'll explain the code below.

First let's start with our configurations. Fill these with your own details of course.

import boto3
import pandas as pd
import io
import re
import time

params = {
    'region': 'eu-central-1',
    'database': 'databasename',
    'bucket': 'your-bucket-name',
    'path': 'temp/athena/output',
    'query': 'SELECT * FROM tablename LIMIT 100'
}

session = boto3.Session()
The following function will dispatch the query to Athena with our details and return an execution object. Our query will be handled in the background by Athena asynchronously.
def athena_query(client, params):
    
    response = client.start_query_execution(
        QueryString=params["query"],
        QueryExecutionContext={
            'Database': params['database']
        },
        ResultConfiguration={
            'OutputLocation': 's3://' + params['bucket'] + '/' + params['path']
        }
    )
    return response

To make things a little easier for us, the next function will do a few things for us. It will:
  1. Dispatch the query to Athena
  2. Poll the results and once the query is finished
  3. Return the filename in S3 where the query results are stored
def athena_to_s3(session, params, max_execution = 5):
    client = session.client('athena', region_name=params["region"])
    execution = athena_query(client, params)
    execution_id = execution['QueryExecutionId']
    state = 'RUNNING'

    while (max_execution > 0 and state in ['RUNNING', 'QUEUED']):
        max_execution = max_execution - 1
        response = client.get_query_execution(QueryExecutionId = execution_id)

        if 'QueryExecution' in response and \
                'Status' in response['QueryExecution'] and \
                'State' in response['QueryExecution']['Status']:
            state = response['QueryExecution']['Status']['State']
            if state == 'FAILED':
                return False
            elif state == 'SUCCEEDED':
                s3_path = response['QueryExecution']['ResultConfiguration']['OutputLocation']
                filename = re.findall('.*\/(.*)', s3_path)[0]
                return filename
        time.sleep(1)
    
    return False
If you want to clean things up after, this piece of code removes all objects in the path:

# Deletes all files in your path so use carefully!
def cleanup(session, params):
    s3 = session.resource('s3')
    my_bucket = s3.Bucket(params['bucket'])
    for item in my_bucket.objects.filter(Prefix=params['path']):
        item.delete()
Best be careful with that one ;)

Once you have all these defined, you can call
# Query Athena and get the s3 filename as a result
s3_filename = athena_to_s3(session, params)

# Removes all files from the s3 folder you specified, so be careful
cleanup(session, params)
If you enjoyed these instructions, would you mind commenting below 👇🏻?
Also, if there's something this did not help you with regarding querying Athena with boto3, I'd be happy to improve the article.

Last update 25.3.2019

Comments

  1. This is very clear and concise. Have you tried such code in a context of a query for a web page backend? I am curious what performance of something like this look like and if it is only functional for ETL.

    ReplyDelete
    Replies
    1. Heyy thanks for the question! First comment on the blog. Cheers! 🍺

      I have not, but off the bat I would recommend against it. Even with simple data sets and queries total response times easily get up to 1-2 seconds. If that's ok though?

      How real time would it need to be? If possible, I would consider caching the result CSV in S3 and only go back to Athena when you need to refresh the file.

      Delete
  2. Hi!
    Thanks for the guide!

    In the configurations section:

    session = boto3.Session(profile_name='profile-optional')

    What do you mean with "profile_name='profile-optional'"?

    ReplyDelete
    Replies
    1. Hi! Thanks for the question!

      If you have multiple profiles configured in your .aws/.config -file, then this is the place to specify which profile you want to use.

      But I suppose for simplicity it's best to leave that out or at least clarify it a bit more. Thanks!

      Delete

Post a Comment

Popular posts from this blog

Snowflake UPSERT operation (aka MERGE)