📜 ⬆️ ⬇️

We free our hands to several analysts: API Livy to automate typical banking tasks

Hi, Habr!

It is not a secret that banks use data from various sources (credit bureau, mobile operators, etc.) to assess customer solvency. The number of external partners can reach several dozen, while there are only a few people on our team of analysts. The problem arises of optimizing the work of a small team and the transfer of routine tasks to computing systems.

How the data gets to the bank, and how the team of analysts monitors this process, we analyze in this article.


')
Let's start in order.

We briefly call our distributed Hadoop-based system and all processes associated with it SmartData. SmartData receives API data from external agents. (Moreover, agents for it are both external partners and internal systems of the bank). Of course, it would be useful to collect a certain “actual profile” for each client, which we do. Updated data from sources fall into Opprofil. Opprofile implements the idea of ​​Customer 360 and is stored in the form of Hbase tables. This is convenient for further work with the client.

Customer 360
Customer 360 is an approach to implementing an operational repository with all sorts of attributes of customer data used in all processes in an organization that work with the customer and its data accessible by the customer's key.

Work with agents is carried out continuously, and it needs to be controlled. To quickly check the quality of interaction and hit rate, as well as ease of transferring this information to other teams, we use visualization, for example, reports in Tableau.

The source data goes to Kafka , is pre-processed and placed in a DataLake based on HDFS . It was necessary to come up with a solution on how to organize the parsing of files with logs from HDFS, their processing and daily uploading to analytical systems and visualization systems. And also combine it with the love of analysts for Python laptops.

We finish with the internal kitchen and move on to the practice.

Our solution was to use the Livy API. Livy allows you to submit code to the cluster directly from Jupyter. An HTTP request containing code written in Python (or Scala) and meta data is sent to Livy. Livy initiates the launch of the Spark session on the cluster, which is managed by the resource manager Yarn. Requests module is suitable for sending HTTP requests. Fans of parsing sites are probably already familiar with it (and if not, here’s a chance to learn a little about it).

We import the necessary modules and create a session. (We will also immediately find out the address of our session, in the future this will come in handy). In the parameters we transfer the data for authorization of the user and the name of the language of the script that will execute the cluster.

import json, requests, schedule, time host = 'http://***:8998' data = {'kind': 'spark', 'proxyUser': 'user'} headers = {'Content-Type': 'application/json'} r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers) session_id = r.json().get('id') print("session_id: " + str(session_id)) session_url = host + r.headers['location'] r = requests.get(session_url, headers=headers) 

We are waiting for the session status to go to idle. In case the waiting time exceeds the set timeout - send an error message.

 timeout = time.time() + wait_time sess_state = ['starting', 'success', 'idle'] while(True): time.sleep(7) req_st = requests.get(session_url, headers=headers).json().get('state') if req_st != 'idle' and time.time() > timeout: requests.delete(session_url, headers=headers) send_message("Scheduler_error", req_st) break if req_st == 'idle': break if req_st not in sess_state: send_message("Scheduler_error", req_st) break print("Session_state: ", req_st) 

Now you can send the code to Livy.

 statements_url = session_url + '/statements' data = {'code': '1 + 1'} r = requests.post(statements_url, data=json.dumps(data), headers=headers) statement_url = host + r.headers['location'] r = requests.get(statement_url, headers=headers) while (requests.get(statement_url, headers=headers).json()['progress'] != 1): time.sleep(15) r = requests.get(statement_url, headers=headers).json()['output'] session_url = 'http://***:8998/sessions/' + str(session_id) 

In the cycle we wait for the end of the code execution, we get the result of processing:

 r.get('data').get('text/plain') 

The delete method will delete the session.

 requests.delete(session_url, headers=headers) 

For daily unloading, you can use several options, about cron on Habré have already been written, but the user-friendly schedule module is not. Just add it to the code, it will not require an explanation. And, for convenience, all the calculations will gather in one place.

Code
 import json, requests, schedule, time schedule.every().day.at("16:05").do(job, 300) while True: schedule.run_pending() def job(wait_time): host = 'http://***:8998' data = {'kind': 'spark', 'proxyUser': 'user'} headers = {'Content-Type': 'application/json'} r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers) session_id = r.json().get('id') print("session_id: " + str(session_id)) session_url = host + r.headers['location'] r = requests.get(session_url, headers=headers) timeout = time.time() + wait_time sess_state = ['starting', 'success', 'idle'] while(True): time.sleep(7) req_st = requests.get(session_url, headers=headers).json().get('state') if req_st != 'idle' and time.time() > timeout: requests.delete(session_url, headers=headers) break if req_st == 'idle': break if req_st not in sess_state: send_message("Scheduler_error", req_st) break print("Session_state: ", req_st) statements_url = session_url + '/statements' data = {'code': '1 + 1'} r = requests.post(statements_url, data=json.dumps(data),headers=headers) statement_url = host + r.headers['location'] r = requests.get(statement_url, headers=headers) while (requests.get(statement_url, headers=headers).json()['progress'] != 1): time.sleep(15) r = requests.get(statement_url, headers=headers).json()['output'] session_url = 'http://***:8998/sessions/' + str(session_id) print(r.get('data').get('text/plain')) #requests.delete(session_url, headers=headers) def send_message(subject, text): import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText me = "my_email_adress" you = "email_adress" msg = MIMEMultipart('alternative') msg['Subject'] = subject msg['From'] = me msg['To'] = you text = text part1 = MIMEText(text, 'plain') msg.attach(part1) s = smtplib.SMTP('domain.org') s.ehlo() s.starttls() s.login("user", "password") s.sendmail(me, you, msg.as_string()) s.quit() 


Conclusion:


Perhaps this decision does not claim to be the best, but it is transparent to the team of analysts. The advantages that I see in it:


Of course, when you start a large number of tasks, you have to keep track of the resources being freed up, set up communication between uploads. These issues are solved individually and agreed with colleagues.

It would be great if at least one team took this decision to note.

Links


Livy Documentation

Source: https://habr.com/ru/post/457096/


All Articles