Commit 66aa6696 authored by Matthijs Tijink's avatar Matthijs Tijink

Use a queue of unsent packets instead of threads

This also allows "unsending" packets (if they're still in the queue).
This patch does that for mouse move packets, so they get accumulated
together (sending less stuff over a congested link).
parent 6da33902
......@@ -77,6 +77,7 @@ public class Device implements BaseLink.PacketReceiver {
private final Map<String, BasePairingHandler> pairingHandlers = new HashMap<>();
private final CopyOnWriteArrayList<BaseLink> links = new CopyOnWriteArrayList<>();
private DevicePacketQueue packetQueue;
private List<String> supportedPlugins = new ArrayList<>();
private final ConcurrentHashMap<String, Plugin> plugins = new ConcurrentHashMap<>();
......@@ -423,6 +424,9 @@ public class Device implements BaseLink.PacketReceiver {
}
public void addLink(NetworkPacket identityPacket, BaseLink link) {
if (links.isEmpty()) {
packetQueue = new DevicePacketQueue(this);
}
//FilesHelper.LogOpenFileCount();
links.add(link);
link.addPacketReceiver(this);
......@@ -528,6 +532,8 @@ public class Device implements BaseLink.PacketReceiver {
Log.i("KDE/Device", "removeLink: " + link.getLinkProvider().getName() + " -> " + getName() + " active links: " + links.size());
if (links.isEmpty()) {
reloadPluginsFromSettings();
packetQueue.disconnected();
packetQueue = null;
}
}
......@@ -616,16 +622,47 @@ public class Device implements BaseLink.PacketReceiver {
};
public void sendPacket(NetworkPacket np) {
sendPacket(np, defaultCallback);
sendPacket(np, -1, defaultCallback);
}
public void sendPacket(NetworkPacket np, int replaceID) {
sendPacket(np, replaceID, defaultCallback);
}
public boolean sendPacketBlocking(NetworkPacket np) {
return sendPacketBlocking(np, defaultCallback);
}
//Async
public void sendPacket(final NetworkPacket np, final SendPacketStatusCallback callback) {
new Thread(() -> sendPacketBlocking(np, callback)).start();
sendPacket(np, -1, callback);
}
/**
* Send a packet to the device asynchronously
* @param np The packet
* @param replaceID If positive, replaces all unsent packages with the same replaceID
* @param callback A callback for success/failure
*/
public void sendPacket(final NetworkPacket np, int replaceID, final SendPacketStatusCallback callback) {
if (packetQueue == null) {
callback.onFailure(new Exception("Device disconnected!"));
} else {
packetQueue.addPacket(np, replaceID, callback);
}
}
/**
* Check if we still have an unsent packet in the queue with the given ID.
* If so, remove it from the queue and return it
* @param replaceID The replace ID (must be positive)
* @return The found packet, or null
*/
public NetworkPacket getAndRemoveUnsentPacket(int replaceID) {
if (packetQueue == null) {
return null;
} else {
return packetQueue.getAndRemoveUnsentPacket(replaceID);
}
}
public boolean sendPacketBlocking(final NetworkPacket np, final SendPacketStatusCallback callback) {
......
package org.kde.kdeconnect;
import java.util.ArrayDeque;
import java.util.Iterator;
/**
* Keeps a queue of packets to send to a device, to prevent either blocking or using lots of threads
*/
class DevicePacketQueue {
/**
* Holds the packet and related stuff to keep in the queue
*/
private static final class Item {
NetworkPacket packet;
/**
* Replacement ID: if positive, it can be replaced by later packets with the same ID
*/
int replaceID;
Device.SendPacketStatusCallback callback;
Item(NetworkPacket packet, int replaceID, Device.SendPacketStatusCallback callback) {
this.packet = packet;
this.callback = callback;
this.replaceID = replaceID;
}
}
private final ArrayDeque<Item> items = new ArrayDeque<>();
private Device mDevice;
private final Object lock = new Object();
private boolean exit = false;
DevicePacketQueue(Device device) {
mDevice = device;
new Thread(new SendingThread()).start();
}
/**
* Send a packet (at some point in the future)
* @param packet The packet
* @param replaceID If positive, it will replace all older packets still in the queue
* @param callback The callback after sending the packet
*/
void addPacket(NetworkPacket packet, int replaceID, Device.SendPacketStatusCallback callback) {
synchronized (lock) {
if (exit) {
callback.onFailure(new Exception("Device disconnected!"));
} else {
if (replaceID >= 0) {
Iterator<Item> iter = items.iterator();
while (iter.hasNext()) {
Item item = iter.next();
if (item.replaceID == replaceID) {
//Replace contents with new contents
item.packet = packet;
item.callback = callback;
iter.remove();
//There can only be one in the queue, so we're done now
return;
}
}
}
items.addLast(new Item(packet, replaceID, callback));
lock.notify();
}
}
}
/**
* Check if we still have an unsent packet in the queue with the given ID.
* If so, remove it from the queue and return it
* @param replaceID The replace ID (must be positive)
* @return The found packet, or null
*/
NetworkPacket getAndRemoveUnsentPacket(int replaceID) {
synchronized (lock) {
Iterator<Item> iter = items.iterator();
while (iter.hasNext()) {
Item item = iter.next();
if (item.replaceID == replaceID) {
iter.remove();
return item.packet;
}
}
}
return null;
}
void disconnected() {
synchronized (lock) {
exit = true;
lock.notifyAll();
}
}
private final class SendingThread implements Runnable {
@Override
public void run() {
while (true) {
Item item;
synchronized (lock) {
while (items.isEmpty() && !exit) {
try {
lock.wait();
} catch (InterruptedException ignored) {
}
}
if (exit) break;
item = items.removeFirst();
}
mDevice.sendPacketBlocking(item.packet, item.callback);
}
while (!items.isEmpty()) {
Item item = items.removeFirst();
item.callback.onFailure(new Exception("Device disconnected!"));
}
}
}
}
......@@ -45,6 +45,8 @@ public class NetworkPacket {
public final static String PACKET_TYPE_IDENTITY = "kdeconnect.identity";
public final static String PACKET_TYPE_PAIR = "kdeconnect.pair";
public final static int PACKET_REPLACEID_MOUSEMOVE = 0;
public static Set<String> protocolPacketTypes = new HashSet<String>() {{
add(PACKET_TYPE_IDENTITY);
add(PACKET_TYPE_PAIR);
......
......@@ -24,6 +24,7 @@ import android.app.Activity;
import android.content.Intent;
import android.graphics.drawable.Drawable;
import org.kde.kdeconnect.Device;
import org.kde.kdeconnect.NetworkPacket;
import org.kde.kdeconnect.Plugins.Plugin;
import org.kde.kdeconnect.Plugins.PluginFactory;
......@@ -96,12 +97,18 @@ public class MousePadPlugin extends Plugin {
}
public void sendMouseDelta(float dx, float dy) {
NetworkPacket np = new NetworkPacket(PACKET_TYPE_MOUSEPAD_REQUEST);
NetworkPacket np = device.getAndRemoveUnsentPacket(NetworkPacket.PACKET_REPLACEID_MOUSEMOVE);
if (np == null) {
np = new NetworkPacket(PACKET_TYPE_MOUSEPAD_REQUEST);
} else {
dx += np.getInt("dx");
dy += np.getInt("dx");
}
np.set("dx", dx);
np.set("dy", dy);
device.sendPacket(np);
device.sendPacket(np, NetworkPacket.PACKET_REPLACEID_MOUSEMOVE);
}
public void sendSingleClick() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment