📜 ⬆️ ⬇️

Reactive work with Bluetooth in real conditions

A small preface, or what is the pain


Recently, I have been actively working on applications that have Bluetooth modules for non-very-well-designed protocols with custom devices, which periodically adds interesting ones to me yep how problems.


Since I am a sincere fan of reactivity in applications, I had to solve such problems on my own, since there are simply no solutions on the web. Totally. I would like to tell you about the resulting architecture of working with Bluetooth devices.


Dangers on the way of the Jedi


The first important point that the developer should keep in mind when working with Bluetooth is that packets can be damaged along the way. And yet - they can be accompanied by noise. And this is not one case in a million, such phenomena can occur quite often, and they need to be processed. Another bluetooth may disconnect, or not connect, or pretend to connect, but in actual fact we know that it means nothing ...


As an example of solving these problems, we will design a micro-framework for processing events that are determined by type using a header (first N bytes) and validated with some simple check-sum. In order not to clutter up the code, we will assume that the header of the protocol has a fixed size. All packets are divided into two types: with a fixed length, and with a dynamic one, transmitted by a separate byte.


Design


Let's start with a description of possible events in the application. So, the general abstraction will look something like this, taking into account the restrictions adopted:


sealed class Event { val headSize: Int = 2 abstract val head: ByteArray abstract fun isCorrupted(): Boolean //To be continued } 

Further, when we have defined sets of constant properties for all packages, it is necessary to formalize somehow the conditions under which we:


  1. Let's calculate that the package belongs to some type.
  2. Must be added to buffer bytes, as long as the packet is not going to
  3. Must bang buffer, as any conditions for its assembly are not fulfilled (this item is needed more for safety, it is better to add logs there during the testing of the application to check the completeness of the other conditions)
  4. We try to build a package from the buffer and check its validity.

These four conditions lead us to the following interface:


 interface EventMatcher { val headSize: Int fun matches(packet: ByteBuffer): Boolean fun create(packet: ByteBuffer): Event fun shouldBuffer(packet: ByteBuffer): Boolean fun shouldDrop(packet: ByteBuffer): Boolean } 

Create a component that will provide would say that convenient, but I will leave it to your discretion proxy interface to our matcher for all existing types, nothing outstanding, the code under the cat:


Proxy matcher
 class EventMatchersAdapter { private val matchers = mutableMapOf<KClass<out Event>, EventMatcher>() fun register(event: KClass<out Event>, matcher: EventMatcher) = apply { matchers.put(event, matcher) } fun unregister(event: KClass<out Event>) = apply { matchers.remove(event) } fun knownEvents(): List<KClass<out Event>> = matchers.keys.toList() fun matches(packet: ByteBuffer, event: KClass<out Event>): Boolean = matchers[event]?.matches(packet) ?: false fun shouldBuffer(packet: ByteBuffer, event: KClass<out Event>): Boolean = matchers[event]?.shouldBuffer(packet) ?: false fun shouldDrop(packet: ByteBuffer, event: KClass<out Event>): Boolean = matchers[event]?.shouldDrop(packet) ?: false fun create(packet: ByteBuffer, event: KClass<out Event>): Event? = matchers[event]?.create(packet) } 

In the packages we describe the method of determining whether the package was damaged or not. This is a rather convenient approach, which allows you not to suffer much because of a poorly designed protocol, in which the engineer decided to throw you a hundred ways to check the packages for correctness, for each of several.


An example of a package with a fixed length
 data class A(override val head: ByteArray, val payload: ByteArray, val checksum: Byte): Event() { companion object { //(two bytes of head) + (2 bytes of payload) + (byte of checksum) @JvmStatic val length = 5.toByte() @JvmStatic val headValue = byteArrayOf(0x00, 0x00) @JvmStatic val matcherValue = object: EventMatcher { override val headSize: Int = 2 override fun matches(packet: ByteBuffer): Boolean { if(packet.position() == 0) return true if(packet.position() == 1) return packet[0] == headValue[0] return packet[0] == headValue[0] && packet[1] == headValue[1] } override fun create(packet: ByteBuffer): A { packet.rewind() return A( ByteArray(2, { packet.get() }), ByteArray(2, { packet.get() }), packet.get() ) } override fun shouldBuffer(packet: ByteBuffer): Boolean = packet.position() < length override fun shouldDrop(packet: ByteBuffer): Boolean = packet.position() > length } } override fun isCorrupted(): Boolean = checksumOf(payload) != checksum override fun equals(other: Any?): Boolean { if(other as? A == null) return false other as A return Arrays.equals(head, other.head) && Arrays.equals(payload, other.payload) && checksum == other.checksum } override fun hashCode(): Int { var result = Arrays.hashCode(head) result = result * 31 + Arrays.hashCode(payload) result = result * 31 + checksum.hashCode() return result } } 

An example of a package with dynamic length
 data class C(override val head: ByteArray, val length: Byte, val payload: ByteArray, val checksum: Byte): Event() { companion object { @JvmStatic val headValue = byteArrayOf(0x01, 0x00) @JvmStatic val matcherValue = object: EventMatcher { override val headSize: Int = 2 override fun matches(packet: ByteBuffer): Boolean { if(packet.position() == 0) return true if(packet.position() == 1) return packet[0] == headValue[0] return packet[0] == headValue[0] && packet[1] == headValue[1] } override fun create(packet: ByteBuffer): C { packet.rewind() val msb = packet.get() val lsb = packet.get() val length = packet.get() return C( byteArrayOf(msb, lsb), length, packet.take(3, length.toPositiveInt()), packet.get() ) } override fun shouldBuffer(packet: ByteBuffer): Boolean = when(packet.position()) { in 0..2 -> true else -> packet.position() < (packet[2].toPositiveInt() + 4) //increase by (2 bytes of head) + (1 byte of length) + (1 byte of checksum) } override fun shouldDrop(packet: ByteBuffer): Boolean = when(packet.position()) { in 0..2 -> false else -> packet.position() > (packet[2].toPositiveInt() + 4) //increase by (2 bytes of head) + (1 byte of length) + (1 byte of checksum) } } } override fun isCorrupted(): Boolean = checksumOf(payload) != checksum override fun equals(other: Any?): Boolean { if(other as? C == null) return false other as C return Arrays.equals(head, other.head) && length == other.length && Arrays.equals(payload, other.payload) && checksum == other.checksum } override fun hashCode(): Int { var result = Arrays.hashCode(head) result = result * 31 + length.hashCode() result = result * 31 + Arrays.hashCode(payload) result = result * 31 + checksum.hashCode() return result } } 

Further, we are required to describe the packet reading algorithm itself, and one that will be:


  1. Maintain several different types.
  2. Destroy damage packages for us
  3. Will be friends with Flowable

The implementation of the algorithm hidden behind the Subscriber interface:


 class EventsBridge(private val adapter: EventMatchersAdapter, private val emitter: FlowableEmitter<Event>, private val bufferSize: Int = 128): DisposableSubscriber<Byte>() { private val buffers: Map<KClass<out Event>, ByteBuffer> = mutableMapOf<KClass<out Event>, ByteBuffer>() .apply { for(knownEvent in adapter.knownEvents()) { put(knownEvent, ByteBuffer.allocateDirect(bufferSize)) } } .toMap() override fun onError(t: Throwable) { emitter.onError(t) } override fun onComplete() { emitter.onComplete() } override fun onNext(t: Byte) { for((key, value) in buffers) { value.put(t) adapter.knownEvents() .filter { it == key } .forEach { if (adapter.matches(value, it)) { when { adapter.shouldDrop(value, it) -> { value.clear() } !adapter.shouldBuffer(value, it) -> { val event = adapter.create(value, it) if (!emitter.isCancelled && event != null && !event.isCorrupted()) { release() emitter.onNext(event) } else { value.clear() } } } } else { value.clear() } } } } private fun release() { for(buffer in buffers) buffer.value.clear() } } 

Using


Consider the example of running unit tests:


A simple test for one type of package
 @Test fun test_single_fixedLength() { val adapter = EventMatchersAdapter() .register(Event.A::class, Event.A.matcherValue) val packetA = generateCorrectPacketA() val testSubscriber = TestSubscriber<Event>() Flowable.create<Event>( { emitter -> val bridge = EventsBridge(adapter, emitter) Flowable.create<Byte>({ byteEmitter -> for(byte in packetA) { byteEmitter.onNext(byte) } }, BackpressureStrategy.BUFFER).subscribe(bridge) }, BackpressureStrategy.BUFFER ) .subscribe(testSubscriber) testSubscriber.assertNoErrors() testSubscriber.assertValue { event -> event is Event.A && !event.isCorrupted() } } 

Test with lots of noise, several types of packages
 @Test fun test_multiple_dynamicLength_mixed_withNoise() { val adapter = EventMatchersAdapter() .register(Event.C::class, Event.C.matcherValue) .register(Event.D::class, Event.D.matcherValue) val packetC1 = generateCorrectPacketC() val packetD1 = generateCorrectPacketD() val packetD2 = generateCorruptedPacketD() val packetC2 = generateCorruptedPacketC() val testSubscriber = TestSubscriber<Event>() val random = Random() Flowable.create<Event>( { emitter -> val bridge = EventsBridge(adapter, emitter) Flowable.create<Byte>({ byteEmitter -> for(b in 0..100) { byteEmitter.onNext(random.nextInt().toByte()) } for(byte in packetC1) { byteEmitter.onNext(byte) } for(b in 0..100) { byteEmitter.onNext(random.nextInt().toByte()) } for(byte in packetD1) { byteEmitter.onNext(byte) } for(b in 0..100) { byteEmitter.onNext(random.nextInt().toByte()) } for(byte in packetD2) { byteEmitter.onNext(byte) } for(b in 0..100) { byteEmitter.onNext(random.nextInt().toByte()) } for(byte in packetC2) { byteEmitter.onNext(byte) } for(b in 0..100) { byteEmitter.onNext(random.nextInt().toByte()) } }, BackpressureStrategy.BUFFER).subscribe(bridge) }, BackpressureStrategy.BUFFER ) .subscribe(testSubscriber) testSubscriber.assertNoErrors() testSubscriber.assertValueCount(2) } 

Test Package Generation
 private fun generateCorrectPacketB(): ByteArray { val rnd = Random() val payload = byteArrayOf( rnd.nextInt().toByte(), rnd.nextInt().toByte(), rnd.nextInt().toByte(), rnd.nextInt().toByte() ) return byteArrayOf( Event.B.headValue[0], Event.B.headValue[1], payload[0], payload[1], payload[2], payload[3], checksumOf(payload) ) } private fun generateCorrectPacketC(): ByteArray { val rnd = Random() val payload = List(rnd.nextInt(16), { index -> rnd.nextInt().toByte() }).toByteArray() return ByteArray(4 + payload.size, { index -> when(index) { 0 -> Event.C.headValue[0] 1 -> Event.C.headValue[1] 2 -> payload.size.toByte() in 3..(4 + payload.size - 2) -> payload[index - 3] 4 + payload.size - 1 -> checksumOf(payload) else -> 0.toByte() } }) } private fun generateCorruptedPacketB(): ByteArray { val rnd = Random() val payload = byteArrayOf( rnd.nextInt().toByte(), rnd.nextInt().toByte(), rnd.nextInt().toByte(), rnd.nextInt().toByte() ) return byteArrayOf( Event.B.headValue[0], Event.B.headValue[1], payload[0], payload[1], payload[2], payload[3], (checksumOf(payload) + 1.toByte()).toByte() ) } private fun generateCorruptedPacketC(): ByteArray { val rnd = Random() val payload = List(rnd.nextInt(16), { _ -> rnd.nextInt().toByte() }).toByteArray() return ByteArray(4 + payload.size, { index -> when(index) { 0 -> Event.C.headValue[0] 1 -> Event.C.headValue[1] 2 -> payload.size.toByte() in 3..(4 + payload.size - 2) -> payload[index - 3] else -> (checksumOf(payload) + 1.toByte()).toByte() } }) } 

Unpretentious cheksumma used for testing
 inline fun checksumOf(data: ByteArray): Byte { var result = 0x00.toByte() for(b in data) { result = (result + b).toByte() } return (result.inv() + 1.toByte()).toByte() } 

And why was all this necessary?


In this example, I would like to show how easy and easy it is to maintain modularity when processing almost arbitrary events, by the way, not necessarily coming from a Bluetooth source (there hasn't been any Bluetooth-dependent code yet), while avoiding possible damage communication packets and noise.


So, what is next?


Let's make a small wrapper on RxBluetooth, which will allow us to work in a reactive style with various connections while listening to different sets of events.


All code can be divided into three sets of components: two services and one repository.
Our services will provide us with connection and work with connection data, respectively, and the repository will provide an abstraction for working with specific connections and act as implicit flyweight connections.


Interfaces will be approximately as follows:


 interface ConnectivityService { fun sub(service: UUID): Observable<DataService> } interface DataService { fun sub(): Flowable<Event> fun write(data: ByteArray): Boolean fun dispose() } interface DataRepository { fun sub(serviceUUID: UUID): Flowable<Event> fun write(serviceUUID: UUID, data: ByteArray): Flowable<Boolean> fun dispose() } 

And, accordingly, the implementation under the cut


ConnectivityServiceImpl
 class ConnectivityServiceImpl(private val bluetooth: RxBluetooth, private val events: EventMatchersAdapter, private val timeoutSeconds: Long = 15L): ConnectivityService { override fun sub(service: UUID): Observable<DataService> = when(bluetooth.isBluetoothEnabled && bluetooth.isBluetoothAvailable) { false -> Observable.empty() else -> { ensureBluetoothNotDiscovering() bluetooth.startDiscovery() bluetooth.observeDevices() .filter { device -> device.uuids.contains(ParcelUuid(service)) } .timeout(timeoutSeconds, TimeUnit.SECONDS) .take(1) .doOnNext { _ -> ensureBluetoothNotDiscovering() } .doOnError { _ -> ensureBluetoothNotDiscovering() } .doOnComplete { -> ensureBluetoothNotDiscovering() } .flatMap { device -> bluetooth.observeConnectDevice(device, service) } .map { connection -> DataServiceImpl(BluetoothConnection(connection), events) } } } private fun ensureBluetoothNotDiscovering() { if(bluetooth.isDiscovering) { bluetooth.cancelDiscovery() } } } 

DataServiceImpl
 class DataServiceImpl constructor(private val connection: BluetoothConnection, private val adapter: EventMatchersAdapter): DataService { override fun sub(): Flowable<Event> = Flowable.create<Event>({ emitter -> val underlying = EventsBridge(adapter = adapter, emitter = emitter) emitter.setDisposable(object: MainThreadDisposable() { override fun onDispose() { if(!underlying.isDisposed) { underlying.dispose() } } }) connection.observeByteStream().subscribe(underlying) }, BackpressureStrategy.BUFFER) override fun write(data: ByteArray): Boolean = connection.send(data) override fun dispose() = connection.closeConnection() } 

DataRepositoryImpl
 class DataRepositoryImpl(private val connectivity: ConnectivityService): DataRepository { private val services = ConcurrentHashMap<UUID, DataService>() override fun sub(serviceUUID: UUID): Flowable<Event> = serviceOf(serviceUUID) .flatMap { service -> service.sub() } override fun write(serviceUUID: UUID, data: ByteArray): Flowable<Boolean> = serviceOf(serviceUUID) .map { service -> service.write(data) } override fun dispose() { for((_, service) in services) { service.dispose() } } private fun serviceOf(serviceUUID: UUID): Flowable<DataService> = with(services[serviceUUID]) { when(this) { null -> connectivity.sub(serviceUUID).doOnNext { service -> services.put(serviceUUID, service) }.toFlowable(BackpressureStrategy.BUFFER) else -> Flowable.just(this) } } } 

And so, in the minimum number of lines, we are able to do what was usually stretched into eerie call chains, or callback hells, like this:


 repository.sub(UUID.randomUUID()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe { event -> when(event) { is Event.A -> doSomeStuffA(event) is Event.B -> doSomeStuffB(event) is Event.C -> doSomeStuffC(event) is Event.D -> doSomeStuffD(event) } } 

11 lines for listening to four events from an arbitrary device, not bad, is not it?)


Instead of conclusion


If someone from the reader has a desire to look at the source code - they are here .


If someone wants to see how other rules fit into the formation of packets of raw bytes - write, try to add.


UPD: designed in a micro framework with optsinalnyh bridges in ReactiveX, Korutiny, as well as a clean implementation on Kotlin.


')

Source: https://habr.com/ru/post/348190/


All Articles