📜 ⬆️ ⬇️

Create streaming processing pipeline. Part 1

Hello. Friends, we are sharing with you the translation of an article prepared especially for the students of the course “Data Engineer” . Go!



Apache Beam and DataFlow for real-time pipelines


Today's post is based on a task that I recently did at work. I was really glad to embody it and describe the work done in the blogpost format, because it gave me the opportunity to work on data-engineering, as well as do something that would be very useful for my team. Not so long ago, I discovered that our systems store a fairly large amount of user logs associated with one of our products for working with data. It turned out that no one used this data, so I immediately became interested in what we could find out if we began to analyze it regularly. However, there were several problems on the way. The first problem was that the data was stored in many different text files that were not available for instant analysis. The second problem was that they were stored in a closed system, so I could not use any of my favorite data analysis tools.
')
I had to decide how to make access easier for us and bring at least some value by integrating this data source into some of our user interaction solutions. After some thought, I decided to design a pipeline to transfer this data to the cloud database so that the team and I could access it and start generating any conclusions. After I finished Data Engineering in Coursera a while ago, I was eager to use some of the course tools in the project.

Thus, placing data in a cloud database seemed like a sensible way to solve my first problem, but what could I do with problem number 2? Fortunately, there was a way to transfer this data to an environment where I could access tools like Python and the Google Cloud Platform (GCP). However, it was a long process, so I needed to do something that would allow me to continue developing while I was waiting for the data transfer to finish. The solution I came to was creating fake data using the Faker library in Python. I have never used this library before, but quickly realized how useful it is. Using this approach allowed me to start writing code and testing the pipeline without actual data.

Given the above, in this post I will describe how I built the above described pipeline using some of the technologies available in the GCP. In particular, I will use Apache Beam (version for Python), Dataflow, Pub / Sub and Big Query to collect user logs, convert data and transfer them to a database for further analysis. In my case, I only needed the Beam package functionality, since my data did not arrive in real time, so Pub / Sub was not required. However, I will focus on the streaming version, as this is what you may encounter in practice.

Introduction to GCP and Apache Beam


The Google Cloud Platform provides a set of really useful tools for processing big data. Here are some of the tools that I will use:



A large number of tools are available in the GCP, so it may be difficult to cover them all, including their purpose, but nevertheless, here is a summary for reference.

Visualization of our pipeline


Let's visualize the components of our conveyor in Figure 1 . At a high level, we want to collect user data in real time, process it and transfer it to BigQuery. Logs are created when users interact with the product by sending requests to the server, which are then logged. This data can be especially useful for understanding how users interact with our product and whether it works correctly. In general, the pipeline will contain the following steps:

  1. These logs of our users are published in the Pub / Sub-section.
  2. We connect to Pub / Sub and convert the data to the appropriate format using Python and Beam (steps 3 and 4 in Figure 1).
  3. After converting the data, the Beam will then connect to BigQuery and add it to our table (steps 4 and 5 in Figure 1).
  4. For the analysis, we can connect to BigQuery using various tools, such as Tableau and Python.

Beam makes this process very simple, regardless of whether we have a stream data source or a CSV file, and we want to perform batch processing. Later you will see that the code contains only the minimal changes necessary to switch between them. This is one of the benefits of using the Beam.


Figure 1: Main Data Pipeline

Creating pseudo data with Faker


As I mentioned earlier, due to limited access to data, I decided to create pseudo data in the same format as the actual data. It was a really useful exercise, since I could write code and test the pipeline while I was waiting for the data. I suggest looking at the Faker documentation if you want to know what else this library has to offer. Our user data will generally be similar to the example below. Based on this format, we can generate data line by line to simulate data in real time. These logs give us information such as date, request type, server response, IP address, etc.

192.52.197.161 - - [30/Apr/2019:21:11:42] "PUT /tag/category/tag HTTP/1.1" [401] 155 "https://harris-lopez.com/categories/about/" "Mozilla/5.0 (Macintosh; PPC Mac OS X 10_11_2) AppleWebKit/5312 (KHTML, like Gecko) Chrome/34.0.855.0 Safari/5312"

Based on the line above, we want to create our LINE variable using 7 variables in curly brackets below. We will also use them as variable names in our table schema a bit later.

LINE = """\
{remote_addr} - - [{time_local}] "{request_type} {request_path} HTTP/1.1" [{status}] {body_bytes_sent} "{http_referer}" "{http_user_agent}"\
"""


If we were doing batch processing, the code would be very similar, although we would need to create a set of samples in a certain time range. To use the faker, we simply create an object and call the methods we need. In particular, Faker was useful for creating IP addresses, as well as websites. I used the following methods:

fake.ipv4()
fake.uri_path()
fake.uri()
fake.user_agent()


 from faker import Faker import time import random import os import numpy as np from datetime import datetime, timedelta LINE = """\ {remote_addr} - - [{time_local}] "{request_type} {request_path} HTTP/1.1" [{status}] {body_bytes_sent} "{http_referer}" "{http_user_agent}"\ """ def generate_log_line(): fake = Faker() now = datetime.now() remote_addr = fake.ipv4() time_local = now.strftime('%d/%b/%Y:%H:%M:%S') request_type = random.choice(["GET", "POST", "PUT"]) request_path = "/" + fake.uri_path() status = np.random.choice([200, 401, 404], p = [0.9, 0.05, 0.05]) body_bytes_sent = random.choice(range(5, 1000, 1)) http_referer = fake.uri() http_user_agent = fake.user_agent() log_line = LINE.format( remote_addr=remote_addr, time_local=time_local, request_type=request_type, request_path=request_path, status=status, body_bytes_sent=body_bytes_sent, http_referer=http_referer, http_user_agent=http_user_agent ) return log_line 


The end of the first part.

In the coming days we will share with you the continuation of the article, and now traditionally we are waiting for comments ;-).

Source: https://habr.com/ru/post/454186/


All Articles