From bad244ef1697167eb8cc61905435609d32651301 Mon Sep 17 00:00:00 2001 From: Stephan Richter Date: Fri, 19 Dec 2025 14:06:31 +0100 Subject: [PATCH] working on message bus Signed-off-by: Stephan Richter --- .../de/srsoftware/umbrella/backend/Application.java | 2 +- .../de/srsoftware/umbrella/messagebus/MessageApi.java | 11 ++++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/backend/src/main/java/de/srsoftware/umbrella/backend/Application.java b/backend/src/main/java/de/srsoftware/umbrella/backend/Application.java index 1768f39..a6379dd 100644 --- a/backend/src/main/java/de/srsoftware/umbrella/backend/Application.java +++ b/backend/src/main/java/de/srsoftware/umbrella/backend/Application.java @@ -64,7 +64,7 @@ public class Application { var server = HttpServer.create(new InetSocketAddress(port), 0); try { new Translations().bindPath("/api/translations").on(server); - new MessageApi().bindPath().on(server); + new MessageApi().bindPath("/api/bus").on(server); new MessageSystem(config); new UserModule(config).bindPath("/api/user").on(server); new TagModule(config).bindPath("/api/tags").on(server); diff --git a/bus/src/main/java/de/srsoftware/umbrella/messagebus/MessageApi.java b/bus/src/main/java/de/srsoftware/umbrella/messagebus/MessageApi.java index a52a5cb..9cb77b1 100644 --- a/bus/src/main/java/de/srsoftware/umbrella/messagebus/MessageApi.java +++ b/bus/src/main/java/de/srsoftware/umbrella/messagebus/MessageApi.java @@ -12,6 +12,7 @@ import java.util.LinkedList; import java.util.List; import static de.srsoftware.umbrella.core.Constants.*; +import static java.lang.System.Logger.Level.*; import static java.net.HttpURLConnection.HTTP_OK; public class MessageApi extends BaseHandler{ @@ -22,23 +23,26 @@ public class MessageApi extends BaseHandler{ @Override public boolean doGet(Path path, HttpExchange ex) throws IOException { var headers = ex.getResponseHeaders(); + var addr = ex.getRemoteAddress(); headers.add(CONTENT_TYPE, MimeType.MIME_EVENT_STREAM); headers.add(CACHE_CONTROL,NO_CACHE); headers.add(CONNECTION,KEEP_ALIVE); ex.sendResponseHeaders(HTTP_OK,0); try (var os = ex.getResponseBody(); var stream = new PrintWriter(os)){ + LOG.log(INFO,"{0} opened event stream.",addr); var counter = 0; var id = 0L; - while (true){ + while (!stream.checkError()){ Thread.sleep(100); if (eventQueue.isEmpty()){ - counter++; - if (counter > 300) counter = sendBeacon(stream); + if (++counter > 300) counter = sendBeacon(stream); } else { sendEvent(stream,id++,eventQueue.remove(0)); counter = 0; } } + LOG.log(INFO,"{0} disconnected from event stream.",addr); + return true; } catch (InterruptedException e) { throw new UmbrellaException("EventStream broken").causedBy(e); } @@ -54,6 +58,7 @@ public class MessageApi extends BaseHandler{ } private int sendBeacon(PrintWriter stream) { + LOG.log(DEBUG,"sending keep-alive…"); stream.print(": keep-alive\n\n"); stream.flush(); return 0;