📜 ⬆️ ⬇️

Apache Ignite: in-memory distributed computing



Hi, Habr!

We continue to be interested in new solutions from Apache. We expect to release Holden Karau’s “High Performance Spark” book in May (book in layout), and Nia Nārhid’s book “Kafka: The Definitive Guide” in August (still in translation). Today we want to offer a brief introductory article on Apache Ignite and assess the scale of interest in the topic.
')
Enjoy reading!

Apache Ignite is a relatively new solution, however, its popularity is growing rapidly. It is difficult to attribute it to any particular sub-type of database engines, since the characteristics of Ignite are related to it with several tools. The main purpose of this tool is to store distributed data in RAM, as well as to store information in a “key-value” format. Ignite also has some common RDBMS functions, in particular, support for SQL queries and ACID transactions. But this does not mean that this solution is a typical database for working with transactions in SQL. Foreign key restrictions are not supported here, and transactions are only available at the “key-value” level. However, Apache Ignite seems like a very interesting solution.

Apache Ignite is easy to run as a node embedded in the Spring Boot application. The easiest way to achieve this is with the Spring Data Ignite library. Apache Ignite implements the Spring Data CrudRepository that supports basic CRUD operations, as well as providing access to the Apache Ignite SQL Grid grid using Spring Data unified interfaces. Although it provides data storage in disk storage with SQL support and ACID paradigm, we developed a solution for storing RAM cache objects in MySQL database. The architecture of the proposed solution is shown in the figure below - as you can see, it is very simple. The application places the data in the cache memory, arranged in Apache Ignite. Apache Ignite automatically synchronizes these changes with the database during an asynchronous background task. The method of reading data in this application should not surprise you either. If the entity is not cached, then it is read from the database and placed in the cache for the future.



Here I will describe in detail how this kind of application is developed. The result is posted on GitHub. I found some more examples on the Internet, but only the basics are affected. I will show you how to configure Apache Ignite to write objects from the cache to the database, as well as how to create more complex join requests using multiple caches. Start by starting the database.

1. Configure the MySQL database


To run the MySQL database locally, it is best to use the Docker container, of course. The MySQL for Docker database for Windows is currently available at 192.168.99.100:33306.

  docker run -d --name mysql -e MYSQL_DATABASE=ignite -e MYSQL_USER=ignite -e MYSQL_PASSWORD=ignite123 -e MYSQL_ALLOW_EMPTY_PASSWORD=yes -p 33306:3306 mysql 

Next, create the tables used by the application storage entities: PERSON , CONTACT . They refer to tables as 1 ... N, where the CONTACT table contains a foreign key pointing to the PERSON id .

  CREATE TABLE `person` ( `id` int(11) NOT NULL, `first_name` varchar(45) DEFAULT NULL, `last_name` varchar(45) DEFAULT NULL, `gender` varchar(10) DEFAULT NULL, `country` varchar(10) DEFAULT NULL, `city` varchar(20) DEFAULT NULL, `address` varchar(45) DEFAULT NULL, `birth_date` date DEFAULT NULL, PRIMARY KEY (`id`) ); CREATE TABLE `contact` ( `id` int(11) NOT NULL, `location` varchar(45) DEFAULT NULL, `contact_type` varchar(10) DEFAULT NULL, `person_id` int(11) NOT NULL, PRIMARY KEY (`id`) ); ALTER TABLE `ignite`.`contact` ADD INDEX `person_fk_idx` (`person_id` ASC); ALTER TABLE `ignite`.`contact` ADD CONSTRAINT `person_fk` FOREIGN KEY (`person_id`) REFERENCES `ignite`.`person` (`id`) ON DELETE CASCADE ON UPDATE CASCADE; 

2. Configure Maven


To get started with the Spring Data repository for Apache Ignite, the easiest way is to add the following Maven dependency to the pom.xml file of our application. All other Ignite dependencies will be included automatically. We also need the MySQL JDBC driver and the Spring JDBC dependencies to configure the database connection. They are necessary because we embed Apache Ignite into the application, and you need to connect to the MySQL database so that you can synchronize the cache with the database tables.

 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.ignite</groupId> <artifactId>ignite-spring-data</artifactId> <version>${ignite.version}</version> </dependency> 

3. Configure the Ignite node


