📜 ⬆️ ⬇️

Apache Ignite - database cache

In Apache Ignite, you can define a named data cache for persistent storage, load data into it, and then perform various manipulations with them.

You can perform get / put operations as c Map, for example, in java, these operations are called read-through, read-through and read-through. Those. after the cache is loaded, when the get operation is executed, the object will be taken from it, and not from the database, but when written, it will change in the cache and be written to the data storage. If, when trying to take an object, it will not be in the cache, it will first be written to it. It is clear that read operations will already be from the memory of the cache and very quickly. All this is relevant for get / put operations. For other search operations, for example, to search for an object that does not have a key - Query, how to quickly find data in the cache ?, work with transactions, all this is below in the article ...

image

In the cache, the data is recorded by a key which may for example be the primary key from the database table. For my example, I took the Oracle XE database, by default Ignite provides the H2 database, but in real life I think I will still have to deal with other databases. So, we take an entity (table) of a database and prepare a class for it in Java (as a data source for an entity, there can be any set: view, function, etc., we can fully control here).
')
KLADR table as a cache entry
public class Kladr implements Serializable { @QuerySqlField(index = true) public Long id; @QuerySqlField public String code; @QuerySqlField public String name; @QuerySqlField public Timestamp upd_date; public Kladr(Long id, String code, String name, Timestamp upd_date) { this.code = code; this.name = name; this.id = id; this.upd_date = upd_date; } public Kladr() { // No-op. } @Override public String toString() { return id + "/"+ code + "/" + name + "/" + upd_date; } } 


Annotations show the fields that will participate in Query operations, as well as the index field.

Now we need to inherit from the class - CacheStoreAdapter and override its main methods:

  public class CacheKladrStore extends CacheStoreAdapter<Long, Kladr> { //     ,    get (...)  Ignite Cache. @Override public Kladr load(Long key) { //     ,    «put (...)»  Ignite Cache. @Override public void write(Cache.Entry<? extends Long, ? extends Kladr> entry) { //     ,    «remove (...)»  Ignite Cache. @Override public void delete(Object key) { //  mehtod   ,   «loadCache ()»  «localLoadCache ()» //      . @Override public void loadCache(IgniteBiInClosure<Long, Kladr> clo, Object... args) { 

It can be seen that the key will be the ID, and the element of the collection is the class Kladr (<Long, Kladr>)

It looks like this

CacheKladrStore
 public class CacheKladrStore extends CacheStoreAdapter<Long, Kladr> { //     ,    get (...)  Ignite Cache. @Override public Kladr load(Long key) { try (Connection conn = connection()) { try (PreparedStatement st = conn.prepareStatement( "select id, code, name, upd_date from KLADR where id=?")) { st.setLong(1, key); ResultSet rs = st.executeQuery(); return rs.next() ? new Kladr(rs.getLong(1), rs.getString(2), rs.getString(3), rs.getTimestamp(4) ) : null; } } catch (SQLException e) { throw new CacheLoaderException("Failed to load: " + key, e); } } //     ,    «put (...)»  Ignite Cache. @Override public void write(Cache.Entry<? extends Long, ? extends Kladr> entry) { Long key = entry.getKey(); Kladr val = entry.getValue(); try (Connection conn = connection()) { try (PreparedStatement stUpd = conn.prepareStatement( "update KLADR set upd_date = ? where id = ?")) { stUpd.setTimestamp(1, val.upd_date); stUpd.setLong(2, val.id); int updated = stUpd.executeUpdate(); if (updated == 0) { try (PreparedStatement stIns = conn.prepareStatement( "insert into KLADR (id, code, name, upd_date) values (?, ?, ?, ?)")) { stUpd.setLong(1, val.id); stUpd.setString(2, val.code); stUpd.setString(2, val.name); //... //stIns.executeUpdate(); } } } } catch (SQLException e) { throw new CacheWriterException("Failed to write [key=" + key + ", val=" + val + ']', e); } } //     ,    «remove (...)»  Ignite Cache. @Override public void delete(Object key) { try (Connection conn = connection()) { try (PreparedStatement st = conn.prepareStatement("delete from KLADR where id=?")) { st.setLong(1, (Long)key); st.executeUpdate(); } } catch (SQLException e) { throw new CacheWriterException("Failed to delete: " + key, e); } } //  mehtod   ,   «loadCache ()»  «localLoadCache ()» //      . @Override public void loadCache(IgniteBiInClosure<Long, Kladr> clo, Object... args) { if (args == null || args.length == 0 || args[0] == null) throw new CacheLoaderException("Expected entry count parameter is not provided."); final int entryCnt = (Integer)args[0]; try (Connection conn = connection()) { try (PreparedStatement st = conn.prepareStatement( "select id, code, name, upd_date from KLADR where id between 100000 and 150000 and rownum <= " + entryCnt)) { try (ResultSet rs = st.executeQuery()) { int cnt = 0; while (cnt < entryCnt && rs.next()) { Kladr kladr = new Kladr(rs.getLong(1), rs.getString(2), rs.getString(3), rs.getTimestamp(4) ); clo.apply(kladr.id, kladr); cnt++; } } } } catch (SQLException e) { throw new CacheLoaderException("Failed to load values from cache store.", e); } } //   JDBC      // ,   . private Connection connection() throws SQLException { return openConnection(true); } //   JDBC private Connection openConnection(boolean autocommit) throws SQLException { //    RDBMS (Oracle, MySQL, Postgres, DB2, Microsoft SQL  . .) //       Oracle. Locale.setDefault(Locale.ENGLISH); Connection conn = DriverManager.getConnection("jdbc:oracle:thin:@localhost:1521:xe", "HR", "1"); conn.setAutoCommit(autocommit); return conn; } } 


First tests

Preparing to start
 public class CacheKladrStoreExample { /** *  . */ private static final String CACHE_NAME = CacheKladrStoreExample.class.getSimpleName(); /** *  , - . */ private static final int ENTRY_COUNT = 50_000; public static void main(String[] args) throws IgniteException { // To start ignite try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { System.out.println(); System.out.println(">>> Cache store example started."); CacheConfiguration<Long, Kladr> cacheCfg = new CacheConfiguration<>(CACHE_NAME); // Set atomicity as transaction, since we are showing transactions in example. cacheCfg.setAtomicityMode(TRANSACTIONAL); // Configure Spring store. cacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(CacheKladrStore.class)); cacheCfg.setReadThrough(true); cacheCfg.setWriteThrough(true); //   Query     ,   . QueryEntity qe = new QueryEntity(Long.class, Kladr.class); LinkedHashMap linkedHashMap = new LinkedHashMap(); linkedHashMap.put("code", "java.lang.String"); linkedHashMap.put("name", "java.lang.String"); linkedHashMap.put("id", "java.lang.Long"); linkedHashMap.put("upd_date", "java.sql.Timestamp"); qe.setFields(linkedHashMap); Collection<QueryEntity> collection = new ArrayList<>(); collection.add(qe); cacheCfg.setQueryEntities(collection); //      . try (IgniteCache<Long, Kladr> cache = ignite.getOrCreateCache(cacheCfg)) { //   


load a massively cache, 50,000 objects (see loadCache in CacheKladrStore above)

  try (IgniteCache<Long, Kladr> cache = ignite.getOrCreateCache(cacheCfg)) { //      . //  CacheStore.loadCache (...) loadCache(cache); 

loadCache
  private static void loadCache(IgniteCache<Long, Kladr> cache) { long start = System.currentTimeMillis(); //    . cache.loadCache(null, ENTRY_COUNT); long end = System.currentTimeMillis(); System.out.println(">>> Loaded size" + cache.size() + " " + (end - start) + "ms."); } 


Loading 50 000 thousand objects takes a few seconds. What we ship, how much, everything under our control is convenient.

We read data from the cache, for the ID that downloaded

  //    getFromCache(cache, 100_000L, 120_000L); 

getFromCache
  private static void getFromCache(IgniteCache<Long, Kladr> cache, Long i1, Long i2) { long millis = System.currentTimeMillis(); for (long i = i1; i < i2; i++) { Kladr kladr = cache.get(i); kladr.upd_date = new Timestamp(new java.util.Date().getTime()); } System.out.println("getFromCache otal get values msec.:" + (System.currentTimeMillis() - millis)); } 


we read 20 000 thousand objects, everything is fine here, everything is taken now from the cache, there are no calls in the database.

But if you now call reading objects that are not in the cache

  //     getFromCache(cache, 10_000L, 11_000L); 

now for each get will be called (see CacheKladrStore)

  //     ,    get (...)  Ignite Cache. @Override public Kladr load(Long key) { 

the object will be read from the database and placed in the cache, the operation took me for 1,000 objects — for several seconds already. And already when re-reading will be taken from the cache as before in the test (read-through in action).

Transaction Transactions

executeTransaction
  private static void executeTransaction(IgniteCache<Long, Kladr> cache) { final Long id1 = 100_001L; final Long id2 = 100_009L; try (Transaction tx = Ignition.ignite().transactions().txStart()) { //      1 Kladr val = cache.get(id1); System.out.println("Read value first id1: " + val); Kladr newKladr = new Kladr(id1, val.code, val.name, new Timestamp(new java.util.Date().getTime())); //    ,      cache.put(id1, newKladr); //       1 val = cache.get(id1); System.out.println("Read value after id1: " + val); // //     2 val = cache.get(id2); System.out.println("Read value first id2: " + val); newKladr = new Kladr(id2, val.code, val.name, new Timestamp(new java.util.Date().getTime())); cache.put(id2, newKladr); //       2 val = cache.get(id2); System.out.println("Read value after id2: " + val); //       . //      CacheKladrStore write(Cache.Entry<? extends Long, ? extends Kladr> entry) tx.commit(); } System.out.println("Read value id1 after commit: " + cache.get(id1)); } 


Yes, just as it should be (or almost), we open a transaction, modify various objects, and only if successful will they be recorded in the commit () database. For each modified object placed in the cache (put) will be called (see CacheKladrStore)

  //     ,    «put (...)»  Ignite Cache. @Override public void write(Cache.Entry<? extends Long, ? extends Kladr> entry) { 

those. after commit is called, - write will be called.

Here is the console output:

image

It can be seen that they were considered from the cache (previously from byzy), then modified, placed in the cache, and after the commit transaction the data were in the database and cache.

And what if after the modification in the cache, but before writing to the database - Exception?, For example here

  System.out.println("Read value after id2: " + val); try { throw new RuntimeException("RuntimeException"); tx.commit(); } catch (Exception e) { e.printStackTrace(); } } System.out.println("Read value id1 after commit: " + cache.get(id1)); 

. Commit does not happen, but in the cache the data will be rolled back - everything is OK!

image

After Exception we see the previous value, which was before the modification.

These were operations of the get / put type, but the application logic is not limited to this, and we need different searches according to different criteria, to get collections and single objects.

Query can work with this in the cache. There is a feature requests will work only with the data that is already in the cache .

An example of working with a cache through a query:

 SqlQuery sql = new SqlQuery(Kladr.class, "id = ?"); long start = System.currentTimeMillis(); int t = 0; for (int i = 100_000; i < 101_000; i++) { try (QueryCursor<Cache.Entry<Long, Kladr>> cursor = cache.query(sql.setArgs(i))) { for (Cache.Entry<Long, Kladr> e : cursor) { e.getValue().upd_date = new Timestamp(new java.util.Date().getTime()); t++; } } } System.out.println("SqlQuery by id " + (System.currentTimeMillis() - start) + "msec, t=" + t); 

reading 1000 objects took 300 ms. But there was a reading across the field which was annotated as an index.

  @QuerySqlField(index = true) public Long id; 

And again, in life, we need searches and other fields, check the “code” field where there is no index, the result is sad, as in the database (but in fact much worse) full scan, the search has been done 1000 times for 30 seconds.

Search in the 'code' field
  String[] codes = new String[]{"4401300010999", "4401300011700"}; sql = new SqlQuery(Kladr.class, "code = ?"); start = System.currentTimeMillis(); t = 0; for (int i = 100_000; i < 101_000; i++) { try (QueryCursor<Cache.Entry<Long, Kladr>> cursor = cache.query(sql.setArgs(codes[i % 2]))) { for (Cache.Entry<Long, Kladr> e : cursor) { e.getValue().upd_date = new Timestamp(new java.util.Date().getTime()); t++; } } } System.out.println("SqlQuery by code " + (System.currentTimeMillis() - start) + "msec., t=" +t); 


image

I didn’t want to compare, but this case became interesting to me, going through all the values, the data in the cache (in memory) seems to be pretty attractive conditions and how the database (Oracle XE) will handle this search. Here is the result, the same search in the database gave 6 seconds.

 declare TYPE code_type IS TABLE OF VARCHAR2(30); v_codes code_type; v_code varchar2(30); v_t number :=0; v_ts timestamp; v_id number; begin v_codes := code_type('4401300010999', '4401300011700'); v_ts := systimestamp; for i in 1..1000 loop v_code := v_codes((i mod 2)+1); select id into v_id from kladr k where k.code = v_code; v_t := v_t + 1; end loop; dbms_output.put_line('query by code ' || to_char(systimestamp - v_ts) || ', t=' || v_t); end; 

image

Apparently in the database more intellectually deals with the cache, storage, search, and so on. If the field to add an index, search in the database 28ms. In Ignite, you can also add an index on one more field and search - took off!

  @QuerySqlField(index = true) public String code; 

image

and made - 160ms.

True in the database, it went an order of magnitude faster with the index. But this is not always the main thing, the issue of scaling the computing system (previously considered) is also very important.

There are other types of requests to the cache, for example ScanQuery, here is the same example with it:

Scanquery
  for (int i = 100_000; i < 101_000; i++) { int id = i; try (QueryCursor<Cache.Entry<Long, Kladr>> cursor = cache.query(new ScanQuery<Long, Kladr>((k, v) -> v.id == id))) { for (Cache.Entry<Long, Kladr> e : cursor) e.getValue().upd_date = new Timestamp(new java.util.Date().getTime()); t++; } } System.out.println("ScanQuery by id " + (System.currentTimeMillis() - start) + "msec., t=" +t); 


Its result is:

image

Material

Source: https://habr.com/ru/post/349358/


All Articles