_PG_output_plugin_init
function is _PG_output_plugin_init
with a structure whose fields need to be assigned their own functions: void _PG_output_plugin_init(OutputPluginCallbacks *cb) { cb->startup_cb = decoder_json_startup; cb->begin_cb = decoder_json_begin_txn; cb->change_cb = decoder_json_change; cb->commit_cb = decoder_json_commit_txn; cb->shutdown_cb = decoder_json_shutdown; }
decoder_json_startup
is called at the beginning of the decoding and is used to set the decoding options and create its own memory context: /* initialize this plugin */ static void decoder_json_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init) { ListCell *option; DecoderRawData *data; data = palloc(sizeof(DecoderRawData)); data->context = AllocSetContextCreate(ctx->context, "Raw decoder context", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); data->include_transaction = false; data->sort_keys = false; ctx->output_plugin_private = data; /* Default output format */ opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT; foreach(option, ctx->output_plugin_options) { DefElem *elem = lfirst(option); Assert(elem->arg == NULL || IsA(elem->arg, String)); if (strcmp(elem->defname, "include_transaction") == 0) { /* if option does not provide a value, it means its value is true */ if (elem->arg == NULL) data->include_transaction = true; else if (!parse_bool(strVal(elem->arg), &data->include_transaction)) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } else if (strcmp(elem->defname, "sort_keys") == 0) { /* if option does not provide a value, it means its value is true */ if (elem->arg == NULL) data->sort_keys = true; else if (!parse_bool(strVal(elem->arg), &data->sort_keys)) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } else { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("option \"%s\" = \"%s\" is unknown", elem->defname, elem->arg ? strVal(elem->arg) : "(null)"))); } } }
decoder_json_change
to properly clean the resources used. Important points:opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT
- this is how the output of the plugin is set to textdecoder_json_shutdown
is called at the end of decoding, and is used to clean up resources. /* cleanup this plugin's resources */ static void decoder_json_shutdown(LogicalDecodingContext *ctx) { DecoderRawData *data = ctx->output_plugin_private; /* cleanup our own resources via memory context reset */ MemoryContextDelete(data->context); }
decoder_json_begin_txn
, decoder_json_commit_txn
and decoder_json_change
which actually generate the lines received by the pg_logical_slot_peek_changes
and pg_logical_slot_get_changes
. The generated string must be added to the slot, this is done with the commands: OutputPluginPrepareWrite(ctx, true); appendStringInfoString(ctx->out, "some string"); OutputPluginWrite(ctx, true);
decoder_json_begin_txn
and decoder_json_commit_txn
write (or just skip, if there is such a condition) the commands of the beginning and end of the transaction in the slot - the lines 'begin' and 'commit', respectively.decoder_json_change
function decoder_json_change
called on a data change event. This function determines which event occurred (INSERT, UPDATE, DELETE) and creates its own structure for each of them. For UPDATE and DELETE, it is important to have a unique (not null) or primary key in the table, otherwise it is simply impossible to determine the row to be changed (deleted). This depends on the value of the REPLICA IDENTITY parameter for the table.change->action
. Then, according to the data in change
( change->data.tp.newtuple
and change->data.tp.oldtuple
), a JSON structure is created. JSON is generated using the libjansson library.{"a": 0, "r": "public.some_table", "c": {"id": 1}, "d": {"a": 2}}
, where a is the type of action, r is the name of the table, c is the value to identify the row, d is the data itself. git clone https://github.com/ildus/decoder_json.git cd decoder_json # - sudo chmod a+rw `pg_config --pkglibdir` chmod a+rwx ./ # libjansson, JSON make deps # postgres sudo su postgres make make test
# postgres, sudo su postgres createdb test_db psql test_db # psql test_db=# create table test1 (id serial primary key, name varchar); test_db=# SELECT * FROM pg_create_logical_replication_slot('custom_slot', 'decoder_json'); slot_name | xlog_position -------------+--------------- custom_slot | 0/4D9F870 (1 row)
pg_logical_slot_peek_changes
and pg_logical_slot_get_changes
. They differ in that the get function cleans the queue after receiving the data. test_db=# insert into test1 values (1, 'bb'); INSERT 0 1 test_db=# insert into test1 values (2, 'bb'); INSERT 0 1 test_db=# select * from pg_logical_slot_get_changes('custom_slot', NULL, NULL, 'include_transaction', 'on'); location | xid | data -----------+-------+----------------------------------------------------- 0/BAB0968 | 48328 | begin 0/BAB0968 | 48328 | {"a":0,"r":"public.test1","d":{"id":1,"name":"bb"}} 0/BAB09F0 | 48328 | commit 0/BAB09F0 | 48329 | begin 0/BAB09F0 | 48329 | {"a":0,"r":"public.test1","d":{"id":2,"name":"bb"}} 0/BAB0A78 | 48329 | commit (6 rows)
test_db=# update test1 set name = 'dd' where id=2; UPDATE 1 test_db=# select * from pg_logical_slot_get_changes('custom_slot', NULL, NULL, 'include_transaction', 'on'); location | xid | data -----------+-------+------------------------------------------------------------------ 0/BB4C700 | 48338 | begin 0/BB4C700 | 48338 | {"c":{"id":2},"a":1,"r":"public.test1","d":{"id":2,"name":"dd"}} 0/BB4C798 | 48338 | commit (3 rows)
test_db=# delete from test1 where id=2; DELETE 1 test_db=# select * from pg_logical_slot_get_changes('custom_slot', NULL, NULL, 'include_transaction', 'on'); location | xid | data -----------+-------+----------------------------------------- 0/BB4C8A8 | 48339 | begin 0/BB4C8A8 | 48339 | {"c":{"id":2},"a":2,"r":"public.test1"} 0/BB4C9C8 | 48339 | commit (3 rows)
Source: https://habr.com/ru/post/254263/
All Articles