Prerequisites
GCP account
Open Cloud Console.
Open Menu > Dataproc > Clusters
data:image/s3,"s3://crabby-images/9662f/9662fdb865a52f8964cae86b84cbf53b01b1e2de" alt=""
Click the Cluster.
data:image/s3,"s3://crabby-images/9f0f5/9f0f56b83541324cf05d351a0d062b35a65739fc" alt=""
Click on VM Instances
data:image/s3,"s3://crabby-images/50514/5051413d3549b2f1c2ba3135d872cedf2f064556" alt=""
Click on SSH of master node
data:image/s3,"s3://crabby-images/9b5de/9b5de2f52911f36fc62419add699479abdc8b3ac" alt=""
Check whether the components is already installed.
$ pyspark #opens pyspark
To exit press ctrl +d
data:image/s3,"s3://crabby-images/563e4/563e472940ea0f8605074cb3ffd4f125bea360c0" alt=""
$ hive #To check hive is available or not
To exit press ctrl +d
data:image/s3,"s3://crabby-images/4018b/4018b7569c2a2ef731acdfccb59490d9196218db" alt=""
$ python –V #to check python version
data:image/s3,"s3://crabby-images/5b65c/5b65cb728d65983c669735267c2550c9920e9f82" alt=""
$ spark-shell # opens spark shell
To exit press ctrl +d
data:image/s3,"s3://crabby-images/44742/44742b8b8c0f3b3baec9ebfa5619c6bad04fff68" alt=""
$ pwd #To get path
$ mkdir ratingscounter #making directory named ratingscounter
$ cd ratingscounter #Change the directory into ratingscounter
data:image/s3,"s3://crabby-images/c691b/c691bb1787f9b9e2decea77d540236868384c90f" alt=""
$ wget https://s3.amazonaws.com/sankethadoop/u.data #To get the data for dataproc
$ ls #Display the contents in the directory
data:image/s3,"s3://crabby-images/28ad7/28ad77c2a65781d9d22e5e1599f97b69ef054306" alt=""
$ nano u.data #open the u.data file.
data:image/s3,"s3://crabby-images/476c4/476c4ae1a61bf64058b562032aa3da0071c75515" alt=""
It will display the content in u.data
To exit press ctrl + x
data:image/s3,"s3://crabby-images/e4e4b/e4e4bd499afac827bcf786ea88244b8b9c407bd6" alt=""
$ nano ratingscounter.py #Creates and opens file ratingscounter.py
data:image/s3,"s3://crabby-images/f6612/f66123a801d2feb209b6af2980e2676f51855244" alt=""
Paste the below code into ratingscounter.py file
from pyspark import SparkConf, SparkContext
import collections
conf = SparkConf().setMaster(“local”).setAppName(“Ratings”)
sc = SparkContext(conf = conf)
lines = sc.textFile(“sparkdata/u.data”)
ratings = lines.map(lambda x: x.split( )[2])
result = ratings.countByValue()
sortedResults = collections.OrderedDict(sorted(result.items()))
for key, value in sortedResults.items():
print(“%s %i” % (key, value))
This code is to count the films in each ratings.
NB : if you are changing the name of directory or file, you may have to change it in the file lolcation also.
To exit press ‘ctrl + x’ then press ‘y’ to confirm then ‘Enter’
data:image/s3,"s3://crabby-images/e8bbd/e8bbde65b3fe0a0c04678ba486a37e90a581d8d0" alt=""
Create Schema structure
$ hadoop fs -mkdir /user/<userid>/sparkdata #To create directory named sparkdata
$ hadoop fs -put u.data sparkdata #To copy u.data file into sparkdata
$ hadoop fs -ls sparkdata # to check the file is saved or not
data:image/s3,"s3://crabby-images/7cf86/7cf8640c3a6ea3fa56cdef54802bbecc6c5d17c4" alt=""
$ spark-submit ratingscounter.py #execute the ratingscounter.py file
It will display the result.
data:image/s3,"s3://crabby-images/ac18b/ac18bb1991e79c9160ed6a8f0deace8775e59adb" alt=""
$ cd #change directory
$ mkdir totalspendbycustomer #make directory
$ cd totalspendbycustomer #change directory to totalspendbycustomer
data:image/s3,"s3://crabby-images/cb175/cb1756d23692ab1e9193265404bdc7c0007e44fb" alt=""
$ wget https://s3.amazonaws.com/sankethadoop/customer-orders.csv #To copy file to disk
data:image/s3,"s3://crabby-images/a339d/a339d1f907e29a1c9d74b36267d476798dc424e7" alt=""
$ ls #list the contents in the directory
$ nano customer-orders.csv #Open the file customer-orders.csv
data:image/s3,"s3://crabby-images/f84c3/f84c3a36c0eb18d37b728581f63ffa9bb1f314f7" alt=""
Opens the file.
To exit press ctrl +x
data:image/s3,"s3://crabby-images/38c01/38c01643c50aa2f5296ed7c4bb01931c7ae3541b" alt=""
$ pwd #Displays the path
data:image/s3,"s3://crabby-images/bbd61/bbd61afc824904c4ce71a04441bca0cc82d64c2c" alt=""
$ nano totalspendbycustomer.py #creates and opens file totalspendbycustomer.py
data:image/s3,"s3://crabby-images/19fce/19fce26cd3501f70bef9be937dd47d897778e45e" alt=""
paste the below code
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster(“local”).setAppName(“SpendByCustomer”)
sc = SparkContext(conf = conf)
def extractCustomerPricePairs(line):
fields = line.split(‘,’)
return (int(fields[0]), float(fields[2]))
input = sc.textFile(“sparkdata/customer-orders.csv”)
mappedInput = input.map(extractCustomerPricePairs)
totalByCustomer = mappedInput.reduceByKey(lambda x, y: x + y)
results = totalByCustomer.collect();
for result in results:
print(result)
This code is to get the amount spent by the customers for movie
NB : if you are changing the name of directory or file, you may have to change it in the file lolcation also.
To exit press ‘ctrl + x’ then press ‘y’ to confirm then ‘Enter’
data:image/s3,"s3://crabby-images/02f31/02f31febbf4cce8020e5ab433b8ed4c83856a65d" alt=""
$ hadoop fs -put customer-orders.csv sparkdata #moves file customer-orders.csv into sparkdata
$ hadoop fs -ls sparkdata # to check the file is saved or not
$ spark-submit totalspendbycustomer.py #Execute the file totalspendbycustomer.py
data:image/s3,"s3://crabby-images/d3d82/d3d821c8ce8c2cef4923bb4891aa4b685bbfa966" alt=""
It will display the customer ID and total amount spend by customer
data:image/s3,"s3://crabby-images/84676/846763f70790fdd1be6bcd2e51ffc8c89474bbbc" alt=""
To find popular movies
$ cd #To Change Directory
$ mkdir popularmovies #Make directory named popularmovies
$ cd popularmovies #To change directory into popularmovies
data:image/s3,"s3://crabby-images/1573b/1573b500a65a8f5a10e2253dda5b117fb0440501" alt=""
$ wget https://s3.amazonaws.com/sankethadoop/u.data #To copy file to disk
data:image/s3,"s3://crabby-images/bd6a6/bd6a65b654725a143d6bf39b9eebdc621f9c91d2" alt=""
$ nano popularmovies.py #Open file popularmovies.py
data:image/s3,"s3://crabby-images/250b4/250b49af1bb3d2c89e036b3ef292ba8ced5cdaa4" alt=""
In the python file paste the below code.
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster(“local”).setAppName(“PopularMovies”)
sc = SparkContext(conf = conf)
lines = sc.textFile(“sparkdata/u.data”)
movies = lines.map(lambda x: (int(x.split()[1]), 1))
movieCounts = movies.reduceByKey(lambda x, y: x + y)
flipped = movieCounts.map( lambda xy: (xy[1],xy[0]) )
sortedMovies = flipped.sortByKey()
results = sortedMovies.collect()
for result in results:
print(result)
To save and exit, Press ‘Ctrl + x’ then ‘y’ then ‘Enter’
$ hadoop fs -put u.data sparkdata #To copy u.data file into sparkdata
$ spark-submit popularmovies.py # To execute popularmovies.py
data:image/s3,"s3://crabby-images/c9afc/c9afc3e6f49b994f84fdfb6f7e99790df78e87f4" alt=""
It will show the most popular movie ID and most number of votes.
data:image/s3,"s3://crabby-images/4a4fb/4a4fbece03cd8ede408ab46ca3cb0b21acee4f00" alt=""
To find most 10 popular movies
$ cd #To Change Directory
$ mkdir 10popularmovies #Make directory named popularmovies
$ cd 10popularmovies #To change directory into popularmovies
data:image/s3,"s3://crabby-images/1e1a4/1e1a46db3a6d7e9ae2737624f733b2fbdd08cf56" alt=""
$ wget https://s3.amazonaws.com/sankethadoop/u.item
$ wget https://s3.amazonaws.com/sankethadoop/u.data
It will copy the file into disk.
data:image/s3,"s3://crabby-images/e8b4c/e8b4c5dc82c2dcc4d649305587a55d1c3dd0c698" alt=""
$ nano u.item #To open the file content
data:image/s3,"s3://crabby-images/876a9/876a983a334efae043cda7e014579ea561e4864b" alt=""
To exit press ctrl+ x
data:image/s3,"s3://crabby-images/10755/10755e58eed91076bfe6877e12f013ae90f08daf" alt=""
$ nano 10popular.py Create and open the file 10popular.py
Paste the below code.
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions
def loadMovieNames():
movieNames = {}
with open(“/home/<userid>/10popularmovies/u.item“, encoding=”ISO-8859-1”) as f:
for line in f:
fields = line.split(‘|’)
movieNames[int(fields[0])] = fields [1]
return movieNames
spark = SparkSession.builder.appName(“PopularMovies”).getOrCreate()
nameDict = loadMovieNames()
lines = spark.sparkContext.textFile(“sparkdata/u.data”)
movies = lines.map(lambda x: Row(movieID =int(x.split()[1])))
movieDataset = spark.createDataFrame(movies)
topMovieIDs = movieDataset.groupBy(“movieID”).count().orderBy(“count”,ascending = False).cache()
topMovieIDs.show()
top10 = topMovieIDs.take(10)
print(“\n”)
for result in top10:
print(“%s: %d” % (nameDict[result[0]], result[1]))
spark.stop()
Change the highlighted area as your directory
data:image/s3,"s3://crabby-images/09f21/09f2111298a93e89236cf99423d6d8dfd223741b" alt=""
$ hadoop fs -put u.data sparkdata #To copy u.data file into sparkdata
$ hadoop fs -ls sparkdata #To display the content
$ spark-submit 10popular.py #To execute the 10popular.py file
data:image/s3,"s3://crabby-images/0dc94/0dc94f148d477e973eac0a4c2b8d2d1bb3337eae" alt=""
It will display the most popular 10 movies.
data:image/s3,"s3://crabby-images/48134/48134c56e1d57707bde23730ad638b102d9542b6" alt=""