Merge branch 'feature/notifications' into dev
All checks were successful
Build Docker Image / Docker-Build (push) Successful in 2m18s
Build Docker Image / Clean-Registry (push) Successful in 0s

This commit is contained in:
2026-01-21 23:43:55 +01:00
16 changed files with 306 additions and 136 deletions

View File

@@ -0,0 +1,60 @@
/* © SRSoftware 2025 */
package de.srsoftware.umbrella.message;
import static java.lang.System.Logger.Level.DEBUG;
import de.srsoftware.umbrella.core.model.Envelope;
import de.srsoftware.umbrella.core.model.TranslatedMessage;
import de.srsoftware.umbrella.core.model.User;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class InMemoryQueue implements MessageQueue<TranslatedMessage> {
public static final System.Logger LOG = System.getLogger(InMemoryQueue.class.getSimpleName());
private LinkedList<Envelope<TranslatedMessage>> queue = new LinkedList<>();
@Override
public Stream<Envelope<TranslatedMessage>> getEnvelopes() {
return queue.stream();
}
@Override
public List<Envelope<TranslatedMessage>> getEnvelopesFor(User user) {
return getEnvelopes().filter(env -> env.isFor(user)).toList();
}
@Override
public Optional<Envelope<TranslatedMessage>> getEnvelope(int hash) {
return getEnvelopes().filter(env -> env.hashCode() == hash).findAny();
}
@Override
public Stream<User> getReceivers() {
return getEnvelopes().map(Envelope::receivers).flatMap(Set::stream).distinct();
}
@Override
public Optional<Envelope<TranslatedMessage>> markRead(int hash, User user) {
for (var env : queue){
if (env.hashCode() == hash) {
LOG.log(DEBUG,"Removing {0} from receiver list of {1}…",user.name(),env.message().subject());
env.receivers().remove(user);
if (env.receivers().isEmpty()) {
LOG.log(DEBUG,"No more due receiers, removing {0} from queue…",env.message().subject());
queue.remove(env);
}
return Optional.of(env);
}
}
return Optional.empty();
}
@Override
public void push(Envelope<TranslatedMessage> envelope) {
queue.add(envelope);
LOG.log(DEBUG,"{0} for {1} pushed to queue",envelope.message().subject(),envelope.receivers().stream().map(User::name).collect(Collectors.joining(", ")));
}
}

View File

@@ -0,0 +1,18 @@
/* © SRSoftware 2025 */
package de.srsoftware.umbrella.message;
import de.srsoftware.umbrella.core.model.Envelope;
import de.srsoftware.umbrella.core.model.Message;
import de.srsoftware.umbrella.core.model.User;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
public interface MessageQueue<T extends Message<?>> {
public Stream<Envelope<T>> getEnvelopes();
public List<Envelope<T>> getEnvelopesFor(User user);
public Optional<Envelope<T>> getEnvelope(int hash);
Stream<User> getReceivers();
public Optional<Envelope<T>> markRead(int hash, User user);
public void push(Envelope<T> envelope);
}

View File

@@ -25,10 +25,7 @@ import de.srsoftware.umbrella.core.BaseHandler;
import de.srsoftware.umbrella.core.ModuleRegistry;
import de.srsoftware.umbrella.core.api.PostBox;
import de.srsoftware.umbrella.core.exceptions.UmbrellaException;
import de.srsoftware.umbrella.core.model.Envelope;
import de.srsoftware.umbrella.core.model.Token;
import de.srsoftware.umbrella.core.model.UmbrellaUser;
import de.srsoftware.umbrella.core.model.User;
import de.srsoftware.umbrella.core.model.*;
import de.srsoftware.umbrella.message.model.*;
import de.srsoftware.umbrella.messagebus.EventListener;
import de.srsoftware.umbrella.messagebus.events.Event;
@@ -43,8 +40,6 @@ import jakarta.mail.internet.MimeMultipart;
import jakarta.mail.util.ByteArrayDataSource;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import org.json.JSONArray;
import org.json.JSONObject;
@@ -52,7 +47,7 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
public static final System.Logger LOG = System.getLogger(MessageSystem.class.getSimpleName());
private final Timer timer = new Timer();
private record Receiver(User user, de.srsoftware.umbrella.core.model.Message message){}
private record Receiver(User user, Message<?> message){}
private class SubmissionTask extends TimerTask{
@@ -81,15 +76,17 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
}
private final String from,host,user,pass;
private final int port;
private final SqliteMessageDb db;
private final MessageDb db;
private final MessageQueue<TranslatedMessage> queue;
private Session session;
private final List<Envelope> queue = new CopyOnWriteArrayList<>();
private String debugAddress;
private final String debugAddress;
private final HashMap<Receiver,List<Exception>> exceptions = new HashMap<>();
public MessageSystem(Configuration config) throws UmbrellaException {
var dbFile = config.get(CONFIG_DB).orElseThrow(() -> missingConfig(CONFIG_DB));
db = new SqliteMessageDb(connect(dbFile));
queue = new InMemoryQueue();
debugAddress = config.get(DEBUG_ADDREESS).map(Object::toString).orElse(null);
port = config.get(CONFIG_SMTP_PORT,587);
host = config.get(CONFIG_SMTP_HOST).map(Object::toString).orElseThrow(() -> new RuntimeException("umbrella.modules.message.smtp.host not configured!"));
@@ -144,7 +141,7 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
var head = path.pop();
return switch (head){
case SETTINGS -> patchSettings(ex,user.get());
case READ -> patchState(ex,user.get(),path.pop());
case READ -> markRead(ex,user.get(),path.pop());
default -> super.doGet(path,ex);
};
} catch (NumberFormatException e){
@@ -155,10 +152,7 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
}
private boolean getMessage(HttpExchange ex, UmbrellaUser user, int hash) throws IOException {
var envelope = queue.stream()
.filter(msg -> msg.isFor(user))
.filter(msg -> msg.hashCode() == hash)
.findFirst();
var envelope = queue.getEnvelope(hash).filter(env -> env.isFor(user));
if (envelope.isPresent()) return sendMessage(ex, user, envelope.get());
return notFound(ex);
}
@@ -168,23 +162,19 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
}
private boolean listMessages(HttpExchange ex, UmbrellaUser user) throws IOException {
var messages = queue.stream().filter(e -> e.isFor(user)).map(e -> summary(e, user.language())).toList();
var messages = queue.getEnvelopesFor(user).stream().map(e -> summary(e, user.language()));
return sendContent(ex,messages);
}
@Override
public void onEvent(Event<?> event) {
for (var user : event.audience()){
if (debugAddress != null && !debugAddress.equals(user.email().toString())) continue;
var message = new de.srsoftware.umbrella.core.model.Message(event.initiator(),event.subject(),event.describe(),null);
var envelope = new Envelope(message,user);
send(envelope);
}
var message = new TranslatableMessage(event.initiator(),event.subject(),event.describe(),null);
send(new Envelope<>(message,event.audience()));
}
private boolean patchSettings(HttpExchange ex, UmbrellaUser user) throws IOException {
var json = json(ex);
Settings settings = null;
Settings settings;
if (json.has(INSTANTLY) && json.get(INSTANTLY) instanceof Boolean b && b){
settings = new Instantly();
} else {
@@ -196,51 +186,43 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
return sendContent(ex,db.update(user,settings));
}
private boolean patchState(HttpExchange ex, UmbrellaUser user, String path) {
private boolean markRead(HttpExchange ex, UmbrellaUser user, String path) {
try {
var hash = Integer.parseInt(path);
var envelope = queue.stream().filter(env -> env.hashCode() == hash).findFirst().orElse(null);
if (envelope != null){
envelope.receivers().remove(user);
return sendMessage(ex,user,envelope);
}
var envelope = queue.markRead(hash, user);
if (envelope.isPresent()) return sendMessage(ex,user,envelope.get());
return notFound(ex);
} catch (NumberFormatException | IOException e) {
throw invalidField(HASH,LONG);
}
}
private boolean sendAt(User user, Integer scheduledHour){
try {
if (user instanceof UmbrellaUser uu) return db.getSettings(uu).sendAt(scheduledHour);
} catch (UmbrellaException ignored) {}
return true;
}
private synchronized void processMessages(Integer scheduledHour) {
LOG.log(INFO,"Running {0}…",scheduledHour == null ? "instantly" : "scheduled at "+scheduledHour);
var queue = new ArrayList<>(this.queue);
var dueRecipients = new ArrayList<User>();
List<User> recipients = queue.stream().map(Envelope::receivers).flatMap(Set::stream).filter(Objects::nonNull).distinct().toList();
{ // for known users: get notification preferences, fallback to _immediately_ for unknown users
for (User recv : recipients) {
if (recv instanceof UmbrellaUser uu) {
try {
if (!db.getSettings(uu).sendAt(scheduledHour)) continue;
} catch (UmbrellaException ignored) {}
}
dueRecipients.add(recv);
}
}
var dueRecipients = queue.getReceivers().filter(uu -> sendAt(uu,scheduledHour)).toList();
var date = new Date();
for (var receiver : dueRecipients){
if (debugAddress != null && !debugAddress.equals(receiver.email().toString())) {
LOG.log(DEBUG,"Debug address is set to {0}, ignoring mail to {1}",debugAddress,receiver);
continue;
}
var combined = new CombinedMessage(t("Collected messages"),receiver);
var envelopes = queue.stream().filter(env -> env.isFor(receiver)).toList();
for (var envelope : envelopes) combined.merge(envelope.message());
try {
var envelopes = queue.getEnvelopesFor(receiver);
envelopes.stream().map(Envelope::message).forEach(combined::merge);
send(combined,date);
for (var envelope : envelopes){
var audience = envelope.receivers();
audience.remove(receiver);
if (audience.isEmpty()) queue.remove(envelope);
}
envelopes.forEach(env -> queue.markRead(env.hashCode(),receiver));
} catch (Exception ex){
LOG.log(WARNING,"Failed to deliver mail ({0}) to {1}.",combined.subject(),receiver,ex);
for (var message : combined.messages()) exceptions.computeIfAbsent(new Receiver(receiver,message), k -> new ArrayList<>()).add(ex);
@@ -251,26 +233,16 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
if (scheduledHour != null) new SubmissionTask(scheduledHour).schedule();
}
private boolean sendMessage(HttpExchange ex, UmbrellaUser user, Envelope envelope) throws IOException {
private boolean sendMessage(HttpExchange ex, UmbrellaUser user, Envelope<?> envelope) throws IOException {
var message = envelope.message();
var sender = message.sender().name();
var subject = message.subject().translate(user.language());
var body = message.body().translate(user.language());
if (message instanceof TranslatableMessage tm) message = tm.translate(user.language());
return sendContent(ex,Map.of(
SENDER,sender,
SUBJECT,subject,
BODY,body
SENDER,message.sender(),
SUBJECT,message.subject(),
BODY,message.body()
));
}
private static JSONObject summary(Envelope envelope, String lang) {
var sender = envelope.message().sender().name();
var subject = envelope.message().subject().translate(lang);
var time = envelope.time().format(TIME_FORMATTER);
var hash = envelope.hashCode();
return new JSONObject(Map.of(SENDER,sender,SUBJECT,subject,TIMESTAMP,time,HASH,hash));
}
private void send(CombinedMessage message, Date date) throws MessagingException {
var receiver = message.receiver();
@@ -309,8 +281,30 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
}
@Override
public void send(Envelope envelope) {
queue.add(envelope);
@SuppressWarnings("unchecked")
public void send(Envelope<?> envelope) {
switch (envelope.message()){
case TranslatedMessage ignored:
queue.push((Envelope<TranslatedMessage>) envelope);
break;
case TranslatableMessage tm:
Map<String,Envelope<TranslatedMessage>> map = new HashMap<>();
for (var receiver : envelope.receivers()){
var lang = receiver.language();
var env = map.get(lang);
if (env == null){
TranslatedMessage translated = tm.translate(lang);
env = new Envelope<>(translated,new HashSet<>());
map.put(lang,env);
}
env.receivers().add(receiver);
}
map.values().forEach(queue::push);
break;
default:
return;
}
new Thread(() -> processMessages(null)).start();
}
@@ -327,8 +321,15 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
return session;
}
public void setDebugAddress(String newVal) {
this.debugAddress = newVal;
private static JSONObject summary(Envelope<?> envelope, String lang) {
var message = envelope.message();
if (message instanceof TranslatableMessage tm) message = tm.translate(lang);
var sender = message.sender().name();
var subject = message.subject();
var time = envelope.time().format(TIME_FORMATTER);
var hash = envelope.hashCode();
return new JSONObject(Map.of(SENDER,sender,SUBJECT,subject,TIMESTAMP,time,HASH,hash));
}
}

View File

@@ -4,6 +4,7 @@ package de.srsoftware.umbrella.message;
import static de.srsoftware.tools.jdbc.Condition.equal;
import static de.srsoftware.tools.jdbc.Query.*;
import static de.srsoftware.umbrella.core.Errors.*;
import static de.srsoftware.umbrella.core.ResponseCode.HTTP_SERVER_ERROR;
import static de.srsoftware.umbrella.core.constants.Constants.TABLE_SETTINGS;
import static de.srsoftware.umbrella.core.constants.Field.*;
import static de.srsoftware.umbrella.core.exceptions.UmbrellaException.*;
@@ -13,7 +14,10 @@ import static java.text.MessageFormat.format;
import de.srsoftware.umbrella.core.constants.Text;
import de.srsoftware.umbrella.core.exceptions.UmbrellaException;
import de.srsoftware.umbrella.core.model.Envelope;
import de.srsoftware.umbrella.core.model.TranslatedMessage;
import de.srsoftware.umbrella.core.model.UmbrellaUser;
import de.srsoftware.umbrella.core.model.User;
import de.srsoftware.umbrella.message.model.Instantly;
import de.srsoftware.umbrella.message.model.Settings;
import de.srsoftware.umbrella.message.model.Silent;
@@ -21,8 +25,11 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
public class SqliteMessageDb implements MessageDb{
public class SqliteMessageDb implements MessageDb, MessageQueue<TranslatedMessage> {
private static final System.Logger LOG = System.getLogger(SqliteMessageDb.class.getSimpleName());
private final Connection db;
private static final String DB_VERSION = "message_db_version";
@@ -80,6 +87,26 @@ CREATE TABLE IF NOT EXISTS {0} ( {1} VARCHAR(255) PRIMARY KEY, {2} VARCHAR(255)
return createSettingsTable();
}
@Override
public Optional<Envelope<TranslatedMessage>> getEnvelope(int hash) {
throw new UmbrellaException(HTTP_SERVER_ERROR,"{class}.getEnvelope({hash}) not implemented!","class",getClass().getSimpleName(),"hash",hash); // TODO
}
@Override
public Stream<Envelope<TranslatedMessage>> getEnvelopes() {
throw new UmbrellaException(HTTP_SERVER_ERROR,"{class}.getMessages() not implemented!","class",getClass().getSimpleName()); // TODO
}
@Override
public List<Envelope<TranslatedMessage>> getEnvelopesFor(User user) {
throw new UmbrellaException(HTTP_SERVER_ERROR,"{class}.getEnvelopesFor({user}) not implemented!","class",getClass().getSimpleName(),"user",user.name()); // TODO
}
@Override
public Stream<User> getReceivers() {
return Stream.empty();
}
@Override
public Settings getSettings(UmbrellaUser user) throws UmbrellaException {
try {
@@ -98,6 +125,17 @@ CREATE TABLE IF NOT EXISTS {0} ( {1} VARCHAR(255) PRIMARY KEY, {2} VARCHAR(255)
var version = createTables();
}
@Override
public Optional<Envelope<TranslatedMessage>> markRead(int hash, User user) {
throw new UmbrellaException(HTTP_SERVER_ERROR,"{class}.markRead(hash, user) not implemented!","class",getClass().getSimpleName()); // TODO
// TODO: throw exception if message not found!
}
@Override
public void push(Envelope<TranslatedMessage> message) {
throw new UmbrellaException(HTTP_SERVER_ERROR,"{class}.push(message) not implemented!","class",getClass().getSimpleName()); // TODO
}
private Settings toSettings(ResultSet rs) throws SQLException {
var submission = rs.getString(VALUE);
if (submission.trim().equalsIgnoreCase(INSTANTLY)) return new Instantly();

View File

@@ -15,21 +15,22 @@ public class CombinedMessage {
private final StringBuilder combinedBody = new StringBuilder();
private final User receiver;
private String combinedSubject = null;
private final List<Message> mergedMessages = new ArrayList<>();
private final List<Message<?>> mergedMessages = new ArrayList<>();
private final Translatable subjectForCombinedMessage;
private UmbrellaUser sender = null;
public CombinedMessage(Translatable subjectForCombinedMessage, User receiver){
LOG.log(DEBUG,"Creating combined message");
LOG.log(DEBUG,"Creating combined message for {0}…",receiver);
this.subjectForCombinedMessage = subjectForCombinedMessage;
this.receiver = receiver;
}
public void merge(Message message) {
public void merge(Message<?> message) {
LOG.log(TRACE,"Merging {0} into combined message…",message);
var lang = receiver.language();
var body = message.body().translate(lang);
var subject = message.subject().translate(lang);
if (message instanceof TranslatableMessage tm) message = tm.translate(lang);
var body = message.body();
var subject = message.subject().toString();
switch (mergedMessages.size()){
case 0:
combinedBody.append(body);
@@ -37,12 +38,11 @@ public class CombinedMessage {
combinedSubject = subject;
break;
case 1:
combinedBody.insert(0,format("# {0}:\n# {1}:\n\n",sender,subject)); // insert sender and subject of first message right before the body of the first message
combinedBody.insert(0,format("# {0} / {1}:\n\n",sender,subject)); // insert sender and subject of first message right before the body of the first message
combinedSubject = subjectForCombinedMessage.translate(lang);
// no break here, we need to append the subject and content
default:
combinedBody.append("\n\n# ").append(message.sender()).append(":\n");
combinedBody.append("# ").append(subject).append(":\n\n");
combinedBody.append("\n\n━━━━━━━━━━━━━━━━━━━━━\n\n# ").append(message.sender()).append(" / ").append(subject).append(":\n\n");
combinedBody.append(body);
}
if (message.attachments() != null) attachments.addAll(message.attachments());
@@ -57,7 +57,7 @@ public class CombinedMessage {
return combinedBody.toString();
}
public List<Message> messages() {
public List<Message<?>> messages() {
return mergedMessages;
}

View File

@@ -13,7 +13,7 @@ public class Instantly implements Settings{
@Override
public boolean sendAt(Integer scheduledHour) {
return false;
return true;
}
@Override