replicator.pipelines = slave
replicator.pipeline.slave = d-binlog-to-q, q-to-kafka
replicator.pipeline.slave.stores = parallel-queue
replicator.pipeline.slave.services = datasource
replicator.pipeline.slave.syncTHLWithExtractor = false
replicator.stage.d-binlog-to-q = com.continuent.tungsten.replicator.pipeline.SingleThreadStageTask
replicator.stage.d-binlog-to-q.extractor = dbms
replicator.stage.d-binlog-to-q.applier = parallel-q-applier
replicator.stage.d-binlog-to-q.filters = replicate, colnames, schemachange
replicator.stage.d-binlog-to-q.blockCommitRowCount = $ {replicator.global.buffer.size}
replicator.stage.q-to-kafka = com.continuent.tungsten.replicator.pipeline.SingleThreadStageTask
replicator.stage.q-to-kafka.extractor = parallel-q-extractor
replicator.stage.q-to-kafka.applier = asynckafka
replicator.stage.q-to-kafka.taskCount = $ {replicator.global.apply.channels}
replicator.stage.q-to-kafka.blockCommitRowCount = $ {replicator.global.buffer.size}
override def commit(): Unit = { try { import scala.collection.JavaConversions._ val msgs : java.util.concurrent.ConcurrentLinkedQueue[(String,String,String,Option[Callback])] = new java.util.concurrent.ConcurrentLinkedQueue[(String,String,String,Option[Callback])]() data.foreach(e => { msgs.addAll(ruleProcessor.get.processToMsg(e._1, e._2).map(e => (e._1, e._2, e._3, None))) }) kafkaSender.get.send(msgs.toSeq:_*) } catch { case kpe: KafkaProducerException => { logger.error(kpe.getMessage, kpe) throw new ReplicatorException(kpe); } } lastHeader.map(saveLastHeader(_)) resetEventsToSend() }
def saveLastHeader(header: ReplDBMSHeader): Unit = { zkCurator.map { zk => try { val dhd = DbmsHeaderData( header.getSeqno, header.getFragno, header.getLastFrag, header.getSourceId, header.getEpochNumber, header.getEventId, header.getShardId, header.getExtractedTstamp.getTime, header.getAppliedLatency, if (null == header.getUpdateTstamp) { 0 } else { header.getUpdateTstamp.getTime }, if (null == header.getTaskId) { 0 } else { header.getTaskId }) logger.info("{}", writePretty(dhd)) zk.setData().forPath(getZkDirectoryPath(context), writePretty(dhd).getBytes("utf8")) } catch { case t: Throwable => logger.error("error while safe last header to zk", t) } } }
override def getLastEvent: ReplDBMSHeader = { lastHeader.getOrElse { var result = new ReplDBMSHeaderData(0, 0, false, "", 0, "", "", new Timestamp(System.currentTimeMillis()), 0) zkCurator.map { zk => try { val json = new String(zk.getData().forPath(getZkDirectoryPath(context)), "utf8") logger.info("found previous header {}", json) val headerDto = read[DbmsHeaderData](json) result = new ReplDBMSHeaderData(headerDto.seqno, headerDto.fragno, headerDto.lastFrag, headerDto.sourceId, headerDto.epochNumber, headerDto.eventId, headerDto.shardId, new Timestamp(headerDto.extractedTstamp), headerDto.appliedLatency, new Timestamp(headerDto.updateTstamp), headerDto.taskId) } catch { case t: Throwable => logger.error("error while safe last header to zk", t) } } result } }
, headerDto.lastFrag, headerDto.sourceId, headerDto.epochNumber, headerDto.eventId, headerDto.shardId, new Timestamp (headerDto.extractedTstamp), headerDto.appliedLatency, new Timestamp (headerDto. override def getLastEvent: ReplDBMSHeader = { lastHeader.getOrElse { var result = new ReplDBMSHeaderData(0, 0, false, "", 0, "", "", new Timestamp(System.currentTimeMillis()), 0) zkCurator.map { zk => try { val json = new String(zk.getData().forPath(getZkDirectoryPath(context)), "utf8") logger.info("found previous header {}", json) val headerDto = read[DbmsHeaderData](json) result = new ReplDBMSHeaderData(headerDto.seqno, headerDto.fragno, headerDto.lastFrag, headerDto.sourceId, headerDto.epochNumber, headerDto.eventId, headerDto.shardId, new Timestamp(headerDto.extractedTstamp), headerDto.appliedLatency, new Timestamp(headerDto.updateTstamp), headerDto.taskId) } catch { case t: Throwable => logger.error("error while safe last header to zk", t) } } result } }
RabbitMQConsumerCallback callback = new RabbitMQConsumerCallback() { @Override public void apply(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { // callback- RabbitMQ String routingKey = envelope.getRoutingKey(); Tuple3<String, String, String> routingExpr = routingExprMap.get(routingKey); // topic Kafka routingKey if (routingExpr == null) throw new RuntimeException("No mapping for routing key " + routingKey); String expr = routingExpr._1(), topic = Objects.firstNonNull(routingExpr._2(), kafkaProducerMainTopic), sourceDoc = routingExpr._3(); Object data = rabbitMQConsumerSerializer.deserialize(body); // , RabbitMQMessageEnvelope msgEnvelope = new RabbitMQMessageEnvelope(envelope, properties, data, sourceDoc); // byte[] key = getValueByExpression(data, expr).getBytes(); byte[] msg = kafkaProducerSerializer.serialize(msgEnvelope); kafkaProducer.addMsg(topic, key, msg, envelope.getDeliveryTag()); // Kafka try { checkForSendBatch(); } catch (IOException e) { this.errBack(e); } } @Override public void errBack(Exception e) { logger.error("{}", e.fillInStackTrace()); close(); }
hdfs_kafka.dataframe.df1.uri="jdbc:hive2://[HiveUri]:10000/test;user=hdfs" // jbdc uri hdfs_kafka.dataframe.df1.sql=select * from test.log_arenas_p1_v1 where dt='%s' hdfs_kafka.dataframe.df1.keyField=arena_id // SQL- '%s' hdfs_kafka.dataframe.df1.outKeyField=arena_id // , . hdfs_kafka.dataframe.df1.tableName=test.log_arenas_p1_v hdfs_kafka.dataframe.df2.uri="jdbc:hive2://[HiveUri]:10000/test;user=hdfs" hdfs_kafka.dataframe.df2.sql=select * from test.log_arenas_members where dt='%s' hdfs_kafka.dataframe.df2.keyField=arena_id hdfs_kafka.dataframe.df2.outKeyField=arena_id // , Kafka hdfs_kafka.dataframe.df2.tableName=test.log_arenas_members_p1_v // ,
val dates = Utils.getRange(configuration.dateFormat, configuration.from, configuration.to) // , sql dates.map( date => { // val dataFrames = configuration.dataframes.map( dfconf => { val df = executeJdbc(sqlContext, Utils.makeQuery(dfconf.sql, date), dfconf.uri) (dfconf, df) }) val keysExtracted = dataFrames.map( e => { // DataFrame dataFrameProcessor.extractKey(e._2.rdd, e._1.keyField, e._1.tableName) }) // RDD[Key, Row] keyBy keyField val grouped = keysExtracted.reduce(_.union(_)).map( e => (e._1, Seq(e._2))) // dataFrame grouped.reduceByKey(_ ++ _) // Row dataFrameProcessor.applySeq(grouped) }) //
Source: https://habr.com/ru/post/273607/