Course 6: Processing Data Analyst data with Apache Spark

147 Views
Oct 1, 2021
Course 6: Processing Data Analyst data with Apache Spark

in the course 6 data analyst , we will discover the practical side of data analysis with apache spark

With the Spark system, we are dealing with a first example (undoubtedly the most in vogue at the time of this writing) of environments dedicated to large-scale distributed computing which offer much more powerful functionalities than the simple MapReduce of the origins, always available in the Hadoop ecosystem.

These functionalities consist in particular of a set of second order operators (see this notion in the chapter Distributed Computing: Hadoop and MapReduce ) which considerably extend the simple pair made up of Map and Reduce. We got a glimpse of these operators with Pig, which however remains tied to a MapReduce execution context (a Pig program is compiled and executed as a sequence of MapReduce jobs ).

Among other limitations, this does not cover an important class of algorithms: those which proceed by iterations on a result progressively refined at each execution. This type of algorithm is very common in the general field of data mining: PageRank, kMeans , calculations of connected components in graphs, etc.

This chapter provides an introduction to the Spark system. We will just enter commands through the command interpreter spark-shell. The easiest way to reproduce these commands is therefore to download the latest version of Spark from the site http://spark.apache.org . The installation includes a subdirectory binin which are the commands that interest us. You can therefore place the path towards spark/bin in your variable PATH, according to specificities which depend on your environment: at this stage of the course you should be familiar with this type of maneuver.

S1: Introduction to Apache Spark

With MapReduce, the iteration specification remains the responsibility of the programmer; it is necessary to store the result of a first job in an intermediate collection and to reiterate the job by taking the intermediate collection as source. It is laborious for the implementation, and especially very inefficient when the intermediate collection is large. The on-disk serialization / deserialization process specific to managing failover in MapReduce results in poor performance.

In Spark, the method is very different. It consists of placing these datasets in RAM memory and avoiding the penalty of writes to the disk. The challenge then is of course to offer an effective automatic failure recovery.

System architecture

Spark is a framework that coordinates the execution of tasks on data by distributing them within a cluster of machines. It is intended to be extremely modular and flexible. Thus, the very management of the cluster of machines can be delegated either to Spark’s cluster manager, or to Yarn or Mesos (other managers for Hadoop).

The programmer sends the framework of Spark Applications for which Spark allocates resources (RAM, CPU) in the cluster for execution. A Spark application consists of a driver process and executors . The driver is essential for the application because it executes the main () function and is responsible for 3 things:

  • keep information about the application;
  • respond to user input or requests from external programs;
  • analyze, distribute and schedule tasks (see below).

An executor is only responsible for 2 things: execute the code assigned to it by the driver and report the progress of the task to it.

The driver is accessible programmatically through an entry point called SparkSession , which is found behind a variable spark.

Figure Fig. 100 illustrates the system architecture of Spark. In this example there is a driver and 4 executors . The notion of node in the cluster is absent: users can configure how many executors are based on each node.

 

Fig. 100 Spark’s system architecture 

Spark is a multilingual framework : Spark programs can be written in Scala, Java, Python, SQL and R. However, it is first written in Scala, this is its default language. This is the one in which we will be working. It is concise and offers the full API. Please note, the API is complete in Scala and Java, not necessarily in other languages.

Note

Spark can also run in local mode , where driver and executors are just machine processes. The power of Spark is to offer transparency (for programs) between a local execution or on a cluster.

Application architecture

The ecosystem of Spark APIs is hierarchical and essentially comprises 3 levels:

  • low-level APIs, with RDDs ( Resilient Distributed Dataset );
  • high level APIs, with Datasets , DataFrames and SQL;
  • other libraries ( Structured Streaming , Advanced Analytics , etc.).

We will leave the last level aside in this course: streaming will be seen with Flink in the next chapter and exploring machine learning libraries is part of course RCP216 .

Initially, RDDs were the focus of programming with Spark (which means that a lot of the resources you’ll find on Spark will be built on them). Today, we prefer higher-level APIs, which we will explore in detail, Datasets and DataFrames. These have the advantage of being close to known data structures (with a tabular view), thus facilitating the transition to Spark. In addition, they are optimized very efficiently by the framework , hence performance gains.

The innovation of RDDs

