📜 ⬆️ ⬇️

Creating a Dataflow template for streaming data from Pub / Sub to BigQuery based on GCP using Apache Beam SDK and Python

image


At the moment I am engaged in the task of streaming (and transformation) of data. In some circles
such a process is known as ETL , i.e. extract, transform and load information.


The whole process involves the following Google Cloud Platform services:



0. Current status

At the moment there is a working version of streaming on the above services, but in
one of the standard is used as a template.


The problem is that this template provides 1 to 1 data transfer, i.e. on
When entering the Pub / Sub, we have a JSON format string, the output is a BigQuery table with fields,
which correspond to the keys of the objects at the top level of the input JSON.


1. Statement of the problem

Create a dataflow template that would allow to get a table or tables at the output
according to the given conditions. For example, we want to create a separate table for each
the value of a specific key input JSON. It is necessary to take into account the fact that some
Input JSON objects can contain nested JSON as a value. is necessary
be able to create BigQuery tables with RECORD fields to store nested
data.


2. Preparation for the decision

To create a dataflow template, use the Apache Beam SDK , which, in turn,
supports java and python as a programming language. Need to say that
Only Python version 2.7x is supported, which surprised me a bit. Moreover, support
Java is a bit wider, since for Python, for example, some functionality is not available and more
A modest list of embedded connectors . By the way, you can write your connectors.


However, due to the fact that I am not familiar with Java, I used Python.


Before starting to create a template, you must have the following:


  1. JSON input format and it should not change over time
  2. BigQuery schema or schema tables to which the data will be streamed
  3. the number of tables to which the output data stream will be streamed

Note that after creating a template and running Dataflow Job on its base, these parameters can be
change only by creating a new template.


Let's say a few words about these restrictions. They all assume that there is no possibility
create a dynamic template that could accept any string as input, parse it
according to internal logic and then populate dynamically created tables with dynamically
created by the scheme. It is very likely that this possibility exists, but within the framework of these
I was not able to implement the tools to implement such a scheme. As I understand it all
pipeline is built before it is executed at runtime and because of this there is no possibility to change it to
fly. Maybe someone will share their decision.


3. Decision

For a more complete understanding of the process, it is worthwhile to give a diagram of the so-called pipeline.
from the Apache Beam documentation.


image


In our case (we will use the division into several tables):



In the process of writing my own template, I was actively inspired by these examples.


Template code with comments (left comments from previous authors as well):
 # coding=utf-8 from __future__ import absolute_import import logging import json import os import apache_beam as beam from apache_beam.pvalue import TaggedOutput from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.io.gcp.bigquery import parse_table_schema_from_json #  GCP  gcp_project = '' #  Pub/Sub  topic_name = '' # Pub/Sub    'projects/_GCP_/topics/_' input_topic = 'projects/%s/topics/%s' % (gcp_project, topic_name) #  BigQuery  bq_dataset = 'segment_eu_test' #       schema_dir = './' class TransformToBigQuery(beam.DoFn): #          ,   # BigQuery IO     python dict def process(self, element, *args, **kwargs): body = json.loads(element) #       ,      # python dict       ,     #   yield body class TagDataWithReqType(beam.DoFn): #      , ..      #     ,       #  with_outputs + default def process(self, element, *args, **kwargs): req_type = element.get('_') types = ( 'type1', 'type2', 'type3', ) if req_type in types: yield TaggedOutput(req_type, element) else: yield element def run(): #       _.json   schema_dir,  #         ()  schema_dct = {} for schema_file in os.listdir(schema_dir): filename_list = schema_file.split('.') if filename_list[-1] == 'json': with open('%s/%s' % (schema_dir, schema_file)) as f: schema_json = f.read() schema_dct[filename_list[0]] = json.dumps({'fields': json.loads(schema_json)}) # We use the save_main_session option because one or more DoFn's in this # workflow rely on global context (eg, a module imported at module level). pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) pipeline_options.view_as(SetupOptions).save_main_session = True pipeline_options.view_as(StandardOptions).streaming = True # Read from PubSub into a PCollection. input_stream = p | beam.io.ReadFromPubSub(input_topic) # Transform stream to BigQuery IO format stream_bq = input_stream | 'transform to BigQuery' >> beam.ParDo(TransformToBigQuery()) # Tag stream by schema name tagged_stream = \ stream_bq \ | 'tag data by type' >> beam.ParDo(TagDataWithReqType()). with_outputs(*schema_dct.keys(), main='default') # Stream unidentified data to default table tagged_stream.default | 'push to default table' >> beam.io.WriteToBigQuery( '%s:%s.default' % ( gcp_project, bq_dataset, ), schema=parse_table_schema_from_json(schema_dct.get('default')), ) # Stream data to BigQuery tables by number of schema names for name, schema in schema_dct.iteritems(): tagged_stream[name] | 'push to table %s' % name >> beam.io.WriteToBigQuery( '%s:%s.%s' % ( gcp_project, bq_dataset, name), schema=parse_table_schema_from_json(schema), ) result = p.run() result.wait_until_finish() if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) logger = logging.getLogger(__name__) run() 

