working on message bus

Signed-off-by: Stephan Richter <s.richter@srsoftware.de>
This commit is contained in:
2025-12-19 14:06:31 +01:00
parent 3d31ac90a0
commit bad244ef16
2 changed files with 9 additions and 4 deletions

View File

@@ -64,7 +64,7 @@ public class Application {
var server = HttpServer.create(new InetSocketAddress(port), 0); var server = HttpServer.create(new InetSocketAddress(port), 0);
try { try {
new Translations().bindPath("/api/translations").on(server); new Translations().bindPath("/api/translations").on(server);
new MessageApi().bindPath().on(server); new MessageApi().bindPath("/api/bus").on(server);
new MessageSystem(config); new MessageSystem(config);
new UserModule(config).bindPath("/api/user").on(server); new UserModule(config).bindPath("/api/user").on(server);
new TagModule(config).bindPath("/api/tags").on(server); new TagModule(config).bindPath("/api/tags").on(server);

View File

@@ -12,6 +12,7 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import static de.srsoftware.umbrella.core.Constants.*; import static de.srsoftware.umbrella.core.Constants.*;
import static java.lang.System.Logger.Level.*;
import static java.net.HttpURLConnection.HTTP_OK; import static java.net.HttpURLConnection.HTTP_OK;
public class MessageApi extends BaseHandler{ public class MessageApi extends BaseHandler{
@@ -22,23 +23,26 @@ public class MessageApi extends BaseHandler{
@Override @Override
public boolean doGet(Path path, HttpExchange ex) throws IOException { public boolean doGet(Path path, HttpExchange ex) throws IOException {
var headers = ex.getResponseHeaders(); var headers = ex.getResponseHeaders();
var addr = ex.getRemoteAddress();
headers.add(CONTENT_TYPE, MimeType.MIME_EVENT_STREAM); headers.add(CONTENT_TYPE, MimeType.MIME_EVENT_STREAM);
headers.add(CACHE_CONTROL,NO_CACHE); headers.add(CACHE_CONTROL,NO_CACHE);
headers.add(CONNECTION,KEEP_ALIVE); headers.add(CONNECTION,KEEP_ALIVE);
ex.sendResponseHeaders(HTTP_OK,0); ex.sendResponseHeaders(HTTP_OK,0);
try (var os = ex.getResponseBody(); var stream = new PrintWriter(os)){ try (var os = ex.getResponseBody(); var stream = new PrintWriter(os)){
LOG.log(INFO,"{0} opened event stream.",addr);
var counter = 0; var counter = 0;
var id = 0L; var id = 0L;
while (true){ while (!stream.checkError()){
Thread.sleep(100); Thread.sleep(100);
if (eventQueue.isEmpty()){ if (eventQueue.isEmpty()){
counter++; if (++counter > 300) counter = sendBeacon(stream);
if (counter > 300) counter = sendBeacon(stream);
} else { } else {
sendEvent(stream,id++,eventQueue.remove(0)); sendEvent(stream,id++,eventQueue.remove(0));
counter = 0; counter = 0;
} }
} }
LOG.log(INFO,"{0} disconnected from event stream.",addr);
return true;
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new UmbrellaException("EventStream broken").causedBy(e); throw new UmbrellaException("EventStream broken").causedBy(e);
} }
@@ -54,6 +58,7 @@ public class MessageApi extends BaseHandler{
} }
private int sendBeacon(PrintWriter stream) { private int sendBeacon(PrintWriter stream) {
LOG.log(DEBUG,"sending keep-alive…");
stream.print(": keep-alive\n\n"); stream.print(": keep-alive\n\n");
stream.flush(); stream.flush();
return 0; return 0;