The main innovation brought by Spark is the concept of Resilient Distributed Dataset (RDD). An RDD is a collection (to stay with our vocabulary) calculated from a data source (for example a Cassandra database, a data stream, another RDD) and placed in RAM memory. Spark keeps the history of operations that made it possible to constitute an RDD, and failure recovery relies primarily on the preservation of this history in order to reconstitute the RDD in the event of failure. To put it briefly: Spark does not preserve data in extension but in intention. Preserving a program that fits in a few lines of specification (cf. Pig programs) is much easier and more efficient than preserving the data set from this string. This is the main idea for the resilience of RDDs.

In addition, RDDs represent partitioned and distributed collections. Each RDD is therefore made up of what we have called fragments . A failure affecting an individual fragment can therefore be repaired (by reconstituting the history) independently of the other fragments, avoiding having to recalculate everything .

The DataFrames and Datasets that we will use later are based on RDDs, i.e. Spark transforms operations on DataFrames / Datasets into operations on RDDs. In practice, you will rarely need RDDs (unless you maintain old code, or your expertise leads you to go beyond Structured APIs ).

Actions and transformations: the Spark processing chain

A fundamental part of the practice of Spark is the immutability of collections, they cannot be changed after creation. It’s a little unusual and it induces new ways of working.

Indeed, to pass input data to the output of the program, we will have to think of a chain of collections which will constitute the processing steps. The first collection (s) contain the input data. Then each collection is the result of transformations on the previous structures, the equivalent of what we called operator in Pig. As in Pig, a transformation selects, enriches, restructures a collection, or combines two collections. We find in Spark, more or less, the same operators / transformations as in Pig, as shown in the table below (which is of course not exhaustive: refer to the documentation for additional information).

Operator Description
map Takes a document as input and produces a document as output
filter Filters documents in the collection
flatMap Takes a document as input, produces one or more document (s) as output
groupByKey Grouping documents by a common key value
reduceByKey Reduction of a pair (k, [v]) by an aggregation of the array [v]
crossProduct Cartesian product of two collections
join Joining two collections
union Union of two collections
cogroup See the description of the operator in the section on Pig
sort Sorting a collection

The collections obtained during the different stages of a processing chain are stored in RDDs, DataFrames, etc., depending on the API used. This is exactly the notion that we have already explored with Pig. The key difference is that in Spark, RDDs or DataFrames can be marked as persistent because they can be reused in other chains. Spark does its best to store persistent structures in RAM, for maximum efficiency.

Fig. 101 Persistent and Transient RDDs in Spark. 

The collections form a graph constructed by applying transformations from stored collections ( Fig. 101 ). If it is not marked as persistent, the RDD / DataFrame will be transient and will not be kept in memory after calculation (this is the case with RDD 1 and 3 in the figure). Otherwise, it is stored in RAM, and available as a data source for further transformations.

As opposed to transformations that produce other RDDs or DataFrames, actions produce values (for the user). The evaluation of operations in Spark is said to be “lazy”, that is to say that Spark waits as long as possible to execute the graph of the processing instructions. More precisely, an action triggers the execution of the transformations which precede it.

Lazy evaluation allows Spark to compile simple DataFrames transformations into a physical execution plan that is efficiently distributed across the cluster. An example of this efficiency is illustrated by the concept of predicate pushdown : if one filter()at the end of a sequence results in only working on 1 line of the input data, the other operations will take this into account, optimizing performance accordingly. in time and space.

RDDs, Dataset and DataFrame 

An RDD, coming from the low-level API, is a “box” intended to contain any document, without any prejudice on the structure (or lack of structure) of the latter. This makes the system very general, but prevents fine manipulation of document constituents, such as filtering according to the value of a field. It is the programmer of the application who must provide the function performing the filter.

As we have said, Spark implements a higher level API with structures similar to relational tables: the Dataset and DataFrame . They include a diagram , with the definitions of the columns. Knowing this scheme – and possibly their type – allows Spark to offer more refined operations, and optimizations inspired by query evaluation techniques in relational systems. In fact, we come back to a distributed implementation of the SQL language. Internally, an important advantage of knowing the schema is to avoid resorting to the serialization of Java objects (operation performed in the case of RDDs to write to disk and exchange data on a network).

Note

We welcome in passing the gradual movement of these systems towards a re-assimilation of relational principles (schema, data structuring, SQL interrogation, etc.), and the recognition of the advantages, internal and external, of data modeling. . From NoSQL to BackToSQL !

A distinction is made between Datasets , for which the type of columns is known, and DataFrames . A DataFrame is nothing other than a Dataset ( containing rows of Row type whose precise schema is not known. This typing of data structures is linked to the programming language: Python and R being dynamically typed, they only access DataFrames, while in Scala and Java, we use Datasets, strongly typed JVM objects.DataFrame = Dataset[Row])

Is this all a bit abstract? Here is a simple example to illustrate the main advantages of Dataset / DataFrame . We want to apply an operator that filters movies with genre “Drama”. We will express the filter (simplifying it a bit) as follows:

films.filter(film.getGenre() == 'Drame');

If filmsis an RDD, Spark has no idea about the structure of the documents it contains. Spark will therefore instantiate a Java object (possibly by de-serializing a string of bytes received by network or read on disk) and call the method getGenre(). This can be long, and above all requires creating an object for a simple test.

With a Dataset or DataFrame , the schema is known and Spark uses its own encoding / decoding system instead of Java serialization. In addition, in the case of Datasets , the value of the field genrecan be tested directly without even performing any decoding from the binary representation.

In short, it is absolutely preferable to use Datasets when dealing with structured data.

Example: parsing log files

Let’s take a concrete example: in an application server, we see that a module M produces incorrect results from time to time. We want to analyze the application log file which contains the messages produced by the suspect module, and by many other modules.

We therefore build a program which loads the log in the form of a collection, keeps only the messages produced by the module M and then analyzes these messages. Several analyzes are possible depending on the suspected causes: the first for example looks at the log of M for a particular product, the second for a particular user, the third for a particular time slot, etc.

With Spark, we will create a dataframe logMpersistent, containing messages generated by M . We will then build, from logMnew derived DataFrames for the specific analyzes ( Fig. 102 ).

 

Fig. 102 Scenario of a log analysis with Spark 

We combine two transformations to build logM, as shown by the following program (which is not the exact syntax of Spark, which we will present later).

// Chargement de la collection
log = load ("app.log") as (...)
// Filtrage des messages du module M
logM = filter log with log.message.contains ("M")
// On rend logM persistant !
logM.persist();

We can then build an analysis based on the produced code directly from logM.

// Filtrage par produit
logProduit = filter logM with log.message.contains ("product P")
// .. analyse du contenu de logProduit

And also use logMfor other analysis, based on user.

// Filtrage par utilisateur
logUtilisateur = filter logM with log.message.contains ("utilisateur U")
// .. analyse du contenu de logProduit

Or by time slot.

// Filtrage par utilisateur
logPeriode = filter logM with log.date.between d1 and d2
// .. analyse du contenu de logPeriode

logM is a kind of “view” on the initial collection, the persistence of which avoids having to redo the complete calculation at each analysis.

Failover

To understand failure recovery, we must look at the second aspect of RDD: distribution . An RDD is a partitioned collection (see chapter NoSQL systems: partitioning ), so are DataFrames. The Fig. 103 shows the foregoing processing from a distribution perspective. Each DataFrame, persistent or not, is made up of fragments distributed throughout the server cluster.

 

Fig. 103 Partitioning and Failover in Spark. 

If a failure affects a calculation based on a fragment F dataframe persistent (e.g., transformation noted Tand marked by a red cross on the figure), it is sufficient to raise it from F . The time savings are considerable!

The most severe failure affects a non- persistent DataFrame fragment (for example, the one marked with a purple cross). In this case, Spark has memorized the processing chain that made up the DataFrame, and it suffices to re-apply this chain by going back to the fragments which precede in the calculation graph.

In our case, we have to browse the file again log to create the fragment logn. If the collections stored at the origin of the calculation are themselves partitioned (which is probably not the case for a log file ), it will suffice to access the part of the collection at the origin of the calculations leading to the failed DataFrame.

In summary, Spark exploits the ability to reconstruct fragments of RDD / DataFrame by application of the processing chain, and this while limiting itself if possible to only a part of the original data. The recovery may take time, but it avoids a complete recalculation. If all goes well (no failure) the presence of intermediate results in RAM memory ensures very good performance.

S2: Spark in practice

 

It’s time to take action. We will start by showing how to perform transformations on unstructured data with standard DataFrames.

Important

The following examples are in Scala language. It’s not for the pleasure of introducing a new language that you (probably) don’t know. Scala happens to be a functional language, with a powerful type inference system, which makes it particularly suitable for expressing chains of processes as a sequence of function calls. Scala is fully compatible with Java, but much, much less verbose, as the following examples will show. The comments should allow you to gradually familiarize yourself with the language. The official documentation is available in English only.

For everything that follows, you must first launch the command interpreter which is located in spark/bin, and therefore in principle accessible in your PATH paths to the executable files, if you have carried out the few necessary post-installation operations.

spark-shell

Note

Like Pig, Spark’s interpreter displays a lot of messages to the console, which is confusing. To get rid of it:

  • copy the file sparkdir/conf/log4j.properties.templateto sparkdir/conf/log4j.properties;
  • edit log4j.propertiesand replace in the first line the level INFOby ERROR.

This is assuming that things haven’t changed between your version and mine. Otherwise, search the web.

Transformations and actions

n any text file will do. Copy and paste the commands below. Commands are preceded by scala>, they are sometimes followed by the result of their execution in the spark shell .

scala> val loupsEtMoutons = spark.read.textFile("loups.txt")
loupsEtMoutons: org.apache.spark.sql.Dataset[String] = [value: string]

We have created a first DataFrame. Spark offers actions directly applicable to a DataFrame and producing scalar results. (A DataFrame is interfaced like an object to which we can apply methods.)

scala> loupsEtMoutons.count() // Nombre de documents dans ce RDD
  res0: Long = 4

scala> loupsEtMoutons.first() // Premier document du RDD
  res1: String = Le loup est dans la bergerie.

scala> loupsEtMoutons.collect() // Récupération du RDD complet

Note

Quick tip: by entering the name of the object ( loupsEtMoutons.) followed by the TAB key, the Scala interpreter displays the list of available methods.

Let’s move on to the transformations . They take one (or two) DataFrame as input, produce a DataFrame as output. You can select (filter) the documents (lines) that contain “sheepfold”.

scala> val bergerie = loupsEtMoutons.filter({ line => line.contains("bergerie") })

The function filter()takes as parameter a Boolean function (which returns Trueor Falsefor each row), and keeps in the resulting collection only the rows for which Truewas returned. Here, we use the function contains()(which takes a pattern as a parameter) and which returns Trueor Falsedepending on whether the string (here, the line) contains the pattern (here, “sheepfold”). Note also the syntax based on an anonymous function as a parameter of the function filter(): each line is called temporarily line, and it is associated with the result of line.contains("bergerie")with the operator =>.

We have created a second DataFrame. We are in the process of defining a processing chain which starts here from a text file and applies successive transformations.

At this stage, nothing is calculated, we just declared the steps. As soon as we trigger an action , such as displaying the content of a DataFrame (with collect()), Spark will trigger the execution.

scala> bergerie.collect()
res3: Array[String] = Array(Le loup est dans la bergerie., Les moutons sont
  dans la bergerie., Un loup a mangé un mouton, les autres loups sont restés
  dans la bergerie.)

You can combine a transformation and an action. In fact, with Scala, you can chain operations and thus define the workflow very concisely .

scala> loupsEtMoutons.filter({ line => line.contains("loup") }).count()
res4: Long = 3

And to conclude this small introductory session, here is how we implement in Spark the term counter in a collection, in DataFrame and in RDD.

Term counter, in DataFrames

We create a first DataFrame made up of all the terms:

scala> val termes = loupsEtMoutons.flatMap({ line => line.split(" ") })

The method splitdecomposes a character string (here, taking a space as separator). Note the operator flatMapthat produces multiple documents (here a term) for an input document (here a line).

scala> val termesGroupes = termes.groupByKey(_.toLowerCase)

Remember that at each step, you can display the content of the DataFrame with collect()(be careful, however, here termesGroupesis type KeyValueGroupedDatasetand does not have this method). A somewhat complex way of visualizing the content of GroupTerms:

scala> termesGroupes.mapGroups{(k, v) => (k, v.toArray)}.collect

Now let’s go to the count, with a count():

scala> val sommes = termesGroupes.count()

Finally, we display the counts, that is to say the lines of the Dataset sommes.

scala> sommes.show()

And here it is! We could have expressed it all at once.

scala> val compteurTermes = loupsEtMoutons.flatMap({ line => line.split(" ") })
                              .groupByKey(_.toLowerCase)
                              .count()
                              .show()

Trick

If you want to enter multi-line instructions in the Scala interpreter, use the command: paste, followed by your instructions, and CTRL D to finish.

The result may seem a little strange to you ( pré,): it lacks the various steps to simplify the text that are required for a search engine (see the chapter Searching with ranking for details). But the main thing is to understand the chain of operators.

Finally, if you want to keep the final DataFrame in memory to submit it to various processing, just call:

scala> compteurTermes.persist()

Term counter, in RDD

With RDDs, we have functions map()and reduce(), less close to SQL and less high level, but efficient.

We start by creating the first RDD:

scala> val loupsEtMoutonsRDD = spark.read.textFile("loups.txt").rdd

We break down the lines in terms:

scala> val termes = loupsEtMoutonsRDD.flatMap({ line => line.split(" ") })

We introduce the notion of counting: each term is equal to 1. The operator mapproduces an output document for each input document. We can use it here to enrich each term with its initial counter.

scala> val termeUnit = termes.map({word => (word, 1)})

The next step gathers the terms and takes the sum of their counters: it is an operator reduceByKey.

scala> val compteurTermes = termeUnit.reduceByKey({(a, b) => a + b})

We pass to the operator a reduction function, here written literally in the Scala syntax. Such a function takes two parameters as input: an accumulator (here a) and the new value to be aggregated to the accumulator (here b). The aggregation here is simply the sum.

It remains to perform the complete treatment:

scala> compteurTermes.collect()

All at once:

scala> val compteurTermes = loupsEtMoutonsRDD.flatMap({ line => line.split(" ") })
                       .map({ word => (word, 1) })
                       .reduceByKey({ (a, b) => a + b })
scala> compteurTermes.collect

The Spark control interface

Spark has a web interface that allows you to take a look at the guts of the system and gain a better understanding of what is being done. It is accessible on port 4040, therefore at the URL http: // localhost: 4040 for a shell execution . To explore the information provided by this interface, we will run our workflow , assembled into a single chain of Scala instructions.

val compteurTermes =  sc.textFile("loups.txt")
       .flatMap(line => line.split(" "))
       .map({ word => (word, 1) })
       .reduceByKey({ (a, b) => a + b })

compteurTermes.collect()

Launch the shell and run this workflow .

Now you should be able to access the interface and get a display like the one in Fig. 104 . In particular, the job you just executed should appear, along with its execution time and some other information.

 

Fig. 104 The Spark web interface 

The jobs tab

Click on the job name for details on the calculation steps ( Fig. 105 ). Spark tells us that the execution was done in two stages. The first includes the transformations textFileflatMapand mapthe second the transformation reduceByKey. the two stages are separated by a shuffle phase .

 

Fig. 105 Plan for the execution of a Spark job : the steps. 

What do these steps correspond to ? In fact, if you have followed the above correctly in the course, you have the elements to answer: a step in Spark groups together a set of operations that can be performed locally , on a single machine, without having to carry out network exchanges. It is a generalization of the Map phase in a MapReduce environment. The steps are logically separated by shuffle phases which consist in redistributing the data in order to group them according to certain criteria. Reread the chapter Distributed Computing: Hadoop and MapReduce to review your basics of distributed computing if it is not clear.

When processing is performed on partitioned data, a step is performed in parallel on the shards, and Spark calls the task to perform the step on a particular shard, for a particular machine. Let’s sum up:

  • job is the execution of a chain of processes ( workflow ) in a distributed environment.
  • job is divided into stages , each stage being a segment of the workflow that can be executed locally.
  • The execution of a step is done by a set of tasks, one per machine hosting a fragment of the RDD serving as the entry point to the step.

And There you go ! If it is clear go on, otherwise reread.

The Internships tab