Now let's go through the code and give explanations, but first we should say that the main
the difficulty in writing this template is to think in the category of "data flow", and
not a specific message. It is also necessary to understand that Pub / Sub handles messages and
it is from them that we will receive information for tagging a stream.


 pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) pipeline_options.view_as(SetupOptions).save_main_session = True pipeline_options.view_as(StandardOptions).streaming = True 

Since Apache Beam Pub / Sub IO connector is used only in streaming mode necessary
add PipelineOptions () (although in fact the options are not used) otherwise the creation of a template
falls with the exception. Need to say about the options run the template. They can be
static and so-called "runtime". Here is a link to the documentation on this topic. The options allow you to create a template without specifying the parameters in advance, but transferring them when you run the Dataflow Job from the template, but I never managed to implement it, probably due to the fact that this connector does not support RuntimeValueProvider .


 # Read from PubSub into a PCollection. input_stream = p | beam.io.ReadFromPubSub(input_topic) 

Everything is clear from the commentary, we read the stream from the topic. It is worth adding that the flow can be taken
both from the topic and from the subscription (subscription). If the topic is specified as input, then
a temporary subscription to this topic will be automatically created. The syntax is also pretty
clear, the input beam.io.ReadFromPubSub(input_topic) data stream is sent to our
pipeline p .


 # Transform stream to BigQuery IO format stream_bq = input_stream | 'transform to BigQuery' >> beam.ParDo(TransformToBigQuery()) 

Transform # 1 happens here and our input is converted from python string to
python dict, and at the output we get PCollection # 1. In the syntax appears >> . On
in fact, the text in quotes is the name of the stream (must be unique), as well as a comment,
which will be added to the block on the graph in the GCP Dataflow web interface. Consider more
override class TransformToBigQuery .


 class TransformToBigQuery(beam.DoFn): #          ,   # BigQuery IO     python dict def process(self, element, *args, **kwargs): body = json.loads(element) #       ,      # python dict       ,     #  ,      python dict yield body 

The element variable will contain one message from the PubSub subscription. As seen from
code, in our case it must be valid JSON. In the class must be
redefined process method in which the necessary conversions should be made
the input line to get the output data that will fit the scheme
tables in which this data will be loaded. Since our flow in this case is
continuous, unbounded in terms of the Apache Beam, then you must return it using
yield , not return , as for the final data stream. In the case of a finite flow, you can
(and need to) further customize windowing and triggers


 # Tag stream by schema name tagged_stream = \ stream_bq \ | 'tag data by type' >> beam.ParDo(TagDataWithReqType()).with_outputs(*schema_dct.keys(), main='default') 

