In [30]:
import boto3
from jinja2 import Template
import re
import sys
from retrying import retry

<b> We will be using public Flights_parquet dataset for performing DDL/DML operations using API </b>
#### <b> Environment setup </b>

In [31]:
flight_paruqet_create_table = """
CREATE EXTERNAL TABLE `default`.`flights_parquet`(
  `yr` int, 
  `quarter` int, 
  `month` int, 
  `dayofmonth` int, 
  `dayofweek` int, 
  `flightdate` string, 
  `uniquecarrier` string, 
  `airlineid` int, 
  `carrier` string, 
  `tailnum` string, 
  `flightnum` string, 
  `originairportid` int, 
  `originairportseqid` int, 
  `origincitymarketid` int, 
  `origin` string, 
  `origincityname` string, 
  `originstate` string, 
  `originstatefips` string, 
  `originstatename` string, 
  `originwac` int, 
  `destairportid` int, 
  `destairportseqid` int, 
  `destcitymarketid` int, 
  `dest` string, 
  `destcityname` string, 
  `deststate` string, 
  `deststatefips` string, 
  `deststatename` string, 
  `destwac` int, 
  `crsdeptime` string, 
  `deptime` string, 
  `depdelay` int, 
  `depdelayminutes` int, 
  `depdel15` int, 
  `departuredelaygroups` int, 
  `deptimeblk` string, 
  `taxiout` int, 
  `wheelsoff` string, 
  `wheelson` string, 
  `taxiin` int, 
  `crsarrtime` int, 
  `arrtime` string, 
  `arrdelay` int, 
  `arrdelayminutes` int, 
  `arrdel15` int, 
  `arrivaldelaygroups` int, 
  `arrtimeblk` string, 
  `cancelled` int, 
  `cancellationcode` string, 
  `diverted` int, 
  `crselapsedtime` int, 
  `actualelapsedtime` int, 
  `airtime` int, 
  `flights` int, 
  `distance` int, 
  `distancegroup` int, 
  `carrierdelay` int, 
  `weatherdelay` int, 
  `nasdelay` int, 
  `securitydelay` int, 
  `lateaircraftdelay` int, 
  `firstdeptime` string, 
  `totaladdgtime` int, 
  `longestaddgtime` int, 
  `divairportlandings` int, 
  `divreacheddest` int, 
  `divactualelapsedtime` int, 
  `divarrdelay` int, 
  `divdistance` int, 
  `div1airport` string, 
  `div1airportid` int, 
  `div1airportseqid` int, 
  `div1wheelson` string, 
  `div1totalgtime` int, 
  `div1longestgtime` int, 
  `div1wheelsoff` string, 
  `div1tailnum` string, 
  `div2airport` string, 
  `div2airportid` int, 
  `div2airportseqid` int, 
  `div2wheelson` string, 
  `div2totalgtime` int, 
  `div2longestgtime` int, 
  `div2wheelsoff` string, 
  `div2tailnum` string, 
  `div3airport` string, 
  `div3airportid` int, 
  `div3airportseqid` int, 
  `div3wheelson` string, 
  `div3totalgtime` int, 
  `div3longestgtime` int, 
  `div3wheelsoff` string, 
  `div3tailnum` string, 
  `div4airport` string, 
  `div4airportid` int, 
  `div4airportseqid` int, 
  `div4wheelson` string, 
  `div4totalgtime` int, 
  `div4longestgtime` int, 
  `div4wheelsoff` string, 
  `div4tailnum` string, 
  `div5airport` string, 
  `div5airportid` int, 
  `div5airportseqid` int, 
  `div5wheelson` string, 
  `div5totalgtime` int, 
  `div5longestgtime` int, 
  `div5wheelsoff` string, 
  `div5tailnum` string)
PARTITIONED BY ( 
  `year` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://athena-examples-us-east-1/flight/parquet'
TBLPROPERTIES (
  'transient_lastDdlTime'='1574436962')
"""

In [17]:
s3_loc = 's3://athena-output-jeetesh/redshiftadmin/testexec1'

### We will revist these functions, once the environment has been set up

In [18]:
athena = boto3.client('athena')
s3     = boto3.resource('s3')

@retry(stop_max_attempt_number = 10,
    wait_exponential_multiplier = 10,
    wait_exponential_max = 10 * 20 * 100)

def poll_status(_id):
    result = athena.get_query_execution( QueryExecutionId = _id )
    state  = result['QueryExecution']['Status']['State']

    if state == 'SUCCEEDED':
        return result
    elif state == 'FAILED':
        return result
    else:
        raise Exception

# Function for executing athena queries
def run_query(query, database, s3_output):
    client = boto3.client('athena')
    response = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': database
        },
        ResultConfiguration={
            'OutputLocation': s3_output,
        }
    )
    QueryExecutionId = response['QueryExecutionId']
    result = poll_status(QueryExecutionId)

    if result['QueryExecution']['Status']['State'] == 'SUCCEEDED':
        print("Query SUCCEEDED: {}".format(QueryExecutionId))

    print('Execution ID: ' + response['QueryExecutionId'])
    global execution_id
    execution_id = 'QueryExecutionId'
    return response