You can obtain additional information on each stage with the Stages tab (which means stages , in English). In particular, the interface shows numerous statistics on the execution time, the volume of data exchanged, etc. All this is very valuable when you want to check that all is well for treatments that last hours or days.

The Storage tab

Now check out the Storage tab . It should be empty and this is normal: no job is running. Our seed file is too small for the runtime to be meaningful. But enter the following command:

compteurTermes.persist()

And perform the action again collect(). This time an RDD should appear in the Storage tab , and moreover you should understand why!

Run the action again collect()and view the runtime statistics. The last run should be significantly faster than the previous ones. Do you understand why? Look at the steps, and clear it all up in your mind.

This is only a 4 line input file. We can extrapolate to very large collections and realize the potential gain with this method (which is not magic: we traded time for space, as always).

Putting it into practice

Exercise MEP-SPark-1 : it’s your turn

You can imagine what to do at this point: replicate the above commands, and explore the Spark interface until everything is clear. You may spend a little time there, but putting it into practice will put you very concretely at the heart of a widely used system, which is based on a good part of the concepts seen in class.

Exercise MEP-SPark-2 : Let’s move on to PageRank

Let’s try to implement our PageRank with Spark. We will assume that our graph is stored in a text file graphe.txtwith one line per edge,

url1 url2
url1 url3
url2 url3
url3 url2

Let’s start by creating the matrix (or more exactly the vectors representing the outgoing links for each URL).

val graphe = spark.read.textFile("graphe.txt")
val matrix = graphe.map{ s =>
                  val parts = s.split("\\s+")
                  (parts(0), parts(1))
              }.distinct().groupByKey()

Let’s initialize the initial row vector

var ranks = matrix.mapValues(v => 1.0)

Let’s apply 20 iterations.

for (i <- 1 to 20) {
  val contribs =
      matrix.join(ranks)
            .values
            .flatMap{ case (urls, rank) =>
                        val size = urls.size
                        urls.map(url => (url, rank / size))
                    }
      ranks = contribs.reduceByKey(_ + _)
}

Finally let’s do it all

ranks.collect()

Once it works, you can make some improvements

  1. Add operators persist() or cache()wherever you see fit.
  2. Refine PageRank by introducing a probability (10% for example) of making a “jump” to any page instead of following outgoing links.

S3: Processing structured data with Cassandra and Spark

 

Let us now see the processing tools offered by Spark on structured data resulting, for example, from a database, or from collections of JSON documents. In this case, we obviously interact in a privileged way with DataFrames and Datasets . As we have said, the two structures are similar to relational tables, but the second is, moreover, strongly typed since we know the type of each column. This considerably simplifies the processes, both from the point of view of the designer of the processes and that of the system.

  • For the designer, the ability to reference fields and apply standard operations to them according to their type avoids having to write a specific function for every single operation, making the code much readable and concise.
  • For the system, knowledge of the schema facilitates checks before execution ( compile-time checking , as opposed to run-time checking ), and allows very fast serialization, independent of Java serialization, thanks to a layer made up of encoders .

We will take this opportunity to instantiate a start to a realistic architecture by associating Spark with Cassandra as a data source. In such an organization, storage and partitioning are provided by Cassandra, and computing distributed by Spark. Ideally, each Spark node processes one or more fragments from a Cassandra partitioned collection, and therefore communicates with one of the nodes in the Cassandra cluster. We then obtain a completely distributed and therefore scalable system .

Preliminaries

The Cassandra base that we take as support is that of New York restaurants. Refer to the Cassandra – Practical work chapter for the creation of this database. In what follows, it is assumed that the Cassandra server is listening on the machine 192.168.99.100, port 32769 (if you are using Cassandra with Docker, also refer to the operations seen in TP to find the right values ​​of IP and port, which are probably different from these).

To associate Spark and Cassandra, you need to get the connector on the page https://spark-packages.org/package/datastax/spark-cassandra-connector . Take the most recent version, at least the one corresponding to your version of Spark.

You get a jar file. The easiest way to take it into account is to copy it to the jarsSpark directory . Then launch the Spark shell . All that remains is to connect to the Cassandra server by adding the configuration (machine and port) in the Spark context. So first run the following commands (replacing the machine and port with your own values, of course).

import org.apache.spark.sql.cassandra._
import com.datastax.spark.connector.cql.CassandraConnectorConf
import com.datastax.spark.connector.rdd.ReadConf

