📜 ⬆️ ⬇️

Direct disk access from python


I will tell you today about how I tried to get from the python to the hard disk interface, and what came of it.

I periodically need to test a large number of hard drives. Usually for this is used presov Victoria loaded on the network. She tests the discs one by one, which is not very convenient. In addition, recently went to the board does not have IDE mode, which further complicates the task. At first, I had an idea to get ready software for Linux with open source and add the possibility of parallel testing of several disks to it. After a quick search, it turned out the depressing state of this area in Linux. From the software, when testing, statistics on access time to sectors and types of errors were found only by whdd. Attempt to deal with code whdd ended in complete failure. For me, never a programmer, the code seemed very confusing. In addition, most of it is not working with iron.
Realizing that a simple solution is not expected, I decided to try to write a similar program on my own. Understanding that I don’t master a similar C project, I began to study the possibility of working directly with python disks, which I often use to solve simple problems and love for simplicity and clarity. The cat wept for information on this issue on the network, but still I found out that there is an fcntl module that also allows sending ioctl requests to the device. Now I have the opportunity to send commands to the disk. But in Linux, all disks are considered scsi disks, and for testing, you must transfer the disk directly to ata commands. It turned out that there is an ATA Command Pass-Through mechanism that allows to wrap the ata command in a scsi request. Basic information on how to use it was learned from the source code of the sg3_utils project. It remains to try to implement it all on python.

In order to create in Python structures similar to the structures of the C language, for later transfer to ioctl, there is a module ctypes. We should also mention the number of gray hairs that appeared as a result of debugging strange glitches with these structures. So I discovered the knowledge of aligning structures in C. As a result, two structures were born:

Structure for ATA Pass-Through:

class ataCmd(ctypes.Structure): _pack_ = 1 _fields_ = [ ('opcode', ctypes.c_ubyte), ('protocol', ctypes.c_ubyte), ('flags', ctypes.c_ubyte), ('features', ctypes.c_ushort), ('sector_count', ctypes.c_ushort), ('lba_h_low', ctypes.c_ubyte), ('lba_low', ctypes.c_ubyte), ('lba_h_mid', ctypes.c_ubyte), ('lba_mid', ctypes.c_ubyte), ('lba_h_high', ctypes.c_ubyte), ('lba_high', ctypes.c_ubyte), ('device', ctypes.c_ubyte), ('command', ctypes.c_ubyte), ('control', ctypes.c_ubyte)] 

