CompletableFutureService snippet

Hey everyone, so I was experimenting with CompletableFutures (which are provided in Java 8) and I wrote a service that plugs into SpiderMonkey and completes a CompletableFuture on the response of a message. This should help in writing functional reactive programming and is theoretically thread-safe.

Two classes: CompletableFutureService and AbstractSignedMessage

AbstractSignedMessage.java

import com.jme3.network.Message;
import com.jme3.network.serializing.Serializable;

@Serializable
public abstract class AbstractSignedMessage implements Message{
	 	private transient boolean reliable = true;
	 	
	 	/**
	 	 * The ID of our message
	 	 */
	 	private String ID;

	    protected AbstractSignedMessage()
	    {
	    }

	    protected AbstractSignedMessage( boolean reliable)
	    {
	        this.reliable = reliable; 
	    }
	    
	    /**
	     *  Sets this message to 'reliable' or not and returns this
	     *  message.
	     */
	    public Message setReliable(boolean f)
	    {
	        this.reliable = f;
	        return this;
	    }
	    
	    /**
	     *  Indicates which way an outgoing message should be sent
	     *  or which way an incoming message was sent.
	     */
	    public boolean isReliable()
	    {
	        return reliable;
	    }
	    
	    /**
	     * Getting and setting the ID
	     */
	    public String getID(){
	    	return ID;
	    }
	    
	    public Message setID(String id){
	    	this.ID=id;
	    	return this;
	    }
}

CompletableFutureService.java

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import com.jme3.network.HostedConnection;
import com.jme3.network.Message;
import com.jme3.network.MessageListener;
import com.jme3.network.service.AbstractHostedService;
import com.jme3.network.service.HostedServiceManager;


public class CompletableFutureService extends AbstractHostedService implements MessageListener<HostedConnection>{

	/**
	 * Stores CompletableFutures for completion based upon their ID's
	 */
	private ConcurrentHashMap<String, CompletableFuture<Boolean>> futures;
	
	
	public CompletableFutureService(){
		futures=new ConcurrentHashMap<String, CompletableFuture<Boolean>>();
	}
	
	/**
	 * Adds itself as a message listener on the server
	 */
	@Override
	protected void onInitialize(HostedServiceManager serviceManager) {
		getServer().addMessageListener(this);
		
	}

	
	/**
	 * When a message is received that has an ID signature, complete
	 * and remove the future assigned with it
	 */
	@Override
	public void messageReceived(HostedConnection source, Message m) {
		if(m instanceof AbstractSignedMessage){
			if(futures.containsKey(((AbstractSignedMessage) m).getID())){
				futures.get(((AbstractSignedMessage) m).getID()).complete(true);
				futures.remove(((AbstractSignedMessage) m).getID());
			}
		}
		
	}
		
	/**
	 * Sends the signed message and completes the future when the signature is returned 
	 * Example: sendMessage(new AbstractSignedMessage(),hc).thenRun(()->System.out.println("Got the return"));
	 * 
	 * @param k message you want to send
	 * @param hc  client you want to send it to
	 * @return
	 */
	public CompletableFuture<Boolean> sendMessage(AbstractSignedMessage k, HostedConnection hc){
		CompletableFuture<Boolean> j = new CompletableFuture<Boolean>();
		k.setID(generateNewID());
		futures.put(k.getID(), j);
		hc.send(k);
		return j;
		
	}
	
	/**
	 * Generates a new ID
	 */
	public String generateNewID(){
		return UUID.randomUUID().toString();
	}

}

All you have to do is attach it to the server server.getServices().addService(new CompletableFutureService()); and then use it to send out messages that you want a CompletableFuture response from using sendMessage(AbstractSignedMessage k, HostedConnection hc)

if you run into any problems, please post your solutions <3


tldr; if you use spidermonkey and you want a start into functional reactive programming try these classes

1 Like

I just have a suggestion regarding your generateNewId method.

UUDI.randomUUID() uses SecureRandom which might be a bit slow and might hang a bit if it doesn’t get enough entropy, and the id is quite long. I’m not sure if your implementation needs to be a universally unique identifier which guarantee generating 1 billion UUIDs per second for about 85 years before you get a collision or if you could switch it to something that is a bit more light weight such as an AtomicInteger used for incrementing a counter and a prefix for each connection, or something similar?

Otherwise, nice code :slight_smile:

1 Like

That’s a fair point, thanks for the suggestion! See the changes:

CompletableFutureService.java

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import com.jme3.network.HostedConnection;
import com.jme3.network.Message;
import com.jme3.network.MessageListener;
import com.jme3.network.Server;
import com.jme3.network.service.AbstractHostedService;
import com.jme3.network.service.HostedServiceManager;


public class CompletableFutureService extends AbstractHostedService implements MessageListener<HostedConnection>{

	/**
	 * Stores CompletableFutures for completion based upon their ID's
	 */
	private ConcurrentHashMap<String, CompletableFuture<AbstractSignedMessage>> futures;
	

	/**
	* Stores our ID counts
	*/
	private AtomicInteger count;
	
	
	public CompletableFutureService(){
		futures=new ConcurrentHashMap<String, CompletableFuture<AbstractSignedMessage>>();
		count = new AtomicInteger(Integer.MIN_VALUE);
	}
	