def execute_athena_query(s3_output,query_stmt):
    database = 'default'
    s3_output = s3_loc
    query = query_stmt
    queries = [query]
    for q in queries:
            #print("Executing query: %s" % (q))
            print("Executing query: {}".format(q))
            #result = run_query(query, database, s3_ouput)
            res = run_query(q, database, s3_output)

In [19]:
#Execute query and print execution results 
execute_athena_query(s3_loc,flight_paruqet_create_table)

Executing query: 
CREATE EXTERNAL TABLE `flights_parquet`(
  `yr` int, 
  `quarter` int, 
  `month` int, 
  `dayofmonth` int, 
  `dayofweek` int, 
  `flightdate` string, 
  `uniquecarrier` string, 
  `airlineid` int, 
  `carrier` string, 
  `tailnum` string, 
  `flightnum` string, 
  `originairportid` int, 
  `originairportseqid` int, 
  `origincitymarketid` int, 
  `origin` string, 
  `origincityname` string, 
  `originstate` string, 
  `originstatefips` string, 
  `originstatename` string, 
  `originwac` int, 
  `destairportid` int, 
  `destairportseqid` int, 
  `destcitymarketid` int, 
  `dest` string, 
  `destcityname` string, 
  `deststate` string, 
  `deststatefips` string, 
  `deststatename` string, 
  `destwac` int, 
  `crsdeptime` string, 
  `deptime` string, 
  `depdelay` int, 
  `depdelayminutes` int, 
  `depdel15` int, 
  `departuredelaygroups` int, 
  `deptimeblk` string, 
  `taxiout` int, 
  `wheelsoff` string, 
  `wheelson` string, 
  `taxiin` int, 
  `crsarrtime` int, 


In [39]:
#The command updates the metadata in the catalog regarding the partitions and the data associated with them
repair_table = """MSCK REPAIR TABLE flights_parquet;"""
execute_athena_query(s3_loc,repair_table)


Executing query: MSCK REPAIR TABLE flights_parquet;
Query SUCCEEDED: 340cdf0b-7727-40d8-8c8e-7face8c8adf1
Execution ID: 340cdf0b-7727-40d8-8c8e-7face8c8adf1


## Helper functions

In [20]:
def alias_column(eval_ctx, value=None, indent=None):
    return [k + " AS " + v for k, v in eval_ctx.items()][0]

def name_column(eval_ctx, value=None, indent=None):
    return [k for k, v in eval_ctx.items()][0]

def agg_alias_column(eval_ctx, value=None, indent=None):
    return [ " AS " + v for k, v in eval_ctx.items()][0]

def agg_name_column(eval_ctx, value=None, indent=None):
    return [k for k, v in eval_ctx.items()][0]

# Return column list after converting from string to dict
def columns(col_list):
    column2 = eval(col_list)
    return column2

#Return aggregation column list after converting fom string to dict
def agg_column(agg_col_list):
    agg_column1 = eval(agg_col_list)
    return agg_column1

## SQL template

### Query Template construct
    Aggregation: "sum",
    Table1  :"flights_parquet", 
    Database :"default",'
    Table2  :None', --> if table2 name is None , skip any type of join clause
    Database2 : 'None' ,
    Type of join : "LEFT OUTER JOIN", 
    Join column table 1:"col1" 
    Join column table2 :"col2", 
    Filter column:'yr', 
    Lower Limit: "2015", 
    Upper Limit: "2018", 
    Group by all columns in selection clause: "'All'",
    Dictionary for column Alias: "[{'yr': 'yr'}, {'month': 'month'}, {'dayofmonth': 'dayofmonth'}, {'carrier': 
                                   'carrier'}, {'originairportid': 'originairportid'}]",
    Aggregation column: "[{'depdelay':'depdelay'}]")

In [21]:
sql_template = Template("""
SELECT
    {{ column1 |map('alias_column')| join(',\n') }}
,   {{agg_type}} ({{ agg_column1 |map('agg_name_column')| join(',\n') }}){{agg_column1 |map('agg_alias_column')| join(',\n')}}

FROM {{database_name_1}}.{{table_name_1}} a
 {% if table_name_2 != 'None' %}
 {{join_type_1}}
 {{database_name_2}}.{{table_name_2}} b
  on a.{{column_name_1}} = b.{{column_name_2}}
{% endif %}
{% if filtercol1 != 'None' %}
WHERE
     {{filtercol1}} BETWEEN {{filtercol1lb}} and {{filtercol1ub}}
{% endif %}

 {% if groupby != 'None' %}
GROUP BY
{{ column1 | map('name_column') | join(',\n') }}
{% endif %}
""")

## SQL template initialization

In [22]:
sql_template.environment.filters['alias_column']     = alias_column
sql_template.environment.filters['name_column']      = name_column
sql_template.environment.filters['agg_alias_column'] = agg_alias_column
sql_template.environment.filters['agg_name_column']  = agg_name_column

