📜 ⬆️ ⬇️

Subscribe to Kafka via HTTP or how to simplify your own web-hooks

There are many ways to handle messages from Pub-Sub systems: using a separate service, isolating an isolated process, orchestrating a process / thread pool, complex IPCs, Poll-over-Http, and many others. Today I want to talk about how to use Pub-Sub over HTTP and about my service, written specifically for this.

Using a ready-made HTTP service backend in some cases is an ideal solution for processing message queues:

  1. Balancing out of the box. Usually, the backend is already behind the balancer and has an infrastructure ready for loads, which greatly simplifies the work with messages.
  2. Use a regular REST controller (any HTTP resource). Consumption of messages via HTTP minimizes the costs of implementing users for different languages, if the backend is motley.
  3. Simplify the use of other services web hooks. Now almost every service (Jira, Gitlab, Mattermost, Slack ...) somehow supports Web hooks to interact with the outside world. You can make life easier if you teach the queue to perform the functions of the HTTP dispatcher.

This approach has disadvantages:
')
  1. You can forget about the lightness of the solution. HTTP is a heavy protocol, and the use of frameworks on the side of the consumer will instantly lead to an increase in latency and load.
  2. We lose the strengths of the Poll approach by getting Push weaknesses.
  3. Processing messages by the same service instances that process clients can affect responsiveness. This is irrelevant, as it is treated by balancing and isolation.

I implemented the idea as a Queue-Over-Http service, which will be discussed later. The project is written in Kotlin using Spring Boot 2.1. As a broker, now only Apache Kafka is available.

Further in the article it is assumed that the reader is familiar with Kafka and knows about commits (commit) and offsets (offset) of messages, principles of groups (group) and consumers (consumer), and also understands how the partition (partition) differs from the topic (topic) . If there are gaps, I advise you to read this section of the Kafka documentation before continuing reading.

Content



Overview


Queue-Over-Http is a service that acts as an intermediary between the message broker and the final HTTP provider (the service makes it easy to implement support for sending messages to conservators in any other way, for example, various * RPCs). At the moment, only subscription, unsubscribe, and browsing the list of consumer designers are available. Sending messages to the broker (produce) via HTTP has not yet been implemented because it is impossible to guarantee the order of messages without special support from the producer.

The key figure of the service is the consumer manager, who can subscribe to specific partitions or simply to topics (the topic pattern is supported). In the first case, the auto balance of the partitions is turned off. After subscribing, the specified HTTP resource starts to receive messages from the assigned Kafka partitions. Architecturally, each subscriber is associated with the Kafka native Java client.

an entertaining story about KafkaConsumer
Kafka has a great Java client that can do a lot. I use it in the queue adapter to receive messages from the broker and then send it to the local service queues. It is worth mentioning that the client works exclusively in the context of a single thread.

The idea of ​​the adapter is simple. We start in one thread, we write the simplest scheduler of native clients, with an emphasis on reducing latency. That is, we write something similar:

while (!Thread.interrupted()) { var hasWork = false for (consumer in kafkaConsumers) { val queueGroup = consumers[consumer] ?: continue invalidateSubscription(consumer, queueGroup) val records = consumer.poll(Duration.ZERO) /*      */ if (!records.isEmpty) { hasWork = true } } val committed = doCommit() if (!hasWork && committed == 0) { // ,    Thread.sleep(1) } } 

It would seem that everything is fine, latency is minimal even if there are dozens of consultants. In practice, it turned out that KafkaConsumer for such a mode of operation and gives the allocation rate about 1.5 MB / s to idle. With 100 conservatives, the allocation rate reaches 150 MB / s and makes the GC more often recall the application. Of course, all this garbage is in the young area, GC copes with it, but still, the solution is not perfect.

Obviously, you need to go in a typical way for KafkaConsumer , and each subscriber is now placed in his stream. This gives an overhead of memory and dispatch, but there is no other way out.

I rewrite the code from above, removing the inner loop and changing Duration.ZERO to Duration.ofMillis(100) . It turns out well, the allocation rate drops to an acceptable 80-150 KB / s per binsmith. However, the Poll with a 100ms timeout delays the entire commit queue for these same 100ms, and this is unacceptable a lot.

In the process of finding solutions to the problem, I remember about KafkaConsumer::wakeup , which throws a WakeupException and interrupts any blocking operation on the consumer store. With this method, the path to low-latency is simple: when a new request for a commit arrives, we put it in the queue, and on the native bureaucrater we call wakeup . In the working cycle we catch WakeupException and go to commit what has accumulated. For the transfer of control with the help of exceptions, you need to immediately give hands, but since there is no other way ...

It turns out that this option is far from perfect, since any operation on the native bureaucrater is now throwing out a WakeupException , including the commit itself. Handling this situation will charge the code that allows wakeup .

I come to the conclusion that it would be a good idea to modify the KafkaConsumer::poll method so that it can be interrupted in a regular manner, using an additional flag. As a result, Frankenstein was born from reflection, which exactly copies the original poll method, adding a loop out of the flag. This flag is set by a separate interruptPoll method, which, moreover, on the client's selector causes a wakeup to unlock the thread on an I / O operation.

Having implemented a client in this way, I get a reaction rate from the moment a request is received for a commit before it is processed up to 100 microseconds, and an excellent latency on a selection of messages from a broker, which is fine.

Each partition is represented by a separate local queue where the adapter writes messages from the broker. A worker collects messages from it and sends them to execution, that is, to send via HTTP.

The service supports batch processing of messages to increase throughput. When subscribing, you can specify the concurrencyFactor each topic (applies to each assigned partition independently). For example, concurrencyFactor=1000 means that 1000 messages can be sent to the consumer at the same time as HTTP requests. As soon as all messages from the pack have been unequivocally worked out by the consultant, the service decides on the next commit of the last set in the order of the message to Kafka. Hence the second value of concurrencyFactor is the maximum number of re-processed messages by the consumer in the event of a Kafka or Queue-Over-Http crash.

To reduce delays, the queue has loadFactor = concurrencyFactor * 2 , which allows you to read twice as many messages from a broker than can be sent. Since the autocommit on the native client is disabled, such a scheme does not violate At-Least-Once warranties.
A high value of concurrencyFactor increases the throughput of the queue (throughput) by reducing the number of commits that take up to 10 ms in the worst case. At the same time, the load on the consumer increases.

The sequence of sending messages within a packet is not guaranteed, but it can be achieved if you set concurrencyFactor=1 .

Commits


Commits are an important part of the service. When the next data packet is ready, the offset of the last message from the packet is immediately committed to Kafka, and only after a successful commit does the next packet become available for processing. Often this is not enough and autocommit is required. To do this, there is the autoCommitPeriodMs parameter, which has little to do with the classic autocommit period for native clients who commit the last message read from the partition. Imagine that concurrencyFactor=10 . The service sent all 10 messages and waits for each of them to be ready. The processing of message 3 is completed first, then message 1, and then message 10. At this point, the time of autocommit occurs. It is important not to break the At-Least-Once semantics. Therefore, you can commit only the first message, that is, offset 2, since only it has been successfully processed at this moment. Further, until the next auto-commit, messages 2, 5, 6, 4, and 8 are processed. Now you need to commit only offset 7, and so on. Autocommit has almost no effect on throughput.

Error processing


In the normal mode of operation, the service sends a message to the master once. If for some reason it caused a 4xx or 5xx error, the service will re-send the message, waiting for successful processing. The time between attempts can be configured as a separate parameter.

It is also possible to set the number of attempts after which the message will be marked as processed, which will stop the re-sending regardless of the response status. I do not advise using it for sensitive data, situations of failure of consumers should always be corrected manually. Sticking messages can be monitored by the service logs and monitoring the status of the response of the consumerizer.

about sticking
Usually, the HTTP server, giving the 4xx or 5xx status of the response, also sends a Connection: close header. A TCP connection thus closed remains in the TIME_WAITED status until it is cleared by the operating system after some time. The problem is that such connections occupy a whole port that cannot be reused until it is released. This can result in the absence of free ports on the machine to establish a TCP connection and the service will be strewn with exceptions in the logs for each send. In practice, on Windows 10 ports end after 10–20 thousand dispatch of erroneous messages within 1-2 minutes. In standard operation, this is not a problem.

Messages


Each message retrieved from the broker is sent to HTTP to the resource specified by the subscriber during the subscription. By default, the message is sent by a POST request in the body. This behavior can be changed by specifying any other method. If the method does not support sending data in the body, you can specify the name of the string parameter in which the message will be sent. In addition, when subscribing, you can specify additional headers that will be added to each message, which is convenient for basic authorization using tokens. Headers are added to each message with the identifier of the consumer’s designer, the topic and the partition from which the message was read, the message number, the partition key, if applicable, and the name of the broker itself.

Performance


To evaluate the performance, I used a PC (Windows 10, OpenJDK-11 (G1 without tuning), i7-6700K, 16GB), on which the service and laptop (Windows 10, i5-8250U, 8GB) are running, on which the message producer was spinning, HTTP -resource konsyumera and Kafka with default settings. The PC is connected to the router via a 1Gb / s wired connection, a laptop using 802.11ac. The producer records messages, 110 bytes long, every 100 ms within 1000 seconds, into assigned topics subscribed to by consumer designers ( concurrencyFactor=500 , autocommit is off) from different groups. The stand is far from ideal, but you can get some picture.

The key measured parameter is the effect of the service on latency.

Let be:
- t q - timestamp of the service receiving the message from the native client
- d t0 - time between t q and the time of sending the message from the local queue to the pool of executors
- d t - time between t q and the time of sending the HTTP request. D t is the influence of the service on the latency of the message.

During the measurements, the following results were obtained (C - Consumer, T - Topics, M - Messages):



In the standard mode of operation, the service itself has almost no effect on latency, and memory consumption is minimal. The maximum values ​​of d t (about 60 ms) are not specifically indicated, as they depend on the operation of the GC, and not on the service itself. Smoothing the spread of maximum values ​​can help special tuning GC or replacing the G1 on Shenandoah.

Everything changes dramatically when the consumer does not cope with the flow of messages from the queue and the service turns on the trotting mode. In this mode, memory consumption increases, as the response time to queries grows dramatically, which prevents timely cleaning of resources. The effect on latency here remains at the level with previous results, and high dt values ​​are caused by preloading messages into the local queue.

Unfortunately, it is not possible to test at a higher load, since the laptop is already bent at 1300 RPS. If someone can help with the organization of measurements at high loads, I will be happy to provide an assembly for the tests.

Demonstration


We now turn to the demonstration. For this we need:


First of all, you need to raise the Queue-Over-Http service itself. To do this, create in the empty application.yml directory as follows:

 spring: profiles: default logging: level: com: viirrtus: queueOverHttp: DEBUG app: persistence: file: storageDirectory: "persist" brokers: - name: "Kafka" origin: "kafka" config: bootstrap.servers: "192.168.99.100:9092" 

Here we indicate to the service the connection parameters of a specific broker, as well as where to store subscribers, so that between launches they are not lost. In `app.brokers []. Config`, you can specify any connection parameters supported by the native Kafka client, the full list can be found here .

Since the configuration file is processed by Spring, you can write a lot of interesting things there. Including, customize logging.

Now we start the service itself. We use the easiest way - docker-compose.yml :

 version: "2" services: app: image: viirrtus/queue-over-http:0.1.3 restart: unless-stopped command: --debug ports: - "8080:8080" volumes: - ./application.yml:/application.yml - ./persist:/persist 

If this option does not suit you, you can build the service from source. Assembly instructions in the project's Readme, a link to which is given at the end of the article.

The next step is to register the first subscriber. To do this, you must perform an HTTP request to the service with a description of the Consumer:

 POST localhost:8080/broker/subscription Content-Type: application/json { "id": "my-first-consumer", "group": { "id": "consumers" }, "broker": "Kafka", "topics": [ { "name": "slack.test", "config": { "concurrencyFactor": 10, "autoCommitPeriodMs": 100 } } ], "subscriptionMethod": { "type": "http", "delayOnErrorMs": 1000, "retryBeforeCommit": 10, "uri": "<slack-wh-uri>", "additionalHeaders": { "Content-Type": "application/json" } } } 

If everything went well, the response will be almost the same content sent.

Let's go over each parameter:


In this request, the indication of the HTTP method is omitted, as the default, POST, Slack is fine.

From this point on, the service keeps track of the assigned partitions of the slack.test topic for new messages.

To write messages to the topic, I will use the utilities built into Kafka, which are located in /opt/bitnami/kafka/bin of the Kafka running image (the location of the utilities in other Kafka instances may differ):

 kafka-console-producer.sh --broker-list localhost:9092 --topic slack.test > {“text”: “Hello!”} 

At the same time, Slack will inform about the new message:



To unsubscribe a consumer, it is enough to make a POST request for `broker / unsubscribe` with the same content as it was with the subscription.

Conclusion


Currently only basic functionality is implemented. Next, we plan to improve batching, try to implement Exactly-once semantics, add the ability to send messages to the broker via HTTP and, most importantly, add support for other popular Pub-Sub.

The Queue-Over-Http service is currently under active development. Version 0.1.3 is fairly stable for testing on dev and stage stands. Performance has been tested on Windows 10, Debian 9 and Ubuntu 18.04. Use in prod at your own risk. If you want to help with the development or give any feedback on the service - welcome to the Github project.

Source: https://habr.com/ru/post/435346/


All Articles