/* © SRSoftware 2025 */ package de.srsoftware.umbrella.messagebus; import static de.srsoftware.umbrella.core.ModuleRegistry.userService; import static de.srsoftware.umbrella.core.constants.Constants.*; import static de.srsoftware.umbrella.core.constants.Field.*; import static java.lang.System.Logger.Level.*; import static java.lang.Thread.sleep; import static java.net.HttpURLConnection.HTTP_OK; import com.sun.net.httpserver.HttpExchange; import de.srsoftware.tools.MimeType; import de.srsoftware.tools.Path; import de.srsoftware.tools.SessionToken; import de.srsoftware.umbrella.core.BaseHandler; import de.srsoftware.umbrella.core.exceptions.UmbrellaException; import de.srsoftware.umbrella.core.model.Token; import de.srsoftware.umbrella.messagebus.events.Event; import java.io.IOException; import java.io.PrintWriter; import java.net.InetSocketAddress; import java.util.Optional; public class MessageApi extends BaseHandler{ private static final System.Logger LOG = System.getLogger(MessageApi.class.getSimpleName()); @Override public boolean doGet(Path path, HttpExchange ex) throws IOException { addCors(ex); Optional token = SessionToken.from(ex).map(Token::of); var user = userService().loadUser(token); if (user.isEmpty()) return unauthorized(ex); // TODO var headers = ex.getResponseHeaders(); var addr = ex.getRemoteAddress(); headers.add(CONTENT_TYPE, MimeType.MIME_EVENT_STREAM); headers.add(CACHE_CONTROL, NO_CACHE); headers.add(CONNECTION, KEEP_ALIVE); headers.add(CONTENT_ENCODING,NONE); ex.sendResponseHeaders(HTTP_OK,0); try (var os = ex.getResponseBody(); var stream = new PrintWriter(os); var eventQueue = new EventQueue(addr)){ LOG.log(INFO,"{0} opened event stream.",addr); var counter = 0; while (!stream.checkError()){ sleep(100); if (eventQueue.isEmpty()){ if (++counter > 300) counter = sendBeacon(addr,stream); } else { var event = eventQueue.removeFirst(); //if (event.isIntendedFor(user.get())) { LOG.log(DEBUG, "sending event to {0}", addr); sendEvent(stream, event); counter = 0; //} } } LOG.log(INFO,"{0} disconnected from event stream.",addr); return true; } catch (InterruptedException e) { throw UmbrellaException.serverError("EventStream broken").causedBy(e); } catch (Exception e) { throw new RuntimeException(e); } } private void sendEvent(PrintWriter out, Event event) { if (event == null) return; out.print("event: "); out.println(event.eventType()); out.print("data: "); out.println(event.json()); out.println(); out.println(); // terminate message with empty line out.flush(); } private int sendBeacon(InetSocketAddress addr, PrintWriter stream) { LOG.log(DEBUG,"sending keep-alive to {0}",addr); stream.print(": keep-alive\n\n"); stream.flush(); return 0; } }