📜 ⬆️ ⬇️

Data migration from MySQL to PostgreSQL

As you work with databases, get acquainted with their pros and cons, there is a moment when a decision is made to migrate from one DBMS to another. In this case, the problem arose of migrating services from MySQL to PostgreSQL. Here is a small list of goodies that are waiting for the transition to PostgreSQL, version 9.2 (a more detailed list of features can be found here ):

As a rule, existing solutions rely on working with a ready-made SQL dump, which is converted in accordance with the syntax of the target database. But in some cases (an actively used web application with a large amount of information), this option incurs certain time costs for creating a SQL dump from a DBMS, converting it, and loading the resulting dump back into the DBMS. Therefore, the online option (straight from the DBMS to the DBMS) of the converter will be more optimal, which can significantly reduce downtime services.

C ++ was chosen as the language for implementation (with some capabilities from C ++ 11x), libraries for connecting with MySQL and PostgreSQL were used native, Qt Creator was used as the IDE.

The migration algorithm is as follows. It is understood that the table structure has already been created in the recipient database, corresponding to the structure in the source database. A list of tables for data transfer is formed, which is then distributed in the thread pool. Each stream has a connection to the source database and the recipient database. Those. parallel transferred several tables. Profit!

Traditionally, any application has a framework - a set of system components that rely on other components - working with a configuration file, a log, an error handler, a memory manager, and so on. In our case, only the most necessary to solve the problem is used. First, some fundamental and composite types were redefined (for convenience only) (yes, I know, it was possible to use Alias ​​templates , but it turned out like this):
simple types
typedef bool t_bool; typedef char t_char; typedef unsigned char t_uchar; typedef signed char t_schar; typedef int t_int; typedef unsigned int t_uint; typedef float t_float; typedef double t_double; 

map
 template<typename T, typename U> class CMap : public std::map<T, U> { public: CMap(); virtual ~CMap(); }; template<typename T, typename U> CMap<T, U>::CMap() { } template<typename T, typename U> CMap<T, U>::~CMap() { } 

vector
 template<typename T> class CVector : public std::vector<T> { public: CVector(); virtual ~CVector(); }; template<typename T> CVector<T>::CVector() { } template<typename T> CVector<T>::~CVector() { } 

fstream
 class CFileStream : public std::fstream { public: CFileStream(); virtual ~CFileStream(); }; 

From explicit patterns, only singleton was used:
classic singleton mayers
 template<typename T> class CSingleton { public: static T* instance(); void free(); protected: CSingleton(); virtual ~CSingleton(); }; template<typename T> T* CSingleton<T>::instance() { static T *instance = new T(); return instance; } template<typename T> void CSingleton<T>::free() { delete this; } template<typename T> CSingleton<T>::CSingleton() { } template<typename T> CSingleton<T>::~CSingleton() { } 

The base classes for the task (performed in a separate thread) and the system (starts the task execution):
task.h
 class CTask { public: CTask(); virtual ~CTask(); void execute(); t_uint taskID(); t_bool isExecuted(); protected: virtual void executeEvent() = 0; private: t_uint m_task_id; t_bool m_executed; }; 

task.cpp
 CTask::CTask() : m_executed(false) { static t_uint task_id = 0; m_task_id = task_id++; } CTask::~CTask() { } void CTask::execute() { executeEvent(); m_executed = true; } t_uint CTask::taskID() { return m_task_id; } t_bool CTask::isExecuted() { return m_executed; } 

system.h
 class CSystem { public: CSystem(); virtual ~CSystem() = 0; protected: void executeTask(CTask *task); }; 

system.cpp
 CSystem::CSystem() { } CSystem::~CSystem() { } void CSystem::executeTask(CTask *task) { CTask& task_ref = *task; std::thread thread([&]() { task_ref.execute(); }); thread.detach(); } 

