In some aspects, the Spark engine is similar to Hadoop because both of them will do Map & Reduce over multiple nodes. The important concept in Spark is RDD (Resilient Distributed Datasets), by which we could operate over array, dataset and the text files. This example gives you some ideas on how to do map/reduce from a log file stored in HDFS and determine which IP address was repeated most frequently:
Sys Log Format
Here is the log file format that we used to do analysis and count how many Source Addresses occur most. We will filter most of the information and only look at Source Address section.
Application Information:
Process ID: 1296
Application Name: \device\harddiskvolume2\windows\system32\dns.exe
Network Information:
Direction: Inbound
Source Address: 192.2.1.21
Source Port: 53
Destination Address: 192.2.1.21
Destination Port: 60561
Protocol: 17
Process Flow
To better understand how the HDSF files to be processed by the application, here is the process flow to show how function was called:
Code Snippet and Result
Log into Spark master node, go to /bin folder, open Spark-shell, then enter below codes.
//To load the file from HDFS, note that the IP address should be defined in host file for master. At this time it does not do load really, due to lazy evaluation feature in Spark
val logsfile = sc.textFile(“hdfs://master:8010/kent/syslog.txt”)
//Here we take 500k lines from the log file and load it into array, then do the filter to reduce size, and catch the IP address which pairs with key word “Source Address”
val ipaddr = logsfile.take(500000).filter(s => s.contains(“Source Address”)).flatMap(s => s.split(“\\s+\\t+”)).filter(s => s.contains(“.”))
//Create distributed dataset based on driver dataset from previous step
val dipaddr = sc.parallelize(ipaddr)
//Do map to construct tuple such as (192.2.1.1, 1)
val map1 = dipaddr.map(s => (s,1))
//Do reduce by key with function passed in, through this step we will count the occurrence of each IP address
val res1 = map1.reduceByKey((x,y) => x+y).collect()
The printed result of res1 will be as following. Each element in this Array is two-tuples which contains IP address as key, occurrence frequency as value.
Array[(String, Int)] = Array((192.2.2.120,1), (192.2.6.115,43), (192.2.2.140,18), (192.2.6.104,27),
(192.2.192.103,5), (192.2.8.102,38), (192.2.6.119,21), (192.2.21.121,39), (192.2.7.125,12),
(192.2.2.115,4), (192.2.21.53,20), (192.2.21.46,1), (192.2.14.121,2), (192.2.11.111,12), (192.2.21.93,3),
(192.2.1.147,1), (192.2.3.24,…