// Paramètres de connexion
spark.setCassandraConf("default",
                   CassandraConnectorConf.ConnectionHostParam.option("192.168.99.100")
                ++ CassandraConnectorConf.ConnectionPortParam.option(32769))

For CNAM machines

We can quickly set up the Cassandra database with the data and a spark connected to Cassandra by following the few lines below:

  1. We launch the Cassandra machine by typing:
docker run --name mon-cassandra -p3000:9042 -d cassandra:latest
  1. We download the restaurant data and decompress the file:
wget b3d.bdpedia.fr/files/restaurants.zip
unzip restaurants.zip
  1. We get the id of our Cassandra container:
docker ps
  1. We copy the files to the Cassandra “machine”
docker cp ./restaurants.csv <CONTAINER-ID>:/
docker cp ./restaurants_inspections.csv <CONTAINER-ID>:/
  1. We open a cqlsh terminal
docker exec -it mon-cassandra cqlsh
  1. We launch the database creation commands, then those of the tables, and finally the filling of the tables: see http://b3d.bdpedia.fr/cassandra_tp.html#creation-de-la-base-de-donnees
  2. We download the spark-cassandra connector in another terminal:
wget https://b3d.bdpedia.fr/files/spark-cassandra-connector_2.11-2.3.0.jar
  1. We launch spark with the jar obtained:
spark-shell --jars ./spark-cassandra-connector_2.11-2.3.0.jar
  1. The following connection options are used:
import org.apache.spark.sql.cassandra._
import com.datastax.spark.connector.cql.CassandraConnectorConf
import com.datastax.spark.connector.rdd.ReadConf

// Paramètres de connexion
spark.setCassandraConf("default",
                   CassandraConnectorConf.ConnectionHostParam.option("127.0.0.1")
                ++ CassandraConnectorConf.ConnectionPortParam.option(3000))

You should be able to verify that the connection is working by querying the restaurant table.

val restaurants_df = spark.read.cassandraFormat("restaurant", "resto_ny").load()
restaurants_df.printSchema()
restaurants_df.show()

Note

It seems that the name of the Keyspace and the table must be put in lowercase.

Here we are in the presence of a Spark DataFrame , whose schema (column names) was obtained directly from Cassandra. On the other hand, the columns are not typed (one could hope that the type is recovered and transcribed from Cassandra’s schema, but this is unfortunately not the case).

To obtain a Dataset whose columns are typed, with all the resulting advantages, you have to define a class in the programming language (here, Scala) and request the conversion, as follows:

case class Restaurant(id: Integer, Name: String, borough: String,
                       BuildingNum: String, Street: String,
                       ZipCode: Integer, Phone: String, CuisineType: String)

val restaurants_ds = restaurants_df.as[Restaurant]

So now we have a DataFrame restaurant_df and a Dataset restaurant_ds . The first is a collection of type objects Row, the second a collection of type objects Restaurant. We can therefore express more precise operations on the second. Note that all this constitutes a practical illustration of the compromise that we have been studying since the beginning of this course on the notion of document: is it better to have data with a very constrained schema, but offering more security, or data with a very flexible schema, but much more difficult to handle?

We will also need inspection data for these restaurants.

case class Inspection (idRestaurant: Integer, InspectionDate: String, ViolationCode: String,
      ViolationDescription: String, CriticalFlag: String, Score: Integer, Grade: String)

val inspections_ds = spark.read.cassandraFormat("inspection", "resto_ny").load().as[Inspection]

Note

For those who want to directly experience the SQL interface of Spark, there is a third option, that of creating a “view” on Cassandra restaurants with the following command:

val createDDL = """CREATE TEMPORARY VIEW restaurants_sql
            USING org.apache.spark.sql.cassandra
            OPTIONS (
             table "restaurant",
             keyspace "resto_ny")"""
spark.sql(createDDL)

spark.sql("SELECT * FROM restaurants_sql").show

Processing based on Datasets 

We are going to illustrate the interface for handling Datasets (it also applies to DataFrames , except that we cannot use the precise typing given by the class of the objects contained in the collection). To fully understand the power of this interface, you are invited to think about what it would take to obtain an equivalent result if we were dealing with a simple RDD, without diagram, with therefore the need to write a function at each step. .