At the end of the consideration of the basic types, you need to mention the class of the string that you had to write from scratch, so that for some operations (substring replacement and concatenation) you could work with the transferred buffer (about it just below) without additional memory allocations and some things (converting the string to and numbers in strings) make members of the class (only the class declaration is given)
string.h
 class CString { public: CString(const t_char *data = nullptr); CString(const CString& s); ~CString(); const t_char* ptr() const; void setPtr(t_char *p); CString& operator= (const CString& s); CString operator+ (const t_char *p) const; CString operator+ (t_char c) const; CString operator+ (const CString& s) const; friend CString operator+ (const t_char *p, const CString& s); CString& operator+= (const t_char *p); CString& operator+= (t_char c); CString& operator+= (const CString& s); t_bool operator== (const CString& s) const; t_bool operator!= (const CString& s) const; t_bool operator< (const CString& s) const; t_bool operator> (const CString& s) const; t_bool operator<= (const CString& s) const; t_bool operator>= (const CString& s) const; t_char& at(t_uint index); t_char at(t_uint index) const; t_uint length() const; t_bool isEmpty() const; void clear(); t_int search(const CString& s, t_uint from = 0) const; CString substr(t_uint from, t_int count = -1) const; CString replace(const CString& before, const CString& after) const; static CString fromNumber(t_uint value); static t_uint toUnsignedInt(const CString& s, t_bool *good = nullptr); CVector<CString> split(const CString& splitter) const; t_bool match(const CString& pattern) const; static t_uint replacePtr(const t_char *src, const t_char *before, const t_char *after, char *buffer); static t_uint lengthPtr(const t_char *src); static t_uint concatenatePtr(const t_char *src, char *buffer); private: t_char *m_data; t_uint length(const t_char *src) const; t_char* copy(const t_char *src) const; t_char* concatenate(const t_char *src0, t_char c) const; t_char* concatenate(const t_char *src0, const t_char *src1) const; t_int compare(const t_char *src0, const t_char *src1) const; }; CString operator+ (const t_char *p, const CString& s); 

As an inevitability, for an application, a little more than “Hello, world”, this is a log and a configuration file. In the method of writing a message to the log, a mutex was involved, since each task as it processes the table writes about it to the log. Small granular locks and lockfree algorithms were not considered due to the fact that writing to the log is far from a bottleneck in the operation of the application:
log.h
 class CLog : public CSingleton<CLog> { public: enum MessageType { Information, Warning, Error }; CLog(); virtual ~CLog(); void information(const CString& message); void warning(const CString& message); void error(const CString& message); private: std::mutex m_mutex; CFileStream m_stream; void writeTimestamp(); void writeHeader(); void writeFooter(); void writeMessage(MessageType type, const CString& message); }; 

log.cpp
 CLog::CLog() { m_stream.open("log.txt", std::ios_base::out); writeHeader(); } CLog::~CLog() { writeFooter(); m_stream.flush(); m_stream.close(); } void CLog::information(const CString& message) { writeMessage(Information, message); } void CLog::warning(const CString& message) { writeMessage(Warning, message); } void CLog::error(const CString& message) { writeMessage(Error, message); } void CLog::writeTimestamp() { time_t rawtime; tm *timeinfo; t_char buffer[32]; time(&rawtime); timeinfo = localtime(&rawtime); strftime(buffer, 32, "%Y/%m/%d %H:%M:%S", timeinfo); m_stream << buffer << " "; } void CLog::writeHeader() { writeMessage(Information, "Log started"); } void CLog::writeFooter() { writeMessage(Information, "Log ended"); } void CLog::writeMessage(MessageType type, const CString& message) { std::lock_guard<std::mutex> guard(m_mutex); writeTimestamp(); switch (type) { case Information: { m_stream << "Information " << message.ptr(); break; } case Warning: { m_stream << "Warning " << message.ptr(); break; } case Error: { m_stream << "Error " << message.ptr(); break; } default: { break; } } m_stream << "\n"; m_stream.flush(); } 

