SPARK – CORE PROGRAMMINGSpark Core is the base of the whole project. It provides distributed task dispatching, scheduling, and basic I/O functionalities. Spark uses a specialized fundamental data structure known as RDD (Resilient Distributed Datasets) that is a logical collection of data partitioned across machines. RDDs can be created in two ways; one is by referencing datasets in external storage systems and second is by applying transformations (e.g. map, filter, reducer, join) on existing RDDs. The RDD abstraction is exposed through a language-integrated API. This simplifies programming complexity because the way applications manipulate RDDs is similar to manipulating local collections of data. Spark Shell Spark provides an interactive shell: a powerful tool to analyze data interactively. It is available in either Scala or Python language. Spark’s primary abstraction is a distributed collection of items called a Resilient Distributed Dataset (RDD). RDDs can be created from Hadoop Input Formats (such as HDFS files) or by transforming other RDDs. Open Spark Shell The following command is used to open Spark shell.
$ spark-shell
Create simple RDD Let us create a simple RDD from the text file. Use the following command to create a simple RDD.
scala> val inputfile = sc.textFile(“input.txt”)
The output for the above command is
inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12
The Spark RDD API introduces few Transformations and few Actions to manipulate RDD. RDD Transformations RDD transformations returns pointer to new RDD and allows you to create dependencies between RDDs. Each RDD in dependency chain (String of Dependencies) has a function for calculating its data and has a pointer (dependency) to its parent RDD. Spark is lazy, so nothing will be executed unless you call some transformation or action that will trigger job creation and execution. Look at the following snippet of the word-count example. Therefore, RDD transformation is not a set of data but is a step in a program (might be the only step) telling Spark how to get data and what to do with it. Given below is a list of RDD transformations.
The following table gives a list of Actions, which return values.
Let us see the implementations of few RDD transformations and actions in RDD programming with the help of an example. Example Consider a word count example: It counts each word appearing in a document. Consider the following text as an input and is saved as an input.txt file in a home directory. input.txt: input file.
people are not as beautiful as they look,
as they walk or as they talk. they are only as beautiful as they love, as they care as they share. Follow the procedure given below to execute the given example. Open Spark-Shell The following command is used to open spark shell. Generally, spark is built using Scala. Therefore, a Spark program runs on Scala environment.
$ spark-shell
If Spark shell opens successfully then you will find the following output. Look at the last line of the output “Spark context available as sc” means the Spark container is automatically created spark context object with the name sc. Before starting the first step of a program, the SparkContext object should be created.
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop 15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop 15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server 15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.2.0 /_/ Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71) Type in expressions to have them evaluated. Spark context available as sc scala> Create an RDD First, we have to read the input file using Spark-Scala API and create an RDD. The following command is used for reading a file from given location. Here, new RDD is created with the name of inputfile. The String which is given as an argument in the textFile(“”) method is absolute path for the input file name. However, if only the file name is given, then it means that the input file is in the current location.
scala> val inputfile = sc.textFile("input.txt")
Execute Word count Transformation Our aim is to count the words in a file. Create a flat map for splitting each line into words (flatMap(line => line.split(“ ”)). Next, read each word as a key with a value ‘1’ (<key, value> = <word,1>)using map function (map(word => (word, 1)). Finally, reduce those keys by adding values of similar keys (reduceByKey(_+_)). The following command is used for executing word count logic. After executing this, you will not find any output because this is not an action, this is a transformation; pointing a new RDD or tell spark to what to do with the given data)
scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);
Current RDD While working with the RDD, if you want to know about current RDD, then use the following command. It will show you the description about current RDD and its dependencies for debugging.
scala> counts.toDebugString
Caching the Transformations You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Use the following command to store the intermediate transformations in memory.
scala> counts.cache()
Applying the Action Applying an action, like store all the transformations, results into a text file. The String argument for saveAsTextFile(“ ”) method is the absolute path of output folder. Try the following command to save the output in a text file. In the following example, ‘output’ folder is in current location.
scala> counts.saveAsTextFile("output")
Checking the Output Open another terminal to go to home directory (where spark is executed in the other terminal). Use the following commands for checking output directory.
[hadoop@localhost ~]$ cd output/
[hadoop@localhost output]$ ls -1 part-00000 part-00001 _SUCCESS The following command is used to see output from Part-00000 files.
[hadoop@localhost output]$ cat part-00000
Output
(people,1)
(are,2) (not,1) (as,8) (beautiful,2) (they, 7) (look,1) The following command is used to see output from Part-00001 files.
[hadoop@localhost output]$ cat part-00001
Output
(walk, 1)
(or, 1) (talk, 1) (only, 1) (love, 1) (care, 1) (share, 1) UN Persist the Storage Before UN-persisting, if you want to see the storage space that is used for this application, then use the following URL in your browser.
http://localhost:4040
which shows the storage space used for the application, which are running on the Spark shell. If you want to UN-persist the storage space of particular RDD, then use the following command.
Scala> counts.unpersist()
You will see the output as follows:
15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9 15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1 15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810) 15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0 15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106) res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14 For verifying the storage space in the browser, use the following URL.
http://localhost:4040
It shows the storage space used for the application, which are running on the Spark shell. |