Assignment 03 Using Sqoop

SQOOP - read data out of SQL database and load it to HDFS
  • Very reliable
  • Original tool



Suggested reading:


Streaming Data 

Confluent

Scenario

“80% of your sales come from 20% of your customers” - The Pareto Principle

Customer segmentation has been a marketing tactic in use for years to identify these “best customers” and once such technique is RFM analysis

RFM analysis is a simple statistical measure of the “value” of a customer.  It is based on 3 scores from a series of customer transaction data:

  • Recency: how recently a customer has purchased
  • Frequency: how often they purchase
  • Monetary Value: how much the customer spends


A local marketing company has been collecting transaction data from a number of small businesses in an enterprise data warehouse.  They would like to sell an RFM modeling service to their clients to supplement their data warehousing capabilities.
They would like to hire you to prototype this service.

For simplicity for this prototype, the results will be stored to file instead of an online store like the would typically be in production (HBase/Cassandra/Redis,etc…)  We will cover this once we talk about landing data.

Assignment

Using the provided demonstration transactional DB, extract the necessary data from the warehouse and calculate an RFM table in Spark for all households.  Additionally the “pilot” customer would then like a report of only scores for homeowners (HOMEOWNER_DESC = “Homeowner”).

Input:

The data currently exists in a relational database.  Connection details (Note that you should only be able to connect to the database from your UW provided VM i.e. not personal computer)

Username: bigdata
Password: 5Zgv\6;8rM
Host: bigdata220w18.database.windows.net
Port: 1433
Database: week3
JDBC URL: jdbc:sqlserver://bigdata220w18.database.windows.net:1433;database=week3


The ERM diagram is as follows:  










There are 3 tables of interest:

hh_demographic
This table contains demographic information for a portion of households. Due to nature of the
data, the demographic information is not available for all households
transaction_data
This table contains all products purchased by households within this study. Each line found in
this table is essentially the same line that would be found on a store receipt.
product
This table contains information on each product sold such as type of product, national or
private label and a brand identifier.

Use Sqoop to de-normalize these 3 tables joining on household_key and product_id and stage in HDFS as a single data source in Parquet format. See this weeks slides for info on sqoop and sqoop documentation for details on the arguments used at 


You will need to download the Azure SQL Server driver from:
https://bigdata220w18.blob.core.windows.net/blobs/sqljdbc42.jar

The driver class name is com.microsoft.sqlserver.jdbc.SQLServerDriver

Copy that JAR file (cp command) to the /usr/hdp/current/sqoop-client/lib directory in your HDP sandbox.  You will use sqoop from inside the sandbox to insert the data into Parquet format into HDFS.

You will need to add the “driver” flag to Sqoop and add the driver name in order to connect to the Azure SQL Server database like discussed in class:

sqoop import --driver com.microsoft.sqlserver.jdbc.SQLServerDriver …..

The RFM metrics are defined as follows:


  • Recency: The most recent “day” from TRANSACTION DATA (can disregard weeks and time)
  • Frequency: The number of unique “basket_id” from TRANSACTION_DATA
  • Monetary Value: The sum of all “sales_value” from TRANSACTION_DATA

RFM is also the order of importance, so the results should be sorted by recency, then frequency, then monetary value

Output:

There should be two output files generated.   A CSV file containing RFM calculations for each household, and a CSV file containing only homeowners.

Each output file record should have the following format:
Household key
, Recency
, Frequency
, Monetary

Submission Guidelines:

The sqoop command to pull over the data, and the Spark code (Scala, Python or Java) to generate the requested output to fulfill the assignment.

A notebook with all of this is preferred (Zeppelin or Jupyter) but individual files are acceptable.


Example

Given the following (truncated) data:

household_key |  basket_id  | day | product_id | quantity | sales_value | store_id |      blanks --------------+-------------+-----+------------+----------+-------------+----------+------------
        2375 | 26984851472 | 711 |    1033142 |        1 |           0 |      364 |           0
        2375 | 26984851472 | 711 |    1082185 |        1 |           1 |      364 |           0
        2375 | 26984851516 | 555 |     826249 |        2 |           1 |      364 |           0
        2375 | 26984851516 | 555 |    1085983 |        1 |           2 |      364 |           0
        2375 | 26984851516 | 555 |    6423775 |        1 |           2 |      364 |           0
        1364 | 26984896261 | 686 |     842930 |        1 |           2 |    31742 |           0
        1364 | 26984896261 | 686 |     920955 |        1 |           3 |    31742 |           0
        1364 | 26984896261 | 686 |     981760 |        1 |           0 |    31742 |           0
        1130 | 26984905972 | 620 |     866950 |        2 |           0 |    31642 |           0
        1130 | 26984905972 | 620 |    1048462 |        1 |           1 |    31642 |           0
        1173 | 26984945254 | 123 |     824399 |        2 |           1 |      412 |           0
        1173 | 26984945604 | 500 |    1131351 |        1 |           4 |      412 |           0
          98 | 26984951769 | 401 |     965138 |        2 |           3 |      337 |           0
          98 | 26984951769 | 401 |    1082185 |        1 |           0 |      337 |           0
        1172 | 26985025264 | 203 |     877180 |        1 |           2 |      396 |           0
        1172 | 26985026664 | 683 |     930917 |        2 |           2 |      396 |           0
        1172 | 26985031269 | 700 |     981760 |        1 |           0 |      396 |           0

