โฌ†๏ธ โฌ‡๏ธ

Running regular tasks on a cluster or how to make friends with Apache Spark and Oozie



It has long been in the air the need to implement the launch of regular Spark tasks through Oozie, but all hands did not reach and finally it happened. In this article I want to describe the whole process, maybe it will simplify your life.



Content





Task



We have the following structure in hdfs:



hdfs://hadoop/project-MD2/data hdfs://hadoop/project-MD2/jobs hdfs://hadoop/project-MD2/status 


The data directory receives data daily and is displayed in directories according to the date. For example, data for 12/31/2017 will be written in the following way: hdfs://hadoop/project/data/2017/12/31/20171231.csv.gz .



Input Format




The jobs directory contains tasks that are directly related to the project. Our task will also be placed in this directory.

The status directory should store statistics on the number of empty fields (with a null value) for each day in json format. For example, for the data for 12/31/2017, the file hdfs://hadoop/project-MD2/status/2017/12/31/part-*.json should appear



Will accept json file:


 { "device_id_count_empty" : 0, "lag_A0_count_empty" : 10, "lag_A1_count_empty" : 0, "flow_1_count_empty" : 37, "flow_2_count_empty" : 100 } 


Hardware and software installed



We have a cluster of 10 machines, each of which has an 8-core processor and 64 GB of RAM. The total amount of hard drives on all machines is 100 TB. To run tasks on a cluster, the PROJECTS queue is allocated.



Installed software:




Writing Spark Tasks



Create a project structure, this can be very easily done in any development environment that supports scala or from the console, as shown below:



 mkdir -p daily-statistic/project echo "sbt.version = 1.0.2" > daily-statistic/project/build.properties echo "" > daily-statistic/project/plugins.sbt echo "" > daily-statistic/build.sbt mkdir -p daily-statistic/src/main/scala 


Great, now let's add a plugin for the assembly, for this we add the following line in the daily-statistic/project/plugins.sbt file:



 addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5") 


Add a description of the project, dependencies and features of the assembly in the file daily-statistic/build.sbt :



 name := "daily-statistic" version := "0.1" scalaVersion := "2.11.11" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "2.0.0" % "provided", "org.apache.spark" %% "spark-sql" % "2.0.0" % "provided" ) assemblyJarName in assembly := s"${name.value}-${version.value}.jar" 


Go to the daily-statistic directory and execute the sbt update command to update the project and pull dependencies from the repository.

Create Statistic.scala in the src/main/scala/ru/daily directory



Task code:


 package ru.daily import org.apache.spark.sql.SparkSession import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions._ object Statistic extends App { //  implicit lazy val spark: SparkSession = SparkSession.builder() .appName("daily-statistic") .getOrCreate() import spark.implicits._ val workDir = args(0) val datePart = args(1) val saveDir = args(2) try { val date = read(s"$workDir/$datePart/*.csv.gz") .select( '_c0 as "device_id", '_c1 as "lag_A0", '_c2 as "lag_A1", '_c3 as "flow_1", '_c4 as "flow_2" ) save(s"$saveDir/$datePart", agg(date)) } finally spark.stop() //    def read(path: String)(implicit spark: SparkSession): DataFrame = { val inputFormat = Map("header" -> "false", "sep" -> ";", "compression" -> "gzip") spark.read .options(inputFormat) .csv(path) } //   def agg(data: DataFrame):DataFrame = data .withColumn("device_id_empty", when('device_id.isNull, lit(1)).otherwise(0)) .withColumn("lag_A0_empty", when('lag_A0.isNull, lit(1)).otherwise(0)) .withColumn("lag_A1_empty", when('lag_A1.isNull, lit(1)).otherwise(0)) .withColumn("flow_1_empty", when('flow_1.isNull, lit(1)).otherwise(0)) .withColumn("flow_2_empty", when('flow_2.isNull, lit(1)).otherwise(0)) .agg( sum('device_id_empty) as "device_id_count_empty", sum('lag_A0_empty) as "lag_A0_count_empty", sum('lag_A1_empty) as "lag_A1_count_empty", sum('flow_1_empty) as "flow_1_count_empty", sum('flow_2_empty) as "flow_2_count_empty" ) //   def save(path: String, data: DataFrame): Unit = data.write.json(path) } 


We sbt assembly project with the sbt assembly team from the daily-statistic directory. After successful completion of the build, a package with the task daily-statistic-0.1.jar will appear in the daily-statistic/target/scala-2.11 daily-statistic-0.1.jar .



Writing workflow.xml



To run a task through Oozie, you need to describe the launch configuration in the workflow.xml file. Below is an example for our task:



 <workflow-app name="project-md2-daily-statistic" xmlns="uri:oozie:workflow:0.5"> <global> <configuration> <property> <name>oozie.launcher.mapred.job.queue.name</name> <value>${queue}</value> </property> </configuration> </global> <start to="project-md2-daily-statistic" /> <action name="project-md2-daily-statistic"> <spark xmlns="uri:oozie:spark-action:0.1"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <master>yarn-client</master> <name>project-md2-daily-statistic</name> <class>ru.daily.Statistic</class> <jar>${nameNode}${jobDir}/lib/daily-statistic-0.1.jar</jar> <spark-opts> --queue ${queue} --master yarn-client --num-executors 5 --conf spark.executor.cores=8 --conf spark.executor.memory=10g --conf spark.executor.extraJavaOptions=-XX:+UseG1GC --conf spark.yarn.jars=*.jar --conf spark.yarn.queue=${queue} </spark-opts> <arg>${nameNode}${dataDir}</arg> <arg>${datePartition}</arg> <arg>${nameNode}${saveDir}</arg> </spark> <ok to="end" /> <error to="fail" /> </action> <kill name="fail"> <message>Statistics job failed [${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <end name="end" /> </workflow-app> 


In the global block, a queue is established, for the MapReduce task that will find our task and run it.

The action block describes the action, in our case the start of the spark task, and what to do when completed with the or ERROR status.

In the spark block, the environment is determined, the task is configured, and arguments are passed. The configuration of the launch task is described in the spark-opts . Parameters can be viewed in the official documentation.

If the task is completed with the ERROR status, the execution goes to the kill block and a multiple error message is displayed.

Parameters in curly brackets, such as ${queue} , we will determine at startup.



Writing coordinator.xml



To organize a regular launch, we will need more coordinator.xml . Below is an example for our task:



 <coordinator-app name="project-md2-daily-statistic-coord" frequency="${frequency}" start="${startTime}" end="${endTime}" timezone="UTC" xmlns="uri:oozie:coordinator:0.1"> <action> <workflow> <app-path>${workflowPath}</app-path> <configuration> <property> <name>datePartition</name> <value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1, 'DAY'), "yyyy/MM/dd")}</value> </property> </configuration> </workflow> </action> </coordinator-app> 


Here, from the interesting, the parameters frequency , start , end , which determine the frequency of execution, the date and time of the start of the task, the date and time of the end of the task, respectively.

The workflow block indicates the path to the directory with the workflow.xml file, which we will specify later on launch.

In the configuration block, the value of the datePartition property is datePartition , which in this case is equal to the current date in the format yyyy/MM/dd minus 1 day.



Project posting on hdfs


As mentioned earlier, we will place our task in the hdfs://hadoop/project-MD2/jobs directory:



 hdfs://hadoop/project-MD2/jobs/daily-statistic/lib/daily-statistic-0.1.jar hdfs://hadoop/project-MD2/jobs/daily-statistic/workflow.xml hdfs://hadoop/project-MD2/jobs/daily-statistic/coordinator.xml hdfs://hadoop/project-MD2/jobs/daily-statistic/sharelib 


Here, in principle, everything is clear without comments except for the sharelib directory. In this directory we put all the libraries that were used in the process of creating the task sew. In our case, these are all Spark 2.0.0 libraries, which we specified in project dependencies. Why do you need it? The fact is that in the project dependencies we specified "provided" . This says the build system does not need to include dependencies in the project, they will be provided by the launch environment, but the world is not in place, cluster administrators can update the version of Spark. Our task may be sensitive to this update, so the launch will use a set of libraries from the sharelib directory. How this is configured will show below.



Run regular run



And so everything is ready for the exciting moment of launch. We will run the task through the console. At startup, you need to set the values โ€‹โ€‹of the properties that we used in the xml files. Let's take these properties into a separate file coord.properties :



 #   nameNode=hdfs://hadoop jobTracker=hadoop.host.ru:8032 #      coordinator.xml oozie.coord.application.path=/project-MD2/jobs/daily-statistic #    (  24 ) frequency=1440 startTime=2017-09-01T07:00Z endTime=2099-09-01T07:00Z #      workflow.xml workflowPath=/project-MD2/jobs/daily-statistic #  ,      mapreduce.job.user.name=username user.name=username #        dataDir=/project-MD2/data saveDir=/project-MD2/status jobDir=/project-MD2/jobs/daily-statistic #     queue=PROJECTS #       hdfs   oozie.libpath=/project-MD2/jobs/daily-statistic/sharelib oozie.use.system.libpath=false 


Wonderful, rub everything ready. We start regular execution with the command:



 oozie job -oozie http://hadoop.host.ru:11000/oozie -config coord.properties -run 


After launch, the task job will be displayed in the console. Using this job_id you can view information on the status of the task:



 oozie job -info {job_id} 


Stop the task:



 oozie job -kill {job_id} 


If you do not know the job_id task, you can find it by displaying all the regular tasks for your user:



 oozie jobs -jobtype coordinator -filter user={user_name} 


Conclusion



It turned out a bit long, but in my opinion better detailed instructions than the quest search on the Internet. I hope the described experience will be useful for you, thank you for your attention!



')

Source: https://habr.com/ru/post/338952/



All Articles