⬆️ ⬇️

How to index business application logs in Hadoop (SolrCloud)

Introduction



One of our clients was faced with the task of removing logs from most corporate applications and their databases “somewhere” - it’s painfully a lot of trouble: they grow like leaps and bounds, clean them periodically, and access to some must also be provided during many years, and yes even the analysis I want to carry out a systematic way. Of course, making logs is not the primary goal, and for the aggregate of requirements, we chose Hadoop, the version from Cloudera (CDH 5).



Requirements indicated that the solution, among other things, should provide the ability to search and view a list of events (from logs) according to specified criteria, preferably fast. Moreover, some applications must also be redone so that log viewing forms start using Hadoop instead of their databases.



One solution is to use the SolrCloud search module, which is included with Cloudera's Hadoop. Cloudera includes out-of-the-box tools for uploading data from application databases and indexing them in bundles (not line by line). However, this method turned out to be at least working, but more laborious and unpredictable in tuning than, say, if we used Impala to fetch data. Therefore, I decided to share how we did it, hoping to save time for those who will face a similar task.

')

This article describes the details of the settings, as well as features encountered in the process.





Scenario



  1. We unload data from Oracle to files on HDFS. The file format is avro. Tool: sqoop ( http://sqoop.apache.org/docs/1.4.2/SqoopUserGuide.htm ).
    The avro format has many advantages: it is binary, the data compresses well, with it you will not bathe with carriage translations and with commas in the text fields, as with CSV, and there is a data scheme in the file itself that supports Schema Evolution. In general, in Hadoop, avro is promoted as a unified format for storing and transferring data between different components; it is supported by many tools and components. And there is one more plus exactly for our task, about it below.
  2. Create a “collection” in SolrCloud. Tool: solrctl ( http://www.cloudera.com/content/cloudera-content/cloudera-docs/CDH5/latest/Search/Cloudera-Search-User-Guide/csug_solrctl_ref.html )
    The collection is a logical index in SolrCloud. It is associated with a set of configuration files and consists of one or more shards (shard), count folders with index files. If the number of shards is more than one, this is a distributed index.
  3. Run the MapReduce driver ( https://developer.yahoo.com/hadoop/tutorial/module4.html#driver ), which:

    • read all entries from avro-file
    • skip them through an ETL process written as a morphline script; the result of this process is a shard with new data (index files in Solr format, laid out in the specified HDFS directory)
    • merge the shard laid out to the active SolrCloud collection without transferring it offline, live (go-live), so to speak :)


    Tool: hadoop command that runs the org.apache.solr.hadoop.MapReduceIndexerTool driver ( http://www.cloudera.com/content/cloudera-content/cloudera-docs/CDH5/latest/Search/Cloudera-Search-User-Guide /csug_mapreduceindexertool.html ), performing this sequence.



We start everything from the main NameNode, although it doesn't matter.



So, on the steps ...



Uploading data from Oracle to avro files



sqoop import --connect jdbc:oracle:thin:@oraclehost:1521/SERVICENAME \ --username ausername --password apassword --table ASCHEMA.LOG_TABLE \ --as-avrodatafile --compression-codec snappy \ -m 16 --split-by NUM_BEG \ --map-column-java NUM_BEG=Integer,DTM_BEG=String,KEY_TYPE=String,OLD_VALUE=String,NEW_VALUE=String,NUM_PARENT=Integer,\ NUM_END=Integer,EVENT=String,TRACELEVEL=String,KEY_USER=String,COMPUTER_NAME=String,PRM=String,OPERATION=Integer,\ KEY_ENTITY=String,MODULE_NAME=String \ --target-dir /user/$USER/solrindir/tmlogavro 




A little about the parameters:





Create a collection



Here we use the solrctl utility to manage deployed SolrCloud.

First, on the local disk, we generate the file structure of the future collection, the so-called Collection Instance Directory. In it, we will do / change the settings of the collection on a local disk and then clone them into the zookeeper configuration service, from which SolrCloud reads the settings necessary for the job:

 solrctl instancedir --generate $HOME/solr_configs_for_tm_log 


Here, the parameter is the path to the local directory being created.





By default, the files created in the directory are already filled with a demo of the data scheme and search instructions, and we need to remove the excess.

Open the conf / schema.xml file in the created directory. This is the main collection file that describes the structure of the data being indexed. Delete the tag with its contents, the tag and all. Instead, insert the following:

 <fields> <field name="num_beg" type="int" indexed="true" stored="true" multiValued="false" /> <field name="dtm_beg" type="date" indexed="true" stored="true" multiValued="false" /> <field name="key_type" type="string" indexed="true" stored="true" multiValued="false" /> <field name="old_value" type="string" indexed="true" stored="true" multiValued="false" /> <field name="new_value" type="string" indexed="true" stored="true" multiValued="false" /> <field name="num_parent" type="string" indexed="true" stored="true" multiValued="false" /> <field name="num_end" type="string" indexed="true" stored="true" multiValued="false" /> <field name="event" type="text_general" indexed="true" stored="true" multiValued="false" /> <field name="tracelevel" type="string" indexed="true" stored="true" multiValued="false" /> <field name="key_user" type="string" indexed="true" stored="true" multiValued="false" /> <field name="computer_name" type="string" indexed="true" stored="true" multiValued="false" /> <field name="prm" type="string" indexed="true" stored="true" multiValued="false" /> <field name="operation" type="string" indexed="true" stored="true" multiValued="false" /> <field name="key_entity" type="string" indexed="true" stored="true" multiValued="false" /> <field name="module_name" type="string" indexed="true" stored="true" multiValued="false" /> <field name="_version_" type="long" indexed="true" stored="true" required="true" /> <!-- catchall field, containing all other searchable text fields (implemented via copyField further on in this schema --> <field name="text" type="text_general" indexed="true" stored="false" multiValued="true"/> </fields> <!-- Field to use to determine and enforce document uniqueness. Unless this field is marked with required="false", it will be a required field --> <uniqueKey>num_beg</uniqueKey> <copyField source="event" dest="text"/> 


Note that the _version_ field does not exist in the data source, it is necessary for Solr’s internal purposes, for example for optimistic locks, for the Partial Update mechanism. Simply specifying such a field in your schema.xml will be enough: Solr will manage its contents on its own.

There is also no text field. We specified it along with the copyField instruction for the performance of full-text search via HUE (user interface to Hadoop from Clouder). If you connect the created collection to HUE (via configuration UI-forms), then in the search interface for this collection the search string value occurs in the text field.



Now one squat. The fact is that in the generated example files, one search engine mechanism is enabled - Elevator. It allows you to put forward results on certain criteria, such as ads from above in the search results in Yandex. So in the example, it is set up that the key field in your schema is of type string (you can see examples of advertising phrases in conf \ elevate.xml). We have an int. Because of this, our entire indexing process collapsed with a type mismatch error. Considering that this mechanism is not interesting for our task, we cut it out, namely: open the conf/solrconfig.xml in the created directory, and delete (comment) the tags and their contents <searchComponent name="elevator" ...">, <requestHandler name="/elevate" ...> . And at the same time we delete the conf\elevate.xml from the created directory so that it does not hang under our feet.





Then we register (clone) the entire configuration of the future collection in SolrCloud, or rather, in the naming service ZooKeeper, from which all SolrCloud deployed servers read configurations (and receive their updates):

 solrctl instancedir --create tm_log_avro $HOME/solr_configs_for_tm_log 


Here, the parameter is the name of the future collection, and the path to the directory on the local disk containing the configuration files. We created it above.





Well, the last step at this stage is to create a collection with a specified number of shards:

 solrctl collection --create tm_log_avro -s 1 


This command creates a collection based on the configuration registered in ZooKeeper. The first parameter is the name of the collection, the second is the number of shards (let's take 1 for simplicity).



Starting the collection indexing process



First we configure the ETL indexing process. Cloudera respects the Kite SDK library, especially its part of Morphline. In essence, the Morphline component is an interpreter of a scripting language in which you describe (as a hierarchy of sequences of commands), what to do with the incoming data stream (as an array of “record” objects), how to transform, and what to give next. For example, there is a command to read the avro file. Of course, their teams are connected, in this and chip. So Clouder wrote the commands to create a Solr-index for all entries of the incoming stream, it will be the last in the script.



The essence of the process:





To set up such a process, create a file $HOME/solr_configs_for_tm_log_morphlines/morphlines.conf with the following content:

 # Specify server locations in a SOLR_LOCATOR variable; used later in # variable substitutions: SOLR_LOCATOR : { # Name of solr collection collection : tm_log_avro # ZooKeeper ensemble zkHost : "hadoop-n1.custis.ru:2181,hadoop-n2.custis.ru:2181,hadoop-n3.custis.ru:2181/solr" } # Specify an array of one or more morphlines, each of which defines an ETL # transformation chain. A morphline consists of one or more potentially # nested commands. A morphline is a way to consume records such as Flume events, # HDFS files or blocks, turn them into a stream of records, and pipe the stream # of records through a set of easily configurable transformations on its way to # Solr. morphlines : [ { # Name used to identify a morphline. For example, used if there are multiple # morphlines in a morphline config file. id : morphline1 # Import all morphline commands in these java packages and their subpackages. # Other commands that may be present on the classpath are not visible to this # morphline. importCommands : ["org.kitesdk.**", "org.apache.solr.**"] commands : [ { # Parse Avro container file and emit a record for each Avro object readAvroContainer { # Optionally, require the input to match one of these MIME types: # supportedMimeTypes : [avro/binary] # Optionally, use a custom Avro schema in JSON format inline: # readerSchemaString : """<json can go here>""" # Optionally, use a custom Avro schema file in JSON format: # readerSchemaFile : /path/to/syslog.avsc } } { # Consume the output record of the previous command and pipe another # record downstream. # # extractAvroPaths is a command that uses zero or more Avro path # excodeblockssions to extract values from an Avro object. Each excodeblockssion # consists of a record output field name, which appears to the left of the # colon ':' and zero or more path steps, which appear to the right. # Each path step is separated by a '/' slash. Avro arrays are # traversed with the '[]' notation. # # The result of a path excodeblockssion is a list of objects, each of which # is added to the given record output field. # # The path language supports all Avro concepts, including nested # structures, records, arrays, maps, unions, and others, as well as a flatten # option that collects the primitives in a subtree into a flat list. In the # paths specification, entries on the left of the colon are the target Solr # field and entries on the right specify the Avro source paths. Paths are read # from the source that is named to the right of the colon and written to the # field that is named on the left. extractAvroPaths { flatten : true paths : { computer_name :/COMPUTER_NAME dtm_beg :/DTM_BEG event :/EVENT key_entity :/KEY_ENTITY key_type :/KEY_TYPE key_user :/KEY_USER module_name :/MODULE_NAME new_value :/NEW_VALUE num_beg :/NUM_BEG num_end :/NUM_END num_parent :/NUM_PARENT old_value :/OLD_VALUE operation :/OPERATION prm :/PRM tracelevel :/TRACELEVEL } } } # Consume the output record of the previous command and pipe another # record downstream. # # convert timestamp field to native Solr timestamp format # such as 2012-09-06 07:14:34 to 2012-09-06T07:14:34.000Z in UTC { convertTimestamp { field : dtm_beg inputFormats : ["yyyy-MM-dd HH:mm:ss", "yyyy-MM-dd"] inputTimezone : Europe/Moscow outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" outputTimezone : UTC } } # Consume the output record of the previous command and pipe another # record downstream. # # This command deletes record fields that are unknown to Solr # schema.xml. # # Recall that Solr throws an exception on any attempt to load a document # that contains a field that is not specified in schema.xml. { sanitizeUnknownSolrFields { # Location from which to fetch Solr schema solrLocator : ${SOLR_LOCATOR} } } # log the record at DEBUG level to SLF4J { logDebug { format : "output record: {}", args : ["@{}"] } } # load the record into a Solr server or MapReduce Reducer { loadSolr { solrLocator : ${SOLR_LOCATOR} } } ] } ] 


A little about the commands used:





Launch



Now everything is ready for launch. Run together two commands:



 sudo -u hdfs hadoop jar /usr/lib/solr/contrib/mr/search-mr-*-job.jar org.apache.solr.hadoop.HdfsFindTool -find \ hdfs://$NNHOST:8020/user/$USER/solrindir/tmlogavro -type f \ -name 'part-m-000*.avro' |\ sudo -u hdfs hadoop --config /etc/hadoop/conf.cloudera.yarn \ jar /usr/lib/solr/contrib/mr/search-mr-*-job.jar org.apache.solr.hadoop.MapReduceIndexerTool \ --libjars /usr/lib/solr/contrib/mr/search-mr-1.0.0-cdh5.0.0.jar \ --log4j $HOME/solr_configs_for_tm_log_morphlines/log4j.properties \ --morphline-file $USER/solr_configs_for_tm_log_morphlines/morphlines.conf \ --output-dir hdfs://$NNHOST:8020/user/$USER/solroutdir \ --verbose --go-live --zk-host $ZKHOST \ --collection tm_log_avro \ --input-list -; 


A little about the parameters of the second command:





This command will create and launch MapReduce tasks:





Little results



MapReduceIndexerTool, and Solr itself, turned out to be very capricious about the available RAM. With our structures, each Reduce task, which indexes a file from the list, required that the available memory (Java Heap Size) be approximately 1/2 the size of the uncompressed file, otherwise, OutOfMemoryError. Therefore, when uploading sqoop to files, control their size through, for example, the parameter m (the number of mappers creating files).

Also, despite the amount of available memory in the Map and Reduce tasks, the success of the last stage directly depends on the amount of available memory in Solr Server and on the size of the already indexed data in the collection. According to our structures, for example, for a merge of 30 GB for one shard, there was enough 6 GB of Java Heap Size allocated for one Solr instance.



There is one more feature - the used mechanism of the index merge does not identify duplicate records. If there are records in your indexed file that are already in the collection, they will be duplicated. Therefore, when re-indexing, make sure to get a unique set of records in the files each time. This can be quite easily arranged using the features of sqoop for incremental data upload (via sqoop job). Just do not forget to remove the old files from the folder before starting the unloading, and then they will be indexed again.

Source: https://habr.com/ru/post/234049/



All Articles