📜 ⬆️ ⬇️

Creating a plugin for logical replication in PostgreSQL 9.4+

As many who are interested, they know that in PostgreSQL in version 9.4 there appeared (finally) logical decoding. Now, to make your replication, you do not have to deal with the format of binary wal files or write triggers (maybe there were other ways), and convert the data into a format that is convenient for you. To do this, just write a plug-in to PostgreSQL, which will do this. This article describes a plugin that converts data to JSON.


The plugin code is on github - github.com/ildus/decoder_json . Pull requests with improvements (especially in terms of improving support for types), bugfixes and simply cosmetic improvements are welcomed. JSON was chosen for simplicity. This is not the final version, perhaps after testing on real data it will turn out that a more productive format is needed, and you will have to redo it. In the article I will not give all the code for the plugin, but only parts about which I think I need to tell.

Requirements for creating a plugin: knowledge C, installed build tools (gcc, cmake), installed packages (on debian-systems) postgresql-9.4, postgresql-server-dev-9.4 and similar on other systems. After installing postgresql, in postgresql.conf, you must set the value max_replication_slots = 1 (or more) and wal_level = logical.
')
The plugin itself is a plug-in library in C, from which callback functions to postgresql events are called. During initialization, the _PG_output_plugin_init function is _PG_output_plugin_init with a structure whose fields need to be assigned their own functions:


The function that fills the structure:

 void _PG_output_plugin_init(OutputPluginCallbacks *cb) { cb->startup_cb = decoder_json_startup; cb->begin_cb = decoder_json_begin_txn; cb->change_cb = decoder_json_change; cb->commit_cb = decoder_json_commit_txn; cb->shutdown_cb = decoder_json_shutdown; } 

It now remains to define these five functions. decoder_json_startup is called at the beginning of the decoding and is used to set the decoding options and create its own memory context:

Decoder_json_startup function
 /* initialize this plugin */ static void decoder_json_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init) { ListCell *option; DecoderRawData *data; data = palloc(sizeof(DecoderRawData)); data->context = AllocSetContextCreate(ctx->context, "Raw decoder context", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); data->include_transaction = false; data->sort_keys = false; ctx->output_plugin_private = data; /* Default output format */ opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT; foreach(option, ctx->output_plugin_options) { DefElem *elem = lfirst(option); Assert(elem->arg == NULL || IsA(elem->arg, String)); if (strcmp(elem->defname, "include_transaction") == 0) { /* if option does not provide a value, it means its value is true */ if (elem->arg == NULL) data->include_transaction = true; else if (!parse_bool(strVal(elem->arg), &data->include_transaction)) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } else if (strcmp(elem->defname, "sort_keys") == 0) { /* if option does not provide a value, it means its value is true */ if (elem->arg == NULL) data->sort_keys = true; else if (!parse_bool(strVal(elem->arg), &data->sort_keys)) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } else { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("option \"%s\" = \"%s\" is unknown", elem->defname, elem->arg ? strVal(elem->arg) : "(null)"))); } } } 

Here the parameters passed to the plugin are parsed and saved to the structure. The created memory context is used later in decoder_json_change to properly clean the resources used. Important points:


decoder_json_shutdown is called at the end of decoding, and is used to clean up resources.

Decoder_json_shutdown function
 /* cleanup this plugin's resources */ static void decoder_json_shutdown(LogicalDecodingContext *ctx) { DecoderRawData *data = ctx->output_plugin_private; /* cleanup our own resources via memory context reset */ MemoryContextDelete(data->context); } 


Next is the most interesting. It is necessary to define the decoder_json_begin_txn , decoder_json_commit_txn and decoder_json_change which actually generate the lines received by the pg_logical_slot_peek_changes and pg_logical_slot_get_changes . The generated string must be added to the slot, this is done with the commands:

 OutputPluginPrepareWrite(ctx, true); appendStringInfoString(ctx->out, "some string"); OutputPluginWrite(ctx, true); 

The decoder_json_begin_txn and decoder_json_commit_txn write (or just skip, if there is such a condition) the commands of the beginning and end of the transaction in the slot - the lines 'begin' and 'commit', respectively.

