Now no one is surprised by multithreaded applications, but I think that in this article you can find some interesting ideas. My study of Java began with this project, so maybe in some places I will be very wrong or build a big bike, but I hope that someone will be interested in the experience of a beginner in java. I will give several features of the application:
Under the cat will be considered the application architecture, as well as the main problems encountered and their solution.
Communication with the application occurs via the Web UI, but in the future it will be possible to add the REST API if necessary.
The application can:
Currently supported storage:
Currently supported databases:
From the special application I can note:
Below are the screenshots of Web UI, clearly describing the capabilities of the application.
The main work will take place in 3 services - DatabaseBackup , Processor , Storage , and we will link them together using the concept of tasks . All this further.
This service is responsible for creating and restoring plain-text backup.
Service Interface:
public interface DatabaseBackup { InputStream createBackup(DatabaseSettings databaseSettings, Integer id); void restoreBackup(InputStream in, DatabaseSettings databaseSettings, Integer id); }
Both interface methods operate on InputStream instances, since we need to ensure that the entire backup is not loaded into memory, which means that the backup should be read / written in the streaming mode. The DatabaseSettings entity is pre-created from the Web UI and stores various settings needed to access the database. What is this parameter - id
- will be explained a little further.
Service requirements are as follows:
restoreBackup()
method must restore the backup in a single transaction so that in case of an error, do not leave the database in an inconsistent state.Specifically, the implementation for PostgreSQL service is implemented as follows:
createBackup()
: creates a pg_dump process that will create a backup and write it to the standard output stream. The standard output stream of the process is returned from the method (see https://docs.oracle.com/javase/8/docs/api/java/lang/Process.html#getInputStream-- ). I / O streams in the system are based on a buffer of a certain size, and when a process writes to the output stream, it actually writes to the buffer in memory. The most important thing here is that the process thread will not write to the filled buffer until the latter is read by the other side, which means the thread will be in a locked state and the backup will not be loaded completely into memory. You may have come across a situation where your Java program caught deadlock when working with processes because you did not read the stdout or stderr process. It is extremely important to keep track of this, because the process cannot continue to work if it is blocked on the blocking I / O call while writing to the full buffer and no one reads this buffer.restoreBackup()
: the psql process is created, the backup is read from the InputStream passed to the method and simultaneously written to the standard input stream psql (see https://docs.oracle.com/javase/8/docs/api/java/lang/Process. html # getOutputStream-- ). This works because plain-text PostgreSQL backup is just a set of DDL and DML commands that are easily understood by psql.Itβs a lot of code, so Iβm not going to give it here, but you can look at GitHub at the link at the end of the article.
This service is responsible for the use of processors and reverse deprocessing backups. Processors are used before being loaded into storage or after being unloaded from storage. Sample processor: compressor, encryption.
Service Interface:
public interface Processor { InputStream process(InputStream in); InputStream deprocess(InputStream in); ProcessorType getType(); // ProcessorType - Enum, int getPrecedence(); // }
Each processor has a priority - if multiple processors are specified, they will be applied in descending order of their priority. Using the inverse function in the same order in which the processors were used, we get the original backup.
This service is responsible for loading and unloading the backup, as well as its removal from the repository. Storage example: Dropbox, local file system.
Service Interface:
public interface Storage { void uploadBackup(InputStream in, StorageSettings storageSettings, String backupName, Integer id); InputStream downloadBackup(StorageSettings storageSettings, String backupName, Integer id); void deleteBackup(StorageSettings storageSettings, String backupName, Integer id); }
Each created backup is associated with a unique name - so we can find it on any of the repositories to which it was loaded. The way the backup is presented to the repository is a matter of service implementation only, but when transferring the name of the backup to one of the functions, we must expect the correct behavior. The StorageSettings entity is pre-created from the Web UI and stores the necessary settings for accessing the storage.
We would like to be able to track the status of our tasks, handle possible errors depending on the progress of the task, as well as cancel tasks. Therefore, we will continue to operate only with tasks. Each task will be represented in the database as a record in the table, and programmatically as an instance of the Future (see Java Future ). Each entry in the table is associated with its own Future (and, if several servers are running, Future instances can be stored in the memory of different servers).
Let's go in sequence. First of all, we need a service for running tasks - creating, restoring and deleting backups.
Creating backup:
public Task startBackupTask(@NotNull Task.RunType runType, @NotNull List<String> storageSettingsNameList, @Nullable List<ProcessorType> processors, @NotNull DatabaseSettings databaseSettings) { Objects.requireNonNull(runType); Objects.requireNonNull(storageSettingsNameList); Objects.requireNonNull(processors); Objects.requireNonNull(databaseSettings); BackupProperties backupProperties = backupPropertiesManager.initNewBackupProperties(storageSettingsNameList, processors, databaseSettings.getName()); Task task = tasksManager.initNewTask(Task.Type.CREATE_BACKUP, runType, backupProperties.getId()); Integer taskId = task.getId(); Future future = tasksStarterExecutorService.submit(() -> { tasksManager.updateTaskState(taskId, Task.State.CREATING); logger.info("Creating backup..."); try (InputStream backupStream = databaseBackupManager.createBackup(databaseSettings, taskId)) { if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.APPLYING_PROCESSORS); logger.info("Applying processors on created backup. Processors: {}", processors); try (InputStream processedBackupStream = backupProcessorManager.process(backupStream, processors)) { if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.UPLOADING); logger.info("Uploading backup..."); backupLoadManager.uploadBackup(processedBackupStream, backupProperties, taskId); if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.COMPLETED); logger.info("Creating backup completed. Backup properties: {}", backupProperties); } } catch (IOException ex) { logger.error("Error occurred while closing input stream of created backup", ex); } catch (RuntimeException ex) { logger.error("Error occurred while creating backup. Backup properties: {}", backupProperties, ex); errorTasksManager.addErrorTask(taskId); } catch (InterruptedException ex) { tasksManager.setInterrupted(taskId); logger.error("Backup creating task was interrupted. Task ID: {}", taskId); } finally { futures.remove(taskId); } }); futures.put(taskId, future); return task; }
Creating a backup goes through 3 main steps in the following order: creating a backup -> using processors -> loading to the storage. Practically in all methods of services we forward the current task ID so that the service can report an error from a thread that works in the background. Error handling, what InterruptedException is for and what happens with an error after receiving a RuntimeException will be described later.
And here is how we will run the backup creation task:
tasksStarterService.startBackupTask(Task.RunType.USER, storageSettingsNameList, processors, databaseSettings);
The first parameter we pass to the initiator of the task: the user or the internal task of the server (an example of an internal task is a periodic backup). Knowledge of the task initiator allows us to show only those tasks that were started by the user in the Web UI. The remaining parameters are necessary for creating a backup directly - a list of storages, processors to use, the database, the dump of which must be created.
When creating a backup, an entry is also created in the database called BackupProperties . This entity will store such backup properties as the name, applied processors and the list of storages to which the backup has been loaded. Further, to restore or delete the backup, we will operate with this entity.
The task in the database is stored as follows:
@Entity @Table(name = "backup_tasks") public class Task { /** * Identifier of each backup task. Identifier is generated by PostgreSQL database after saving of entity. */ @Id @Column(insertable = false, updatable = false) @GeneratedValue(strategy = GenerationType.IDENTITY) private Integer id; /** * Backup task type. * <p> * Type is set at the very start of any task and can't be changed. * * @see Type */ @Enumerated(EnumType.STRING) @Column(updatable = false) private Type type; /** * Who initiated a task: user or server. * <p> * We need to know it to show on front only these tasks that was started by user. * * @see RunType */ @Enumerated(EnumType.STRING) @Column(updatable = false) private RunType runType; /** * Backup task state. * <p> * State is updated with every new step in task being executed. * * @see Task.State */ @Enumerated(EnumType.STRING) private State state; /** * Whether task has been interrupted or not. * <p> * Default is {@literal false}. */ @Column(insertable = false) private boolean interrupted; /** * Identifier of {@link BackupProperties}. * <p> * We need to know backup ID to be able to handle occurred errors. */ @Column(updatable = false) private Integer backupPropertiesId; /** * Start time of the task. */ @Column(updatable = false) private LocalDateTime date; public enum RunType { USER, INTERNAL } public enum State { PLANNED, CREATING, RESTORING, DELETING, APPLYING_PROCESSORS, APPLYING_DEPROCESSORS, DOWNLOADING, UPLOADING, COMPLETED, } public enum Type { CREATE_BACKUP { @Override public String toString() { return "CREATE BACKUP"; } }, RESTORE_BACKUP { @Override public String toString() { return "RESTORE BACKUP"; } }, DELETE_BACKUP { @Override public String toString() { return "DELETE BACKUP"; } } } // getters & setters... }
Thus, the process of creating a backup in the form of a diagram can be described as follows:
The remaining types of tasks are run by analogy. In order not to clutter the article with a huge amount of code, for the curious, Iβll give the code to start the tasks of restoring and deleting the backup separately in the spoiler.
public Task startRestoreTask(@NotNull Task.RunType runType, @NotNull BackupProperties backupProperties, @NotNull String storageSettingsName, @NotNull DatabaseSettings databaseSettings) { Objects.requireNonNull(runType); Objects.requireNonNull(backupProperties); Objects.requireNonNull(storageSettingsName); Objects.requireNonNull(databaseSettings); Task task = tasksManager.initNewTask(Task.Type.RESTORE_BACKUP, runType, backupProperties.getId()); Integer taskId = task.getId(); Future future = tasksStarterExecutorService.submit(() -> { tasksManager.updateTaskState(taskId, Task.State.DOWNLOADING); logger.info("Downloading backup..."); try (InputStream downloadedBackup = backupLoadManager.downloadBackup(backupProperties.getBackupName(), storageSettingsName, taskId)) { if (Thread.interrupted() || downloadedBackup == null) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.APPLYING_DEPROCESSORS); logger.info("Deprocessing backup..."); try (InputStream deprocessedBackup = backupProcessorManager.deprocess(downloadedBackup, backupProperties.getProcessors())) { if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.RESTORING); logger.info("Restoring backup..."); databaseBackupManager.restoreBackup(deprocessedBackup, databaseSettings, taskId); if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.COMPLETED); logger.info("Restoring backup completed. Backup properties: {}", backupProperties); } } catch (IOException ex) { logger.error("Error occurred while closing input stream of downloaded backup", ex); } catch (RuntimeException ex) { logger.info("Error occurred while restoring backup. Backup properties: {}", backupProperties, ex); errorTasksManager.addErrorTask(taskId); } catch (InterruptedException ex) { tasksManager.setInterrupted(taskId); logger.error("Task was interrupted. Task ID: {}", taskId); } finally { futures.remove(taskId); } }); futures.put(taskId, future); return task; }
Restoring a backup goes through 3 main steps in the following order: unloading a backup from storage -> using deprocessors to get the original plain-text backup -> restoring a backup.
Start the recovery as follows:
tasksStarterService.startRestoreTask(Task.RunType.USER, backupProperties, storageSettingsName, databaseSettings);
The process of restoring a backup in the form of a diagram:
public Task startDeleteTask(@NotNull Task.RunType runType, @NotNull BackupProperties backupProperties) { Objects.requireNonNull(runType); Objects.requireNonNull(backupProperties); Task task = tasksManager.initNewTask(Task.Type.DELETE_BACKUP, runType, backupProperties.getId()); Integer taskId = task.getId(); Future future = tasksStarterExecutorService.submit(() -> { try { logger.info("Deleting backup started. Backup properties: {}", backupProperties); tasksManager.updateTaskState(taskId, Task.State.DELETING); backupLoadManager.deleteBackup(backupProperties, taskId); if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.COMPLETED); logger.info("Deleting backup completed. Backup properties: {}", backupProperties); } catch (RuntimeException ex) { logger.error("Error occurred while deleting backup. Backup properties: {}", backupProperties, ex); errorTasksManager.addErrorTask(taskId); } catch (InterruptedException ex) { tasksManager.setInterrupted(taskId); logger.error("Task was interrupted. Task ID: {}", taskId); } finally { futures.remove(taskId); } }); futures.put(taskId, future); return task; }
The backup removal process is quite simple: the backup is simply deleted from all the repositories to which it was loaded.
To start the deletion as follows:
tasksStarterService.startDeleteTask(Task.RunType.USER, backupProperties);
The process of deleting a backup in the form of a diagram:
What is task cancellation? Of course, this is nothing more than the interruption of a thread. You could see that all the main code running in Future is wrapped in the following try-catch construct:
try { ... } catch (InterruptedException ex) { ... tasksManager.setInterrupted(taskId); }
And also after each important method, the flow of which we control, the following construction is established:
if (Thread.interrupted()) { throw new InterruptedException(); }
Before you go further, you should provide a brief theory about interruptions and thread states of the JVM.
Threads in a JVM can have the following states:
We are only interested in the Waiting and Timed waiting states. The thread is transferred to the Waiting state by the methods Object.wait()
, Thread.join()
and others. A thread is transferred to the Timed waiting state (i.e., a wait that lasts a certain period of time) using Object.wait(timeout)
, Thread.join(timeout)
, Thread.sleep(sleeping)
and others.
The most important thing here is that if you interrupt a thread before entering the Waiting or Timed waiting state , or when the thread is in this state , the thread will wake up by throwing the InterruptedException exception.
But that is not all. Itβs not at all a fact that a thread will ever go into these states by creating, restoring or deleting a backup. How, then, tell the thread that he was interrupted?
The first way is to self-check the interrupt flag with a thread using Thread.interrupted Thread.interrupted()
or Thread.currentThread.isInterrupted()
methods. The difference between them is that the first one calls the private native method currentThread.isInterrupted(boolean ClearInterrupted)
, passing true
to it, indicating that the interrupt flag will be cleared, and the second passing false
, leaving the interrupt flag intact. The choice between these two methods depends solely on the situation. When InterruptedException is thrown, the interrupt flag is also cleared - this is worth remembering.
But there must be a way easier - and there is. In the application there is a huge amount of work with I / O streams, and therefore with I / O methods. Our task is to make sure that when calling the read()
or write(int b)
methods on the I / O stream, any error was thrown during an interrupt, indicating that the blocking I / O call was interrupted. Fortunately, Java has such an exception - InterruptedIOException . However, not all read / write methods of streams follow thread interrupts, and only PipedInputStream keeps track of this . Therefore, in those places where this stream is not involved, we must extend the read / write method so that when there is an interrupt, an InterruptedIOException is thrown. In fact, in the application, I only had the extension of the read () method in only one place β when the InputStream returns from the backup unload method. In this way, we can learn about the origin of the interruption without placing pattern checks on the flag everywhere. However, it is important to catch this exception separately from an IOException and handle it separately. Of course, one can not do without the help of the template checking the flag in some places, but it has already become better.
It is also important to note that if the flag was cleared when processing an interrupt, it is always necessary to set the interrupt flag again so that after returning from the method we can learn about the interruption that occurred.
Let me explain with an example why this is important. Suppose we load the backup to the storage in the upload () method and the interrupt occurs. The interrupt is processed, the operation stops, and the method returns. The interruption does not happen spontaneously - it means that either an error occurred somewhere, or the user canceled the task. Regardless of the reason, we must stop all work in this future. But if we do not set the interrupt flag again before returning from the load method, we will never know about the interruption in the main Future block.
The same example code:
backupLoadManager.uploadBackup(processedBackupStream, backupProperties, taskId); <- , if (Thread.interrupted()) { // , - , throw new InterruptedException(); }
Thus, it is a good practice to handle InterruptedException or InterruptedIOException as follows:
try { ... } catch (InterruptedException e) { // InterruptedIOException ... // re-interrupt the thread Thread.currentThread().interrupt(); }
Well, we can handle an interrupt, but who will actually interrupt threads?
To do this, we will create another entity called CancelTask , which will store the ID of the task for cancellation, and also write a text that will try to interrupt the task. Why precisely try? Because:
I will briefly describe the cancellation algorithm in the night:
The manager gets all the entries from the table cancel_tasks (no lock is set at this), passes through each one and tries to get the corresponding Future from its memory. If the Future is successfully received - the corresponding thread is interrupted, the task is revert and the query is deleted from the table. If the timeout request for canceling a task is exceeded (which means that the server crashed and Future were lost), the request is simply deleted from the table. If several servers notice a timeout and delete the record from the table, nothing terrible will happen, because the deletion in PostgreSQL is idempotent.
CancelTasksWatcher Code:
/** * This class scans for tasks to cancel and tries to cancel them. */ @Component class CancelTasksWatcher { private static final Logger logger = LoggerFactory.getLogger(CancelTasksWatcher.class); private static final Duration cancelTimeout = Duration.ofMinutes(10); private CancelTasksManager cancelTasksManager; private TasksStarterService tasksStarterService; private TasksManager tasksManager; // spring setters... /** * This watcher wakes up every time 10 seconds passed from the last completion, checks if there are any tasks to cancel and tries to * cancel each task. * <p> * Since there are can be working more that one instance of the program, {@literal Future} instance of task can belong to different * servers. We can't get access to {@literal Future} if it's not in memory of the server where task cancellation request was accepted. * So the purpose of this watcher is to be able cancel tasks that works in the other instance of program. Each server has this watcher * checking for available cancellation requests and if any, the watcher tries to cancel corresponding {@literal Future}. * If cancellation is successful task will be also reverted. * <p> * If task cancellation request timeout exceeded, then it means a server that had requested {@literal Future} instances has been * shutdown, so all {@literal Future} instances lost and task can't be canceled. In such case task cancellation request will be ignored. * * @see TasksStarterService#getFuture(Integer) * @see TasksManager#revertTask(Task) */ @Scheduled(fixedDelay = 10 * 1000) @Transactional(isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW) public void watchTasksToCancel() { Iterable<CancelTask> cancelTasks = cancelTasksManager.findAll(); Iterable<Task> tasks = tasksManager.findAllById(StreamSupport.stream(cancelTasks.spliterator(), false) .map(CancelTask::getTaskId).collect(Collectors.toList())); Map<Integer, Task> tasksAsMap = StreamSupport.stream(tasks.spliterator(), false) .collect(Collectors.toMap(Task::getId, Function.identity())); List<Integer> taskIdsForDeleting = new ArrayList<>(); for (CancelTask cancelTask : cancelTasks) { Integer taskId = cancelTask.getTaskId(); Task task = tasksAsMap.get(taskId); if (task == null) { logger.error("Can't cancel task: no such entity with ID {}", taskId); taskIdsForDeleting.add(taskId); continue; } // timeout exceeded, that is server shutdown and lost all Future instances, so task can't be canceled if (LocalDateTime.now(ZoneOffset.UTC).isAfter(cancelTask.getPutTime().plus(cancelTimeout))) { logger.error("Can't cancel task: timeout exceed. Task ID: {}", taskId); taskIdsForDeleting.add(taskId); continue; } tasksStarterService.getFuture(taskId).ifPresent(future -> { logger.info("Canceling task with ID {}", taskId); boolean canceled = future.cancel(true); if (canceled) { try { // give time to properly handle interrupt Thread.sleep(10000); } catch (InterruptedException e) { // should not happen } tasksManager.revertTask(task); } taskIdsForDeleting.add(taskId); logger.info("Task canceled: {}. Task ID: {}", canceled, taskId); }); } cancelTasksManager.deleteByTaskIdIn(taskIdsForDeleting); } }
, , Future, try-catch :
try { ... } catch (RuntimeException e) { ... errorTasksManager.addErrorTask(taskId); }
RuntimeException , Future , .
addErrorTask(taskId)
, ID , .
? , , , .
:
, , . β PostgreSQL select for update
, select skip locked
. , , revertTask()
, .
ErrorTasksWatcher :
/** * This class scans for erroneous tasks and handles them depending on their state. */ @Component class ErrorTasksWatcher { private static final Logger logger = LoggerFactory.getLogger(ErrorTasksWatcher.class); private static final Integer nRows = 10; private TasksManager tasksManager; private ErrorTasksManager errorTasksManager; // spring setters... /** * This watcher wakes up every time 1 minute passed from the last completion, checks backup states periodically and handles erroneous * tasks if any. * <p> * The watcher handles at most N tasks as described by {@link #nRows} constant and skips already locked tasks. * When retrieving error tasks from database pessimistic lock is set. It allows safely run more than one copy of program, as no other * watcher can pick up already being handled error tasks. * <p> * If the server shutdowns while rows was locked, transaction will be rolled back and lock released, so these entities can be picked * up by the other running server. */ @Scheduled(fixedDelay = 60 * 1000) @Transactional(isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW) public void watchErrorTasks() { for (ErrorTask errorTask : errorTasksManager.findFirstNAndLock(nRows)) { if (!errorTask.isErrorHandled()) { Integer backupTaskId = errorTask.getTaskId(); Optional<Task> optionalTask = tasksManager.findById(backupTaskId); if (!optionalTask.isPresent()) { logger.info("Can't handle erroneous task: no corresponding backup task entity. Backup task ID: {}", backupTaskId); continue; } tasksManager.revertTask(optionalTask.get()); errorTask.setErrorHandled(true); } } } }
revertTask(Task)
:
/** * This function reverts erroneous task by its entity. * <p> * Use this function only after canceling related {@literal Future}. * <p> * If the task was of the type {@link Task.Type#CREATE_BACKUP} then related {@link BackupProperties} will be deleted. * * @param task the entity */ public void revertTask(@NotNull Task task) { Objects.requireNonNull(task); Task.State state = task.getState(); switch (state) { case DOWNLOADING: case APPLYING_DEPROCESSORS: case RESTORING: case DELETING: { logger.info("Handling broken operation. Operation: {}. No extra actions required", state.toString()); break; } case CREATING: case APPLYING_PROCESSORS: { logger.info("Handling broken operation. Operation: {}. Delete backup properties...", state.toString()); Integer backupPropertiesID = task.getBackupPropertiesId(); if (!backupPropertiesManager.existsById(backupPropertiesID)) { logger.error("Can't revert task: no related backup properties. Task info: {}", task); return; } backupPropertiesManager.deleteById(backupPropertiesID); break; } case UPLOADING: { logger.info("Handling broken operation. Operation: {}. Deleting backup from storage...", state); Integer backupPropertiesId = task.getBackupPropertiesId(); Optional<BackupProperties> optionalBackupProperties = backupPropertiesManager.findById(backupPropertiesId); if (!optionalBackupProperties.isPresent()) { logger.error("Can't revert task: no related backup properties. Task info: {}", task); return; } tasksStarterService.startDeleteTask(Task.RunType.INTERNAL, optionalBackupProperties.get()); backupPropertiesManager.deleteById(backupPropertiesId); break; } default: { logger.error("Can't revert task: unknown state. Task info: {}", task); } } }
:
, . , ? , , Future ( 1), , InputStream ( 2). , 2, 1 2 ?
, , , . Future ( 1) :
public void onError(@NotNull Throwable t, @NotNull Integer taskId) { logger.error("Exception caught. Task ID: {}", taskId, t); Optional<Future> optionalFuture = tasksStarterService.getFuture(taskId); if (!optionalFuture.isPresent()) { logger.error("Can't cancel the Future of task with ID {}: no such Future instance", taskId); } else { boolean canceled = optionalFuture.get().cancel(true); if (!canceled) { logger.error("Error canceling the Future of task with ID {}", taskId); } else { logger.info("Task canceled. Task ID: {}", taskId); errorTasksManager.setError(taskId); } } }
, , ID , , Future - , ID .
, , , , , .
, :
, , , . β Future.
, , , I/O , β / . , . :
, β ( ID , , ), .
, , . , , .
:
, ! , GitHub!
Source: https://habr.com/ru/post/459478/
All Articles