Hey everyone, so I was experimenting with CompletableFutures (which are provided in Java 8) and I wrote a service that plugs into SpiderMonkey and completes a CompletableFuture on the response of a message. This should help in writing functional reactive programming and is theoretically thread-safe.
Two classes: CompletableFutureService and AbstractSignedMessage
AbstractSignedMessage.java
import com.jme3.network.Message;
import com.jme3.network.serializing.Serializable;
@Serializable
public abstract class AbstractSignedMessage implements Message{
private transient boolean reliable = true;
/**
* The ID of our message
*/
private String ID;
protected AbstractSignedMessage()
{
}
protected AbstractSignedMessage( boolean reliable)
{
this.reliable = reliable;
}
/**
* Sets this message to 'reliable' or not and returns this
* message.
*/
public Message setReliable(boolean f)
{
this.reliable = f;
return this;
}
/**
* Indicates which way an outgoing message should be sent
* or which way an incoming message was sent.
*/
public boolean isReliable()
{
return reliable;
}
/**
* Getting and setting the ID
*/
public String getID(){
return ID;
}
public Message setID(String id){
this.ID=id;
return this;
}
}
CompletableFutureService.java
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import com.jme3.network.HostedConnection;
import com.jme3.network.Message;
import com.jme3.network.MessageListener;
import com.jme3.network.service.AbstractHostedService;
import com.jme3.network.service.HostedServiceManager;
public class CompletableFutureService extends AbstractHostedService implements MessageListener<HostedConnection>{
/**
* Stores CompletableFutures for completion based upon their ID's
*/
private ConcurrentHashMap<String, CompletableFuture<Boolean>> futures;
public CompletableFutureService(){
futures=new ConcurrentHashMap<String, CompletableFuture<Boolean>>();
}
/**
* Adds itself as a message listener on the server
*/
@Override
protected void onInitialize(HostedServiceManager serviceManager) {
getServer().addMessageListener(this);
}
/**
* When a message is received that has an ID signature, complete
* and remove the future assigned with it
*/
@Override
public void messageReceived(HostedConnection source, Message m) {
if(m instanceof AbstractSignedMessage){
if(futures.containsKey(((AbstractSignedMessage) m).getID())){
futures.get(((AbstractSignedMessage) m).getID()).complete(true);
futures.remove(((AbstractSignedMessage) m).getID());
}
}
}
/**
* Sends the signed message and completes the future when the signature is returned
* Example: sendMessage(new AbstractSignedMessage(),hc).thenRun(()->System.out.println("Got the return"));
*
* @param k message you want to send
* @param hc client you want to send it to
* @return
*/
public CompletableFuture<Boolean> sendMessage(AbstractSignedMessage k, HostedConnection hc){
CompletableFuture<Boolean> j = new CompletableFuture<Boolean>();
k.setID(generateNewID());
futures.put(k.getID(), j);
hc.send(k);
return j;
}
/**
* Generates a new ID
*/
public String generateNewID(){
return UUID.randomUUID().toString();
}
}
All you have to do is attach it to the server server.getServices().addService(new CompletableFutureService()); and then use it to send out messages that you want a CompletableFuture response from using sendMessage(AbstractSignedMessage k, HostedConnection hc)
if you run into any problems, please post your solutions <3
I just have a suggestion regarding your generateNewId method.
UUDI.randomUUID() uses SecureRandom which might be a bit slow and might hang a bit if it doesn’t get enough entropy, and the id is quite long. I’m not sure if your implementation needs to be a universally unique identifier which guarantee generating 1 billion UUIDs per second for about 85 years before you get a collision or if you could switch it to something that is a bit more light weight such as an AtomicInteger used for incrementing a counter and a prefix for each connection, or something similar?
That’s a fair point, thanks for the suggestion! See the changes:
CompletableFutureService.java
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import com.jme3.network.HostedConnection;
import com.jme3.network.Message;
import com.jme3.network.MessageListener;
import com.jme3.network.Server;
import com.jme3.network.service.AbstractHostedService;
import com.jme3.network.service.HostedServiceManager;
public class CompletableFutureService extends AbstractHostedService implements MessageListener<HostedConnection>{
/**
* Stores CompletableFutures for completion based upon their ID's
*/
private ConcurrentHashMap<String, CompletableFuture<AbstractSignedMessage>> futures;
/**
* Stores our ID counts
*/
private AtomicInteger count;
public CompletableFutureService(){
futures=new ConcurrentHashMap<String, CompletableFuture<AbstractSignedMessage>>();
count = new AtomicInteger(Integer.MIN_VALUE);
}
/**
* Adds itself as a message listener on the server
*/
@Override
protected void onInitialize(HostedServiceManager serviceManager) {
getServer().addMessageListener(this);
}
/**
* When a message is received that has an ID signature, complete
* and remove the future assigned with it
*/
@Override
public void messageReceived(HostedConnection source, Message m) {
if(m instanceof AbstractSignedMessage){
if(((AbstractSignedMessage) m).getID()!=null){
if(futures.containsKey(((AbstractSignedMessage) m).getID())){
futures.get(((AbstractSignedMessage) m).getID()).complete((AbstractSignedMessage)m);
futures.remove(((AbstractSignedMessage) m).getID());
}
}
}
}
/**
* Sends the signed message and completes the future when the signature is returned
* Example: sendMessage(new AbstractSignedMessage(),hc).thenRun(()->System.out.println("Got the return"));
*
* @param k message you want to send
* @param hc client you want to send it to
* @return
*/
public CompletableFuture<AbstractSignedMessage> sendMessage(AbstractSignedMessage k, HostedConnection hc){
CompletableFuture<AbstractSignedMessage> j = new CompletableFuture<AbstractSignedMessage>();
k.setID(generateNewID());
futures.put(k.getID(), j);
hc.send(k);
return j;
}
/**
* Generates a new ID
*/
public String generateNewID(){
int i = count.incrementAndGet();
if(i==-2147483646){
i=Integer.MIN_VALUE;
}
count.set(i);
return Integer.toString(i);
}
}
Note that this change is more efficient and safer. With a concurrentHashMap, you are not ensured that the map will have the same state when calling .get after the .contains.
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import com.jme3.network.HostedConnection;
import com.jme3.network.Message;
import com.jme3.network.MessageListener;
import com.jme3.network.Server;
import com.jme3.network.service.AbstractHostedService;
import com.jme3.network.service.HostedServiceManager;
public class CompletableFutureService extends AbstractHostedService implements MessageListener<HostedConnection>{
/**
* Stores CompletableFutures for completion based upon their ID's
*/
private ConcurrentHashMap<String, CompletableFuture<AbstractSignedMessage>> futures;
private AtomicInteger count;
public CompletableFutureService(){
futures=new ConcurrentHashMap<String, CompletableFuture<AbstractSignedMessage>>();
count = new AtomicInteger(Integer.MIN_VALUE);
}
/**
* Adds itself as a message listener on the server
*/
@Override
protected void onInitialize(HostedServiceManager serviceManager) {
getServer().addMessageListener(this);
}
/**
* When a message is received that has an ID signature, complete
* and remove the future assigned with it
*/
@Override
public void messageReceived(HostedConnection source, Message m) {
if(m instanceof AbstractSignedMessage){
if(((AbstractSignedMessage) m).getID()!=null){
//if(futures.containsKey(((AbstractSignedMessage) m).getID())){
//futures.get(((AbstractSignedMessage) m).getID()).complete((AbstractSignedMessage)m);
//futures.remove(((AbstractSignedMessage) m).getID());
CompletableFuture<AbstractSignedMessage> removed = futures.remove(((AbstractSignedMessage) m).getID());
if(removed != null) {
removed.complete((AbstractSignedMessage)m);
}
//}
}
}
}
/**
* Sends the signed message and completes the future when the signature is returned
* Example: sendMessage(new AbstractSignedMessage(),hc).thenRun(()->System.out.println("Got the return"));
*
* @param k message you want to send
* @param hc client you want to send it to
* @return
*/
public CompletableFuture<AbstractSignedMessage> sendMessage(AbstractSignedMessage k, HostedConnection hc){
CompletableFuture<AbstractSignedMessage> j = new CompletableFuture<AbstractSignedMessage>();
k.setID(generateNewID());
futures.put(k.getID(), j);
hc.send(k);
return j;
}
/**
* Generates a new ID
*/
public String generateNewID(){
int i = count.incrementAndGet();
if(i==-2147483646){
i=Integer.MIN_VALUE;
}
count.set(i);
return Integer.toString(i);
}
}
So, this allows you to use the spidermonkey networking service in a way that plays nicely with reactive programming, as well as providing an interface to do actions immediately after a message response.
So, for example, if you want to do something right after you get a certain message from a client, you can do that with this service, and it would look something like this:
So this single line incorporates the entire response system of sending a message to a client, receiving the return message, and doing something with it (in this case printing out the ID of the region I created); and it does it all asynchronously and non-blocking.