The article describes the use of the AVRO serialization format in python, gives a brief description of the AVRO scheme with explanations of the most obscure points, provides specific examples of python code. Intentionally excluded from consideration questions of schema evolution (schema evolution), RPC and AVRO-IDL.
All the examples are given using the fastavro library, which the author had to significantly improve to meet the specification and compatibility with the java implementation.
Why AVRO, but not json, bson, msgpack, protobuf, ASN.1, thrift, yaml?
When decomposing a monolithic system, it became necessary to describe the procedure for interaction between microservices. Not long choosing between RabbitMQ and Kafka stopped at the last. But over the next task - the choice of the serialization system had to sweat.
When choosing a serialization system, the following requirements were taken into account:
Multiple programming language support
The basis of our codebase is python 2.7. Moreover, I would like to further translate productivity-sensitive processes into other languages.
Validation of data during serialization
In a dynamic interpreted python, it's too easy to accidentally send the wrong data. And in the pub-sub model of kafka we chose it was very important for us to ensure the correctness of the input data in the topic. We needed a system that allows typing kafka topics.
Type support
Our system actively operates with types Decimal, UUID and Datetime. Alas, the well-known serialization formats starting ASN.1 and ending with msgpack basically describe the serialization of low-level types (int \ float \ string \ bytes) and do not offer complete solutions for those of interest.
Based on these considerations, the choice fell on the AVRO. But suddenly it turned out (spring 2017) that despite the presence of support for logical types in the specification and JAVA libraries - neither in the official AVRO implementation for python, nor in the competing fastavto they were simply ignored. Had to add their own.
The most adequate (and also the fastest) code turned out to be at fastavro, as a result, it was decided to finalize this library. This was my first experience of participation in opensource.
AVRO is a data serialization system created by the Hadoop project. The data is serialized in a binary format using a previously created json scheme, and a scheme is also required for deserialization (perhaps another one).
Also, AVRO will allow to pack a large number of records specified by a single scheme within a single container, which allows you to efficiently transfer large amounts of data, avoiding the overheads of other formats.
I will not describe in detail the rules for constructing schemes, since They are set out in the official documentation.
I will dwell only on the basic things and not at all obvious points.
The AVRO scheme is a JSON that describes a serializable \ deserializable data structure. Data types can be:
Although the above logical types have long been the standard in relational databases and modern programming languages, serialization libraries have bypassed them, forcing them to be reduced to primitive types, fortunately, AVRO has solved this problem.
Consider a simple scheme:
{ "type": "record", "namespace": "notificator", "name": "out", "doc": "HTTP ", "fields": [ { "doc": "id ", "name": "id", "type": "long" }, { "name": "datetime", "doc": " ", "type": { "type": "long", "logicalType": "timestamp-millis" } }, { "doc": " ", "name": "source", "type": [ "null", "string" ] }, { "doc": "", "name": "method", "default": "POST", "type": { "type": "enum", "name": "methods", "symbols": [ "POST", "GET", ] } }, { "name": "url", "type": "string" }, { "name": "headers", "type": { "type": "map", "values": "string" } }, { "doc": "body", "name": "body", "type": "bytes" } ] }
The scheme begins with a declaration of the record type with the given name and namespace. These fields in the first lines will be used in code generation systems that are not relevant to python, since our scheme will be processed dynamically. Next comes the enumeration of the field types of our record.
Of particular interest is the declaration of the datetime field, since it contains a logical type. It is important to remember that logical types should be specified as nested in the type descriptions of the field .
wrong:
{ "name": "datetime", "doc": " ", "type": "long", "logicalType": "timestamp-millis" },
right:
{ "name": "datetime", "doc": " ", "type": { "type": "long", "logicalType": "timestamp-millis" } },
Next comes the source field declared as union "type": ["null", "string"]
, this entry means that the value of source can be one of two types null
or string
. This way you can combine not only primitive types, but also composite and logical ones. Examples of such combinations, as well as more complex schemes can be found here.
Another non-obvious point is related to default
: the default
value should be set for the first type in the enumeration .
wrong:
{ "name": "f", "type": ["long", "string"], "default": "asd" },
right:
{ "name": "f", "type": ["string", "long"], "default": "asd" },
The logical types are Decimal (fixed-point number) and UUID.
Decimal requires additional parameters - the number of characters in the number and the number of decimal places:
{ "name": "money", "doc": "16 , 4 ", "type": { "type": "bytes", "logicalType": "decimal", "precision": 16, "scale": 4, } }
And the UUID is interesting because it is not in the specification, but its implementation is . What is rather strangely done - the UUID is encoded as a string.
{ "name": "uuid", "type": { "type": "string", "logicalType": "uuid" } }
I had to add such an implementation to fastavro.
import fastavro as avro with open('some-file.avro', 'rb') as fo: # reader_schema reader = fastavro.reader(fo, reader_schema=None) schema = reader.schema for record in reader: process_record(record)
from fastavro import writer schema = { 'doc': 'A weather reading.', 'name': 'Weather', 'namespace': 'test', 'type': 'record', 'fields': [ {'name': 'station', 'type': 'string'}, {'name': 'time', 'type': 'long'}, {'name': 'temp', 'type': 'int'}, ], } records = [ {'station': '011990-99999', 'temp': 0, 'time': 1433269388}, {'station': '011990-99999', 'temp': 22, 'time': 1433270389}, {'station': '011990-99999', 'temp': -11, 'time': 1433273379}, {'station': '012650-99999', 'temp': 111, 'time': 1433275478}, ] with open('weather.avro', 'wb') as out: writer(out, schema, records)
Used when transferring data as messages.
from io import BytesIO import fastavro def serialize(schema, data): bytes_writer = BytesIO() fastavro.schemaless_writer(bytes_writer, schema, data) return bytes_writer.getvalue() def deserialize(schema, binary): bytes_writer = BytesIO() bytes_writer.write(binary) bytes_writer.seek(0) data = fastavro.schemaless_reader(bytes_writer, schema) return data
import fastavro fastavro._reader.LOGICAL_READERS['long-timestamp-millis'] = lambda d, w, r: d
Now the timestamp-millis logical type will be read not in python datetime, but in long.
In fastavro, the function acquaint_schema is provided, which reads the schema into the internal repository (there are also exceptions , but this is a separate story).
Having a scheme
{ "name": "decimal_18_6", "namespace": "types", "type": "bytes", "logicalType": "decimal", "precision": 18, "scale": 6 }
and downloading it with the help of acquaint_schema you can further use a short description of the types:
"fields": [ { "name": "money1", "type": "types.decimal_18_6" }, { "name": "money2", "type": "types.decimal_18_6" }, ]
Please note - the name of the type when accessing includes its namespace types .decimal_18_6
It is also necessary in some non-trivial cases.
Source: https://habr.com/ru/post/346698/
All Articles