refaturing message system, step 3: making use of queue

Signed-off-by: Stephan Richter <s.richter@srsoftware.de>
This commit is contained in:
2026-01-21 22:06:49 +01:00
parent 6d3c7cb14b
commit b123c30c5b
5 changed files with 82 additions and 75 deletions

View File

@@ -4,5 +4,5 @@ package de.srsoftware.umbrella.core.api;
import de.srsoftware.umbrella.core.model.Envelope; import de.srsoftware.umbrella.core.model.Envelope;
public interface PostBox { public interface PostBox {
public void send(Envelope envelope); public void send(Envelope<?> envelope);
} }

View File

@@ -17,16 +17,16 @@ import java.util.stream.Collectors;
import org.json.JSONArray; import org.json.JSONArray;
import org.json.JSONObject; import org.json.JSONObject;
public class Envelope<T> { public class Envelope<T extends Message<?>> {
private final Message<T> message; private final T message;
private final Set<User> receivers; private final Set<User> receivers;
private final LocalDateTime time; private final LocalDateTime time;
public Envelope(Message<T> message, User receiver){ public Envelope(T message, User receiver){
this(message,new HashSet<>(Set.of(receiver))); this(message,new HashSet<>(Set.of(receiver)));
} }
public Envelope(Message<T> message, HashSet<User> receivers) { public Envelope(T message, HashSet<User> receivers) {
this.message = message; this.message = message;
this.receivers = receivers; this.receivers = receivers;
time = LocalDateTime.now(); time = LocalDateTime.now();
@@ -38,7 +38,7 @@ public class Envelope<T> {
* @return * @return
* @throws UmbrellaException * @throws UmbrellaException
*/ */
public static Envelope from(JSONObject json) throws UmbrellaException { public static Envelope<TranslatedMessage> from(JSONObject json) throws UmbrellaException {
if (!json.has(RECEIVERS)) throw missingField(RECEIVERS); if (!json.has(RECEIVERS)) throw missingField(RECEIVERS);
var message = TranslatedMessage.from(json); var message = TranslatedMessage.from(json);
var obj = json.get(RECEIVERS); var obj = json.get(RECEIVERS);
@@ -49,12 +49,12 @@ public class Envelope<T> {
if (!(o instanceof JSONObject receiverData)) throw invalidField("entries of "+ RECEIVERS, t(JSONOBJECT)); if (!(o instanceof JSONObject receiverData)) throw invalidField("entries of "+ RECEIVERS, t(JSONOBJECT));
receivers.add(User.of(receiverData)); receivers.add(User.of(receiverData));
} }
return new Envelope(message,receivers); return new Envelope<>(message,receivers);
} }
@Override @Override
public final boolean equals(Object o) { public final boolean equals(Object o) {
if (!(o instanceof Envelope envelope)) return false; if (!(o instanceof Envelope<?> envelope)) return false;
return message.equals(envelope.message) && time.equals(envelope.time); return message.equals(envelope.message) && time.equals(envelope.time);
} }
@@ -67,7 +67,7 @@ public class Envelope<T> {
return receivers.contains(receiver); return receivers.contains(receiver);
} }
public Message<T> message(){ public T message(){
return message; return message;
} }

View File

@@ -1,15 +1,18 @@
/* © SRSoftware 2025 */
package de.srsoftware.umbrella.message; package de.srsoftware.umbrella.message;
import de.srsoftware.umbrella.core.model.Envelope;
import de.srsoftware.umbrella.core.model.Message;
import de.srsoftware.umbrella.core.model.UmbrellaUser; import de.srsoftware.umbrella.core.model.UmbrellaUser;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Stream; import java.util.stream.Stream;
public interface MessageQueue<T> { public interface MessageQueue<T extends Message<?>> {
public Stream<T> getEnvelopes(); public Stream<Envelope<T>> getEnvelopes();
public Stream<T> getEnvelopesFor(UmbrellaUser user); public List<Envelope<T>> getEnvelopesFor(UmbrellaUser user);
public Optional<T> getEnvelope(int hash); public Optional<Envelope<T>> getEnvelope(int hash);
public void markRead(int hash, UmbrellaUser user); Stream<UmbrellaUser> getReceivers();
public void push(T message); public Optional<Envelope<T>> markRead(int hash, UmbrellaUser user);
public void push(Envelope<T> message);
} }

View File

@@ -40,7 +40,6 @@ import jakarta.mail.internet.MimeMultipart;
import jakarta.mail.util.ByteArrayDataSource; import jakarta.mail.util.ByteArrayDataSource;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import org.json.JSONArray; import org.json.JSONArray;
import org.json.JSONObject; import org.json.JSONObject;
@@ -77,15 +76,18 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
} }
private final String from,host,user,pass; private final String from,host,user,pass;
private final int port; private final int port;
private final SqliteMessageDb db; private final MessageDb db;
private final MessageQueue<TranslatedMessage> queue;
private Session session; private Session session;
private final List<Envelope<?>> queue = new CopyOnWriteArrayList<>(); private final String debugAddress;
private String debugAddress;
private final HashMap<Receiver,List<Exception>> exceptions = new HashMap<>(); private final HashMap<Receiver,List<Exception>> exceptions = new HashMap<>();
public MessageSystem(Configuration config) throws UmbrellaException { public MessageSystem(Configuration config) throws UmbrellaException {
var dbFile = config.get(CONFIG_DB).orElseThrow(() -> missingConfig(CONFIG_DB)); var dbFile = config.get(CONFIG_DB).orElseThrow(() -> missingConfig(CONFIG_DB));
db = new SqliteMessageDb(connect(dbFile)); var sqlite = new SqliteMessageDb(connect(dbFile));
db = sqlite;
queue = sqlite;
debugAddress = config.get(DEBUG_ADDREESS).map(Object::toString).orElse(null); debugAddress = config.get(DEBUG_ADDREESS).map(Object::toString).orElse(null);
port = config.get(CONFIG_SMTP_PORT,587); port = config.get(CONFIG_SMTP_PORT,587);
host = config.get(CONFIG_SMTP_HOST).map(Object::toString).orElseThrow(() -> new RuntimeException("umbrella.modules.message.smtp.host not configured!")); host = config.get(CONFIG_SMTP_HOST).map(Object::toString).orElseThrow(() -> new RuntimeException("umbrella.modules.message.smtp.host not configured!"));
@@ -151,12 +153,7 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
} }
private boolean getMessage(HttpExchange ex, UmbrellaUser user, int hash) throws IOException { private boolean getMessage(HttpExchange ex, UmbrellaUser user, int hash) throws IOException {
var envel = db.getEnvelope(hash).filter(env -> env.isFor(user)); var envelope = queue.getEnvelope(hash).filter(env -> env.isFor(user));
if (envel.isPresent()) return sendMessage(ex, user, envel.get());
var envelope = queue.stream()
.filter(msg -> msg.isFor(user))
.filter(msg -> msg.hashCode() == hash)
.findFirst();
if (envelope.isPresent()) return sendMessage(ex, user, envelope.get()); if (envelope.isPresent()) return sendMessage(ex, user, envelope.get());
return notFound(ex); return notFound(ex);
} }
@@ -166,9 +163,7 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
} }
private boolean listMessages(HttpExchange ex, UmbrellaUser user) throws IOException { private boolean listMessages(HttpExchange ex, UmbrellaUser user) throws IOException {
var msgs = db.getEnvelopesFor(user).map(e -> summary(e, user.language())).toList(); var messages = queue.getEnvelopesFor(user).stream().map(e -> summary(e, user.language()));
if (!msgs.isEmpty()) return sendContent(ex,msgs);
var messages = queue.stream().filter(e -> e.isFor(user)).map(e -> summary(e, user.language())).toList();
return sendContent(ex,messages); return sendContent(ex,messages);
} }
@@ -184,7 +179,7 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
private boolean patchSettings(HttpExchange ex, UmbrellaUser user) throws IOException { private boolean patchSettings(HttpExchange ex, UmbrellaUser user) throws IOException {
var json = json(ex); var json = json(ex);
Settings settings = null; Settings settings;
if (json.has(INSTANTLY) && json.get(INSTANTLY) instanceof Boolean b && b){ if (json.has(INSTANTLY) && json.get(INSTANTLY) instanceof Boolean b && b){
settings = new Instantly(); settings = new Instantly();
} else { } else {
@@ -199,49 +194,37 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
private boolean markRead(HttpExchange ex, UmbrellaUser user, String path) { private boolean markRead(HttpExchange ex, UmbrellaUser user, String path) {
try { try {
var hash = Integer.parseInt(path); var hash = Integer.parseInt(path);
db.markRead(hash, user); var envelope = queue.markRead(hash, user);
var envelope = queue.stream().filter(env -> env.hashCode() == hash).findFirst().orElse(null); if (envelope.isPresent()) return sendMessage(ex,user,envelope.get());
if (envelope != null){
envelope.receivers().remove(user);
return sendMessage(ex,user,envelope);
}
return notFound(ex); return notFound(ex);
} catch (NumberFormatException | IOException e) { } catch (NumberFormatException | IOException e) {
throw invalidField(HASH,LONG); throw invalidField(HASH,LONG);
} }
} }
private boolean sendAt(UmbrellaUser user, Integer scheduledHour){
try {
return db.getSettings(user).sendAt(scheduledHour);
} catch (UmbrellaException ignored) {
return true;
}
}
private synchronized void processMessages(Integer scheduledHour) { private synchronized void processMessages(Integer scheduledHour) {
LOG.log(INFO,"Running {0}…",scheduledHour == null ? "instantly" : "scheduled at "+scheduledHour); LOG.log(INFO,"Running {0}…",scheduledHour == null ? "instantly" : "scheduled at "+scheduledHour);
var queue = new ArrayList<>(this.queue);
var dueRecipients = new ArrayList<User>();
List<User> recipients = queue.stream().map(Envelope::receivers).flatMap(Set::stream).filter(Objects::nonNull).distinct().toList();
{ // for known users: get notification preferences, fallback to _immediately_ for unknown users var dueRecipients = queue.getReceivers().filter(uu -> sendAt(uu,scheduledHour)).toList();
for (User recv : recipients) {
if (recv instanceof UmbrellaUser uu) {
try {
if (!db.getSettings(uu).sendAt(scheduledHour)) continue;
} catch (UmbrellaException ignored) {}
}
dueRecipients.add(recv);
}
}
var date = new Date(); var date = new Date();
for (var receiver : dueRecipients){ for (var receiver : dueRecipients){
var combined = new CombinedMessage(t("Collected messages"),receiver); var combined = new CombinedMessage(t("Collected messages"),receiver);
var envelopes = queue.stream().filter(env -> env.isFor(receiver)).toList();
for (var envelope : envelopes) combined.merge(envelope.message());
try { try {
var envelopes = queue.getEnvelopesFor(receiver);
envelopes.stream().map(Envelope::message).forEach(combined::merge);
send(combined,date); send(combined,date);
for (var envelope : envelopes){ envelopes.forEach(env -> queue.markRead(env.hashCode(),receiver));
var audience = envelope.receivers();
audience.remove(receiver);
if (audience.isEmpty()) queue.remove(envelope);
}
} catch (Exception ex){ } catch (Exception ex){
LOG.log(WARNING,"Failed to deliver mail ({0}) to {1}.",combined.subject(),receiver,ex); LOG.log(WARNING,"Failed to deliver mail ({0}) to {1}.",combined.subject(),receiver,ex);
for (var message : combined.messages()) exceptions.computeIfAbsent(new Receiver(receiver,message), k -> new ArrayList<>()).add(ex); for (var message : combined.messages()) exceptions.computeIfAbsent(new Receiver(receiver,message), k -> new ArrayList<>()).add(ex);
@@ -262,17 +245,6 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
)); ));
} }
private static JSONObject summary(Envelope<?> envelope, String lang) {
var message = envelope.message();
if (message instanceof TranslatableMessage tm) message = tm.translate(lang);
var sender = message.sender().name();
var subject = message.subject();
var time = envelope.time().format(TIME_FORMATTER);
var hash = envelope.hashCode();
return new JSONObject(Map.of(SENDER,sender,SUBJECT,subject,TIMESTAMP,time,HASH,hash));
}
private void send(CombinedMessage message, Date date) throws MessagingException { private void send(CombinedMessage message, Date date) throws MessagingException {
var receiver = message.receiver(); var receiver = message.receiver();
@@ -311,9 +283,29 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
} }
@Override @Override
@SuppressWarnings("unchecked")
public void send(Envelope<?> envelope) { public void send(Envelope<?> envelope) {
queue.add(envelope); switch (envelope.message()){
db.push(envelope); case TranslatedMessage ignored:
queue.push((Envelope<TranslatedMessage>) envelope);
break;
case TranslatableMessage tm:
Map<String,Envelope<TranslatedMessage>> map = new HashMap<>();
for (var receiver : envelope.receivers()){
var lang = receiver.language();
var env = map.get(lang);
if (env == null){
TranslatedMessage translated = tm.translate(lang);
env = new Envelope<>(translated,new HashSet<>());
map.put(lang,env);
}
env.receivers().add(receiver);
}
map.values().forEach(queue::push);
default:
return;
}
new Thread(() -> processMessages(null)).start(); new Thread(() -> processMessages(null)).start();
} }
@@ -330,8 +322,15 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
return session; return session;
} }
public void setDebugAddress(String newVal) { private static JSONObject summary(Envelope<?> envelope, String lang) {
this.debugAddress = newVal; var message = envelope.message();
if (message instanceof TranslatableMessage tm) message = tm.translate(lang);
var sender = message.sender().name();
var subject = message.subject();
var time = envelope.time().format(TIME_FORMATTER);
var hash = envelope.hashCode();
return new JSONObject(Map.of(SENDER,sender,SUBJECT,subject,TIMESTAMP,time,HASH,hash));
} }
} }

View File

@@ -28,7 +28,7 @@ import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Stream; import java.util.stream.Stream;
public class SqliteMessageDb implements MessageDb, MessageQueue<Envelope<TranslatedMessage>> { public class SqliteMessageDb implements MessageDb, MessageQueue<TranslatedMessage> {
private static final System.Logger LOG = System.getLogger(SqliteMessageDb.class.getSimpleName()); private static final System.Logger LOG = System.getLogger(SqliteMessageDb.class.getSimpleName());
private final Connection db; private final Connection db;
private static final String DB_VERSION = "message_db_version"; private static final String DB_VERSION = "message_db_version";
@@ -97,10 +97,15 @@ CREATE TABLE IF NOT EXISTS {0} ( {1} VARCHAR(255) PRIMARY KEY, {2} VARCHAR(255)
} }
@Override @Override
public Stream<Envelope<TranslatedMessage>> getEnvelopesFor(UmbrellaUser user) { public List<Envelope<TranslatedMessage>> getEnvelopesFor(UmbrellaUser user) {
throw new UmbrellaException(HTTP_SERVER_ERROR,"{class}.getEnvelopesFor({user}) not implemented!","class",getClass().getSimpleName(),"user",user.name()); // TODO throw new UmbrellaException(HTTP_SERVER_ERROR,"{class}.getEnvelopesFor({user}) not implemented!","class",getClass().getSimpleName(),"user",user.name()); // TODO
} }
@Override
public Stream<UmbrellaUser> getReceivers() {
return Stream.empty();
}
@Override @Override
public Settings getSettings(UmbrellaUser user) throws UmbrellaException { public Settings getSettings(UmbrellaUser user) throws UmbrellaException {
try { try {
@@ -120,7 +125,7 @@ CREATE TABLE IF NOT EXISTS {0} ( {1} VARCHAR(255) PRIMARY KEY, {2} VARCHAR(255)
} }
@Override @Override
public void markRead(int hash, UmbrellaUser user) { public Optional<Envelope<TranslatedMessage>> markRead(int hash, UmbrellaUser user) {
throw new UmbrellaException(HTTP_SERVER_ERROR,"{class}.markRead(hash, user) not implemented!","class",getClass().getSimpleName()); // TODO throw new UmbrellaException(HTTP_SERVER_ERROR,"{class}.markRead(hash, user) not implemented!","class",getClass().getSimpleName()); // TODO
// TODO: throw exception if message not found! // TODO: throw exception if message not found!
} }