Logo DWBI.org Login / Sign Up
Sign Up
Have Login?
This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.
Login
New Account?
Recovery
Go to Login
By continuing you indicate that you agree to Terms of Service and Privacy Policy of the site.
AWS Analytics

Process Data as Job Steps in EMR

Updated on Oct 12, 2021

We can submit jobs and interact directly with the data frameworks that is installed in the Amazon EMR cluster. Alternatively, we can submit one or more ordered steps to an Amazon EMR cluster. Each step is a unit of work that contains instructions to manipulate data for processing by the data framework installed on the cluster.

Let us now Process our Data as Job Steps in EMR. The following is an example process using four steps:

  1. Copy input dataset from S3 to HDFS using S3DistCp
  2. Process the input data as external & Hive managed tables by using a Hive program.
  3. Process a second input dataset by using a Pig program.
  4. Process the intermediate dataset by using Spark program.

Go to the EMR Steps Tab & Add a Step.

EMR Step1
EMR Step1
  • Name: Copy Data from S3 to HDFS using S3DistCp
  • JAR location: command-runner.jar
  • Arguments: s3-dist-cp --src=s3://aws-bda-demo/datasets/ --dest=hdfs:///datasets
  • Action on failure: Continue
EMR Step1 Success
EMR Step1 Success
EMR Step1 MapReduce Job
EMR Step1 MapReduce Job

Next let's add another EMR Step. Lets use Hive SQL script to create & load data to external tables.

EMR Step2- Hive
EMR Step2- Hive
  • Name: Create & Load Hive Dimension Tables; Create Hive Fact Tables
  • JAR location: command-runner.jar
  • Arguments: hive-script --run-hive-script --args -f s3://aws-bda-demo/scripts/tables.hql
  • Action on failure: Continue
--tables.hql

-- Databases
CREATE DATABASE sales_staging;
CREATE DATABASE sales_analytics;

-- Dates
CREATE EXTERNAL TABLE IF NOT EXISTS sales_staging.ext_dates (
  year_number INT,
  month_number INT,
  day_of_year_number INT,
  day_of_month_number INT,
  day_of_week_number INT,
  week_of_year_number INT,
  day_name VARCHAR(20),
  month_name VARCHAR (20),
  quarter_number INT,
  quarter_name VARCHAR(2),
  year_quarter_name VARCHAR(10),
  weekend_ind VARCHAR(1),
  days_in_month_qty INT,
  date_sk INT,
  day_desc VARCHAR(10),
  week_sk INT,
  day_date DATE,
  week_name VARCHAR(10),
  week_of_month_number INT,
  week_of_month_name VARCHAR(10),
  month_sk INT,
  quarter_sk INT,
  year_sk INT,
  year_sort_number VARCHAR(4),
  day_of_week_sort_name VARCHAR(10)
) 
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION '/datasets/dates/'
TBLPROPERTIES ("skip.header.line.count"="1");

CREATE TABLE IF NOT EXISTS sales_analytics.dim_date(
  year_number INT,
  month_number INT,
  day_of_year_number INT,
  day_of_month_number INT,
  day_of_week_number INT,
  week_of_year_number INT,
  day_name VARCHAR(20),
  month_name VARCHAR (20),
  quarter_number INT,
  quarter_name VARCHAR(2),
  year_quarter_name VARCHAR(10),
  weekend_ind VARCHAR(1),
  days_in_month_qty INT,
  date_sk INT,
  day_desc VARCHAR(10),
  week_sk INT,
  day_date DATE,
  week_name VARCHAR(10),
  week_of_month_number INT,
  week_of_month_name VARCHAR(10),
  month_sk INT,
  quarter_sk INT,
  year_sk INT,
  year_sort_number VARCHAR(4),
  day_of_week_sort_name VARCHAR(10)
)
STORED AS ORC
TBLPROPERTIES ("orc.compress"="SNAPPY");

INSERT OVERWRITE TABLE sales_analytics.dim_date SELECT * FROM sales_staging.ext_dates;
ANALYZE TABLE sales_analytics.dim_date COMPUTE STATISTICS FOR COLUMNS;


