From b123c30c5ba1537c508258ff5fb641e6a72732f3 Mon Sep 17 00:00:00 2001 From: Stephan Richter Date: Wed, 21 Jan 2026 22:06:49 +0100 Subject: [PATCH] refaturing message system, step 3: making use of queue Signed-off-by: Stephan Richter --- .../srsoftware/umbrella/core/api/PostBox.java | 2 +- .../umbrella/core/model/Envelope.java | 16 +-- .../umbrella/message/MessageQueue.java | 17 +-- .../umbrella/message/MessageSystem.java | 111 +++++++++--------- .../umbrella/message/SqliteMessageDb.java | 11 +- 5 files changed, 82 insertions(+), 75 deletions(-) diff --git a/core/src/main/java/de/srsoftware/umbrella/core/api/PostBox.java b/core/src/main/java/de/srsoftware/umbrella/core/api/PostBox.java index 74e8b485..678219f4 100644 --- a/core/src/main/java/de/srsoftware/umbrella/core/api/PostBox.java +++ b/core/src/main/java/de/srsoftware/umbrella/core/api/PostBox.java @@ -4,5 +4,5 @@ package de.srsoftware.umbrella.core.api; import de.srsoftware.umbrella.core.model.Envelope; public interface PostBox { - public void send(Envelope envelope); + public void send(Envelope envelope); } diff --git a/core/src/main/java/de/srsoftware/umbrella/core/model/Envelope.java b/core/src/main/java/de/srsoftware/umbrella/core/model/Envelope.java index 14d96517..448297e7 100644 --- a/core/src/main/java/de/srsoftware/umbrella/core/model/Envelope.java +++ b/core/src/main/java/de/srsoftware/umbrella/core/model/Envelope.java @@ -17,16 +17,16 @@ import java.util.stream.Collectors; import org.json.JSONArray; import org.json.JSONObject; -public class Envelope { - private final Message message; +public class Envelope> { + private final T message; private final Set receivers; private final LocalDateTime time; - public Envelope(Message message, User receiver){ + public Envelope(T message, User receiver){ this(message,new HashSet<>(Set.of(receiver))); } - public Envelope(Message message, HashSet receivers) { + public Envelope(T message, HashSet receivers) { this.message = message; this.receivers = receivers; time = LocalDateTime.now(); @@ -38,7 +38,7 @@ public class Envelope { * @return * @throws UmbrellaException */ - public static Envelope from(JSONObject json) throws UmbrellaException { + public static Envelope from(JSONObject json) throws UmbrellaException { if (!json.has(RECEIVERS)) throw missingField(RECEIVERS); var message = TranslatedMessage.from(json); var obj = json.get(RECEIVERS); @@ -49,12 +49,12 @@ public class Envelope { if (!(o instanceof JSONObject receiverData)) throw invalidField("entries of "+ RECEIVERS, t(JSONOBJECT)); receivers.add(User.of(receiverData)); } - return new Envelope(message,receivers); + return new Envelope<>(message,receivers); } @Override public final boolean equals(Object o) { - if (!(o instanceof Envelope envelope)) return false; + if (!(o instanceof Envelope envelope)) return false; return message.equals(envelope.message) && time.equals(envelope.time); } @@ -67,7 +67,7 @@ public class Envelope { return receivers.contains(receiver); } - public Message message(){ + public T message(){ return message; } diff --git a/messages/src/main/java/de/srsoftware/umbrella/message/MessageQueue.java b/messages/src/main/java/de/srsoftware/umbrella/message/MessageQueue.java index 1aea1c22..4b892b6d 100644 --- a/messages/src/main/java/de/srsoftware/umbrella/message/MessageQueue.java +++ b/messages/src/main/java/de/srsoftware/umbrella/message/MessageQueue.java @@ -1,15 +1,18 @@ +/* © SRSoftware 2025 */ package de.srsoftware.umbrella.message; +import de.srsoftware.umbrella.core.model.Envelope; +import de.srsoftware.umbrella.core.model.Message; import de.srsoftware.umbrella.core.model.UmbrellaUser; - import java.util.List; import java.util.Optional; import java.util.stream.Stream; -public interface MessageQueue { - public Stream getEnvelopes(); - public Stream getEnvelopesFor(UmbrellaUser user); - public Optional getEnvelope(int hash); - public void markRead(int hash, UmbrellaUser user); - public void push(T message); +public interface MessageQueue> { + public Stream> getEnvelopes(); + public List> getEnvelopesFor(UmbrellaUser user); + public Optional> getEnvelope(int hash); + Stream getReceivers(); + public Optional> markRead(int hash, UmbrellaUser user); + public void push(Envelope message); } diff --git a/messages/src/main/java/de/srsoftware/umbrella/message/MessageSystem.java b/messages/src/main/java/de/srsoftware/umbrella/message/MessageSystem.java index 7ff02f8a..d3d94bee 100644 --- a/messages/src/main/java/de/srsoftware/umbrella/message/MessageSystem.java +++ b/messages/src/main/java/de/srsoftware/umbrella/message/MessageSystem.java @@ -40,7 +40,6 @@ import jakarta.mail.internet.MimeMultipart; import jakarta.mail.util.ByteArrayDataSource; import java.io.IOException; import java.util.*; -import java.util.concurrent.CopyOnWriteArrayList; import org.json.JSONArray; import org.json.JSONObject; @@ -77,15 +76,18 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener } private final String from,host,user,pass; private final int port; - private final SqliteMessageDb db; + private final MessageDb db; + private final MessageQueue queue; private Session session; - private final List> queue = new CopyOnWriteArrayList<>(); - private String debugAddress; + private final String debugAddress; private final HashMap> exceptions = new HashMap<>(); public MessageSystem(Configuration config) throws UmbrellaException { var dbFile = config.get(CONFIG_DB).orElseThrow(() -> missingConfig(CONFIG_DB)); - db = new SqliteMessageDb(connect(dbFile)); + var sqlite = new SqliteMessageDb(connect(dbFile)); + db = sqlite; + queue = sqlite; + debugAddress = config.get(DEBUG_ADDREESS).map(Object::toString).orElse(null); port = config.get(CONFIG_SMTP_PORT,587); host = config.get(CONFIG_SMTP_HOST).map(Object::toString).orElseThrow(() -> new RuntimeException("umbrella.modules.message.smtp.host not configured!")); @@ -151,12 +153,7 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener } private boolean getMessage(HttpExchange ex, UmbrellaUser user, int hash) throws IOException { - var envel = db.getEnvelope(hash).filter(env -> env.isFor(user)); - if (envel.isPresent()) return sendMessage(ex, user, envel.get()); - var envelope = queue.stream() - .filter(msg -> msg.isFor(user)) - .filter(msg -> msg.hashCode() == hash) - .findFirst(); + var envelope = queue.getEnvelope(hash).filter(env -> env.isFor(user)); if (envelope.isPresent()) return sendMessage(ex, user, envelope.get()); return notFound(ex); } @@ -166,9 +163,7 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener } private boolean listMessages(HttpExchange ex, UmbrellaUser user) throws IOException { - var msgs = db.getEnvelopesFor(user).map(e -> summary(e, user.language())).toList(); - if (!msgs.isEmpty()) return sendContent(ex,msgs); - var messages = queue.stream().filter(e -> e.isFor(user)).map(e -> summary(e, user.language())).toList(); + var messages = queue.getEnvelopesFor(user).stream().map(e -> summary(e, user.language())); return sendContent(ex,messages); } @@ -184,7 +179,7 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener private boolean patchSettings(HttpExchange ex, UmbrellaUser user) throws IOException { var json = json(ex); - Settings settings = null; + Settings settings; if (json.has(INSTANTLY) && json.get(INSTANTLY) instanceof Boolean b && b){ settings = new Instantly(); } else { @@ -199,49 +194,37 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener private boolean markRead(HttpExchange ex, UmbrellaUser user, String path) { try { var hash = Integer.parseInt(path); - db.markRead(hash, user); - var envelope = queue.stream().filter(env -> env.hashCode() == hash).findFirst().orElse(null); - if (envelope != null){ - envelope.receivers().remove(user); - return sendMessage(ex,user,envelope); - } + var envelope = queue.markRead(hash, user); + if (envelope.isPresent()) return sendMessage(ex,user,envelope.get()); return notFound(ex); } catch (NumberFormatException | IOException e) { throw invalidField(HASH,LONG); } } + private boolean sendAt(UmbrellaUser user, Integer scheduledHour){ + try { + return db.getSettings(user).sendAt(scheduledHour); + } catch (UmbrellaException ignored) { + return true; + } + } + private synchronized void processMessages(Integer scheduledHour) { LOG.log(INFO,"Running {0}…",scheduledHour == null ? "instantly" : "scheduled at "+scheduledHour); - var queue = new ArrayList<>(this.queue); - var dueRecipients = new ArrayList(); - List recipients = queue.stream().map(Envelope::receivers).flatMap(Set::stream).filter(Objects::nonNull).distinct().toList(); - { // for known users: get notification preferences, fallback to _immediately_ for unknown users - for (User recv : recipients) { - if (recv instanceof UmbrellaUser uu) { - try { - if (!db.getSettings(uu).sendAt(scheduledHour)) continue; - } catch (UmbrellaException ignored) {} - } - dueRecipients.add(recv); - } - } + var dueRecipients = queue.getReceivers().filter(uu -> sendAt(uu,scheduledHour)).toList(); var date = new Date(); for (var receiver : dueRecipients){ var combined = new CombinedMessage(t("Collected messages"),receiver); - var envelopes = queue.stream().filter(env -> env.isFor(receiver)).toList(); - for (var envelope : envelopes) combined.merge(envelope.message()); try { + var envelopes = queue.getEnvelopesFor(receiver); + envelopes.stream().map(Envelope::message).forEach(combined::merge); send(combined,date); - for (var envelope : envelopes){ - var audience = envelope.receivers(); - audience.remove(receiver); - if (audience.isEmpty()) queue.remove(envelope); - } + envelopes.forEach(env -> queue.markRead(env.hashCode(),receiver)); } catch (Exception ex){ LOG.log(WARNING,"Failed to deliver mail ({0}) to {1}.",combined.subject(),receiver,ex); for (var message : combined.messages()) exceptions.computeIfAbsent(new Receiver(receiver,message), k -> new ArrayList<>()).add(ex); @@ -262,17 +245,6 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener )); } - private static JSONObject summary(Envelope envelope, String lang) { - var message = envelope.message(); - if (message instanceof TranslatableMessage tm) message = tm.translate(lang); - - var sender = message.sender().name(); - var subject = message.subject(); - var time = envelope.time().format(TIME_FORMATTER); - var hash = envelope.hashCode(); - return new JSONObject(Map.of(SENDER,sender,SUBJECT,subject,TIMESTAMP,time,HASH,hash)); - } - private void send(CombinedMessage message, Date date) throws MessagingException { var receiver = message.receiver(); @@ -311,9 +283,29 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener } @Override + @SuppressWarnings("unchecked") public void send(Envelope envelope) { - queue.add(envelope); - db.push(envelope); + switch (envelope.message()){ + case TranslatedMessage ignored: + queue.push((Envelope) envelope); + break; + case TranslatableMessage tm: + Map> map = new HashMap<>(); + for (var receiver : envelope.receivers()){ + var lang = receiver.language(); + var env = map.get(lang); + if (env == null){ + TranslatedMessage translated = tm.translate(lang); + env = new Envelope<>(translated,new HashSet<>()); + map.put(lang,env); + } + env.receivers().add(receiver); + } + map.values().forEach(queue::push); + default: + return; + } + new Thread(() -> processMessages(null)).start(); } @@ -330,8 +322,15 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener return session; } - public void setDebugAddress(String newVal) { - this.debugAddress = newVal; + private static JSONObject summary(Envelope envelope, String lang) { + var message = envelope.message(); + if (message instanceof TranslatableMessage tm) message = tm.translate(lang); + + var sender = message.sender().name(); + var subject = message.subject(); + var time = envelope.time().format(TIME_FORMATTER); + var hash = envelope.hashCode(); + return new JSONObject(Map.of(SENDER,sender,SUBJECT,subject,TIMESTAMP,time,HASH,hash)); } } diff --git a/messages/src/main/java/de/srsoftware/umbrella/message/SqliteMessageDb.java b/messages/src/main/java/de/srsoftware/umbrella/message/SqliteMessageDb.java index d0edff95..3c3918f7 100644 --- a/messages/src/main/java/de/srsoftware/umbrella/message/SqliteMessageDb.java +++ b/messages/src/main/java/de/srsoftware/umbrella/message/SqliteMessageDb.java @@ -28,7 +28,7 @@ import java.util.List; import java.util.Optional; import java.util.stream.Stream; -public class SqliteMessageDb implements MessageDb, MessageQueue> { +public class SqliteMessageDb implements MessageDb, MessageQueue { private static final System.Logger LOG = System.getLogger(SqliteMessageDb.class.getSimpleName()); private final Connection db; private static final String DB_VERSION = "message_db_version"; @@ -97,10 +97,15 @@ CREATE TABLE IF NOT EXISTS {0} ( {1} VARCHAR(255) PRIMARY KEY, {2} VARCHAR(255) } @Override - public Stream> getEnvelopesFor(UmbrellaUser user) { + public List> getEnvelopesFor(UmbrellaUser user) { throw new UmbrellaException(HTTP_SERVER_ERROR,"{class}.getEnvelopesFor({user}) not implemented!","class",getClass().getSimpleName(),"user",user.name()); // TODO } + @Override + public Stream getReceivers() { + return Stream.empty(); + } + @Override public Settings getSettings(UmbrellaUser user) throws UmbrellaException { try { @@ -120,7 +125,7 @@ CREATE TABLE IF NOT EXISTS {0} ( {1} VARCHAR(255) PRIMARY KEY, {2} VARCHAR(255) } @Override - public void markRead(int hash, UmbrellaUser user) { + public Optional> markRead(int hash, UmbrellaUser user) { throw new UmbrellaException(HTTP_SERVER_ERROR,"{class}.markRead(hash, user) not implemented!","class",getClass().getSimpleName()); // TODO // TODO: throw exception if message not found! }