And the structure for ioctl:

 class sgioHdr(ctypes.Structure): _pack_ = 1 _fields_ = [ ('interface_id', ctypes.c_int), # [i] 'S' for SCSI generic (required) ('dxfer_direction', ctypes.c_int), # [i] data transfer direction ('cmd_len', ctypes.c_ubyte), # [i] SCSI command length ( <= 16 bytes) ('mx_sb_len', ctypes.c_ubyte), # [i] max length to write to sbp ('iovec_count', ctypes.c_ushort), # [i] 0 implies no scatter gather ('dxfer_len', ctypes.c_uint), # [i] byte count of data transfer ('dxferp', ctypes.c_void_p), # [i], [*io] points to data transfer memory ('cmdp', ctypes.c_void_p), # [i], [*i] points to command to perform ('sbp', ctypes.c_void_p), # [i], [*o] points to sense_buffer memory ('timeout', ctypes.c_uint), # [i] MAX_UINT->no timeout (unit: millisec) ('flags', ctypes.c_uint), # [i] 0 -> default, see SG_FLAG... ('pack_id', ctypes.c_int), # [i->o] unused internally (normally) ('usr_ptr', ctypes.c_void_p), # [i->o] unused internally ('status', ctypes.c_ubyte), # [o] scsi status ('masked_status', ctypes.c_ubyte), # [o] shifted, masked scsi status ('msg_status', ctypes.c_ubyte), # [o] messaging level data (optional) ('sb_len_wr', ctypes.c_ubyte), # [o] byte count actually written to sbp ('host_status', ctypes.c_ushort), # [o] errors from host adapter ('driver_status', ctypes.c_ushort), # [o] errors from software driver ('resid', ctypes.c_int), # [o] dxfer_len - actual_transferred ('duration', ctypes.c_uint), # [o] time taken by cmd (unit: millisec) ('info', ctypes.c_uint)] # [o] auxiliary information 

Since the filling of these structures is required before each disk operation and takes up a lot of space, this operation is moved to a separate function. In multibyte values, you need to swap the byte order.

 def prepareSgio(cmd, feature, count, lba, direction, sense, buf): if direction == SG_DXFER_FROM_DEV: buf_len = ctypes.sizeof(buf) buf_p = ctypes.cast(buf, ctypes.c_void_p) prot = 4 << 1 # PIO Data-In elif direction == SG_DXFER_TO_DEV: buf_len = ctypes.sizeof(buf) buf_p = ctypes.cast(buf, ctypes.c_void_p) prot = 5 << 1 # PIO Data-Out else: buf_len = 0 buf_p = None prot = 3 << 1 # Non-data if cmd != 0xb0: # not SMART COMMAND prot = prot | 1 # + EXTEND sector_lba = lba.to_bytes(6, byteorder='little') ata_cmd = ataCmd(opcode=0x85, # ATA PASS-THROUGH (16) protocol=prot, # flags field # OFF_LINE = 0 (0 seconds offline) # CK_COND = 1 (copy sense data in response) # T_DIR = 1 (transfer from the ATA device) # BYT_BLOK = 1 (length is in blocks, not bytes) # T_LENGTH = 2 (transfer length in the SECTOR_COUNT field) flags=0x2e, features=swap16(feature), sector_count=swap16(count), lba_h_low=sector_lba[3], lba_low=sector_lba[0], lba_h_mid=sector_lba[4], lba_mid=sector_lba[1], lba_h_high=sector_lba[5], lba_high=sector_lba[2], device=0, command=cmd, control=0) sgio = sgioHdr(interface_id=ASCII_S, dxfer_direction=direction, cmd_len=ctypes.sizeof(ata_cmd), mx_sb_len=ctypes.sizeof(sense), iovec_count=0, dxfer_len=buf_len, dxferp=buf_p, cmdp=ctypes.addressof(ata_cmd), sbp=ctypes.cast(sense, ctypes.c_void_p), timeout=1000, flags=0, pack_id=0, usr_ptr=None, status=0, masked_status=0, msg_status=0, sb_len_wr=0, host_status=0, driver_status=0, resid=0, duration=0, info=0) return sgio 

This function accepts ata command, parameters, and buffers, and returns a ready-made structure for the ioctl request. Then everything is simple. Create a buffer in which the command execution status and the contents of the ata status and error registers are returned. Create a buffer for a sector read from disk. We fill the structures and execute our first ata command.

 sense = ctypes.c_buffer(64) identify = ctypes.c_buffer(512) sgio = prepareSgio(0xec, 0, 0, 0, SG_DXFER_FROM_DEV, sense, identify) # IDENTIFY with open(dev, 'r') as fd: if fcntl.ioctl(fd, SG_IO, ctypes.addressof(sgio)) != 0: return None # fcntl failed! 

In response, we get a sector with information about the disk:

 0000000: 5a04 ff3f 37c8 1000 0000 0000 3f00 0000 Z..?7.......?... 0000010: 0000 0000 2020 2020 2020 4b4a 3131 3142 .... KJ111B 0000020: 3942 5647 4142 4659 0300 5fea 3800 4b4a 9BVGABFY.._.8.KJ 0000030: 4f41 3341 4145 6948 6174 6863 2069 5548 OA3AAEiHathc iUH 0000040: 3741 3232 3230 4130 414c 3333 2030 2020 7A2220A0AL33 0 0000050: 2020 2020 2020 2020 2020 2020 2020 1080 .. 0000060: 0040 002f 0040 0002 0002 0700 ff3f 1000 .@./.@.......?.. 0000070: 3f00 10fc fb00 0001 ffff ff0f 0000 0700 ?............... 0000080: 0300 7800 7800 7800 7800 0000 0000 0000 ..xxxx...... 0000090: 0000 0000 0000 1f00 0617 0000 5e00 4400 ............^.D. 00000a0: fc01 2900 6b34 697d 7347 6934 41bc 6347 ..).k4i}sGi4A.cG 00000b0: 7f40 0401 0000 0000 feff 0000 0000 0800 .@.............. 00000c0: ca00 f900 1027 0000 b088 e0e8 0000 0000 .....'.......... 00000d0: ca00 0000 0000 875a 0050 a2cc cb22 44fc .......ZP.."D. 00000e0: 0000 0000 0000 0000 0000 0000 0000 1440 ...............@ 00000f0: 1440 0000 0000 0000 0000 0000 0000 0000 .@.............. 0000100: 0100 0b00 0000 0000 8020 f10d 20fa 0100 ......... .. ... 0000110: 0040 0404 0403 0000 0000 0502 0604 0504 .@.............. 0000120: 0506 0803 0506 0504 0505 0603 0505 0000 ................ 0000130: 3741 3342 0000 0a78 0000 bd5d d3a1 0080 7A3B...x...].... 0000140: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 0000150: 0200 0000 0000 0000 0000 0000 0000 0000 ................ 0000160: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 0000170: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 0000180: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 0000190: 0000 0000 0000 0000 0000 0000 3d00 0000 ............=... 00001a0: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 00001b0: 0000 201c 0000 0000 0000 0000 1f10 2100 .. ...........!. 00001c0: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 00001d0: 0000 0000 0100 e003 0000 0000 0000 0000 ................ 00001e0: 0000 0000 0000 0000 0000 0000 0000 0000 ................ 00001f0: 0000 0000 0000 0000 0000 0000 0000 a503 ................ 

It contains complete information about the disk, we extract the main one.

  serial = swapString(identify[20:40]) firmware = swapString(identify[46:53]) model = swapString(identify[54:93]) sectors = int.from_bytes(identify[200] + identify[201] + identify[202] + identify[203] + identify[204] + identify[205] + identify[206] + identify[207], byteorder='little') 

The result is:

Model: Hitachi HUA722020ALA330; firmware: JKAOA3; serial number: JK11A1YAJE2N5V; number of sectors: 3907029168.

Now we can send ata commands to the disk and receive answers from it. Slowly, the result of my work took shape in a library containing the implementation of the main set of ata commands, including reading SMART. Anyone interested can look at it here . Do not scold the quality of the code, I'm not a programmer wizard , I'm just learning.

It now remains with its help to write a testing utility. Feel me waiting for many more discoveries.

As recommended by amarao, I rewrote the library using classes and exceptions. I also decided that the name sgio is misleading about the purpose of the library. The library is now called atapt and is available on GitHub and via pip. There is an example of using a githaba.

Source: https://habr.com/ru/post/274195/

All Articles