working on event reception in Kanban

This commit is contained in:
2025-12-19 16:07:30 +01:00
parent bad244ef16
commit 3b3803dafa
9 changed files with 116 additions and 38 deletions

View File

@@ -1,10 +1,11 @@
/* © SRSoftware 2025 */
package de.srsoftware.umbrella.messagebus; package de.srsoftware.umbrella.messagebus;
import de.srsoftware.umbrella.core.model.UmbrellaUser; import de.srsoftware.umbrella.core.model.UmbrellaUser;
public class Event { public abstract class Event<Payload> {
public enum Type{ public enum EventType {
CREATE, CREATE,
UPDATE, UPDATE,
DELETE; DELETE;
@@ -12,21 +13,23 @@ public class Event {
private UmbrellaUser initiator; private UmbrellaUser initiator;
private String realm; private String realm;
private Object payload; private Payload payload;
private Type type; private EventType eventType;
public Event(UmbrellaUser initiator, String realm, Object payload, Type type){ public Event(UmbrellaUser initiator, String realm, Payload payload, EventType type){
this.initiator = initiator; this.initiator = initiator;
this.realm = realm; this.realm = realm;
this.payload = payload; this.payload = payload;
this.type = type; this.eventType = type;
} }
public String json() { public String eventType(){
return "TODO"; // TODO return eventType.toString();
} }
public String type(){ public abstract String json();
return type.toString();
public Payload payload(){
return payload;
} }
} }

View File

@@ -1,3 +1,4 @@
/* © SRSoftware 2025 */
package de.srsoftware.umbrella.messagebus; package de.srsoftware.umbrella.messagebus;
public interface EventListener { public interface EventListener {

View File

@@ -0,0 +1,36 @@
/* © SRSoftware 2025 */
package de.srsoftware.umbrella.messagebus;
import static de.srsoftware.umbrella.messagebus.MessageBus.messageBus;
import java.net.InetSocketAddress;
import java.util.LinkedList;
import java.util.Objects;
public class EventQueue extends LinkedList<Event> implements AutoCloseable, EventListener {
private final InetSocketAddress addr;
public EventQueue(InetSocketAddress addr){
this.addr = addr;
messageBus().register(this);
}
public void close() {
messageBus().drop(this);
}
@Override
public int hashCode() {
return addr.hashCode();
}
@Override
public void onEvent(Event event) {
System.getLogger(addr.toString()).log(System.Logger.Level.INFO,"adding event to queue of {1}: {0}",event.eventType(),addr);
add(event);
}
}

View File

@@ -1,43 +1,54 @@
/* © SRSoftware 2025 */
package de.srsoftware.umbrella.messagebus; package de.srsoftware.umbrella.messagebus;
import static de.srsoftware.umbrella.core.Constants.*;
import static de.srsoftware.umbrella.core.ModuleRegistry.userService;
import static java.lang.System.Logger.Level.*;
import static java.net.HttpURLConnection.HTTP_OK;
import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpExchange;
import de.srsoftware.tools.MimeType; import de.srsoftware.tools.MimeType;
import de.srsoftware.tools.Path; import de.srsoftware.tools.Path;
import de.srsoftware.tools.SessionToken;
import de.srsoftware.umbrella.core.BaseHandler; import de.srsoftware.umbrella.core.BaseHandler;
import de.srsoftware.umbrella.core.exceptions.UmbrellaException; import de.srsoftware.umbrella.core.exceptions.UmbrellaException;
import de.srsoftware.umbrella.core.model.Token;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.util.LinkedList; import java.net.InetSocketAddress;
import java.util.List; import java.util.Optional;
import static de.srsoftware.umbrella.core.Constants.*;
import static java.lang.System.Logger.Level.*;
import static java.net.HttpURLConnection.HTTP_OK;
public class MessageApi extends BaseHandler{ public class MessageApi extends BaseHandler{
private static final System.Logger LOG = System.getLogger(MessageApi.class.getSimpleName()); private static final System.Logger LOG = System.getLogger(MessageApi.class.getSimpleName());
private List<Event> eventQueue = new LinkedList<>();
@Override @Override
public boolean doGet(Path path, HttpExchange ex) throws IOException { public boolean doGet(Path path, HttpExchange ex) throws IOException {
addCors(ex);
Optional<Token> token = SessionToken.from(ex).map(Token::of);
var user = userService().loadUser(token);
//if (user.isEmpty()) return unauthorized(ex); // TODO
var headers = ex.getResponseHeaders(); var headers = ex.getResponseHeaders();
var addr = ex.getRemoteAddress(); var addr = ex.getRemoteAddress();
headers.add(CONTENT_TYPE, MimeType.MIME_EVENT_STREAM); headers.add(CONTENT_TYPE, MimeType.MIME_EVENT_STREAM);
headers.add(CACHE_CONTROL,NO_CACHE); headers.add(CACHE_CONTROL,NO_CACHE);
headers.add(CONNECTION,KEEP_ALIVE); headers.add(CONNECTION,KEEP_ALIVE);
headers.add(CONTENT_ENCODING,NONE);
ex.sendResponseHeaders(HTTP_OK,0); ex.sendResponseHeaders(HTTP_OK,0);
try (var os = ex.getResponseBody(); var stream = new PrintWriter(os)){ try (var os = ex.getResponseBody(); var stream = new PrintWriter(os); var eventQueue = new EventQueue(addr)){
LOG.log(INFO,"{0} opened event stream.",addr); LOG.log(INFO,"{0} opened event stream.",addr);
var counter = 0; var counter = 0;
var id = 0L;
while (!stream.checkError()){ while (!stream.checkError()){
Thread.sleep(100); Thread.sleep(100);
if (eventQueue.isEmpty()){ if (eventQueue.isEmpty()){
if (++counter > 300) counter = sendBeacon(stream); if (++counter > 300) counter = sendBeacon(addr,stream);
} else { } else {
sendEvent(stream,id++,eventQueue.remove(0)); var elem = eventQueue.remove(0);
LOG.log(DEBUG,"sending event to {0}",addr);
sendEvent(stream,elem);
counter = 0; counter = 0;
} }
} }
@@ -45,20 +56,24 @@ public class MessageApi extends BaseHandler{
return true; return true;
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new UmbrellaException("EventStream broken").causedBy(e); throw new UmbrellaException("EventStream broken").causedBy(e);
} catch (Exception e) {
throw new RuntimeException(e);
} }
} }
private void sendEvent(PrintWriter out, long id, Event event) { private void sendEvent(PrintWriter out, Event event) {
if (event == null) return; if (event == null) return;
out.printf("event: %s%n", event.type()); out.print("event: ");
out.printf("id: %s%n", id); out.println(event.eventType());
out.printf("data: %s%n", event.json()); out.print("data: ");
out.print("\n"); // terminate message with empty line out.println(event.json());
out.println();
out.println(); // terminate message with empty line
out.flush(); out.flush();
} }
private int sendBeacon(PrintWriter stream) { private int sendBeacon(InetSocketAddress addr, PrintWriter stream) {
LOG.log(DEBUG,"sending keep-alive"); LOG.log(DEBUG,"sending keep-alive to {0}",addr);
stream.print(": keep-alive\n\n"); stream.print(": keep-alive\n\n");
stream.flush(); stream.flush();
return 0; return 0;

View File

@@ -1,3 +1,4 @@
/* © SRSoftware 2025 */
package de.srsoftware.umbrella.messagebus; package de.srsoftware.umbrella.messagebus;
import java.util.HashSet; import java.util.HashSet;

View File

@@ -1,11 +1,19 @@
/* © SRSoftware 2025 */
package de.srsoftware.umbrella.messagebus; package de.srsoftware.umbrella.messagebus;
import de.srsoftware.umbrella.core.model.UmbrellaUser;
import static de.srsoftware.umbrella.core.Constants.TASK; import static de.srsoftware.umbrella.core.Constants.TASK;
public class TaskEvent extends Event{ import de.srsoftware.umbrella.core.model.Task;
public TaskEvent(UmbrellaUser initiator,Object payload, Type type){ import de.srsoftware.umbrella.core.model.UmbrellaUser;
super(initiator,TASK,payload, type); import org.json.JSONObject;
public class TaskEvent extends Event<Task>{
public TaskEvent(UmbrellaUser initiator, Task task, EventType type){
super(initiator,TASK, task, type);
}
@Override
public String json() {
return new JSONObject(payload().toMap()).toString();
} }
} }

View File

@@ -28,6 +28,7 @@ public class Constants {
public static final String CONNECTION = "Connection"; public static final String CONNECTION = "Connection";
public static final String CONTENT = "content"; public static final String CONTENT = "content";
public static final String CONTENT_DISPOSITION = "Content-Disposition"; public static final String CONTENT_DISPOSITION = "Content-Disposition";
public static final String CONTENT_ENCODING = "Content-Encoding";
public static final String CONTENT_TYPE = "Content-Type"; public static final String CONTENT_TYPE = "Content-Type";
public static final String CUSTOMER_NUMBER_PREFIX = "customer_number_prefix"; public static final String CUSTOMER_NUMBER_PREFIX = "customer_number_prefix";
@@ -89,6 +90,7 @@ public class Constants {
public static final String MIME = "mime"; public static final String MIME = "mime";
public static final String NO_CACHE = "no-cache"; public static final String NO_CACHE = "no-cache";
public static final String NO_INDEX = "no_index"; public static final String NO_INDEX = "no_index";
public static final String NONE = "none";
public static final String NOTE = "note"; public static final String NOTE = "note";
public static final String NUMBER = "number"; public static final String NUMBER = "number";

View File

@@ -1,5 +1,5 @@
<script> <script>
import { onMount } from 'svelte'; import { onDestroy, onMount } from 'svelte';
import { useTinyRouter } from 'svelte-tiny-router'; import { useTinyRouter } from 'svelte-tiny-router';
import { api, target } from '../../urls.svelte.js'; import { api, target } from '../../urls.svelte.js';
@@ -11,6 +11,8 @@
import LineEditor from '../../Components/LineEditor.svelte'; import LineEditor from '../../Components/LineEditor.svelte';
import MarkdownEditor from '../../Components/MarkdownEditor.svelte'; import MarkdownEditor from '../../Components/MarkdownEditor.svelte';
let eventSource = null;
let connectionStatus = 'disconnected';
let { id } = $props(); let { id } = $props();
let descr = $state(false); let descr = $state(false);
let filter_input = $state(''); let filter_input = $state('');
@@ -55,6 +57,12 @@
return a.name.localeCompare(b.name); return a.name.localeCompare(b.name);
} }
function connectToEventStream(){
eventSource = new EventSource(api('bus'));
eventSource.onmessage = (m) => { console.log(m); } // https://stackoverflow.com/questions/57650830/sse-onmessage-never-gets-called#comment104961222_59383080
eventSource.addEventListener('UPDATE', (event) => console.log(JSON.parse(event.data)) );
}
async function create(name,user_id,state){ async function create(name,user_id,state){
var url = api('task/add'); var url = api('task/add');
let task = { let task = {
@@ -114,6 +122,7 @@
async function load(){ async function load(){
try { try {
connectToEventStream();
await loadProject(); await loadProject();
loadTasks({project_id:+id,parent_task_id:0}); loadTasks({project_id:+id,parent_task_id:0});
} catch (ignored) {} } catch (ignored) {}
@@ -218,6 +227,11 @@
} }
onMount(load); onMount(load);
onDestroy(() => {
if (eventSource) eventSource.close();
});
</script> </script>
<svelte:head> <svelte:head>

View File

@@ -11,7 +11,7 @@ import static de.srsoftware.umbrella.core.Util.mapValues;
import static de.srsoftware.umbrella.core.exceptions.UmbrellaException.*; import static de.srsoftware.umbrella.core.exceptions.UmbrellaException.*;
import static de.srsoftware.umbrella.core.model.Permission.*; import static de.srsoftware.umbrella.core.model.Permission.*;
import static de.srsoftware.umbrella.core.model.Permission.OWNER; import static de.srsoftware.umbrella.core.model.Permission.OWNER;
import static de.srsoftware.umbrella.messagebus.Event.Type.UPDATE; import static de.srsoftware.umbrella.messagebus.Event.EventType.UPDATE;
import static de.srsoftware.umbrella.messagebus.MessageBus.messageBus; import static de.srsoftware.umbrella.messagebus.MessageBus.messageBus;
import static de.srsoftware.umbrella.project.Constants.PERMISSIONS; import static de.srsoftware.umbrella.project.Constants.PERMISSIONS;
import static de.srsoftware.umbrella.task.Constants.*; import static de.srsoftware.umbrella.task.Constants.*;
@@ -31,12 +31,10 @@ import de.srsoftware.umbrella.core.model.*;
import de.srsoftware.umbrella.core.model.Task; import de.srsoftware.umbrella.core.model.Task;
import de.srsoftware.umbrella.core.model.Token; import de.srsoftware.umbrella.core.model.Token;
import de.srsoftware.umbrella.core.model.UmbrellaUser; import de.srsoftware.umbrella.core.model.UmbrellaUser;
import de.srsoftware.umbrella.messagebus.TaskEvent;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import de.srsoftware.umbrella.messagebus.Event;
import de.srsoftware.umbrella.messagebus.TaskEvent;
import org.json.JSONArray; import org.json.JSONArray;
import org.json.JSONObject; import org.json.JSONObject;