Files
Umbrella/bus/src/main/java/de/srsoftware/umbrella/messagebus/MessageApi.java

84 lines
2.7 KiB
Java

/* © SRSoftware 2025 */
package de.srsoftware.umbrella.messagebus;
import static de.srsoftware.umbrella.core.Constants.*;
import static de.srsoftware.umbrella.core.ModuleRegistry.userService;
import static java.lang.System.Logger.Level.*;
import static java.lang.Thread.sleep;
import static java.net.HttpURLConnection.HTTP_OK;
import com.sun.net.httpserver.HttpExchange;
import de.srsoftware.tools.MimeType;
import de.srsoftware.tools.Path;
import de.srsoftware.tools.SessionToken;
import de.srsoftware.umbrella.core.BaseHandler;
import de.srsoftware.umbrella.core.exceptions.UmbrellaException;
import de.srsoftware.umbrella.core.model.Token;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.util.Optional;
public class MessageApi extends BaseHandler{
private static final System.Logger LOG = System.getLogger(MessageApi.class.getSimpleName());
@Override
public boolean doGet(Path path, HttpExchange ex) throws IOException {
addCors(ex);
Optional<Token> token = SessionToken.from(ex).map(Token::of);
var user = userService().loadUser(token);
//if (user.isEmpty()) return unauthorized(ex); // TODO
var headers = ex.getResponseHeaders();
var addr = ex.getRemoteAddress();
headers.add(CONTENT_TYPE, MimeType.MIME_EVENT_STREAM);
headers.add(CACHE_CONTROL,NO_CACHE);
headers.add(CONNECTION,KEEP_ALIVE);
headers.add(CONTENT_ENCODING,NONE);
ex.sendResponseHeaders(HTTP_OK,0);
try (var os = ex.getResponseBody(); var stream = new PrintWriter(os); var eventQueue = new EventQueue(addr)){
LOG.log(INFO,"{0} opened event stream.",addr);
var counter = 0;
while (!stream.checkError()){
sleep(100);
if (eventQueue.isEmpty()){
if (++counter > 300) counter = sendBeacon(addr,stream);
} else {
var event = eventQueue.removeFirst();
//if (event.isIntendedFor(user.get())) {
LOG.log(DEBUG, "sending event to {0}", addr);
sendEvent(stream, event);
counter = 0;
//}
}
}
LOG.log(INFO,"{0} disconnected from event stream.",addr);
return true;
} catch (InterruptedException e) {
throw new UmbrellaException("EventStream broken").causedBy(e);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void sendEvent(PrintWriter out, Event event) {
if (event == null) return;
out.print("event: ");
out.println(event.eventType());
out.print("data: ");
out.println(event.json());
out.println();
out.println(); // terminate message with empty line
out.flush();
}
private int sendBeacon(InetSocketAddress addr, PrintWriter stream) {
LOG.log(DEBUG,"sending keep-alive to {0}",addr);
stream.print(": keep-alive\n\n");
stream.flush();
return 0;
}
}