In [23]:
global a, b, c, d, e, f, g, h, i, j, kk, l, m, n
# Generate output based on the input value received
def output_template(a, b, c, d, e, f, g, h, i, j, kk, l, m, n):
    # print(sys.argv[14])
    format_string = sql_template.render(
        agg_type=a,
        table_name_1=b,
        database_name_1=c,
        table_name_2=d,
        database_name_2=e,
        join_type_1=f,
        column_name_1=g,
        column_name_2=h,
        filtercol1=i,
        filtercol1lb=j,
        filtercol1ub=kk,
        groupby=l,
        column1=columns(m),
        agg_column1=agg_column(n)

    )
    return (sql_stmt(format_string))

#print(output_template("sum" "flights_parquet" "default"'None' 'None' "LEFT OUTER JOIN" "col1" "col2" 'yr' "2015" "2018" "'All'""[{'yr': 'yr'}, {'month': 'month'}, {'dayofmonth': 'dayofmonth'}, {'carrier': 'carrier'}, {'originairportid': 'originairportid'}]""[{'depdelay':'depdelay'}]"
#))

In [24]:
def sql_stmt(string):
    # Remove any blank lines in the generated SQL statement
    output_query = re.sub(r'\n\s*\n', '\n', string, re.MULTILINE)
    #print(output_query)
    return output_query

### Print generated SQL statement 

In [25]:
print(output_template("sum","flights_parquet", "default",'None', 'None' ,"LEFT OUTER JOIN", "col1" ,"col2", 'yr', "2015", "2018", "'All'","[{'yr': 'yr'}, {'month': 'month'}, {'dayofmonth': 'dayofmonth'}, {'carrier': 'carrier'}, {'originairportid': 'originairportid'}]","[{'depdelay':'depdelay'}]"))


SELECT
    yr AS yr,
month AS month,
dayofmonth AS dayofmonth,
carrier AS carrier,
originairportid AS originairportid
,   sum (depdelay) AS depdelay
FROM default.flights_parquet a
WHERE
     yr BETWEEN 2015 and 2018
GROUP BY
yr,
month,
dayofmonth,
carrier,
originairportid



<b>SQL statement looks good, now lets take SQL generator and embed it into an API call.
API would pass parameters to generate SQL statement, get execution id from Athena, and 
store results in S3 bucket</b>

<b> Note : API's can also be used to perform DDL operation, such as create, drop objects </b>

### Athena API to take generate SQL , execute and store results into predefined s3 bucket. Primarily DML operations


### Initialize 
    boto3 client 
    Configure retry parameters

In [None]:
athena = boto3.client('athena')
s3     = boto3.resource('s3')

In [None]:
@retry(stop_max_attempt_number = 10,
    wait_exponential_multiplier = 10,
    wait_exponential_max = 10 * 20 * 100)

def poll_status(_id):
    result = athena.get_query_execution( QueryExecutionId = _id )
    state  = result['QueryExecution']['Status']['State']

    if state == 'SUCCEEDED':
        return result
    elif state == 'FAILED':
        return result
    else:
        raise Exception

In [None]:
# Function for executing athena queries traiging based on the state
def run_query(query, database, s3_output):
    client = boto3.client('athena')
    response = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': database
        },
        ResultConfiguration={
            'OutputLocation': s3_output,
        }
    )
    QueryExecutionId = response['QueryExecutionId']
    result = poll_status(QueryExecutionId)

    if result['QueryExecution']['Status']['State'] == 'SUCCEEDED':
        print("Query SUCCEEDED: {}".format(QueryExecutionId))

    print('Execution ID: ' + response['QueryExecutionId'])
    global execution_id
    execution_id = 'QueryExecutionId'
    return response

<b> Returns sql statmement from output_template function </b>

In [None]:
def print_sql():
    sql_stmt = output_template("sum","flights_parquet", "default",'None', 'None' ,"LEFT OUTER JOIN", "col1" ,"col2", 'yr', "2015", "2018", "'All'","[{'yr': 'yr'}, {'month': 'month'}, {'dayofmonth': 'dayofmonth'}, {'carrier': 'carrier'}, {'originairportid': 'originairportid'}]","[{'depdelay':'depdelay'}]")
    return sql_stmt

### Execute query and output results in S3

In [None]:
def execute_athena_query(s3_output,query_stmt):
    database = 'default'
    s3_output = s3_loc
    query = query_stmt
    queries = [query]
    for q in queries:
            #print("Executing query: %s" % (q))
            print("Executing query: {}".format(q))
            #result = run_query(query, database, s3_ouput)
            res = run_query(q, database, s3_output)
    

In [None]:
#Execute query and print execution results 
execute_athena_query(s3_loc,print_sql())

### Let's use the same SQL statement to create a view 

In [None]:
create_table_view = """create view default.flights_parquet_analyzed as
(
SELECT
    yr AS yr,
month AS month,
dayofmonth AS dayofmonth,
carrier AS carrier,
originairportid AS originairportid
,   sum (depdelay) AS depdelay
FROM default.flights_parquet a
WHERE
     yr BETWEEN 2015 and 2018
GROUP BY
yr,
month,
dayofmonth,
carrier,
originairportid
)"""


In [None]:
execute_athena_query(s3_loc,create_table_view)

### Lets use this view to load data into quicksight and develop widgets showing data