	/**
	 * Adds itself as a message listener on the server
	 */
	@Override
	protected void onInitialize(HostedServiceManager serviceManager) {
		getServer().addMessageListener(this);
		
	}

	
	/**
	 * When a message is received that has an ID signature, complete
	 * and remove the future assigned with it
	 */
	@Override
	public void messageReceived(HostedConnection source, Message m) {
		if(m instanceof AbstractSignedMessage){
			if(((AbstractSignedMessage) m).getID()!=null){
				if(futures.containsKey(((AbstractSignedMessage) m).getID())){
					futures.get(((AbstractSignedMessage) m).getID()).complete((AbstractSignedMessage)m);
					futures.remove(((AbstractSignedMessage) m).getID());
				}
			}
		}
		
	}
		
	
	/**
	 * Sends the signed message and completes the future when the signature is returned 
	 * Example: sendMessage(new AbstractSignedMessage(),hc).thenRun(()->System.out.println("Got the return"));
	 * 
	 * @param k message you want to send
	 * @param hc  client you want to send it to
	 * @return
	 */
	public CompletableFuture<AbstractSignedMessage> sendMessage(AbstractSignedMessage k, HostedConnection hc){
		CompletableFuture<AbstractSignedMessage> j = new CompletableFuture<AbstractSignedMessage>();
		k.setID(generateNewID());
		futures.put(k.getID(), j);
		hc.send(k);
		return j;
		
	}
	
	/**
	 * Generates a new ID
	 */
	public String generateNewID(){
		int i = count.incrementAndGet();
		if(i==-2147483646){
			i=Integer.MIN_VALUE;
		}
		count.set(i);
		return Integer.toString(i);
	}

}

If want to improve it performance, maybe you’ll prefer:

	@Override
	public void messageReceived(HostedConnection source, Message m) {
		if(m instanceof AbstractSignedMessage){
			if(((AbstractSignedMessage) m).getID()!=null){
                                CompletableFuture<AbstractSignedMessage> removed = futures.remove(((AbstractSignedMessage) m);
				if(removed != null) {
					removed.complete((AbstractSignedMessage)m);
				}
			}
		}
		
	}

(1 hash check vs 3)

Note that this change is more efficient and safer. With a concurrentHashMap, you are not ensured that the map will have the same state when calling .get after the .contains.

2 Likes

What are some potential use cases of this feature?

Oh, good call, cheers! Updated class:

CompletableFutureService.java


import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import com.jme3.network.HostedConnection;
import com.jme3.network.Message;
import com.jme3.network.MessageListener;
import com.jme3.network.Server;
import com.jme3.network.service.AbstractHostedService;
import com.jme3.network.service.HostedServiceManager;



public class CompletableFutureService extends AbstractHostedService implements MessageListener<HostedConnection>{

	/**
	 * Stores CompletableFutures for completion based upon their ID's
	 */
	private ConcurrentHashMap<String, CompletableFuture<AbstractSignedMessage>> futures;
	
	private AtomicInteger count;
	
	
	public CompletableFutureService(){
		futures=new ConcurrentHashMap<String, CompletableFuture<AbstractSignedMessage>>();
		count = new AtomicInteger(Integer.MIN_VALUE);
	}
	
	/**
	 * Adds itself as a message listener on the server
	 */
	@Override
	protected void onInitialize(HostedServiceManager serviceManager) {
		getServer().addMessageListener(this);
		
	}

	
	/**
	 * When a message is received that has an ID signature, complete
	 * and remove the future assigned with it
	 */
	@Override
	public void messageReceived(HostedConnection source, Message m) {
		if(m instanceof AbstractSignedMessage){
			if(((AbstractSignedMessage) m).getID()!=null){
				//if(futures.containsKey(((AbstractSignedMessage) m).getID())){
					//futures.get(((AbstractSignedMessage) m).getID()).complete((AbstractSignedMessage)m);
					//futures.remove(((AbstractSignedMessage) m).getID());
					CompletableFuture<AbstractSignedMessage> removed = futures.remove(((AbstractSignedMessage) m).getID());
					if(removed != null) {
						removed.complete((AbstractSignedMessage)m);
					}
				//}
			}
		}
		
	}
	
	/**
	 * Sends the signed message and completes the future when the signature is returned 
	 * Example: sendMessage(new AbstractSignedMessage(),hc).thenRun(()->System.out.println("Got the return"));
	 * 
	 * @param k message you want to send
	 * @param hc  client you want to send it to
	 * @return
	 */
	public CompletableFuture<AbstractSignedMessage> sendMessage(AbstractSignedMessage k, HostedConnection hc){
		CompletableFuture<AbstractSignedMessage> j = new CompletableFuture<AbstractSignedMessage>();
		k.setID(generateNewID());
		futures.put(k.getID(), j);
		hc.send(k);
		return j;
		
	}
	
	/**
	 * Generates a new ID
	 */
	public String generateNewID(){
		int i = count.incrementAndGet();
		if(i==-2147483646){
			i=Integer.MIN_VALUE;
		}
		count.set(i);
		return Integer.toString(i);
	}

}

So, this allows you to use the spidermonkey networking service in a way that plays nicely with reactive programming, as well as providing an interface to do actions immediately after a message response.

So, for example, if you want to do something right after you get a certain message from a client, you can do that with this service, and it would look something like this:

sendMessage(new CreateRegionMessage("Region20",5,new Point(4,4)),hc).thenAccept(x->System.out.println(((CreateRegionMessage)x).getID()));

So this single line incorporates the entire response system of sending a message to a client, receiving the return message, and doing something with it (in this case printing out the ID of the region I created); and it does it all asynchronously and non-blocking.

1 Like