📜 ⬆️ ⬇️

Parallel parsing of a large number of HTML pages using Apache Ignite (GridGain) in 200 lines of code

Periodically, I have tasks to process a large number of files. This is usually converting from one format to another: XSLT transformation, parsing, converting images or video. To solve these problems, I adapted the GridGain In-Memory Data Fabric framework. It provides the ability to do distributed computing, MapReduce, distributed caches and queues, a distributed file system in memory, moving code to data, job stealing, accelerators for Hadoop, and many other things that are fashionable today. And all this is easy and for different OSes. You can easily feel all this under Windows.

I will try to talk about my experience using the example of a simple task.

Recently there was a task to extract product descriptions from a large site (2.5 million products).
The site was downloaded using the wget utility:
C:>start wget110 --recursive --level 10 -nc --no-clobber --html-extension --exclude-directories=it,fr --convert-links http://site.com 


You can run several utility instances that will download in parallel. But still a long time, a week somewhere.
')
Initially, I tried parsing with the Xidel console program:
 REM          FOR /R ./ %%G IN (dbfamily*.html) DO xidel "%%G" --quiet --extract-file=mytemplate.txt 

The file mytemplate.txt contained a pattern:
 //td/h1 || " # " || //td[contains(text(),"")] || " # " || //td[contains(text(),"")] || " # " || //td[contains(text(),"")] || " # " || //td[contains(text(),"")] 

But it took a lot of time sequential parsing. Therefore, it was decided to do this in parallel on several computers. Surprisingly, in the end, this resulted in only one class in java.

Each HTML page is parsed with XPath expressions. The document is pre-cleaned with html-cleaner