The decoder_json_change function decoder_json_change called on a data change event. This function determines which event occurred (INSERT, UPDATE, DELETE) and creates its own structure for each of them. For UPDATE and DELETE, it is important to have a unique (not null) or primary key in the table, otherwise it is simply impossible to determine the row to be changed (deleted). This depends on the value of the REPLICA IDENTITY parameter for the table.

This function takes 4 parameters:


Briefly about the function, we can say that the type of operation is defined through change->action . Then, according to the data in change ( change->data.tp.newtuple and change->data.tp.oldtuple ), a JSON structure is created. JSON is generated using the libjansson library.

This is where the difficulties begin. If REPLICA IDENTITY for a table is set to NOTHING or DEFAULT with a missing primary key, it is impossible to determine the rows to be changed and only the addition records will be included in the log. When updating or deleting data from a table with DEFAULT, FULL, INDEX and if there is a unique key, its value is taken from the newtuple or from the oldtuple (if the key value is changed by the query). If there is no unique key and if FULL, then all values ​​from oldtuple are used for identification.

As a result, a JSON structure is constructed, of the form {"a": 0, "r": "public.some_table", "c": {"id": 1}, "d": {"a": 2}} , where a is the type of action, r is the name of the table, c is the value to identify the row, d is the data itself.

Check the work. Building the plugin and running the tests:

 git clone https://github.com/ildus/decoder_json.git cd decoder_json #         -         sudo chmod a+rw `pg_config --pkglibdir` chmod a+rwx ./ #    libjansson,   JSON make deps #    postgres sudo su postgres make make test 

Testing the plugin manually:

 #    postgres,          sudo su postgres createdb test_db psql test_db # psql  test_db=# create table test1 (id serial primary key, name varchar); test_db=# SELECT * FROM pg_create_logical_replication_slot('custom_slot', 'decoder_json'); slot_name | xlog_position -------------+--------------- custom_slot | 0/4D9F870 (1 row) 

Here we specify the name of the slot and the plug-in. In response, we see the name of the slot, and the place (xlog position) from which the data is actually written to the slot. The fact that we have specified our plugin does not mean that it is already working, the decoding itself begins only when we take the data. To do this, use the pg_logical_slot_peek_changes and pg_logical_slot_get_changes . They differ in that the get function cleans the queue after receiving the data.

Adding data:

 test_db=# insert into test1 values (1, 'bb'); INSERT 0 1 test_db=# insert into test1 values (2, 'bb'); INSERT 0 1 test_db=# select * from pg_logical_slot_get_changes('custom_slot', NULL, NULL, 'include_transaction', 'on'); location | xid | data -----------+-------+----------------------------------------------------- 0/BAB0968 | 48328 | begin 0/BAB0968 | 48328 | {"a":0,"r":"public.test1","d":{"id":1,"name":"bb"}} 0/BAB09F0 | 48328 | commit 0/BAB09F0 | 48329 | begin 0/BAB09F0 | 48329 | {"a":0,"r":"public.test1","d":{"id":2,"name":"bb"}} 0/BAB0A78 | 48329 | commit (6 rows) 

Data change

 test_db=# update test1 set name = 'dd' where id=2; UPDATE 1 test_db=# select * from pg_logical_slot_get_changes('custom_slot', NULL, NULL, 'include_transaction', 'on'); location | xid | data -----------+-------+------------------------------------------------------------------ 0/BB4C700 | 48338 | begin 0/BB4C700 | 48338 | {"c":{"id":2},"a":1,"r":"public.test1","d":{"id":2,"name":"dd"}} 0/BB4C798 | 48338 | commit (3 rows) 

Deletion of data:

 test_db=# delete from test1 where id=2; DELETE 1 test_db=# select * from pg_logical_slot_get_changes('custom_slot', NULL, NULL, 'include_transaction', 'on'); location | xid | data -----------+-------+----------------------------------------- 0/BB4C8A8 | 48339 | begin 0/BB4C8A8 | 48339 | {"c":{"id":2},"a":2,"r":"public.test1"} 0/BB4C9C8 | 48339 | commit (3 rows) 

Used and useful resources:

Source: https://habr.com/ru/post/254263/


All Articles