Video streaming with VideoRecorderAppState

For my app I want to be able to stream a video recorded with VideoRecorderAppState. So that anyone could connect with VLC player to the running app and watch. Therefore I made some changes to the VideoRecorderAppState. I’m posting the code for my HttpStreamingServer and the modifications to the VideoRecorderAppState. Streaming already works but is a bit laggy yet. Maybe you can give me some advice how to make the server more efficient. Using LinkedBlockingQueue for the byte transfer is probably not the best idea. Thanks for reviewing and marry x-mas.



VideoRecorderAppState mods:

  • need to be able to initialize with a output stream additionally to a file.
  • need to get the mjpeg header with a max file size for streaming



    [patch]Index: src/desktop/com/jme3/app/state/MjpegFileWriter.java

    ===================================================================

    — src/desktop/com/jme3/app/state/MjpegFileWriter.java (revision 8955)

    +++ src/desktop/com/jme3/app/state/MjpegFileWriter.java (working copy)

    @@ -6,6 +6,8 @@

    import java.io.ByteArrayOutputStream;

    import java.io.File;

    import java.io.FileOutputStream;

    +import java.io.IOException;

    +import java.io.OutputStream;

    import java.io.RandomAccessFile;

    import java.nio.channels.FileChannel;

    import java.util.ArrayList;

    @@ -13,6 +15,8 @@

    import java.util.List;

    import javax.imageio.ImageIO;



    +import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream;

    +

    /**
  • Released under BSD License
  • @author monceaux, normenhansen

    @@ -23,37 +27,57 @@

    int height = 0;

    double framerate = 0;

    int numFrames = 0;
  • File aviFile = null;
  • FileOutputStream aviOutput = null;
  • FileChannel aviChannel = null;
  • OutputStream aviOutput = null;

    long riffOffset = 0;

    long aviMovieOffset = 0;
  • long position = 0;

    AVIIndexList indexlist = null;


  • public MjpegFileWriter(File aviFile, int width, int height, double framerate) throws Exception {
  •    this(aviFile, width, height, framerate, 0);<br />
    
  • public MjpegFileWriter(OutputStream aviOutput, int width, int height, double framerate) throws Exception {
  •    this(aviOutput, width, height, framerate, 0);<br />
    

}


  • public MjpegFileWriter(File aviFile, int width, int height, double framerate, int numFrames) throws Exception {
  •    this.aviFile = aviFile;<br />
    
  • public MjpegFileWriter(OutputStream aviOutput, int width, int height, double framerate, int numFrames) throws Exception {

    this.width = width;

    this.height = height;

    this.framerate = framerate;

    this.numFrames = numFrames;
  •    aviOutput = new FileOutputStream(aviFile);<br />
    
  •    aviChannel = aviOutput.getChannel();<br />
    
  •    this.aviOutput = aviOutput;<br />
    

RIFFHeader rh = new RIFFHeader();
- aviOutput.write(rh.toBytes());
- aviOutput.write(new AVIMainHeader().toBytes());
- aviOutput.write(new AVIStreamList().toBytes());
- aviOutput.write(new AVIStreamHeader().toBytes());
- aviOutput.write(new AVIStreamFormat().toBytes());
- aviOutput.write(new AVIJunk().toBytes());
- aviMovieOffset = aviChannel.position();
- aviOutput.write(new AVIMovieList().toBytes());
+ writeToAviOutput(rh.toBytes());
+ writeToAviOutput(new AVIMainHeader().toBytes());
+ writeToAviOutput(new AVIStreamList().toBytes());
+ writeToAviOutput(new AVIStreamHeader().toBytes());
+ writeToAviOutput(new AVIStreamFormat().toBytes());
+ writeToAviOutput(new AVIJunk().toBytes());
+ aviMovieOffset = position;
+ writeToAviOutput(new AVIMovieList().toBytes());
indexlist = new AVIIndexList();
}
+
+ public byte[] getMjpegStreamHeader() throws Exception {
+ ByteOutputStream bos = new ByteOutputStream();
+ RIFFHeader rh = new RIFFHeader(Integer.MAX_VALUE);
+ bos.write(rh.toBytes());
+ bos.write(new AVIMainHeader().toBytes());
+ bos.write(new AVIStreamList().toBytes());
+ bos.write(new AVIStreamHeader().toBytes());
+ bos.write(new AVIStreamFormat().toBytes());
+ bos.write(new AVIJunk().toBytes());
+ bos.write(new AVIMovieList().toBytes());
+ return bos.getBytes();
+ }
+
+ private void writeToAviOutput(int b) throws IOException {
+ position++;;
+ aviOutput.write(b);
+ }
+
+ private void writeToAviOutput(byte[] bytes) throws IOException {
+ position += bytes.length;
+ aviOutput.write(bytes);
+ }

public void addImage(Image image) throws Exception {
addImage(writeImageToBytes(image));
@@ -62,7 +86,6 @@
public void addImage(byte[] imagedata) throws Exception {
byte[] fcc = new byte[]{'0', '0', 'd', 'b'};
int useLength = imagedata.length;
- long position = aviChannel.position();
int extra = (useLength + (int) position) % 4;
if (extra > 0) {
useLength = useLength + extra;
@@ -70,20 +93,20 @@

indexlist.addAVIIndex((int) position, useLength);

- aviOutput.write(fcc);
- aviOutput.write(intBytes(swapInt(useLength)));
- aviOutput.write(imagedata);
+ writeToAviOutput(fcc);
+ writeToAviOutput(intBytes(swapInt(useLength)));
+ writeToAviOutput(imagedata);
if (extra > 0) {
for (int i = 0; i < extra; i++) {
- aviOutput.write(0);
+ writeToAviOutput(0);
}
}
imagedata = null;
}

- public void finishAVI() throws Exception {
+ public void finishAVI(File aviFile) throws Exception {
byte[] indexlistBytes = indexlist.toBytes();
- aviOutput.write(indexlistBytes);
+ writeToAviOutput(indexlistBytes);
aviOutput.close();
long size = aviFile.length();
RandomAccessFile raf = new RandomAccessFile(aviFile, "rw");
@@ -141,7 +164,12 @@
public byte[] fcc4 = new byte[]{'h', 'd', 'r', 'l'};

public RIFFHeader() {
+ this(0);
}
+
+ public RIFFHeader(int fileSize) {
+ this.fileSize = fileSize;
+ }

public byte[] toBytes() throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Index: src/desktop/com/jme3/app/state/VideoRecorderAppState.java
===================================================================
--- src/desktop/com/jme3/app/state/VideoRecorderAppState.java (revision 8955)
+++ src/desktop/com/jme3/app/state/VideoRecorderAppState.java (working copy)
@@ -13,6 +13,8 @@
import com.jme3.util.Screenshots;
import java.awt.image.BufferedImage;
import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.*;
@@ -34,6 +36,7 @@
private int framerate = 30;
private VideoProcessor processor;
private File file;
+ private OutputStream aviOutput;
private Application app;
private ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactory() {

@@ -55,6 +58,21 @@
this.file = file;
Logger.getLogger(this.getClass().getName()).log(Level.INFO, "JME3 VideoRecorder running on {0} CPU's", numCpus);
}
+
+ public VideoRecorderAppState(OutputStream aviOutput) {
+ this.aviOutput = aviOutput;
+ Logger.getLogger(this.getClass().getName()).log(Level.INFO, "JME3 VideoRecorder running on {0} CPU's", numCpus);
+ }
+
+ public byte[] getMjpegHeader() {
+ try {
+ return processor.getWriter().getMjpegStreamHeader();
+ } catch (Exception ex) {
+ Logger.getLogger(this.getClass().getName()).log(Level.INFO, null, ex);
+ return null;
+ }
+
+ }

public File getFile() {
return file;
@@ -66,15 +84,23 @@
}
this.file = file;
}
+
+ public OutputStream getAviOutput() {
+ return aviOutput;
+ }
+
+ public void setAviOutput(OutputStream aviOutput) {
+ this.aviOutput = aviOutput;
+ }

@Override
public void initialize(AppStateManager stateManager, Application app) {
super.initialize(stateManager, app);
this.app = app;
app.setTimer(new IsoTimer(framerate));
- if (file == null) {
- String filename = System.getProperty("user.home") + File.separator + "jMonkey-" + System.currentTimeMillis() / 1000 + ".avi";
- file = new File(filename);
+ if (aviOutput == null && file == null) {
+ String filename = System.getProperty("user.home") + File.separator + "jMonkey-" + System.currentTimeMillis() / 1000 + ".avi";
+ file = new File(filename);
}
processor = new VideoProcessor();
List<ViewPort> vps = app.getRenderManager().getPostViews();
@@ -114,6 +140,10 @@
private LinkedBlockingQueue<WorkItem> freeItems;
private LinkedBlockingQueue<WorkItem> usedItems = new LinkedBlockingQueue<WorkItem>();
private MjpegFileWriter writer;
+
+ public MjpegFileWriter getWriter() {
+ return writer;
+ }

public void addImage(Renderer renderer, FrameBuffer out) {
if (freeItems == null) {
@@ -167,7 +197,10 @@
public void preFrame(float tpf) {
if (null == writer) {
try {
- writer = new MjpegFileWriter(file, width, height, framerate);
+ if (file != null) {
+ aviOutput = new FileOutputStream(file);
+ }
+ writer = new MjpegFileWriter(aviOutput, width, height, framerate);
} catch (Exception ex) {
Logger.getLogger(VideoRecorderAppState.class.getName()).log(Level.SEVERE, "Error creating file writer: {0}", ex);
}
@@ -186,10 +219,13 @@
while (freeItems.size() < numCpus) {
Thread.sleep(10);
}
- writer.finishAVI();
+ if (file != null) {
+ writer.finishAVI(file);
+ }
} catch (Exception ex) {
Logger.getLogger(VideoRecorderAppState.class.getName()).log(Level.SEVERE, "Error closing video: {0}", ex);
}
+ aviOutput = null;
writer = null;
}
}
[/patch]

[java]package streaming;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.jme3.app.state.VideoRecorderAppState;
import java.lang.Thread.UncaughtExceptionHandler;

/**
* Connect with VLC Player e.g. to http://localhost:78
*
* @author Arthur
*/
public class HttpStreamingServer extends Thread implements UncaughtExceptionHandler {

private final static class StreamThread extends Thread {

private final InputStream videoStream;
private final BufferedReader httpReader;
private final BufferedWriter httpWriter;
private final InputStream inputStream;
private final OutputStream outputStream;
private final List<Closeable> closeables = new ArrayList<Closeable>();

StreamThread(String name, Socket socket, UncaughtExceptionHandler handler, InputStream videoStream) {
setDaemon(true);
setUncaughtExceptionHandler(handler);
setName(name);
try {
closeables.add(socket);
closeables.add(this.videoStream = videoStream);
closeables.add(inputStream = socket.getInputStream());
closeables.add(outputStream = socket.getOutputStream());
closeables.add(httpReader = new BufferedReader(new InputStreamReader(inputStream)));
closeables.add(httpWriter = new BufferedWriter(new OutputStreamWriter(outputStream)));
} catch (IOException ex) {
destory();
throw new RuntimeException(ex);
}
}

public @Override void run() {
try {
stream();
} catch (Exception ex) {
throw new RuntimeException(ex);
} finally {
destory();
}
}

public void stream() throws Exception {
String line = httpReader.readLine();
if (line.startsWith("GET")) {
if (videoStream == null) {
httpWriter.write(noContent());
httpWriter.flush();
throw new RuntimeException("No Content");
}
httpWriter.write(ok());
httpWriter.flush();
byte[] buffer = new byte[1024];
int written = 0;
while (!Thread.currentThread().isInterrupted()) {
int read = videoStream.read(buffer, 0, buffer.length);
if (read == -1) {
break;
}
outputStream.write(buffer, 0, read);
written = written + read;
}
}
}

private String ok() {
StringBuilder sb = new StringBuilder();
sb.append("HTTP/1.1 200 OKrn");
sb.append("Content-Type: video/mpegrn");
sb.append("rn");
return sb.toString();
}

private String noContent() {
StringBuilder sb = new StringBuilder();
sb.append("HTTP/1.1 204 No Contentrn");
sb.append("rn");
return sb.toString();
}

public void destory() {
for (Closeable c : closeables) {
if (c != null) {
try {
c.close();
} catch (Exception ex) { }
}
}
}
}

private static final Logger logger = Logger.getLogger(HttpStreamingServer.class.getName());
private final ServerSocket serverSocket;
private final Map<String, LinkedBlockingQueue<Integer>> clientStreams = new HashMap<String, LinkedBlockingQueue<Integer>>();
private final VideoRecorderAppState videoRecorder;
private int clients;

public HttpStreamingServer(VideoRecorderAppState videoRecorder) {
this.videoRecorder = videoRecorder;
this.videoRecorder.setAviOutput(getOutputStream());
setDaemon(true);
setName(getClass().getSimpleName());
setPriority(Thread.MIN_PRIORITY);
try {
serverSocket = new ServerSocket(78);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}

public void serve() {
start();
}

/**
* Listens for client requests.
*
* @see java.lang.Thread#run()
*/
@Override
public final void run() {
if (serverSocket != null) {
while (!serverSocket.isClosed()) {
try {
Socket client = serverSocket.accept();
if (client != null) {
String name = createClientName();
InputStream video = createInputStream(name);
(new StreamThread(name, client, this, video)).start();
}
} catch (Exception ex) {
logDisconnectAndCleanUp(ex, this);
}
}
}
}

public String createClientName() {
return "Clinet-" + clients++;
}

public InputStream createInputStream(final String name) {
final byte[] header = videoRecorder.getMjpegHeader();
if (header != null) {
return new InputStream() {

LinkedBlockingQueue<Integer> cs = new LinkedBlockingQueue<Integer>();
{
try {
clientStreams.put(name, cs);
for (int b : header) {
cs.put(b);
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}

@Override
public int read() throws IOException {
try {
return cs.take();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}

@Override
public int read(byte[] b) throws IOException {
return read(b, 0, 1024);
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
ArrayList<Integer> list = new ArrayList<Integer>();
cs.drainTo(list, len);
for (int i = 0; i < list.size(); i++) {
b = list.get(i).byteValue();
}
return list.size();
}

};
} else {
return null;
}
}

public OutputStream getOutputStream() {
return new OutputStream() {

@Override
public void write(int b) throws IOException {
for (Queue<Integer> cs : clientStreams.values()) {
cs.offer(b);
}
}
};
}

@Override
public void uncaughtException(Thread t, Throwable e) {
logDisconnectAndCleanUp(e, t);
}

private void logDisconnectAndCleanUp(Throwable throwable, Thread thread) {
clientStreams.remove(thread.getName());
logger.log(Level.INFO, "Client disconnected; Cause: {0}; Thread: {1}",
new Object[] { throwable.getMessage(), thread });
}

}
[/java]
7 Likes

cool thx, sounds useful!

Nice, thanks! Love that you implemented the server yourself instead of adding some bloated library :slight_smile:

OH would be a cool feature, very nice.

Thanks for the “thumbs up” :slight_smile: I think I could get rid of the efficiency problems at least for me it seems to work quite ok now. The engine doesn’t get slown down anymore if the connection is too slow but instead the VLC client jumps ahead a couple of frames.



This is the patched version:

[java]package streaming;



import java.io.BufferedReader;

import java.io.BufferedWriter;

import java.io.Closeable;

import java.io.IOException;

import java.io.InputStream;

import java.io.InputStreamReader;

import java.io.OutputStream;

import java.io.OutputStreamWriter;

import java.io.PipedInputStream;

import java.io.PipedOutputStream;

import java.net.ServerSocket;

import java.net.Socket;

import java.util.ArrayList;

import java.util.List;

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.LinkedBlockingQueue;

import java.util.logging.Level;

import java.util.logging.Logger;

import com.jme3.app.state.VideoRecorderAppState;

import java.lang.Thread.UncaughtExceptionHandler;



/**

  • Connect with VLC Player e.g. to http://localhost:78

    *
  • @author Arthur

    */

    public class HttpStreamingServer extends Thread implements UncaughtExceptionHandler {



    private final static class Packet {



    private final byte[] buffer;

    private final int length;



    public Packet(byte[] buffer, int length) {

    this.buffer = buffer;

    this.length = length;

    }



    public byte[] getBuffer() {

    return buffer;

    }



    public int getLength() {

    return length;

    }



    }



    private final static class DispatcherThread extends Thread {



    private final PipedInputStream in;

    private final Map<String, LinkedBlockingQueue<Packet>> clientStreams;



    public DispatcherThread(PipedInputStream in,

    Map<String, LinkedBlockingQueue<Packet>> clientStreams,

    UncaughtExceptionHandler handler) {



    this.in = in;

    this.clientStreams = clientStreams;

    setDaemon(true);

    setName(getClass().getSimpleName());

    setUncaughtExceptionHandler(handler);

    setPriority(Thread.NORM_PRIORITY - 1);

    }



    @Override

    public void run() {

    try {

    dispatch();

    } catch (Exception ex) {

    throw new RuntimeException(ex);

    }

    }



    private void dispatch() throws Exception{

    while (!Thread.currentThread().isInterrupted()) {

    byte[] buffer = new byte[1024];

    int read = in.read(buffer, 0, buffer.length);

    for (LinkedBlockingQueue<Packet> cs : clientStreams.values()) {

    Packet packet = new Packet(buffer, read);

    if (!cs.offer(packet)) {

    cs.clear();

    }

    }

    }

    }



    }



    private final static class StreamThread extends Thread {



    private final LinkedBlockingQueue<Packet> stream;

    private final BufferedReader httpReader;

    private final BufferedWriter httpWriter;

    private final InputStream inputStream;

    private final OutputStream outputStream;

    private final List<Closeable> closeables = new ArrayList<Closeable>();



    public StreamThread(String name, Socket socket,

    UncaughtExceptionHandler handler, LinkedBlockingQueue<Packet> stream) {

    this.stream = stream;

    setDaemon(true);

    setUncaughtExceptionHandler(handler);

    setName(name);

    try {

    closeables.add(socket);

    closeables.add(inputStream = socket.getInputStream());

    closeables.add(outputStream = socket.getOutputStream());

    closeables.add(httpReader = new BufferedReader(new InputStreamReader(inputStream)));

    closeables.add(httpWriter = new BufferedWriter(new OutputStreamWriter(outputStream)));

    } catch (IOException ex) {

    closeAll();

    throw new RuntimeException(ex);

    }

    }



    public @Override void run() {

    try {

    stream();

    } catch (Exception ex) {

    throw new RuntimeException(ex);

    } finally {

    closeAll();

    }

    }



    public void stream() throws Exception {

    String line = httpReader.readLine();

    if (line.startsWith("GET")) {

    if (stream == null) {

    httpWriter.write(noContent());

    httpWriter.flush();

    throw new RuntimeException("No Content");

    }

    httpWriter.write(ok());

    httpWriter.flush();

    while (!Thread.currentThread().isInterrupted()) {

    Packet packet = stream.take();

    outputStream.write(packet.getBuffer(), 0, packet.getLength());

    }

    }

    }



    private String ok() {

    StringBuilder sb = new StringBuilder();

    sb.append("HTTP/1.1 200 OKrn");

    sb.append("Content-Type: video/mpegrn");

    sb.append("rn");

    return sb.toString();

    }



    private String noContent() {

    StringBuilder sb = new StringBuilder();

    sb.append("HTTP/1.1 204 No Contentrn");

    sb.append("rn");

    return sb.toString();

    }



    public void closeAll() {

    for (Closeable c : closeables) {

    if (c != null) {

    try {

    c.close();

    } catch (Exception ex) { }

    }

    }

    }

    }



    private static final Logger logger = Logger.getLogger(HttpStreamingServer.class.getName());

    private final ServerSocket serverSocket;

    private final Map<String, LinkedBlockingQueue<Packet>> clientStreams = new ConcurrentHashMap<String, LinkedBlockingQueue<Packet>>();

    private final PipedInputStream in = new PipedInputStream();

    private final PipedOutputStream out = new PipedOutputStream();

    private final DispatcherThread dispatcher = new DispatcherThread(in, clientStreams, this);

    private final VideoRecorderAppState videoRecorder;

    private boolean initialized = false;

    private int clients;



    public HttpStreamingServer(VideoRecorderAppState videoRecorder) {

    this.videoRecorder = videoRecorder;

    setDaemon(true);

    setName(getClass().getSimpleName());

    setPriority(Thread.MIN_PRIORITY);

    try {

    out.connect(in);

    serverSocket = new ServerSocket(78);

    dispatcher.start();

    } catch (Exception ex) {

    throw new RuntimeException(ex);

    }

    }



    /**
  • Must be called each time before attaching {@link VideoRecorderAppState}.

    */

    public void serve() {

    if (!initialized) {

    start();

    initialized = true;

    }

    videoRecorder.setAviOutput(getOutputStream());

    }



    /**
  • Listens for client requests.

    *
  • @see java.lang.Thread#run()

    */

    @Override

    public final void run() {

    if (serverSocket != null) {

    while (!serverSocket.isClosed()) {

    try {

    Socket client = serverSocket.accept();

    if (client != null) {

    String name = createClientName();

    LinkedBlockingQueue<Packet> stream = createStream(name);

    (new StreamThread(name, client, this, stream)).start();

    }

    } catch (Exception ex) {

    logDisconnectAndCleanUp(ex, this);

    }

    }

    }

    }



    public String createClientName() {

    return "Clinet-" + clients++;

    }



    public LinkedBlockingQueue<Packet> createStream(final String name) {

    final byte[] header = videoRecorder.getMjpegHeader();

    if (header != null) {

    LinkedBlockingQueue<Packet> stream = new LinkedBlockingQueue<Packet>(262144);

    clientStreams.put(name, stream);

    try {

    stream.put(new Packet(header, header.length));

    } catch (InterruptedException ex) {

    return null;

    }

    return stream;

    } else {

    return null;

    }

    }



    public OutputStream getOutputStream() {

    return out;

    }



    @Override

    public void uncaughtException(Thread t, Throwable e) {

    logDisconnectAndCleanUp(e, t);

    }



    private void logDisconnectAndCleanUp(Throwable throwable, Thread thread) {

    clientStreams.remove(thread.getName());

    logger.log(Level.INFO, "Client disconnected; Cause: {0}; Thread: {1}",

    new Object[] { throwable.getMessage(), thread });

    }



    }

    [/java]
@normen said:
some bloated library :)


*cough* jme *cough*
Thanks for the “thumbs up”


Take mine too :)!

That’s really cool; thanks for your hard work!



Since you’re streaming in real time, you might consider whether you want to actually use the IsoTimer in VideoRecorderAppState or not. Streaming might work better without it.



As a thought, what would happen with a very simple app that runs faster with the IsoTimer, producing 3 seconds of video for every second it runs?

My solution should only lower the framerate, not up it.

@Normen – That’s right; I forgot you did that :).