HTML Parsing Code:
 public static ArrayList parseWithXPathList(String path, ArrayList<String> XPathList) { count++; String str = ""; ArrayList list = new ArrayList(); try { String content = readFile(path, StandardCharsets.UTF_8); TagNode tagNode = new HtmlCleaner().clean(content); org.w3c.dom.Document doc = new DomSerializer(new CleanerProperties()).createDOM(tagNode); // And then use the standard JAXP interfaces to query it: XPath xpath = XPathFactory.newInstance().newXPath(); Iterator<String> it = XPathList.iterator(); while (it.hasNext()) { String XPath = it.next(); String res = (String) xpath.evaluate(XPath, doc, XPathConstants.STRING); list.add(res); } // System.out.println(str); } catch (Exception e) { str = "" + e; list.add(str); } return list; } 


Called as follows:
ArrayList Xpaths = new ArrayList (Arrays.asList ("// title", "// td / h1"));
ArrayList ResultList = parseWithXPathList (param.toString (), Xpaths);

It now remains only to bypass the directory tree using the standard wolf:
Files.walkFiletree (startingDir, opts, Integer.MAX_VALUE, parseFiles)
And apply a parser to each file.
 if (file.toString().endsWith(".html")) { ArrayList<String> Xpaths= new ArrayList<String>(Arrays.asList("//title","//td/h1")); ArrayList<String> ResultList=parseWithXPathList(file.toString(),Xpaths); System.out.format(" %d ) %s %n", count,""+ResultList); //castOneAsync(ignite, "" + file); } 


Parsing a million files on one machine consistently takes ~ 12 hours
I tried to parallelize the code using the GridGain framework, or rather its non-commercial version of Apache Ignite.
This thing works like this: you need to run the nodes (on one or more machines), the nodes will find each other over the network and write in their consoles how many processors and memory you have organized in a cluster (these are your slaves). I launched 12 nodes on 3 machines (4 cores each and 16 GB RAM).
After downloading the site (~ 500GB), the daddy with html-kami is shared for access on the grid. Share daddy should be visible to all nodes (check access rights!)

Further, you should write a simple java app in which the master-node should also start:
 Ignite ignite = Ignition.start("D:\\grid\\1.4\\apache-ignite-fabric-1.4.0-bin\\config\\default-config.xml"); 


After that, you can ask her about the state of the cluster and cast the job on arbitrary nodes.
The division into master and slave is conditional. Nody equal. I call the master node on which the initial code will be executed. Actually, you can do a cluster segmentation by node type, but we now need nothing.

Code casting job with parsing to the node:

 public static void castOneAsync(Ignite ignite, String param) { // Enable asynchronous mode. IgniteCluster cluster = ignite.cluster(); IgniteCompute asyncCompute = ignite.compute(cluster.forRemotes()).withAsync(); // Asynchronously execute a job. asyncCompute.call(() -> { System.out.println("processing: " + param); ArrayList<String> Xpaths = new ArrayList<String>(Arrays.asList("//title", "//td/h1")); ArrayList<String> ResultList = parseWithXPathList(param.toString(), Xpaths); System.out.format(" %d ) %s \n %n", count, "" + ResultList); return ""+param+" :" + ResultList; }); // Get the future for the above invocation. IgniteFuture<String> fut = asyncCompute.future(); // Asynchronously listen for completion and print out the result. fut.listen(f -> { String resultStr = f.get()+" \n"; // System.out.println("Job result: " + resultStr); count++; try { Files.write(Paths.get("d:\\grid\\result.txt"), resultStr.getBytes(), StandardOpenOption.APPEND); } catch (IOException e) { System.out.println("" + e); } if (count%100==0) System.out.println( "processed: "+count ); }); } 

Important points:

As a result, I managed to fit the entire project into one java class ~ 200 lines of code with comments. The class needs jar files with htmlcleaner and apache ignite.

You can instead of html cleaner to use the external utility Xidel. It supports XQuery and XPath.
Then you need to register it on all machines with nodes in the system variable PATH and then call directly from java. But you will enjoy XQuery.

If the publication is of interest, I will write about distributed cache, queues and other distributed things on this framework.

Project Source Code for Eclipse

-
 package gridE; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCluster; import org.apache.ignite.IgniteCompute; import org.apache.ignite.Ignition; import org.apache.ignite.lang.IgniteFuture; import org.htmlcleaner.CleanerProperties; import org.htmlcleaner.DomSerializer; import org.htmlcleaner.HtmlCleaner; import org.htmlcleaner.TagNode; import javax.xml.xpath.XPath; import javax.xml.xpath.XPathConstants; import javax.xml.xpath.XPathFactory; import java.io.File; import java.io.IOException; import java.io.PrintWriter; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.nio.file.*; import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.EnumSet; import java.util.Iterator; import java.util.Scanner; import static java.nio.file.FileVisitResult.CONTINUE; /** * Created by Veaceslav Kunitki on 11/13/2015. * This class parse files on cluster with "Apache Ignite" framework */ import static java.nio.file.FileVisitResult.*; public class ParseFilesOnCluster extends SimpleFileVisitor<Path> { Ignite ignite; public static long count = 0; // counter of parsed files // Java standart FileTree walker @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attr) { if (attr.isSymbolicLink()) { System.out.format("Symbolic link: %s ", file); } else if (attr.isRegularFile()) { // System.out.format("Regular file: %s ", file); if (file.toString().endsWith(".html") ) { //if (file.toString().endsWith(".html") ) { // uncomment it for serial processing //ArrayList<String> Xpaths = new ArrayList<String>(Arrays.asList("//title", "//td/h1")); // ArrayList<String> // ResultList=parseWithXPathList(file.toString(),Xpaths); // System.out.format(" %d ) %s %n", count,""+ResultList); castOneAsync(ignite, "" + file); // parallel processing } } else { System.out.format("Other: %s ", file); } return CONTINUE; } // Print each directory visited. @Override public FileVisitResult postVisitDirectory(Path dir, IOException exc) { System.out.format("Directory: %s%n", dir); return CONTINUE; } @Override public FileVisitResult visitFileFailed(Path file, IOException exc) { System.err.println(exc); return CONTINUE; } static String readFile(String path, Charset encoding) throws IOException { byte[] encoded = Files.readAllBytes(Paths.get(path)); return new String(encoded, encoding); } public static ArrayList parseWithXPathList(String path, ArrayList<String> XPathList) { count++; String str = ""; ArrayList list = new ArrayList(); try { String content = readFile(path, StandardCharsets.UTF_8); TagNode tagNode = new HtmlCleaner().clean(content); org.w3c.dom.Document doc = new DomSerializer(new CleanerProperties()).createDOM(tagNode); // And then use the standard JAXP interfaces to query it: XPath xpath = XPathFactory.newInstance().newXPath(); // String str = (String) xpath.evaluate("//div//td[contains(@id, // 'foo')]/text()", Iterator<String> it = XPathList.iterator(); while (it.hasNext()) { String XPath = it.next(); String res = (String) xpath.evaluate(XPath, doc, XPathConstants.STRING); list.add(res); } // System.out.println(str); } catch (Exception e) { str = "" + e; list.add(str); } return list; } /* * Asynchronously execute a job on external PC */ public static void castOneAsync(Ignite ignite, String param) { // Enable asynchronous mode. IgniteCluster cluster = ignite.cluster(); // IgniteCompute compute1 = ignite.compute(cluster.forRemotes()); IgniteCompute asyncCompute = ignite.compute(cluster.forRemotes()).withAsync(); // Asynchronously execute a job. asyncCompute.call(() -> { // Print hello world on some cluster node and wait for completion. System.out.println("processing: " + param); ArrayList<String> Xpaths = new ArrayList<String>(Arrays.asList("//title", "//li/@data-zoom")); ArrayList<String> ResultList = parseWithXPathList(param.toString(), Xpaths); System.out.format(" %d ) %s \n %n", count, "" + ResultList); String text = new Scanner(new File(param.toString()), "UTF-8").useDelimiter("\\A").next(); return "{ 'url':" + param + " ,'ResultList'=" + ResultList + " }"; }); // Get the future for the above invocation. IgniteFuture<String> fut = asyncCompute.future(); // Asynchronously listen for completion and print out the result. fut.listen(f -> { String resultStr = f.get() + " \n"; // System.out.println("Job result: " + resultStr); count++; try { Files.write(Paths.get("d:\\grid\\result.txt"), resultStr.getBytes(), StandardOpenOption.APPEND ); //Warning! File must be exist, do it manual! } catch (IOException e) { System.out.println("" + e); } if (count % 100 == 0) System.out.println("processed: " + count); }); } public static void main(String[] args) throws Exception { System.out.println("# Distributed parser!"); Ignite ignite = Ignition.start("D:\\grid\\1.4\\apache-ignite-fabric-1.4.0-bin\\config\\default-config.xml"); IgniteCluster cluster = ignite.cluster(); // Compute instance over remote nodes. IgniteCompute compute4remote = ignite.compute(cluster.forRemotes()); // Print hello message on all remote nodes. compute4remote.broadcast( () -> System.out.println("---===Distributed parser started===---: " + cluster.localNode().id())); System.out.println( "Cluster ready!" ); if (true) { // start parsing job // final Path startingDir = Paths.get("d:/home/familytree.ru/"); Path startingDir = Paths.get("\\\\SERGIU-PC\\temp"); // shared directory with HTML-files EnumSet<FileVisitOption> opts = EnumSet.of(FileVisitOption.FOLLOW_LINKS); ParseFiles parseFiles = new ParseFiles(); parseFiles.ignite = ignite; // log time to file PrintWriter writer = new PrintWriter("d:\\grid\\start.txt", "UTF-8"); String dateTime = "" + (new Date()); writer.println(dateTime + "\n"); System.out.println(dateTime + "\n"); writer.close(); System.out.println("# walking...!"); Files.walkFileTree(startingDir, opts, Integer.MAX_VALUE, parseFiles); // log end time dateTime = "" + (new Date()); Files.write(Paths.get("d:\\grid\\start.txt"), dateTime.getBytes(), StandardOpenOption.APPEND); } } } 


POM-file with project dependencies

 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>gridE</groupId> <artifactId>gridE</artifactId> <version>0.0.1-SNAPSHOT</version> <repositories> <repository> <id>GridGain External Repository</id> <url>http://www.gridgainsystems.com/nexus/content/repositories/external</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.ignite</groupId> <artifactId>ignite-core</artifactId> <version>1.4.0</version> </dependency> <dependency> <groupId>org.apache.ignite</groupId> <artifactId>ignite-spring</artifactId> <version>1.1.4</version> </dependency> <dependency> <groupId>org.apache.ignite</groupId> <artifactId>ignite-indexing</artifactId> <version>1.4.0</version> </dependency> <dependency> <groupId>org.apache.ignite</groupId> <artifactId>ignite-examples</artifactId> <version>1.0.0-RC1</version> </dependency> <dependency> <groupId>net.sourceforge.htmlcleaner</groupId> <artifactId>htmlcleaner</artifactId> <version>2.15</version> </dependency> </dependencies> </project> 


Configuration file for node

 <?xml version="1.0" encoding="UTF-8"?> <!-- _________ _____ __________________ _____ __ ____/___________(_)______ /__ ____/______ ____(_)_______ _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ Copyright (C) GridGain Systems. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <!-- Ignite Spring configuration file. --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.0.xsd"> <bean class="org.apache.ignite.configuration.IgniteConfiguration"> <!-- Set to true to enable grid-aware class loading for examples, default is false. --> <property name="peerClassLoadingEnabled" value="true"/> <property name="marshaller"> <bean class="org.apache.ignite.marshaller.optimized.OptimizedMarshaller"> <!-- Set to false to allow non-serializable objects in examples, default is true. --> <property name="requireSerializable" value="false"/> </bean> </property> <!-- Enable events for examples. --> <property name="includeEventTypes"> <util:constant static-field="org.apache.ignite.events.EventType.EVTS_ALL"/> </property> <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --> <property name="discoverySpi"> <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> <property name="ipFinder"> <!-- Uncomment multicast IP finder to enable multicast-based discovery of initial nodes. --> <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">--> <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"> <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>127.0.0.1:47500..47509</value> <value>192.168.4.110:47500..47509</value> <value>192.168.4.117:47500..47509</value> </list> </property> </bean> </property> </bean> </property> <property name="cacheConfiguration"> <bean class="org.apache.ignite.configuration.CacheConfiguration"> <!-- Set a cache name. --> <property name="name" value="cacheName"/> <!-- Set cache mode. --> <property name="cacheMode" value="PARTITIONED"/> </bean> </property> </bean> </beans> in writing, software <?xml version="1.0" encoding="UTF-8"?> <!-- _________ _____ __________________ _____ __ ____/___________(_)______ /__ ____/______ ____(_)_______ _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ Copyright (C) GridGain Systems. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <!-- Ignite Spring configuration file. --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.0.xsd"> <bean class="org.apache.ignite.configuration.IgniteConfiguration"> <!-- Set to true to enable grid-aware class loading for examples, default is false. --> <property name="peerClassLoadingEnabled" value="true"/> <property name="marshaller"> <bean class="org.apache.ignite.marshaller.optimized.OptimizedMarshaller"> <!-- Set to false to allow non-serializable objects in examples, default is true. --> <property name="requireSerializable" value="false"/> </bean> </property> <!-- Enable events for examples. --> <property name="includeEventTypes"> <util:constant static-field="org.apache.ignite.events.EventType.EVTS_ALL"/> </property> <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --> <property name="discoverySpi"> <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> <property name="ipFinder"> <!-- Uncomment multicast IP finder to enable multicast-based discovery of initial nodes. --> <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">--> <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"> <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>127.0.0.1:47500..47509</value> <value>192.168.4.110:47500..47509</value> <value>192.168.4.117:47500..47509</value> </list> </property> </bean> </property> </bean> </property> <property name="cacheConfiguration"> <bean class="org.apache.ignite.configuration.CacheConfiguration"> <!-- Set a cache name. --> <property name="name" value="cacheName"/> <!-- Set cache mode. --> <property name="cacheMode" value="PARTITIONED"/> </bean> </property> </bean> </beans> 

Source: https://habr.com/ru/post/271475/


All Articles