📜 ⬆️ ⬇️

Ship terabytes in barrels or SparkStreaming vs Spring + YARN + Java


As part of the project to integrate GridGain and storage based on Hadoop (HDFS + HBASE), we faced the challenge of obtaining and processing a substantial amount of data, up to about 80 TB per day. This is necessary for building storefronts and for recovering data deleted in GridGain after it is uploaded to our long-term storage. In general terms, we can say that we transfer data between two distributed data processing systems using a distributed data transmission system. Accordingly, we want to talk about the problems that our team encountered in the implementation of this task and how they were solved.

Since the integration tool is Kafka (described in detail in the article by Mikhail Golovanov), the use of SparkStreaming looks like a natural and easy solution. Easy, because you don’t need to worry too much about crashes, reconnects, commits, etc. Spark is known as a fast alternative to the classic MapReduce, thanks to numerous optimizations. You just need to tune in to the topic, process the batch and save it to a file, which was implemented. However, during the development and testing, the instability of the data receiving module was noticed. In order to eliminate the influence of potential errors in the code, the following experiment was performed. All the message handling functionality was cut out and only direct storage was left immediately in avro:

JavaRDD<AvroWrapper<GenericRecord>> map = rdd.map(messageTuple -> { SeekableByteArrayInput sin = new SeekableByteArrayInput(messageTuple.value()); DataFileReader dataFileReader = new DataFileReader<>(sin, new GenericDatumReader<>()); GenericRecord record = (GenericRecord) dataFileReader.next(); return new AvroWrapper<>(record); }); Timestamp ts = new Timestamp(System.currentTimeMillis()); map.mapToPair(recordAvroWrapper -> new Tuple2<AvroWrapper<GenericRecord>, NullWritable>(recordAvroWrapper, NullWritable.get())) .saveAsHadoopFile("/tmp/SSTest/" + ts.getTime(), AvroWrapper.class, NullWritable.class, AvroOutputFormat.class, jobConf); 

All tests took place on this stand:


')
As it turned out, everything works fine on a cluster free from other tasks, you can get pretty good speed. However, it turned out that when working simultaneously with other applications, there are very large delays. Moreover, problems arise even at ridiculous speeds, about 150 MB / s. Sometimes the spark comes out of depression and catches up with the loss, but sometimes it happens like this:



Here you can see that at a reception rate of about 1000 messages per second (input rate), after several drawdowns, the delay in starting the batch processing (scheduling delay) still returned to normal (the middle part of the graph). However, at some point, the processing time went out of permissible limits and the soul of Sparke could not stand the earthly tests and rushed into the sky.

It is clear that for the Indian guru this is the norm, but our PROM is not in the ashram, so this is not particularly acceptable. In order to make sure that the problem is not in the function of storing data, you can use the Dataset wrapper - it seems like it is well optimized. Therefore, we try this code:

 JavaRDD<Row> rows = rdd.map(messageTuple -> { try (SeekableByteArrayInput sin = new SeekableByteArrayInput(messageTuple.value()); DataFileReader dataFileReader = new DataFileReader<>(sin, new GenericDatumReader<>())) { GenericRecord record = (GenericRecord) dataFileReader.next(); Object[] values = new Object[]{ record.get("field_1"), … record.get("field_N")}; return new GenericRowWithSchema(values, getSparkSchema(ReflectData.get().getSchema(SnapshotContainer.class))); } }); StructType st = (StructType) SchemaConverters.toSqlType(schm).dataType(); Dataset<Row> batchDs = spark.createDataFrame(rows, st); Timestamp ts = new Timestamp(System.currentTimeMillis()); batchDs .write() .mode(SaveMode.Overwrite) .format("com.databricks.spark.avro") .save("/tmp/SSTestDF/" + ts.getTime()); 

And we get exactly the same problems. And if you run two versions at the same time, on different clusters, then problems arose only for the one that works on the more loaded one. This meant that the problem was not the decision and the specifics of the data storage function. Testing also showed that if we simultaneously read the same topic with which SS worked, using flume on the same cluster, the same data extraction slowdown was obtained:

Topic1, Cluster1, SparkSreaming - slowdowns
Topic2, Cluster1, Flume - slowdowns
Topic2, Cluster2, SparkSreaming - no slowdowns

In other words, the problem was precisely the background load on the cluster. Thus, the task was to write an application that would work reliably in a highly loaded environment, and all this was complicated by the fact that the tests above do not contain any data processing logic at all. Whereas the real process looks like this:



The main difficulty here is the task of collecting data simultaneously from two topics (from one small data stream, and from the second large) and their join on the fly. There was also a need to write data from one batch to different files at the same time. In a Spark, this was implemented using the class to be serialized and calling its methods from the message receiving map. Sometimes Spark fell, trying to read foul messages from the topic, and we started storing offsets in hbase. At some point we began to look at the resulting monster with some kind of longing and soulful torment.

Therefore, we decided to turn to the bright side of the force - a warm, tube-like java. The blessing we have is an agile, and it is not at all necessary to gnaw a cactus to jump into a waterfall, when for some reason you don't want to