This code directs PCollection # 1 to Transform # 2 where tagging will occur.
(separation) data stream. In the schema_dct variable in this case, the dictionary, where the key is the name of the schema file without an extension, this will be the tag and the value is the valid JSON schema.
BigQuery tables for this tag. It should be noted that the scheme should be transmitted precisely in
as {'fields': } where is the BigQuery table schema in the form of JSON (you can
export from web interface).


main='default' is the name of the tag of the thread that will go to
all messages not subject to tagging conditions. Consider the class
TagDataWithReqType .


 class TagDataWithReqType(beam.DoFn): #      , ..      #     ,       #  with_outputs + default def process(self, element, *args, **kwargs): req_type = element.get('_') types = ( 'type1', 'type2', 'type3', ) if req_type in types: yield TaggedOutput(req_type, element) else: yield element 

Apparently, the process class is also redefined here. types contains names
tags and they must match the number and name with the number and names of the dictionary keys
schema_dct . Although the process method has the ability to take arguments, I still haven't
was able to pass them. The reason has not yet figured out.


At the output we get a tuple of threads in the number of tags, namely the number of our
predefined tags + default flow that could not be tagged.


 # Stream unidentified data to default table tagged_stream.default | 'push to default table' >> beam.io.WriteToBigQuery( '%s:%s.default' % ( gcp_project, bq_dataset, ), schema=parse_table_schema_from_json(schema_dct.get('default')), ) 

Transform # ... (in fact, it is not on the diagram, it is a "branch") - we write the default flow
in the default table.


tagged_stream.default - the stream is taken with the default tag, the alternative syntax is tagged_stream['default']


schema=parse_table_schema_from_json(schema_dct.get('default')) - the schema is defined here
tables. Please note that the default.json file with a valid BigQuery table schema
must be in the current schema_dir = './' directory.


The stream will fall into a table named default .


If a table with the same name (in this dataset of this project) does not exist, then it
will be automatically created from the schema due to the default setting
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED


 # Stream data to BigQuery tables by number of schema names for name, schema in schema_dct.iteritems(): tagged_stream[name] | 'push to table %s' % name >> beam.io.WriteToBigQuery( '%s:%s.%s' % ( gcp_project, bq_dataset, name), schema=parse_table_schema_from_json(schema), ) 

Transform # 3, everything should be clear to those who read the article from the very beginning and owns
python syntax. We divide a tuple of threads in a cycle and write each stream to its table with
its scheme. It should be recalled that the stream name must be unique - '%s:%s.%s' % (gcp_project, bq_dataset, name) .


Now it should be clear how this works and you can create a template. For this you need
execute in the console (do not forget to activate venv if available) or from the IDE:


 python _.py / --runner DataflowRunner / --project dreamdata-test / --staging_location gs://STORAGE_NAME/STAGING_DIR / --temp_location gs://STORAGE_NAME/TEMP_DIR / --template_location gs://STORAGE_NAME/TEMPLATES_DIR/TEMPLATE_NAME 

At the same time, access to the Google Account should be organized, for example, through export
the environment variable GOOGLE_APPLICATION_CREDENTIALS or in some other way .


A few words about --runner . In this case, the DataflowRunner says that this code
will run as a template for Dataflow Job. You can still specify
DirectRunner , it will be used by default with no --runner option and code
will work as a dataflow job, but locally, which is very convenient for debugging.


If there are no errors, then gs://STORAGE_NAME/TEMPLATES_DIR/TEMPLATE_NAME will be
template created. It is worth saying that gs://STORAGE_NAME/STAGING_DIR will also be written
service files that are necessary for the successful work of the Datafow Job created on the base
template and delete them is not necessary.


Next, you need to create a dataflow job using this template, manually or by any
in another way (CI for example).


4. Conclusions

Thus it was possible to organize streaming stream from PubSub to BigQuery using
necessary data transformations for further storage, conversion and
data usage.


Main links



In this article possible inaccuracies and even mistakes, I will be grateful for the constructive
criticism. In the end, I want to add that in fact, far from all are used here.
Apache Beam SDK features, but there was no such goal.


')

Source: https://habr.com/ru/post/441892/


All Articles