The IgniteConfiguration class allows you to configure all available Ignite node settings. In this case, the most important cache configuration (1). You should add master key and entity classes as indexed types (2). Next, you need to provide for the export of cache updates to the database (3) and read from the database the information that is not in the cache (4). The interaction between the Ignite node and MySQL can be configured using the CacheJdbcPojoStoreFactory (5) class. There you need to pass the DataSource @Bean (6), the dialect (7) and the correspondence between the fields of the object and the columns of the table (8).

 @Bean public Ignite igniteInstance() { IgniteConfiguration cfg = new IgniteConfiguration(); cfg.setIgniteInstanceName("ignite-1"); cfg.setPeerClassLoadingEnabled(true); CacheConfiguration<Long, Contact> ccfg2 = new CacheConfiguration<>("ContactCache"); // (1) ccfg2.setIndexedTypes(Long.class, Contact.class); // (2) ccfg2.setWriteBehindEnabled(true); ccfg2.setWriteThrough(true); // (3) ccfg2.setReadThrough(true); // (4) CacheJdbcPojoStoreFactory<Long, Contact> f2 = new CacheJdbcPojoStoreFactory<>(); // (5) f2.setDataSource(datasource); // (6) f2.setDialect(new MySQLDialect()); // (7) JdbcType jdbcContactType = new JdbcType(); // (8) jdbcContactType.setCacheName("ContactCache"); jdbcContactType.setKeyType(Long.class); jdbcContactType.setValueType(Contact.class); jdbcContactType.setDatabaseTable("contact"); jdbcContactType.setDatabaseSchema("ignite"); jdbcContactType.setKeyFields(new JdbcTypeField(Types.INTEGER, "id", Long.class, "id")); jdbcContactType.setValueFields(new JdbcTypeField(Types.VARCHAR, "contact_type", ContactType.class, "type"), new JdbcTypeField(Types.VARCHAR, "location", String.class, "location"), new JdbcTypeField(Types.INTEGER, "person_id", Long.class, "personId")); f2.setTypes(jdbcContactType); ccfg2.setCacheStoreFactory(f2); CacheConfiguration<Long, Person> ccfg = new CacheConfiguration<>("PersonCache"); ccfg.setIndexedTypes(Long.class, Person.class); ccfg.setWriteBehindEnabled(true); ccfg.setReadThrough(true); ccfg.setWriteThrough(true); CacheJdbcPojoStoreFactory<Long, Person> f = new CacheJdbcPojoStoreFactory<>(); f.setDataSource(datasource); f.setDialect(new MySQLDialect()); JdbcType jdbcType = new JdbcType(); jdbcType.setCacheName("PersonCache"); jdbcType.setKeyType(Long.class); jdbcType.setValueType(Person.class); jdbcType.setDatabaseTable("person"); jdbcType.setDatabaseSchema("ignite"); jdbcType.setKeyFields(new JdbcTypeField(Types.INTEGER, "id", Long.class, "id")); jdbcType.setValueFields(new JdbcTypeField(Types.VARCHAR, "first_name", String.class, "firstName"), new JdbcTypeField(Types.VARCHAR, "last_name", String.class, "lastName"), new JdbcTypeField(Types.VARCHAR, "gender", Gender.class, "gender"), new JdbcTypeField(Types.VARCHAR, "country", String.class, "country"), new JdbcTypeField(Types.VARCHAR, "city", String.class, "city"), new JdbcTypeField(Types.VARCHAR, "address", String.class, "address"), new JdbcTypeField(Types.DATE, "birth_date", Date.class, "birthDate")); f.setTypes(jdbcType); ccfg.setCacheStoreFactory(f); cfg.setCacheConfiguration(ccfg, ccfg2); return Ignition.start(cfg); } 

Here is the Spring data source configuration for MySQL as a Docker container.

spring:
datasource:
name: mysqlds
url: jdbc:mysql://192.168.99.100:33306/ignite?useSSL=false
username: ignite
password: ignite123


It should be noted here that Apache Ignite is not without some drawbacks. For example, it maps Enum to an integer and takes its ordinal value, although it configures VARCHAR as a JDCB type. When such a row is read from the database, it is incorrectly displayed on Enum in the object — in this response field you will get null .

  new JdbcTypeField(Types.VARCHAR, "contact_type", ContactType.class, "type") 

4. Objects of the model


As mentioned above, there are two tables in our database schema. There are still two model classes and two cache configurations, one for each model class. Below is the implementation of the model class. One of the most interesting things worth noting here is ID generation using the AtomicLong class. This is one of the basic components of Ignite, which serves as a sequence generator. We also see the specific @QuerySqlField annotation; if it accompanies a field, this means that this field can be used in SQL as a query parameter.

  @QueryGroupIndex.List( @QueryGroupIndex(name="idx1") ) public class Person implements Serializable { private static final long serialVersionUID = -1271194616130404625L; private static final AtomicLong ID_GEN = new AtomicLong(); @QuerySqlField(index = true) private Long id; @QuerySqlField(index = true) @QuerySqlField.Group(name = "idx1", order = 0) private String firstName; @QuerySqlField(index = true) @QuerySqlField.Group(name = "idx1", order = 1) private String lastName; private Gender gender; private Date birthDate; private String country; private String city; private String address; private List<Contact> contacts = new ArrayList<>(); public void init() { this.id = ID_GEN.incrementAndGet(); } public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getFirstName() { return firstName; } public void setFirstName(String firstName) { this.firstName = firstName; } public String getLastName() { return lastName; } public void setLastName(String lastName) { this.lastName = lastName; } public Gender getGender() { return gender; } public void setGender(Gender gender) { this.gender = gender; } public Date getBirthDate() { return birthDate; } public void setBirthDate(Date birthDate) { this.birthDate = birthDate; } public String getCountry() { return country; } public void setCountry(String country) { this.country = country; } public String getCity() { return city; } public void setCity(String city) { this.city = city; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } public List<Contact> getContacts() { return contacts; } public void setContacts(List<Contact> contacts) { this.contacts = contacts; } } 

5. Ignite repositories


I guess you know how repositories are created in Spring Data JPA. Repository processing must be provided in the main class or @Configuration .

 @SpringBootApplication @EnableIgniteRepositories public class IgniteRestApplication { @Autowired DataSource datasource; public static void main(String[] args) { SpringApplication.run(IgniteRestApplication.class, args); } // ... } 

Then we extend our @Repository interface @Repository base CrudRepository interface. It supports only inherited methods with the id parameter. In the PersonRepository snippet PersonRepository I defined several search methods using the Spring Data naming conventions and Ignite queries. These examples demonstrate that you can return in the query results either a full object or selected fields from it, depending on what we need.

 @RepositoryConfig(cacheName = "PersonCache") public interface PersonRepository extends IgniteRepository<Person, Long> { List<Person> findByFirstNameAndLastName(String firstName, String lastName); @Query("SELECT c.* FROM Person p JOIN \"ContactCache\".Contact c ON p.id=c.personId WHERE p.firstName=? and p.lastName=?") List<Contact> selectContacts(String firstName, String lastName); @Query("SELECT p.id, p.firstName, p.lastName, c.id, c.type, c.location FROM Person p JOIN \"ContactCache\".Contact c ON p.id=c.personId WHERE p.firstName=? and p.lastName=?") List<List<?>> selectContacts2(String firstName, String lastName); } 

6. API and Testing


Now you can embed the repository components in the classes of REST controllers. The API will provide methods for adding new objects to the cache, updating or deleting existing objects, and also for searching by the primary key or by other, more complex indexes.

 @RestController @RequestMapping("/person") public class PersonController { private static final Logger LOGGER = LoggerFactory.getLogger(PersonController.class); @Autowired PersonRepository repository; @PostMapping public Person add(@RequestBody Person person) { person.init(); return repository.save(person.getId(), person); } @PutMapping public Person update(@RequestBody Person person) { return repository.save(person.getId(), person); } @DeleteMapping("/{id}") public void delete(Long id) { repository.delete(id); } @GetMapping("/{id}") public Person findById(@PathVariable("id") Long id) { return repository.findOne(id); } @GetMapping("/{firstName}/{lastName}") public List<Person> findByName(@PathVariable("firstName") String firstName, @PathVariable("lastName") String lastName) { return repository.findByFirstNameAndLastName(firstName, lastName); } @GetMapping("/contacts/{firstName}/{lastName}") public List<Person> findByNameWithContacts(@PathVariable("firstName") String firstName, @PathVariable("lastName") String lastName) { List<Person> persons = repository.findByFirstNameAndLastName(firstName, lastName); List<Contact> contacts = repository.selectContacts(firstName, lastName); persons.stream().forEach(it -> it.setContacts(contacts.stream().filter(c -> c.getPersonId().equals(it.getId())).collect(Collectors.toList()))); LOGGER.info("PersonController.findByIdWithContacts: {}", contacts); return persons; } @GetMapping("/contacts2/{firstName}/{lastName}") public List<Person> findByNameWithContacts2(@PathVariable("firstName") String firstName, @PathVariable("lastName") String lastName) { List<List<?>> result = repository.selectContacts2(firstName, lastName); List<Person> persons = new ArrayList<>(); for (List<?> l : result) { persons.add(mapPerson(l)); } LOGGER.info("PersonController.findByIdWithContacts: {}", result); return persons; } private Person mapPerson(List<?> l) { Person p = new Person(); Contact c = new Contact(); p.setId((Long) l.get(0)); p.setFirstName((String) l.get(1)); p.setLastName((String) l.get(2)); c.setId((Long) l.get(3)); c.setType((ContactType) l.get(4)); c.setLocation((String) l.get(4)); p.addContact(c); return p; } } 

Of course, it is important to check the performance of the created solution, especially when it is associated with the storage of distributed data in RAM and with databases. To do this, I wrote several junit tests that put a large number of objects in the cache, and then calling the search methods (random data is used for input) - this is how the query performance is checked. Here is a method that generates many Person and Contact objects and caches them using the endpoint API.

 @Test public void testAddPerson() throws InterruptedException { ExecutorService es = Executors.newCachedThreadPool(); for (int j = 0; j < 10; j++) { es.execute(() -> { TestRestTemplate restTemplateLocal = new TestRestTemplate(); Random r = new Random(); for (int i = 0; i < 1000000; i++) { Person p = restTemplateLocal.postForObject("http://localhost:8090/person", createTestPerson(), Person.class); int x = r.nextInt(6); for (int k = 0; k < x; k++) { restTemplateLocal.postForObject("http://localhost:8090/contact", createTestContact(p.getId()), Contact.class); } } }); } es.shutdown(); es.awaitTermination(60, TimeUnit.MINUTES); } 

Spring Boot provides methods for capturing key features that allow you to judge the speed of the API response. To activate this feature, you need to enable Spring Actuator dependencies. The Metrics endpoint is available at localhost : 8090 / metrics. It not only shows how much time each API method spends on its work, but also displays statistics on such indicators as the number of active threads or free memory.

7. Run the application


Now let's run the application we got, which has the Apache Ignite node embedded in it. I took into account the performance tips that are contained in the Ignite documentation and determined the JVM configuration shown below.

  java -jar -Xms512m -Xmx1024m -XX:MaxDirectMemorySize=256m -XX:+DisableExplicitGC -XX:+UseG1GC target/ignite-rest-service-1.0-SNAPSHOT.jar 

You can now run the test class JUnit IgniteRestControllerTest . It caches a certain amount of data, and then calls the search methods. Parameters for tests are given, where 1M Person objects and 2.5M Contact objects are used in the cache. Each search method is performed on average over 1 ms.

  { "mem": 624886, "mem.free": 389701, "processors": 4, "instance.uptime": 2446038, "uptime": 2466661, "systemload.average": -1, "heap.committed": 524288, "heap.init": 524288, "heap.used": 133756, "heap": 1048576, "threads.peak": 107, "threads.daemon": 25, "threads.totalStarted": 565, "threads": 80, ... "gauge.response.person.contacts.firstName.lastName": 1, "gauge.response.contact": 1, "gauge.response.person.firstName.lastName": 1, "gauge.response.contact.location.location": 1, "gauge.response.person.id": 1, "gauge.response.person": 0, "counter.status.200.person.id": 1000, "counter.status.200.person.contacts.firstName.lastName": 1000, "counter.status.200.person.firstName.lastName": 1000, "counter.status.200.contact": 2500806, "counter.status.200.person": 1000000, "counter.status.200.contact.location.location": 1000 } 

Source: https://habr.com/ru/post/352070/


All Articles