AWS Glue – All you need to Simplify ETL process

#########################################

### IMPORT LIBRARIES AND SET VARIABLES

#########################################

#Import python modules

from datetime import datetime

#Import pyspark modules

from pyspark.context import SparkContext

import pyspark.sql.functions as f

#Import glue modules

from awsglue.utils import getResolvedOptions

from awsglue.context import GlueContext

from awsglue.dynamicframe import DynamicFrame

from awsglue.job import Job

#Initialize contexts and session

spark_context = SparkContext.getOrCreate()

glue_context = GlueContext(spark_context)

session = glue_context.spark_session

#Parameters

glue_db = "glue-demo-edureka-db"

glue_tbl = "read"

s3_write_path = "s3://glue-demo-bucket-edureka/write"

#########################################

### EXTRACT (READ DATA)

#########################################

#Log starting time

dt_start = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

print("Start time:", dt_start)

#Read movie data to Glue dynamic frame

dynamic_frame_read = glue_context.create_dynamic_frame.from_catalog(database = glue_db, table_name = glue_tbl)

#Convert dynamic frame to data frame to use standard pyspark functions

data_frame = dynamic_frame_read.toDF()

#########################################

### TRANSFORM (MODIFY DATA)

#########################################

#Create a decade column from year

decade_col = f.floor(data_frame["year"]/10)*10

data_frame = data_frame.withColumn("decade", decade_col)

#Group by decade: Count movies, get average rating

data_frame_aggregated = data_frame.groupby("decade").agg(

f.count(f.col("movie_title")).alias('movie_count'),

f.mean(f.col("rating")).alias('rating_mean'),

)

#Sort by the number of movies per the decade

data_frame_aggregated = data_frame_aggregated.orderBy(f.desc("movie_count"))

#Print result table

#Note: Show function is an action. Actions force the execution of the data frame plan.

#With big data the slowdown would be significant without cacching.

data_frame_aggregated.show(10)

#########################################

### LOAD (WRITE DATA)

#########################################

#Create just 1 partition, because there is so little data

data_frame_aggregated = data_frame_aggregated.repartition(1)

#Convert back to dynamic frame

dynamic_frame_write = DynamicFrame.fromDF(data_frame_aggregated, glue_context, "dynamic_frame_write")

#Write data back to S3

glue_context.write_dynamic_frame.from_options(

frame = dynamic_frame_write,

connection_type = "s3",

connection_options = {

"path": s3_write_path,

#Here you could create S3 prefixes according to a values in specified columns

#"partitionKeys": ["decade"]

},

format = "csv"

)

#Log end time

dt_end = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

print("Start time:", dt_end)

sexvideosschool stripvidz.info sex tube8
sexy comedy lambotube.mobi xosspi
南条れいな tomodachinpo.mobi 甜美素人
tamil virgin sex videos booketube.mobi sexi bidio
eleohanttube matureporntrends.com punjabi women sex
pyaasi jawani hindifuckvideo.com movies with only sex
kowalasky sex fuckvidstube.com kannada mp3
english sex sexy video redwap3.com bp hindi sexy
kannada sax video onlyporn.mobi indian sex mivies
xzxcom feetporntrends.com naat hd
ترجمة سكس black-pornstar.com صور متحركة للجماع
افلام بورن مترجم porno-videos-x.com افلام سكس عبد الفتاح الصعيدى
hataraku maou sama hentai hentai-art.net venelana
جنس غريب onyxarabians.com سكس اب وبنتة
inu no kimochi li onhentai.com momonosuidousui