{"id":11525,"date":"2015-06-02T17:46:42","date_gmt":"2015-06-02T17:46:42","guid":{"rendered":"https:\/\/www.happiestminds.com\/blogs\/?p=2539"},"modified":"2024-04-11T06:04:39","modified_gmt":"2024-04-11T06:04:39","slug":"spark-in-action-processing-data","status":"publish","type":"post","link":"https:\/\/www.happiestminds.com\/blogs\/spark-in-action-processing-data\/","title":{"rendered":"Spark In Action : Processing Data"},"content":{"rendered":"<div id=\"bsf_rt_marker\"><\/div><div style=\"padding: 10px;\">\n<p>This blog is intended for data engineers, data scientists who are planning to use Spark &amp; Scala, and for anyone who is interested in learning Spark trough practice.<\/p>\n<h2>Understanding How to use Spark for Data Exploration<\/h2>\n<p>The goal of the blog is to provide a quick start to Apache Spark and how to go about its application<\/p>\n<ol>\n<li>RDD<\/li>\n<li>Transformations<\/li>\n<li>Actions<\/li>\n<li>Perform Joins<\/li>\n<li>Spark SQL<\/li>\n<li>Spark Automatic Json Schema Inference Capability<\/li>\n<li>Caching<\/li>\n<li>Lazy Evaluation<\/li>\n<\/ol>\n<h2>Quick Intro to Spark<\/h2>\n<p>Apache Spark is often referred to as the &#8216;Swiss Knife&#8217;, because of its ability to do different tasks with ease. It can handle data cleansing, data exploration, feature extraction, sql query, machine learning, complex graph algorithms, building streaming applications, etc&#8230;<\/p>\n<h2>Getting Started with Spark<\/h2>\n<p>In case you do not have a spark setup on your machine, do not worry. It takes less than 2 minutes to get started. Download spark from\u00a0&#8220;Apache Spark&#8221;. Extract it and start the Scala REPL from $Spark Home\/bin\/spark-shell.<\/p>\n<p><strong>Note:-\u00a0 $Spark_Home\u00a0<\/strong>is the location where you extracted.<\/p>\n<h2>Loading Data Set<\/h2>\n<p>Download the required Data Set from\u00a0<a href=\"https:\/\/github.com\/svishnu88\/DataProcessingSpark\/blob\/master\/usagov_bitly_data2012-03-16-1331923249.txt\">github<\/a>\u00a0repository and execute the below steps in the spark-shell.<\/p>\n<p><strong>val<\/strong>rawData = sc.textFile(<strong>&#8220;path\/usagov_bitly_data2012-03-16-1331923249.txt&#8221;<\/strong>)<\/p>\n<p>rawData: org.apache.spark.rdd.RDD[String] = usagov_bitly_data2012-03-16-1331923249.txt MappedRDD[1] at textFile at\u00a0:12<\/p>\n<p>We just created an RDD called raw data.<\/p>\n<p><strong>What is RDD :\u00a0<\/strong>In simplest words, it can be expressed as a data structure distributed across many machines. RDD is the core of Spark, and all operations revolve around it.<\/p>\n<p>Spark provides two kinds of operations, one is transformation and another is action.<\/p>\n<h2>Transformation and Actions<\/h2>\n<p><strong>What is Transformation:\u00a0<\/strong>Transformation converts one RDD to another RDD.<\/p>\n<p><strong>What is Action:\u00a0<\/strong>Action on the other hand produces an output like printing the data and storing the data .<\/p>\n<p>In this case, we created an RDD of type string.<\/p>\n<p><strong>What is sc:\u00a0<\/strong>For Spark shell , Spark creates a context\u00a0sc. It is the main entry point for Spark functionality. It helps you tcreate RDD, accumulate and broadcast variables.<\/p>\n<p><strong>What is Text File:<\/strong>\u00a0Text File method helps you read data from \u00a0local file system, Hadoop file system or any hadoop-supported system URI.<\/p>\n<p>Before proceeding further, Let&#8217;s try to understand about the data set.<\/p>\n<p>In 2011, URL shortening service bit.ly partnered with the United States government&#8217;s website &#8211; usa.gov to provide a feed of anonymous data gathered from users who shorten links ending with .gov or .mil. Each line in each file contains a common form of web data known as &#8216;JSON&#8217; .<\/p>\n<p>So one of the key points is, we are going to deal with JSON data, and each line in the file represents a JSON input.<\/p>\n<p>Let&#8217;s try to see what our RDD contains. Spark provides methods like\u00a0collect,\u00a0take(number of records),\u00a0first\u00a0to access the elements in the RDD.<\/p>\n<p><strong>Collect()\u00a0<\/strong>: It brings the entire data to the driver program. In our case, driver program is Spark Shell. You cannot collect when the data set is huge, that it cannot fit into the memory of a single machine. You can use and collect the same when your dataset is limited.<\/p>\n<p><strong>Take(n)<\/strong>\u00a0: It complements the limitation of collect. It returns the\u00a0n<strong>\u00a0<\/strong>elements of the RDD.<\/p>\n<p><strong>First :<\/strong>\u00a0It returns only one element from the RDD. Useful when you want to quickly check whether your operations worked correctly.<\/p>\n<p>Let&#8217;s try first on our rawData. &#8220;rawData.first&#8221;<\/p>\n<p>res0: String = { &#8220;a&#8221;: &#8220;Mozilla\\\/5.0 (Windows NT 6.1; WOW64) AppleWebKit\\\/535.11 (KHTML, like Gecko) Chrome\\\/17.0.963.78 Safari\\\/535.11&#8221;, &#8220;c&#8221;: &#8220;US&#8221;, &#8220;nk&#8221;: 1, &#8220;tz&#8221;: &#8220;America\\\/New_York&#8221;, &#8220;gr&#8221;: &#8220;MA&#8221;, &#8220;g&#8221;: &#8220;A6qOVH&#8221;, &#8220;h&#8221;: &#8220;wfLQtf&#8221;, &#8220;l&#8221;: &#8220;orofrog&#8221;, &#8220;al&#8221;: &#8220;en-US,en;q=0.8&#8221;, &#8220;hh&#8221;: &#8220;1.usa.gov&#8221;, &#8220;r&#8221;: &#8220;http:\\\/\\\/www.facebook.com\\\/l\\\/7AQEFzjSi\\\/1.usa.gov\\\/wfLQtf&#8221;, &#8220;u&#8221;: &#8220;http:\\\/\\\/www.ncbi.nlm.nih.gov\\\/pubmed\\\/22415991&#8221;, &#8220;t&#8221;: 1331923247, &#8220;hc&#8221;: 1331822918, &#8220;cy&#8221;: &#8220;Danvers&#8221;, &#8220;ll&#8221;: [ 42.576698, -70.954903 ] }<\/p>\n<h2>Spark SQL JSON Inference<\/h2>\n<p>To understand more about the data set, we should bring JSON data to a structured form like table. For that, we need to extract the elements . There are several JSON libraries which can help in parsing data. We will use\u00a0Spark Sql\u00a0capability of inferring JSON Schema to structure our data.<\/p>\n<p><strong>import <\/strong>org.apache.spark.sql.SQLContext<\/p>\n<p>&nbsp;<\/p>\n<p><strong>val<\/strong>sqlContext = <strong>new <\/strong>SQLContext(sc)<\/p>\n<p><strong>val<\/strong>recordsJson = sqlContext.jsonRDD(rawData)<\/p>\n<p>recordsJson.registerTempTable(<strong>&#8220;records&#8221;<\/strong>)<\/p>\n<p>&nbsp;<\/p>\n<p>SparkSql is a subproject of Apache Spark \u00a0to deal with structured data. The beauty of it is that it works seamlessly with Spark programs. It provides various features to handle data from Hive, JSON , parquet \u00a0and thrift service for external programs to connect. There are many more features that Spark SQL supports.<\/p>\n<p>Lets try to check what is the schema of our data. SparkSQL comes with a printSchema method.<\/p>\n<p>recordsJson.printSchema<\/p>\n<p>root<\/p>\n<p>|&#8211; _heartbeat_: integer (nullable = true)<\/p>\n<p>|&#8211; a: string (nullable = true)<\/p>\n<p>|&#8211; al: string (nullable = true)<\/p>\n<p>|&#8211; c: string (nullable = true)<\/p>\n<p>|&#8211; cy: string (nullable = true)<\/p>\n<p>|&#8211; g: string (nullable = true)<\/p>\n<p>|&#8211; gr: string (nullable = true)<\/p>\n<p>|&#8211; h: string (nullable = true)<\/p>\n<p>|&#8211; hc: integer (nullable = true)<\/p>\n<p>|&#8211; hh: string (nullable = true)<\/p>\n<p>|&#8211; kw: string (nullable = true)<\/p>\n<p>|&#8211; l: string (nullable = true)<\/p>\n<p>|&#8211; ll: array (nullable = true)<\/p>\n<p>|\u00a0\u00a0 |&#8211; element: double (containsNull = false)<\/p>\n<p>|&#8211; nk: integer (nullable = true)<\/p>\n<p>|&#8211; r: string (nullable = true)<\/p>\n<p>|&#8211; t: integer (nullable = true)<\/p>\n<p>|&#8211; tz: string (nullable = true)<\/p>\n<p>|&#8211; u: string (nullable = true)<br \/>\nOne advantage of SparkSQL is that we can use the spark functionalities also. Now let&#8217;s try to proceed with our exploration of the dataset. Let&#8217;s take one particular field say\u00a0&#8216;tz<strong>&#8216;\u00a0<\/strong>which stands for time zone. Lets quickly write a SQL to extract the data.<\/p>\n<p><strong>val<\/strong><strong>\u00a0<\/strong>x =\u00a0sqlContext.sql(<strong>&#8220;select tz from records&#8221;<\/strong>)<\/p>\n<p>Now, &#8216;x&#8217; will contain only the time zone records.<\/p>\n<h2>Lets try to find how many times each time zone occurs in the data set<\/h2>\n<p>How can we solve it. Let&#8217;s get each element, and then map it to 1 to form a key value pair.<\/p>\n<p>x.map(row =&gt; row(0)).map(temp =&gt; (temp,1))<\/p>\n<p>Here\u00a0x\u00a0is a Schema RDD.<\/p>\n<p><strong>map(f):\u00a0<\/strong>\u00a0map is a transformation applied on the RDD . It accepts a function as input, and applies it to each element of RDD in a parallel fashion.<br \/>\n<img fetchpriority=\"high\" decoding=\"async\" class=\"aligncenter size-full wp-image-2541\" src=\"https:\/\/www.happiestminds.com\/blogs\/wp-content\/uploads\/2015\/06\/json.png\" alt=\"json\" width=\"644\" height=\"268\" \/><\/p>\n<p>Let&#8217;s see how the elements have transformed . We will do\u00a0take(5)<strong>\u00a0<\/strong>on the above RDD .<\/p>\n<p>(America\/New_York,1)<\/p>\n<p>(America\/Denver,1)<\/p>\n<p>(America\/New_York,1)<\/p>\n<p>(America\/Sao_Paulo,1)<\/p>\n<p>(America\/New_York,1)<\/p>\n<p>Now, a simple question that you can be asking yourself could be &#8216;why I am pairing each field with 1?&#8217;. Basically we converted our fields to a tuple of key-value pairs.<\/p>\n<p>Spark provides useful transformations on key-value pair RDD like reduceByKey ,groupByKey , countByKey and many more.<\/p>\n<p>Let&#8217;s get back to our question,<strong>\u00a0<\/strong>how many times each time zone occurs <strong>.<\/strong>\u00a0Hope you would have guessed \u00a0it, let us use reduceByKey on the data.<\/p>\n<p>x.map(row =&gt; row(0)).map(temp =&gt; (temp,1)).reduceByKey((x,y) =&gt;x+y).take(5).foreach(println)<\/p>\n<p>(null,120)<\/p>\n<p>(Europe\/Brussels,4)<\/p>\n<p>(Europe\/Vienna,6)<\/p>\n<p>(Europe\/Athens,6)<\/p>\n<p>(America\/Puerto_Rico,10)<br \/>\n<strong>Note:<\/strong>\u00a0Remember that transformations in Spark are Lazy, it means till you perform an action like count ,first, collect, saveAsTextFile transformations are not evaluated.<\/p>\n<p>Looking at the result , we can recognize that there are\u00a0null\u00a0values . Let&#8217;s try to remove records that contain\u00a0nullvalues. Spark comes with a handy\u00a0filter\u00a0method<strong>.\u00a0<\/strong><br \/>\nx.map(row =&gt; row(0)).filter(x =&gt; x!=null).map(temp =&gt; (temp,1)).reduceByKey((x,y) =&gt;x+y).take(5).foreach(println)<\/p>\n<p>(Europe\/Brussels,4)<\/p>\n<p>(Asia\/Bangkok,6)<\/p>\n<p>(Pacific\/Honolulu,36)<\/p>\n<p>(America\/Santo_Domingo,1)<\/p>\n<p>(Europe\/Bucharest,4)<\/p>\n<p>Let&#8217;s see how we could sort the data. We will use\u00a0sortBy\u00a0.<\/p>\n<p>x.map(row =&gt; row(0)).filter(x =&gt; x!=null).map(temp =&gt; (temp,1)).reduceByKey((x,y) =&gt;x+y).sortBy(_._2,false).take(5).foreach(println)<\/p>\n<p>(America\/New_York,1251)<\/p>\n<p>(,521)<\/p>\n<p>(America\/Chicago,400)<\/p>\n<p>(America\/Los_Angeles,382)<\/p>\n<p>(America\/Denver,191)<\/p>\n<p>Looking at the data , we can guess that the data needs more cleansing, which I will leave to you.<br \/>\nUse the filter() on the RDD and remove the\u00a0empty rows.<\/p>\n<p>We can simplify the entire task by writing a\u00a0<strong>SQL\u00a0<\/strong>query on\u00a0<strong>records\u00a0<\/strong>table .<\/p>\n<p>sqlContext.sql(&#8220;select tz,count(tz) as total from records where tz != &#8221; and tz is not NULL group by tz order by total desc&#8221;).take(5).foreach(println)<\/p>\n<p>[America\/New_York,1251]<\/p>\n<p>[America\/Chicago,400]<\/p>\n<p>[America\/Los_Angeles,382]<\/p>\n<p>[America\/Denver,191]<\/p>\n<p>[Europe\/London,74]<\/p>\n<p>In the next blog we will look into how joins, caching, lazy evaluation works.<\/p>\n<\/div>\n<div class=\"pld-like-dislike-wrap pld-template-2\">\r\n    <div class=\"pld-like-wrap  pld-common-wrap\">\r\n    <a href=\"javascript:void(0)\" class=\"pld-like-trigger pld-like-dislike-trigger  \" title=\"Like\" data-post-id=\"11525\" data-trigger-type=\"like\" data-restriction=\"cookie\" data-already-liked=\"0\">\r\n                        <i class=\"fas fa-heart\"><\/i>\r\n                <\/a>\r\n    <span class=\"pld-like-count-wrap pld-count-wrap\">0    <\/span>\r\n<\/div><\/div>","protected":false},"excerpt":{"rendered":"<p>This blog is intended for data engineers, data scientists who are planning to use Spark &amp; Scala, and for anyone who is interested in learning Spark trough practice. Understanding How to use Spark for Data Exploration The goal of the blog is to provide a quick start to Apache Spark and how to go about [&hellip;]<\/p>\n","protected":false},"author":25,"featured_media":1313,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[225,312,218,123,661],"tags":[313,201,927,940,941,1506,1592,1593],"class_list":["post-11525","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-analytics","category-big-data","category-digital-transformation","category-technology","category-tools","tag-analytics","tag-big-data","tag-data-analytics","tag-data-science","tag-data-scientists","tag-scala","tag-spark","tag-spark-data-processing"],"aioseo_notices":[],"_links":{"self":[{"href":"https:\/\/www.happiestminds.com\/blogs\/wp-json\/wp\/v2\/posts\/11525","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.happiestminds.com\/blogs\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.happiestminds.com\/blogs\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.happiestminds.com\/blogs\/wp-json\/wp\/v2\/users\/25"}],"replies":[{"embeddable":true,"href":"https:\/\/www.happiestminds.com\/blogs\/wp-json\/wp\/v2\/comments?post=11525"}],"version-history":[{"count":1,"href":"https:\/\/www.happiestminds.com\/blogs\/wp-json\/wp\/v2\/posts\/11525\/revisions"}],"predecessor-version":[{"id":12282,"href":"https:\/\/www.happiestminds.com\/blogs\/wp-json\/wp\/v2\/posts\/11525\/revisions\/12282"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/www.happiestminds.com\/blogs\/wp-json\/wp\/v2\/media\/1313"}],"wp:attachment":[{"href":"https:\/\/www.happiestminds.com\/blogs\/wp-json\/wp\/v2\/media?parent=11525"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.happiestminds.com\/blogs\/wp-json\/wp\/v2\/categories?post=11525"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.happiestminds.com\/blogs\/wp-json\/wp\/v2\/tags?post=11525"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}