Skip to main content

Development

A Spark Example to MapReduce System Log File

In some aspects, the Spark engine is similar to Hadoop because both of them will do Map & Reduce over multiple nodes. The important concept in Spark is RDD (Resilient Distributed Datasets), by which we could operate over array, dataset and the text files. This example gives you some ideas on how to do map/reduce from a log file stored in HDFS and determine which IP address was repeated most frequently:

Sys Log Format

Here is the log file format that we used to do analysis and count how many Source Addresses occur most. We will filter most of the information and only look at Source Address section.

Application Information:

                Process ID:                             1296

                Application Name:               \device\harddiskvolume2\windows\system32\dns.exe

Network Information:

                Direction:                               Inbound

                Source Address:                    192.2.1.21

                Source Port:                           53

                Destination Address:           192.2.1.21

                Destination Port:                  60561

                Protocol:                                17

Process Flow

To better understand how the HDSF files to be processed by the application, here is the process flow to show how function was called:

01

Code Snippet and Result

Log into Spark master node, go to /bin folder, open Spark-shell, then enter below codes.

//To load the file from HDFS, note that the IP address should be defined in host file for master. At this time it does not do load really, due to lazy evaluation feature in Spark

val logsfile = sc.textFile(“hdfs://master:8010/kent/syslog.txt”)

//Here we take 500k lines from the log file and load it into array, then do the filter to reduce size, and catch the IP address which pairs with key word “Source Address”

val ipaddr = logsfile.take(500000).filter(s => s.contains(“Source Address”)).flatMap(s => s.split(“\\s+\\t+”)).filter(s => s.contains(“.”))

//Create distributed dataset based on driver dataset from previous step

val dipaddr = sc.parallelize(ipaddr)

//Do map to construct tuple such as (192.2.1.1, 1)

val map1 = dipaddr.map(s => (s,1))

//Do reduce by key with function passed in, through this step we will count the occurrence of each IP address

val res1 = map1.reduceByKey((x,y) => x+y).collect()

The printed result of res1 will be as following. Each element in this Array is two-tuples which contains IP address as key, occurrence frequency as value.

Array[(String, Int)] = Array((192.2.2.120,1), (192.2.6.115,43), (192.2.2.140,18), (192.2.6.104,27),

(192.2.192.103,5), (192.2.8.102,38), (192.2.6.119,21), (192.2.21.121,39), (192.2.7.125,12),

(192.2.2.115,4), (192.2.21.53,20), (192.2.21.46,1), (192.2.14.121,2), (192.2.11.111,12), (192.2.21.93,3),

(192.2.1.147,1), (192.2.3.24,…

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Kent Jiang

Currently I was working in Perficient China GDC located in Hangzhou as a Lead Technical Consultant. I have been with 8 years experience in IT industry across Java, CRM and BI technologies. My interested tech area includes business analytic s, project planning, MDM, quality assurance etc

More from this Author

Follow Us