The corresponding RFM table would be:

household_key | recency | frequency | monetary
---------------+---------+-----------+---------
         2375 |     711 |         2 |       6
         1172 |     700 |         3 |       4
         1364 |     686 |         1 |       5
         1130 |     620 |         1 |       1
         1173 |     500 |         2 |       5
         98   |     401 |         1 |       3












Bonus Exercise #1

The company for which you are prototyping this service has some clients that refuse to consider alcohol sales in their marketing campaigns.  Also provide the code to calculate RFM metrics, excluding sales of products with the department of “SPIRITS”.

Bonus Exercise #2

Once the RFM data is generated, in order to be most useful, each customer is typically assigned a “score” based on the values of their individual metrics.  You then would choose categories for each value to fall into, typically 3 or 5.  This is usually either done with business rules or statistical quantiles (more common).  Then it is very easy to identify groups of customers based on these “scores”.  For example, with 3 quantiles, the highest ranked customers would score 222.  

Add additional logic to your Spark application to calculate the full RFM table with scores and answer the following question:

How many customers score is “222”?

Take the RFM table generated from the base exercise and add the quantile each customer falls into.  We will use 3 quantiles in a uniform split, so the ranges would be [0, 33, 66, 100].

So given:

household_key | recency | frequency | monetary
---------------+---------+-----------+---------
         2375 |     711 |         2 |       6
         1172 |     700 |         3 |       4
         1364 |     686 |         1 |       5
         1130 |     620 |         1 |       1
         1173 |     500 |         2 |       5
         98   |     401 |         1 |       3

Add the following might look like:

household_key | recency | frequency | monetary | r_score | f_score | m_score
---------------+---------+-----------+----------+---------+---------+--------
         2375 |     711 |         2 |       6  |        2|        1|      2
         1172 |     700 |         3 |       4  |        2|        2|      2
         1364 |     686 |         1 |       5  |        1|        0|      1
         1130 |     620 |         1 |       1  |        1|        0|      1
         1173 |     500 |         2 |       5  |        0|        1|      0
         98   |     401 |         1 |       3  |        0|        0|      0



Assignment 

Run this code to return tables

Ran this code to see the table names in Scoop
sqoop list-tables \ --driver "com.microsoft.sqlserver.jdbc.SQLServerDriver" \ --connect "jdbc:sqlserver://bigdata220w18.database.windows.net:1433;database=week3" \ --username "bigdata" \ --password "5Zgv\6;8rM"
Ran successfully

Denormalize tables and import into Horton works

sqoop import \
--driver "com.microsoft.sqlserver.jdbc.SQLServerDriver" \
--connect "jdbc:sqlserver://bigdata220w18.database.windows.net:1433;database=week3" \
--username "bigdata" \
--password "5Zgv\6;8rM" \
-m 4 \
--target-dir sqoopFull \
--as-parquetfile \
--query 'SELECT
d.*,p.manufacturer,p.department,p.brand,p.commodity_desc,p.sub_commodity_desc,p.curr_size_of_product FROM transaction_data d JOIN product p on p.product_id = d.product_id LEFT JOIN hh_demographic h on h.household_key = d.household_key WHERE $CONDITIONS'  \Ran this code to see the table names in Scoop
sqoop list-tables \ --driver "com.microsoft.sqlserver.jdbc.SQLServerDriver" \ --connect "jdbc:sqlserver://bigdata220w18.database.windows.net:1433;database=week3" \ --username "bigdata" \ --password "5Zgv\6;8rM"
Ran sucessfully
--split-by d.household_key

Read tables into Dataframe

val df = spark.read.parquet("hdfs:///user/root/sqoopFull")
df.printSchema()

Count DataFrame

df.count


Comments

Popular posts from this blog

Assignment 01 - Installing Azure CLI 2.0 and resizing VM