config.h
 class CConfig : public CSingleton<CConfig> { public: CConfig(); virtual ~CConfig(); CString value(const CString& name, const CString& defvalue = "") const; private: CFileStream m_stream; CMap<CString, CString> m_values; }; 

config.cpp
 CConfig::CConfig() { m_stream.open("mysql2psql.conf", std::ios_base::in); if (m_stream.is_open()) { CString line; const t_uint buffer_size = 256; t_char buffer[buffer_size]; while (m_stream.getline(buffer, buffer_size)) { line = buffer; if (!line.isEmpty() && line.at(0) != '#') { t_int pos = line.search("="); CString name = line.substr(0, pos); CString value = line.substr(pos + 1); m_values.insert(std::pair<CString, CString>(name, value)); } } m_stream.close(); CLog::instance()->information("Config loaded"); } else { CLog::instance()->warning("Can't load config"); } } CConfig::~CConfig() { } CString CConfig::value(const CString& name, const CString& defvalue) const { CMap<CString, CString>::const_iterator iter = m_values.find(name); if (iter != m_values.end()) { return iter->second; } return defvalue; } 

mysql2psql.conf
 # MySQL connection mysql_host=localhost mysql_port=3306 mysql_database=mysqldb mysql_username=root mysql_password=rootpwd mysql_encoding=UTF8 # PostgreSQL connection psql_host=localhost psql_port=5432 psql_database=psqldb psql_username=postgres psql_password=postgrespwd psql_encoding=UTF8 # Migration # (!) Note: source_schema == mysql_database source_schema=mysqldb destination_schema=public tables=* use_insert=0 # Other settings threads=16 

Now, what about adding data to PostgreSQL. There are two options - to use INSERT requests, which on a large data array didn’t show themselves very well in terms of performance (features of the transaction mechanism), or via the COPY command , which allows you to continuously send chunks of data by sending a special marker (terminator symbol) at the end of the transfer. Another nuance is related to the type definition (fields in the table) in PostgreSQL. The documentation does not indicate (perhaps there was no reading between the lines of documentation), how can a human-readable type identifier be returned, so a match was made for oid ( almost unique identifier of each object in the database) and type:
')
 case 20: // int8 case 21: // int2 case 23: // int4 case 1005: // int2 case 1007: // int4 case 1016: // int8 case 700: // float4 case 701: // float8 case 1021: // float4 case 1022: // float8 case 1700: // numeric case 18: // char case 25: // text case 1002: // char case 1009: // text case 1015: // varchar case 1082: // date case 1182: // date case 1083: // time case 1114: // timestamp case 1115: // timestamp case 1183: // time case 1185: // timestamptz case 16: // bool case 1000: // bool 


Preparation and execution of tasks is as follows:

In each task, there are three static buffers of 50 MB each, in which data is prepared for the COPY command (screening special characters and concatenating field values):
code snippet with task preparation
 // create connection pool t_uint threads = CString::toUnsignedInt(CConfig::instance()->value("threads", "1")); CLog::instance()->information("Count of working threads: " + CString::fromNumber(threads)); if (!createConnectionPool(threads - 1)) { return false; } // create tasks CString destination_schema = CConfig::instance()->value("destination_schema"); t_uint range_begin = 0; t_uint range_end = 0; t_uint range = m_tables.size() / threads; for (t_uint i = 0, j = 0; i < m_tables.size() - range; i += range + 1, ++j) { range_begin = i; range_end = i + range; std::unique_ptr<CTask> task = std::unique_ptr<CTask>(new CMigrationTask(m_source_pool.at(j), m_destination_pool.at(j), destination_schema, m_tables, range_begin, range_end)); m_migration_tasks.push_back(std::move(task)); } range_begin = range_end + 1; range_end = m_tables.size() - 1; std::unique_ptr<CTask> task = std::unique_ptr<CTask>(new CMigrationTask(std::move(m_source), std::move(m_destination), destination_schema, m_tables, range_begin, range_end)); // executing tasks for (t_uint i = 0; i < m_migration_tasks.size(); ++i) { executeTask(m_migration_tasks.at(i).get()); } task->execute(); // wait for completion for (t_uint i = 0; i < m_migration_tasks.size(); ++i) { while (!m_migration_tasks.at(i)->isExecuted()) { } } 