However, this requires solving the problem of distributed message reception from several nodes at once. For this, the Spring for Apache Hadoop framework was selected, which allows you to run the required number of Yarn containers and execute your code inside each.

The general logic of his work is as follows. Run AppMaster, which is the coordinator of the containers YARN. That is, it launches them, passing the necessary parameters to them for input, and tracks the status of execution. In the event of a container crash (for example, due to OutOfMemory), it can restart it or shut down.

Directly in the container and implemented the logic of the work and the processing of data. Since YARN launches containers by distributing approximately equally at the cluster nodes, there are no bottlenecks for network traffic or disk access. Each container clings to the selected partition and works only with it, it helps to avoid rebalancing of the consumer.

Below is a greatly simplified description of the logic of the module, a more detailed description of what is happening under the hood of the spring, colleagues plan to do in a separate article. The original example can be downloaded here .

So, to launch the wizard, the client module is used:

 @EnableAutoConfiguration public class ClientApplication { public static void main(String[] args) { ConfigurableApplicationContext applicationContext = SpringApplication.run(ClientApplication.class, args); YarnClient yarnClient = applicationContext.getBean(YarnClient.class); yarnClient.submitApplication(); } } 

After the submit wizard is complete, the client quits. Next, the CustomAppMaster class is written in application.yml

 spring: hadoop: fsUri: hdfs://namespace:port/ resourceManagerHost: hostname resources: - "file:/path/hdfs-site.xml" yarn: appName: some-name applicationDir: /path/to/app/ appmaster: resource: memory: 10240 virtualCores: 1 appmaster-class: enter.appmaster.CustomAppMaster containerCount: 10 launchcontext: archiveFile: container.jar container: container-class: enter.appmaster.FailingContextContainer 

It has the most interesting preLaunch function. Here we manage containers and parameters passed to the input:

 @Override public ContainerLaunchContext preLaunch(Container container, ContainerLaunchContext context) { Integer attempt = 1; //    ContainerId containerId = container.getId(); ContainerId failedContainerId = failed.poll(); if (failedContainerId == null) { //      } else { //      (  ..) } Object assignedData = (failedContainerId != null ? getContainerAssign().getAssignedData(failedContainerId) : null); if (assignedData != null) { attempt = (Integer) assignedData; attempt += 1; } getContainerAssign().assign(containerId, attempt); Map<String, String> env = new HashMap<String, String>(context.getEnvironment()); env.put("some.param", "param1"); context.setEnvironment(env); return context; } 

And the crash handler:

 @Override protected boolean onContainerFailed(ContainerStatus status) { ContainerId containerId = status.getContainerId(); if (status.getExitStatus() > 0) { failed.add(containerId); getAllocator().allocateContainers(1); } return true; } 

In the ContainerApplication.java container class, the necessary bins are connected, for example:

 @Bean public WorkClass workClass() { return new WorkClass(); } 

In the working class, we use the @OnContainerStart annotation to specify the method that will be called automatically when the container is started:

 @OnContainerStart public void doWorkOnStart() throws Exception { //       containerId DefaultYarnContainer yarnContainer = (DefaultYarnContainer) containerConfiguration.yarnContainer(); Map<String, String> environment = yarnContainer.getEnvironment(); ContainerId containerId = getContainerId(environment); //     String param = environment.get("some.param"); SimpleConsumer<Serializable, Serializable> simpleConsumer = new SimpleConsumer<>(); //   simpleConsumer.kafkaConsumer(param); } 

In reality, the logic of implementation is, of course, much more complicated. In particular, there is an exchange of messages between the container and AppMaster through REST, allowing to coordinate the process of receiving data, etc.

As a result, we received an application that needs to be tested in a loaded cluster. To do this, during the day, during a high background load, we launched a trimmed version on SparkStreaming, which does nothing except save to a file, and at the same time the version of “full stuffing” on java. Resources they were allocated the same, each 30 containers of 2 cores.



Now it is interesting to conduct an experiment in pure conditions in order to understand the performance limit of a java solution. For this, the download of 1.2 TB of data was started, 65 containers of 1 core and it was completed in 10 minutes:



Those. speed was 2 GB / s. The higher values ​​in the picture above are explained by the fact that the data replication factor on HDFS is equal to 3. CPUs of the servers of the data receiving cluster E5-2680v4 @ 2.40GHz. The remaining parameters do not make much sense to give, because all the same the utilization of resources is significantly below 50%. The current solution allows you to scale easily further, but it does not make sense, because at the moment, the bottleneck is Kafka itself (or rather its network interfaces, there are only three brokers and at the same time triple replication for reliability).

In fact, it should not seem that we have something against Spark in principle. This is a very good tool in certain conditions and we also use it for further processing. However, a high level of abstraction, which allows you to quickly and easily work with any data, has its price. It always happens when something goes wrong. We had the experience of Hbase patching and picking in the Hive code, however, this is not the most encouraging thing, in fact. In the case of a Spark, of course, it is also possible to find some acceptable solution, but at the cost of quite a lot of effort. And in our application, we can always quickly find the cause of the problems and correct, as well as implement very complex logic and this will work quickly. In general, as the old Latin saying goes:

Source: https://habr.com/ru/post/358786/


All Articles