-- Showroom
CREATE EXTERNAL TABLE IF NOT EXISTS sales_staging.ext_showroom (
  id INT,
  code VARCHAR(40),
  name VARCHAR(50),
  operation_date DATE,
  staff_count INT,
  country VARCHAR(50),
  state VARCHAR(50),
  address VARCHAR(50),
  update_date TIMESTAMP,
  create_date TIMESTAMP
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION '/datasets/showroom/'
TBLPROPERTIES ("skip.header.line.count"="1");

CREATE TABLE IF NOT EXISTS sales_analytics.dim_showroom(
  id INT,
  code VARCHAR(40),
  name VARCHAR(50),
  operation_date DATE,
  staff_count INT,
  country VARCHAR(50),
  state VARCHAR(50),
  address VARCHAR(50),
  update_date TIMESTAMP,
  create_date TIMESTAMP
)
STORED AS ORC
TBLPROPERTIES ("orc.compress"="SNAPPY");

INSERT OVERWRITE TABLE sales_analytics.dim_showroom SELECT * FROM sales_staging.ext_showroom;
ANALYZE TABLE sales_analytics.dim_showroom COMPUTE STATISTICS FOR COLUMNS;


-- Customer
CREATE EXTERNAL TABLE IF NOT EXISTS sales_staging.ext_customer (
  id INT,
  first_name VARCHAR(50),
  last_name VARCHAR(50),
  gender VARCHAR(50),
  dob DATE,
  company VARCHAR(50),
  job VARCHAR(50),
  email VARCHAR(50),
  country VARCHAR(50),
  state VARCHAR(50),
  address VARCHAR(50),
  update_date TIMESTAMP,
  create_date TIMESTAMP
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION '/datasets/customer/'
TBLPROPERTIES ("skip.header.line.count"="1");

CREATE TABLE IF NOT EXISTS sales_analytics.dim_customer(
  id INT,
  first_name VARCHAR(50),
  last_name VARCHAR(50),
  gender VARCHAR(50),
  dob DATE,
  company VARCHAR(50),
  job VARCHAR(50),
  email VARCHAR(50),
  country VARCHAR(50),
  state VARCHAR(50),
  address VARCHAR(50),
  update_date TIMESTAMP,
  create_date TIMESTAMP
)
STORED AS ORC
TBLPROPERTIES ("orc.compress"="SNAPPY");

INSERT OVERWRITE TABLE sales_analytics.dim_customer SELECT * FROM sales_staging.ext_customer;
ANALYZE TABLE sales_analytics.dim_customer COMPUTE STATISTICS FOR COLUMNS;


-- Product
CREATE EXTERNAL TABLE IF NOT EXISTS sales_staging.ext_product (
  id INT,
  code VARCHAR(50),
  category VARCHAR(6),
  make VARCHAR(50),
  model VARCHAR(50),
  year VARCHAR(50),
  color VARCHAR(50),
  price INT,
  currency VARCHAR(50),
  update_date TIMESTAMP,
  create_date TIMESTAMP
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION '/datasets/product/'
TBLPROPERTIES ("skip.header.line.count"="1");

CREATE TABLE IF NOT EXISTS sales_analytics.dim_product(
  id INT,
  code VARCHAR(50),
  category VARCHAR(6),
  make VARCHAR(50),
  model VARCHAR(50),
  year VARCHAR(50),
  color VARCHAR(50),
  price INT,
  currency VARCHAR(50),
  update_date TIMESTAMP,
  create_date TIMESTAMP
)
STORED AS ORC
TBLPROPERTIES ("orc.compress"="SNAPPY");

INSERT OVERWRITE TABLE sales_analytics.dim_product SELECT * FROM sales_staging.ext_product;
ANALYZE TABLE sales_analytics.dim_product COMPUTE STATISTICS FOR COLUMNS;


-- Sales
CREATE EXTERNAL TABLE IF NOT EXISTS sales_staging.ext_sales (
  id INT,
  order_number VARCHAR(50),
  customer_id INT,
  showroom_id INT,
  product_id INT,
  quantity INT,
  discount INT,
  amount INT,
  delivered VARCHAR(50),
  card_type VARCHAR(50),
  card_number VARCHAR(50),
  txn_date DATE,
  update_date TIMESTAMP,
  create_date TIMESTAMP
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION '/datasets/sales/'
TBLPROPERTIES ("skip.header.line.count"="1");

CREATE TABLE IF NOT EXISTS sales_analytics.fact_sales (
  id INT,
  order_number VARCHAR(50),
  customer_id INT,
  showroom_id INT,
  product_id INT,
  quantity INT,
  discount INT,
  amount INT,
  net_amount INT,
  delivered VARCHAR(50),
  card_type VARCHAR(50),
  card_number VARCHAR(50),
  update_date TIMESTAMP,
  create_date TIMESTAMP
)
PARTITIONED BY (txn_date DATE)
STORED AS ORC 
TBLPROPERTIES ("orc.compress"="SNAPPY");


-- Stocks
CREATE EXTERNAL TABLE IF NOT EXISTS sales_staging.ext_stocks (
  id INT,
  showroom_id INT,
  product_id INT,
  quantity INT,
  stock_date DATE,
  update_date TIMESTAMP,
  create_date TIMESTAMP
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION '/datasets/stocks/'
TBLPROPERTIES ("skip.header.line.count"="1");

CREATE TABLE IF NOT EXISTS sales_analytics.fact_stocks (
  id INT,
  showroom_id INT,
  product_id INT,
  quantity INT,
  stock_amount INT,
  update_date TIMESTAMP,
  create_date TIMESTAMP
)
PARTITIONED BY (stock_date DATE)
STORED AS ORC 
TBLPROPERTIES ("orc.compress"="SNAPPY");
EMR Step2 Success
EMR Step2 Success
EMR Step2 Tez Hive Job
EMR Step2 Tez Hive Job
Showroom Dimension Data Preview
Showroom Dimension Data Preview

Next let's add another EMR Step. Lets use Pig script to perform some ETL.

EMR Step3- Pig
EMR Step3- Pig
  • Name: Pig ETL Hive Fact Tables
  • JAR location: command-runner.jar
  • Arguments: pig-script --run-pig-script --args -useHCatalog -f s3://aws-bda-demo/scripts/etl.pig
  • Action on failure: Continue
-- etl.pig

SET mapred.output.direct.NativeS3FileSystem false;
SET mapred.output.direct.EmrFileSystem false;

-- Product
ext_product = LOAD 'sales_staging.ext_product' USING org.apache.hive.hcatalog.pig.HCatLoader();

-- Sales
ext_sales = LOAD 'sales_staging.ext_sales' USING org.apache.hive.hcatalog.pig.HCatLoader();
sales_product = JOIN ext_sales BY (product_id), ext_product BY (id) PARALLEL 2;
fact_sales = FOREACH sales_product GENERATE ext_sales::id AS id, order_number AS order_number, customer_id AS customer_id, showroom_id AS showroom_id, product_id AS product_id, quantity AS quantity, discount AS discount, price*quantity AS amount, (price*quantity)- discount AS net_amount, delivered AS delivered, card_type AS card_type, card_number AS card_number, ext_sales::update_date AS update_date, ext_sales::create_date AS create_date, txn_date AS txn_date;
STORE fact_sales INTO 'sales_analytics.fact_sales' USING org.apache.hive.hcatalog.pig.HCatStorer();

-- Stocks
ext_stocks = LOAD 'sales_staging.ext_stocks' USING org.apache.hive.hcatalog.pig.HCatLoader();
stocks_product = JOIN ext_stocks BY (product_id), ext_product BY (id) PARALLEL 2;
fact_stocks = FOREACH stocks_product GENERATE ext_stocks::id AS id, showroom_id AS showroom_id, product_id AS product_id, quantity AS quantity, price*quantity AS stock_amount, ext_stocks::update_date AS update_date, ext_stocks::create_date AS create_date, stock_date AS stock_date;
STORE fact_stocks INTO 'sales_analytics.fact_stocks' USING org.apache.hive.hcatalog.pig.HCatStorer();
EMR Step3 Success
EMR Step3 Success
EMR Step3 Tez Pig Job
EMR Step3 Tez Pig Job
Sales Fact Data Preview
Sales Fact Data Preview

Next add our final EMR Step.

EMR Step4- Spark
EMR Step4- Spark
  • Name: PySpark Create Sales View
  • JAR location: command-runner.jar
  • Arguments: spark-submit s3://aws-bda-demo/scripts/etl.py
  • Action on failure: Continue
-- etl.py

from os.path import abspath
from pyspark.sql import SparkSession
from pyspark.sql import Row

warehouse_location = abspath('sales_analytics')

spark = SparkSession \
  .builder \
  .appName("sales_analytics") \
  .config("spark.sql.warehouse.dir", warehouse_location) \
  .enableHiveSupport() \
  .getOrCreate()

# spark.sql("SELECT count(1), sum(net_amount) FROM sales_analytics.fact_sales").show()

sales_df = spark.sql("SELECT \
  product.category, product.make, product.color, \
  showroom.name as showroom_name, showroom.state as showroom_state, \
  customer.gender, customer.state as customer_state, \
  sales.card_type, sales.quantity, sales.amount, sales.discount, sales.net_amount, sales.txn_date, dates.date_sk \
  FROM sales_analytics.fact_sales as sales \
  INNER JOIN sales_analytics.dim_product product \
  ON sales.product_id = product.id \
  INNER JOIN sales_analytics.dim_showroom showroom \
  ON sales.showroom_id = showroom.id \
  INNER JOIN sales_analytics.dim_customer customer \
  ON sales.customer_id = customer.id \
  INNER JOIN sales_analytics.dim_date dates \
  ON sales.txn_date = dates.day_date \
")

sales_df.createOrReplaceTempView("temp_vw_sales") 

spark.sql("CREATE TABLE sales_analytics.vw_sales as select * from temp_vw_sales");
EMR Step4 Success
EMR Step4 Success
EMR Step4 Tez Spark SQL Job
EMR Step4 Tez Spark SQL Job
Sales Analytics View Data Preview
Sales Analytics View Data Preview

The lifecycle state of an EMR Step can be one of PENDING, RUNNING, COMPLETED, FAILED or CANCELLED at any point in time.