mirror of
https://github.com/KDE/kdeconnect-android
synced 2025-08-29 05:07:40 +00:00
Revert "refactor: migrate DevicePacketQueue
to Kotlin Coroutine"
This reverts merge request !431
This commit is contained in:
parent
3e96d5ecd3
commit
ef760a3628
@ -480,8 +480,7 @@ public class Device implements BaseLink.PacketReceiver {
|
|||||||
if (packetQueue == null) {
|
if (packetQueue == null) {
|
||||||
callback.onFailure(new Exception("Device disconnected!"));
|
callback.onFailure(new Exception("Device disconnected!"));
|
||||||
} else {
|
} else {
|
||||||
// TODO: Migrate to coroutine version (addPacket)
|
packetQueue.addPacket(np, replaceID, callback);
|
||||||
packetQueue.addPacketSync(np, replaceID, callback);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -495,8 +494,7 @@ public class Device implements BaseLink.PacketReceiver {
|
|||||||
if (packetQueue == null) {
|
if (packetQueue == null) {
|
||||||
return null;
|
return null;
|
||||||
} else {
|
} else {
|
||||||
// TODO: Migrate to coroutine version (getAndRemoveUnsentPacket)
|
return packetQueue.getAndRemoveUnsentPacket(replaceID);
|
||||||
return packetQueue.getAndRemoveUnsentPacketSync(replaceID);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
142
src/org/kde/kdeconnect/DevicePacketQueue.java
Normal file
142
src/org/kde/kdeconnect/DevicePacketQueue.java
Normal file
@ -0,0 +1,142 @@
|
|||||||
|
/*
|
||||||
|
* SPDX-FileCopyrightText: 2019 Matthijs Tijink <matthijstijink@gmail.com>
|
||||||
|
*
|
||||||
|
* SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.kde.kdeconnect;
|
||||||
|
|
||||||
|
import android.util.Log;
|
||||||
|
|
||||||
|
import org.kde.kdeconnect.Helpers.ThreadHelper;
|
||||||
|
|
||||||
|
import java.util.ArrayDeque;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Keeps a queue of packets to send to a device, to prevent either blocking or using lots of threads
|
||||||
|
*/
|
||||||
|
class DevicePacketQueue {
|
||||||
|
/**
|
||||||
|
* Holds the packet and related stuff to keep in the queue
|
||||||
|
*/
|
||||||
|
private static final class Item {
|
||||||
|
NetworkPacket packet;
|
||||||
|
/**
|
||||||
|
* Replacement ID: if positive, it can be replaced by later packets with the same ID
|
||||||
|
*/
|
||||||
|
final int replaceID;
|
||||||
|
Device.SendPacketStatusCallback callback;
|
||||||
|
|
||||||
|
Item(NetworkPacket packet, int replaceID, Device.SendPacketStatusCallback callback) {
|
||||||
|
this.packet = packet;
|
||||||
|
this.callback = callback;
|
||||||
|
this.replaceID = replaceID;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final ArrayDeque<Item> items = new ArrayDeque<>();
|
||||||
|
private final Device mDevice;
|
||||||
|
private final Object lock = new Object();
|
||||||
|
private boolean exit = false;
|
||||||
|
|
||||||
|
DevicePacketQueue(Device device) {
|
||||||
|
this(device, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
DevicePacketQueue(Device device, Boolean startThread) {
|
||||||
|
mDevice = device;
|
||||||
|
if (startThread) {
|
||||||
|
ThreadHelper.execute(new SendingRunnable());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send a packet (at some point in the future)
|
||||||
|
* @param packet The packet
|
||||||
|
* @param replaceID If positive, it will replace all older packets still in the queue
|
||||||
|
* @param callback The callback after sending the packet
|
||||||
|
*/
|
||||||
|
void addPacket(NetworkPacket packet, int replaceID, Device.SendPacketStatusCallback callback) {
|
||||||
|
synchronized (lock) {
|
||||||
|
if (exit) {
|
||||||
|
callback.onFailure(new Exception("Device disconnected!"));
|
||||||
|
} else {
|
||||||
|
boolean replaced = false;
|
||||||
|
|
||||||
|
if (replaceID >= 0) {
|
||||||
|
for (Item item : items) {
|
||||||
|
if (item.replaceID == replaceID) {
|
||||||
|
item.packet = packet;
|
||||||
|
item.callback = callback;
|
||||||
|
replaced = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!replaced) {
|
||||||
|
items.addLast(new Item(packet, replaceID, callback));
|
||||||
|
lock.notify();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if we still have an unsent packet in the queue with the given ID.
|
||||||
|
* If so, remove it from the queue and return it
|
||||||
|
* @param replaceID The replace ID (must be positive)
|
||||||
|
* @return The found packet, or null
|
||||||
|
*/
|
||||||
|
NetworkPacket getAndRemoveUnsentPacket(int replaceID) {
|
||||||
|
synchronized (lock) {
|
||||||
|
final Optional<Item> itemOptional = items.stream()
|
||||||
|
.filter(item -> item.replaceID == replaceID).findFirst();
|
||||||
|
if (itemOptional.isPresent()) {
|
||||||
|
final Item item = itemOptional.get();
|
||||||
|
items.remove(item);
|
||||||
|
return item.packet;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
void disconnected() {
|
||||||
|
synchronized (lock) {
|
||||||
|
exit = true;
|
||||||
|
lock.notifyAll();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final class SendingRunnable implements Runnable {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
while (true) {
|
||||||
|
Item item;
|
||||||
|
synchronized (lock) {
|
||||||
|
while (items.isEmpty() && !exit) {
|
||||||
|
try {
|
||||||
|
lock.wait();
|
||||||
|
} catch (InterruptedException ignored) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (exit) {
|
||||||
|
Log.i("DevicePacketQueue", "Terminating sending loop");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
item = items.removeFirst();
|
||||||
|
}
|
||||||
|
|
||||||
|
mDevice.sendPacketBlocking(item.packet, item.callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
while (!items.isEmpty()) {
|
||||||
|
Item item = items.removeFirst();
|
||||||
|
item.callback.onFailure(new Exception("Device disconnected!"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -1,131 +0,0 @@
|
|||||||
/*
|
|
||||||
* SPDX-FileCopyrightText: 2019 Matthijs Tijink <matthijstijink@gmail.com>
|
|
||||||
*
|
|
||||||
* SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
|
|
||||||
*/
|
|
||||||
package org.kde.kdeconnect
|
|
||||||
|
|
||||||
import kotlinx.coroutines.*
|
|
||||||
import kotlinx.coroutines.sync.Mutex
|
|
||||||
import kotlinx.coroutines.sync.withLock
|
|
||||||
import kotlin.collections.ArrayDeque
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Keeps a queue of packets to send to a device, to prevent either blocking or using lots of threads
|
|
||||||
*/
|
|
||||||
class DevicePacketQueue @JvmOverloads constructor(private val device: Device, startRunning: Boolean = true) {
|
|
||||||
/**
|
|
||||||
* Holds the packet and related stuff to keep in the queue
|
|
||||||
*/
|
|
||||||
private class Item(
|
|
||||||
var packet: NetworkPacket,
|
|
||||||
/**
|
|
||||||
* Replacement ID: if positive, it can be replaced by later packets with the same ID
|
|
||||||
*/
|
|
||||||
val replaceID: Int,
|
|
||||||
var callback: Device.SendPacketStatusCallback
|
|
||||||
)
|
|
||||||
|
|
||||||
private val scope = CoroutineScope(Dispatchers.IO)
|
|
||||||
private val sendingJob = SupervisorJob()
|
|
||||||
private val loopJob = Job()
|
|
||||||
|
|
||||||
private val items = ArrayDeque<Item>()
|
|
||||||
private val mutex = Mutex()
|
|
||||||
|
|
||||||
init {
|
|
||||||
if (startRunning) {
|
|
||||||
scope.launch(loopJob) {
|
|
||||||
sending()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fun addPacketSync(packet: NetworkPacket, replaceID: Int, callback: Device.SendPacketStatusCallback) {
|
|
||||||
runBlocking {
|
|
||||||
addPacket(packet, replaceID, callback)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Send a packet (at some point in the future)
|
|
||||||
* @param packet The packet
|
|
||||||
* @param replaceID If positive, it will replace all older packets still in the queue
|
|
||||||
* @param callback The callback after sending the packet
|
|
||||||
*/
|
|
||||||
suspend fun addPacket(packet: NetworkPacket, replaceID: Int, callback: Device.SendPacketStatusCallback) {
|
|
||||||
if (sendingJob.isCancelled) {
|
|
||||||
callback.onFailure(Exception("Device disconnected!"))
|
|
||||||
} else {
|
|
||||||
mutex.withLock {
|
|
||||||
var replaced = false
|
|
||||||
if (replaceID >= 0) {
|
|
||||||
items.forEach {
|
|
||||||
if (it.replaceID == replaceID) {
|
|
||||||
it.packet = packet
|
|
||||||
it.callback = callback
|
|
||||||
replaced = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!replaced) {
|
|
||||||
items.addLast(Item(packet, replaceID, callback))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fun getAndRemoveUnsentPacketSync(replaceID: Int): NetworkPacket? {
|
|
||||||
return runBlocking {
|
|
||||||
getAndRemoveUnsentPacket(replaceID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* Check if we still have an unsent packet in the queue with the given ID.
|
|
||||||
* If so, remove it from the queue and return it
|
|
||||||
* @param replaceID The replace ID (must be positive)
|
|
||||||
* @return The found packet, or null
|
|
||||||
*/
|
|
||||||
suspend fun getAndRemoveUnsentPacket(replaceID: Int): NetworkPacket? {
|
|
||||||
mutex.withLock {
|
|
||||||
val itemOptional = items.stream()
|
|
||||||
.filter { item: Item -> item.replaceID == replaceID }.findFirst()
|
|
||||||
if (itemOptional.isPresent) {
|
|
||||||
val item = itemOptional.get()
|
|
||||||
items.remove(item)
|
|
||||||
return item.packet
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null
|
|
||||||
}
|
|
||||||
|
|
||||||
fun disconnected() {
|
|
||||||
sendingJob.cancel()
|
|
||||||
}
|
|
||||||
|
|
||||||
private suspend fun sending() {
|
|
||||||
while (true) {
|
|
||||||
val item = mutex.withLock {
|
|
||||||
if (items.isEmpty()) {
|
|
||||||
null
|
|
||||||
} else {
|
|
||||||
items.removeFirst()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (item == null) {
|
|
||||||
yield()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if (sendingJob.isCancelled) {
|
|
||||||
item.callback.onFailure(Exception("Device disconnected!"))
|
|
||||||
} else {
|
|
||||||
scope.launch(sendingJob) {
|
|
||||||
device.sendPacketBlocking(item.packet, item.callback)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -13,11 +13,11 @@ public class DevicePacketQueueTest {
|
|||||||
|
|
||||||
DevicePacketQueue queue = new DevicePacketQueue(device, false);
|
DevicePacketQueue queue = new DevicePacketQueue(device, false);
|
||||||
|
|
||||||
queue.addPacketSync(new NetworkPacket("Test"), 0, callback);
|
queue.addPacket(new NetworkPacket("Test"), 0, callback);
|
||||||
queue.addPacketSync(new NetworkPacket("Test1"), 1, callback);
|
queue.addPacket(new NetworkPacket("Test1"), 1, callback);
|
||||||
|
|
||||||
assertNotNull(queue.getAndRemoveUnsentPacketSync(0));
|
assertNotNull(queue.getAndRemoveUnsentPacket(0));
|
||||||
assertNotNull(queue.getAndRemoveUnsentPacketSync(1));
|
assertNotNull(queue.getAndRemoveUnsentPacket(1));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -27,11 +27,11 @@ public class DevicePacketQueueTest {
|
|||||||
|
|
||||||
DevicePacketQueue queue = new DevicePacketQueue(device, false);
|
DevicePacketQueue queue = new DevicePacketQueue(device, false);
|
||||||
|
|
||||||
queue.addPacketSync(new NetworkPacket("Test"), -1, callback);
|
queue.addPacket(new NetworkPacket("Test"), -1, callback);
|
||||||
queue.addPacketSync(new NetworkPacket("Test1"), -1, callback);
|
queue.addPacket(new NetworkPacket("Test1"), -1, callback);
|
||||||
|
|
||||||
assertNotNull(queue.getAndRemoveUnsentPacketSync(-1));
|
assertNotNull(queue.getAndRemoveUnsentPacket(-1));
|
||||||
assertNotNull(queue.getAndRemoveUnsentPacketSync(-1));
|
assertNotNull(queue.getAndRemoveUnsentPacket(-1));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -41,10 +41,10 @@ public class DevicePacketQueueTest {
|
|||||||
|
|
||||||
DevicePacketQueue queue = new DevicePacketQueue(device, false);
|
DevicePacketQueue queue = new DevicePacketQueue(device, false);
|
||||||
|
|
||||||
queue.addPacketSync(new NetworkPacket("Test"), 1, callback);
|
queue.addPacket(new NetworkPacket("Test"), 1, callback);
|
||||||
queue.addPacketSync(new NetworkPacket("Test1"), 1, callback);
|
queue.addPacket(new NetworkPacket("Test1"), 1, callback);
|
||||||
|
|
||||||
assertNotNull(queue.getAndRemoveUnsentPacketSync(1));
|
assertNotNull(queue.getAndRemoveUnsentPacket(1));
|
||||||
assertNull(queue.getAndRemoveUnsentPacketSync(1));
|
assertNull(queue.getAndRemoveUnsentPacket(1));
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
x
Reference in New Issue
Block a user