kafka-topics.bat --create --zookeeper localhost: 2181 --replication-factor 1 --partitions 1 --topic out-topichere we specify the zookeeper server, replication-factor is the number of replicas of the message log, partitions - the number of sections in the topic (more on this below) and the topic itself is “out-topic”.
kafka-topics.bat --zookeeper localhost: 2181 --alter --topic out-topic --partitions 2I launched my publisher and two subscribers in the same group on the same topic (examples of java programs will be below). Configuring the group names and client IDs is not necessary, Kafka takes it upon himself.
my_kafka_run.cmd com.home.SimpleProducer out-topic (publisher)Starting to enter a key in the publisher of a key: the value can be observed who gets them. For example, according to the key distribution strategy of the key hash, the message m: 1 got into the client01 client
my_kafka_run.cmd com.home.SimpleConsumer out-topic testGroup01 client01 (first subscriber)
my_kafka_run.cmd com.home.SimpleConsumer out-topic testGroup01 client02 (second subscriber)
kafka-consumer-groups.bat --bootstrap-server localhost: 9092 --describe --group testGroup01
kafka-topics.bat --describe --zookeeper localhost: 2181 --topic out-topic
public class SimpleProducer { public static void main(String[] args) throws Exception { // Check arguments length value if (args.length == 0) { System.out.println("Enter topic name"); return; } //Assign topicName to string variable String topicName = args[0].toString(); System.out.println("Producer topic=" + topicName); // create instance for properties to access producer configs Properties props = new Properties(); //Assign localhost id props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); //Set acknowledgements for producer requests. props.put("acks", "all"); //If the request fails, the producer can automatically retry, props.put("retries", 0); //Specify buffer size in config props.put("batch.size", 16384); //Reduce the no of requests less than 0 props.put("linger.ms", 1); //The buffer.memory controls the total amount of memory available to the producer for buffering. props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer(props); BufferedReader br = null; br = new BufferedReader(new InputStreamReader(System.in)); System.out.println("Enter key:value, q - Exit"); while (true) { String input = br.readLine(); String[] split = input.split(":"); if ("q".equals(input)) { producer.close(); System.out.println("Exit!"); System.exit(0); } else { switch (split.length) { case 1: // strategy by round producer.send(new ProducerRecord(topicName, split[0])); break; case 2: // strategy by hash producer.send(new ProducerRecord(topicName, split[0], split[1])); break; case 3: // strategy by partition producer.send(new ProducerRecord(topicName, Integer.valueOf(split[2]), split[0], split[1])); break; default: System.out.println("Enter key:value, q - Exit"); } } } } }
public class SimpleConsumer { public static void main(String[] args) throws Exception { if (args.length != 3) { System.out.println("Enter topic name, groupId, clientId"); return; } //Kafka consumer configuration settings final String topicName = args[0].toString(); final String groupId = args[1].toString(); final String clientId = args[2].toString(); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", groupId); props.put("client.id", clientId); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); //props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); //props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); //Kafka Consumer subscribes list of topics here. consumer.subscribe(Arrays.asList(topicName)); //print the topic name System.out.println("Subscribed to topic=" + topicName + ", group=" + groupId + ", clientId=" + clientId); SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); // looping until ctrl-c while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) // print the offset,key and value for the consumer records. System.out.printf("offset = %d, key = %s, value = %s, time = %s \n", record.offset(), record.key(), record.value(), sdf.format(new Date())); } } }
@echo off set CLASSPATH="C:\Project\myKafka\target\classes"; for %%i in (C:\kafka_2.11-1.1.0\libs\*) do ( call :concat "%%i" ) set COMMAND=java -classpath %CLASSPATH% %* %COMMAND% :concat IF not defined CLASSPATH ( set CLASSPATH="%~1" ) ELSE ( set CLASSPATH=%CLASSPATH%;"%~1" )
my_kafka_run.cmd com.home.SimpleConsumer out-topic testGroup02 client01
config.put (StreamsConfig.STATE_DIR_CONFIG, "C: /kafka_2.11-1.1.0/state");States are stored by application IDs independently ( StreamsConfig.APPLICATION_ID_CONFIG ). Example
my_kafka_run.cmd com.home.SimpleProducer in-topicThe Stream application will read this topic count count of identical words, it is not explicit for us to save the state and redirect out-topic to another topic. Here I want to clarify the relationship of the log and state (state store). And so ZooKeeper and Kafka server are running. I launch Stream with App-ID = app_01
my_kafka_run.cmd com.home.KafkaCountStream in-topic app_01publisher and subscriber accordingly
my_kafka_run.cmd com.home.SimpleProducer in-topicHere they are:
my_kafka_run.cmd com.home.SimpleConsumer out-topic testGroup01 client01
public class KafkaCountStream { public static void main(final String[] args) throws Exception { // Check arguments length value if (args.length != 2) { System.out.println("Enter topic name, appId"); return; } String topicName = args[0]; String appId = args[1]; System.out.println("Count stream topic=" + topicName +", app=" + appId); Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 2000); config.put(StreamsConfig.STATE_DIR_CONFIG, "C:/kafka_2.11-1.1.0/state"); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> textLines = builder.stream(topicName); // State store KTable<String, Long> wordCounts = textLines .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+"))) .groupBy((key, word) -> word) .count(); // out to another topic KStream<String, String> stringKStream = wordCounts.toStream() .map((k, v) -> new KeyValue<>(appId + "." + k, v.toString())); stringKStream.to("out-topic", Produced.with(Serdes.String(), Serdes.String())); KafkaStreams streams = new KafkaStreams(builder.build(), config); // additional to complete the work final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { System.out.println("Kafka Stream close"); streams.close(); latch.countDown(); } }); try { System.out.println("Kafka Stream start"); streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.out.println("Kafka Stream exit"); System.exit(0); } }
Source: https://habr.com/ru/post/354486/
All Articles