Let’s start with the projections (unfortunately referenced by the keyword selectsince the beginnings of SQL) consisting in keeping only certain columns. The following command keeps only three columns.

val restaus_simples = restaurants_ds.select("name", "phone", "cuisinetype")

restaus_simples.show()

Here is now how to make a selection (with the keyword filter, corresponding to whereSQL).

val manhattan = restaurants_df.filter("borough =  'MANHATTAN'")

manhattan.show()

Subsequently, we omit the call to show () which you can add if you want to see the result.

The Dataset interface offers a slightly different syntax that takes advantage of the fact that we are dealing with a collection of type objects Restaurant. We can therefore pass as a parameter a Boolean Scala expression which takes an object Restaurant as input and returns a Boolean.

val r = restaurants_ds.filter(r => r.borough == "MANHATTAN")

This type of construct allows for static (compile time) typing which ensures that there will be no problem at run time.

Aggregates can be carried out, such as for example the grouping of restaurants by district ( borough ):

val comptage_par_borough = restaurants_ds.groupBy("borough").count()

All of this could just as well have been expressed in CQL (see exercises). But Spark definitely goes further in terms of processing capacity, and notably offers the famous join operation that we have so missed so far.

val restaus_inspections = restaurants_ds
       .join(inspections_ds, restaurants_ds("id") === inspections_ds("idRestaurant"))

The following process averages the votes for the Tapas restaurants.

val restaus_stats = restaurants_ds.filter("cuisinetype > 'Tapas'")
      .join(inspections_ds, restaurants_ds("id") === inspections_ds("idRestaurant"))
      .groupBy(restaurants_ds("name"))
     .agg(avg(inspections_ds("score")))

Putting it into practice

Exercise MEP-SPark-3 : it’s your turn

The practice of this session is more complex. If you choose to go for it, you will have an almost complete (at a very small scale) distributed computing and storage system.

Exercises

Exercise Ex-Spark-1 : Let’s think about iterative processing

The goal of this exercise is to model the computation of an iterative algorithm with Spark. We will take as an example the one we already know: PageRank. We take as a starting point a set of web pages containing links, stored in a system such as, for example, Elastic Search.

For the moment you are not asked to produce code, but to reflect and expose the principles, and in particular the management of RDD.

  • Starting from a distributed storage of Web pages, which processing chain makes it possible to produce the matrix representation of the PageRank graph? What operations are required and where to store the result?
  • Which processing chain is used to calculate, from the graph, the PageRank vector? You can set a number of iterations (100, 200) or determine a stop condition (much more difficult). Indicate the RDDs along the entire chain.
  • Finally indicate which RDDs should be marked persistent. There are two things you should consider: improved performance and reduced recovery time.

Exercise: Ex-Spark-2 what happened to CQL?

You may have noticed that Spark outperforms CQL. We can therefore consider doing without the latter, which nevertheless raises a major drawback (which one?). The Spark / Cassandra connector allows you to delegate Spark transformations compatible with CQL thanks to a pushdown parameter which is enabled by default.

  • Clearly state the downside of using Spark as a replacement for CQL.
  • Study the role and operation of the pushdown option in the connector documentation.
  • Which of the queries seen above can be passed to CQL?

And to go further

Exercise Ex-Spark-3 : execution plans

With the interface of Spark you can consult the execution graph of each treatment. As we have moved with the DataFrame API to a much more declarative level , it is worth looking at, for each processing done (and especially the join) how Spark evaluates the result with distributed operators.

Exercise Ex-Spark-4 : Exploring the Dataset interface

The Datasets API is presented here:

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset

Study and experience the transformations and actions described.

Exercise Ex-Spark-5 : Cassandra and Spark, a complete distributed system

By combining Cassandra and Spark, we get a complete distributed environment, Cassandra for storage, Spark for compute. The question to be studied (which can be the subject of a project) is the good integration of these two systems, and in particular the correspondence between the partitioning of the Cassandra storage and the partitioning of the Spark calculations. Ideally, each fragment in a Cassandra collection should become an RDD fragment in Spark, and all fragments processed in parallel. To be deepened!

Leave a Reply

Your email address will not be published. Required fields are marked *