preparing message bus

This commit is contained in:
2025-12-19 13:33:08 +01:00
parent 73751c1ea2
commit 3d31ac90a0
9 changed files with 73 additions and 13 deletions

View File

@@ -3,10 +3,11 @@ package de.srsoftware.umbrella.messagebus;
import de.srsoftware.umbrella.core.model.UmbrellaUser;
public class Event {
public enum Type{
CREATE,
UPDATE,
DELETE
DELETE;
}
private UmbrellaUser initiator;
@@ -20,4 +21,12 @@ public class Event {
this.payload = payload;
this.type = type;
}
public String json() {
return "TODO"; // TODO
}
public String type(){
return type.toString();
}
}

View File

@@ -1,17 +1,61 @@
package de.srsoftware.umbrella.messagebus;
import com.sun.net.httpserver.HttpExchange;
import de.srsoftware.tools.MimeType;
import de.srsoftware.tools.Path;
import de.srsoftware.umbrella.core.BaseHandler;
import de.srsoftware.umbrella.core.exceptions.UmbrellaException;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.LinkedList;
import java.util.List;
public class MessageApi implements EventListener{
import static de.srsoftware.umbrella.core.Constants.*;
import static java.net.HttpURLConnection.HTTP_OK;
public class MessageApi extends BaseHandler{
private static final System.Logger LOG = System.getLogger(MessageApi.class.getSimpleName());
private List<Event> eventQueue = new LinkedList<>();
@Override
public void onEvent(Event event) {
LOG.log(System.Logger.Level.DEBUG,"received {0}",event.getClass().getSimpleName());
public boolean doGet(Path path, HttpExchange ex) throws IOException {
var headers = ex.getResponseHeaders();
headers.add(CONTENT_TYPE, MimeType.MIME_EVENT_STREAM);
headers.add(CACHE_CONTROL,NO_CACHE);
headers.add(CONNECTION,KEEP_ALIVE);
ex.sendResponseHeaders(HTTP_OK,0);
try (var os = ex.getResponseBody(); var stream = new PrintWriter(os)){
var counter = 0;
var id = 0L;
while (true){
Thread.sleep(100);
if (eventQueue.isEmpty()){
counter++;
if (counter > 300) counter = sendBeacon(stream);
} else {
sendEvent(stream,id++,eventQueue.remove(0));
counter = 0;
}
}
} catch (InterruptedException e) {
throw new UmbrellaException("EventStream broken").causedBy(e);
}
}
private void sendEvent(PrintWriter out, long id, Event event) {
if (event == null) return;
out.printf("event: %s%n", event.type());
out.printf("id: %s%n", id);
out.printf("data: %s%n", event.json());
out.print("\n"); // terminate message with empty line
out.flush();
}
private int sendBeacon(PrintWriter stream) {
stream.print(": keep-alive\n\n");
stream.flush();
return 0;
}
}