snippet with preparation in the data task for COPY
 t_uint count = 0; t_char *value; CString copy_query = "COPY " + m_destination_schema + "." + table + " ( "; m_buffer[0] = '\0'; m_buffer_temp0[0] = '\0'; m_buffer_temp1[0] = '\0'; if (result->nextRecord()) { for (t_uint i = 0; i < result->columnCount(); ++i) { if (i != 0) { copy_query += ", "; CString::concatenatePtr("\t", m_buffer); } copy_query += result->columnName(i); if (!result->isColumnNull(i)) { value = result->columnValuePtr(i); CString::replacePtr(value, "\\", "\\\\", m_buffer_temp0); CString::replacePtr(m_buffer_temp0, "\b", "\\b", m_buffer_temp1); CString::replacePtr(m_buffer_temp1, "\f", "\\f", m_buffer_temp0); CString::replacePtr(m_buffer_temp0, "\n", "\\n", m_buffer_temp1); CString::replacePtr(m_buffer_temp1, "\r", "\\r", m_buffer_temp0); CString::replacePtr(m_buffer_temp0, "\t", "\\t", m_buffer_temp1); CString::replacePtr(m_buffer_temp1, "\v", "\\v", m_buffer_temp0); CString::concatenatePtr(m_buffer_temp0, m_buffer); } else { CString::concatenatePtr("\\N", m_buffer); } } copy_query += " ) FROM STDIN"; if (!m_destination_connection->copyOpen(copy_query)) { CLog::instance()->error("Can't execute query '" + copy_query + "', error: " + m_destination_connection->lastError()); return false; } CString::concatenatePtr("\n", m_buffer); if (!m_destination_connection->copyDataPtr(m_buffer)) { CLog::instance()->error("Can't copy data, error: " + m_destination_connection->lastError()); return false; } ++count; while (result->nextRecord()) { m_buffer[0] = '\0'; for (t_uint i = 0; i < result->columnCount(); ++i) { if (i != 0) { CString::concatenatePtr("\t", m_buffer); } if (!result->isColumnNull(i)) { value = result->columnValuePtr(i); CString::replacePtr(value, "\\", "\\\\", m_buffer_temp0); CString::replacePtr(m_buffer_temp0, "\b", "\\b", m_buffer_temp1); CString::replacePtr(m_buffer_temp1, "\f", "\\f", m_buffer_temp0); CString::replacePtr(m_buffer_temp0, "\n", "\\n", m_buffer_temp1); CString::replacePtr(m_buffer_temp1, "\r", "\\r", m_buffer_temp0); CString::replacePtr(m_buffer_temp0, "\t", "\\t", m_buffer_temp1); CString::replacePtr(m_buffer_temp1, "\v", "\\v", m_buffer_temp0); CString::concatenatePtr(m_buffer_temp0, m_buffer); } else { CString::concatenatePtr("\\N", m_buffer); } } CString::concatenatePtr("\n", m_buffer); if (!m_destination_connection->copyDataPtr(m_buffer)) { CLog::instance()->error("Can't copy data, error: " + m_destination_connection->lastError()); return false; } ++count; if (count % 250000 == 0) { CLog::instance()->information("Working task #" + CString::fromNumber(taskID()) + ":\t\ttable " + table + " processing, record count: " + CString::fromNumber(count)); } } } 



results

To transfer 2 GB of data to PostgreSQL, with WAL archiving enabled, it took about 10 minutes (16 streams were created).

What is worth thinking about


Source

Source code is available on github .

Source: https://habr.com/ru/post/175459/


All Articles