Merge branch 'feature/notifications' into dev
All checks were successful
Build Docker Image / Docker-Build (push) Successful in 2m26s
Build Docker Image / Clean-Registry (push) Successful in -1s

This commit is contained in:
2026-01-26 17:10:37 +01:00
15 changed files with 277 additions and 176 deletions

View File

@@ -2,6 +2,7 @@
package de.srsoftware.umbrella.message;
public class Constants {
public static final String AUTH = "mail.smtp.auth";
public static final String CONFIG_DB = "umbrella.modules.message.database";
public static final String CONFIG_SMTP_HOST = "umbrella.modules.message.smtp.host";
@@ -10,11 +11,23 @@ public class Constants {
public static final String CONFIG_SMTP_USER = "umbrella.modules.message.smtp.user";
public static final String DEBUG_ADDREESS = "umbrella.modules.message.debug_address";
public static final String ENVELOPE_FROM = "mail.smtp.from";
public static final String SMTP_AUTH = "mail.smtp.auth";
public static final String SMTP_HOST = "mail.smtp.host";
public static final String SMTP_FROM = "mail.smtp.from";
public static final String SMTP_PORT = "mail.smtp.port";
public static final String SMTP_SSL = "mail.smtp.ssl.enable";
public static final String SSL = "mail.smtp.ssl.enable";
public static final String SUBMISSION = "submission";
public static final String PORT = "mail.smtp.port";
public static final String TABLE_ATTACHMENTS = "attachments";
public static final String TABLE_MESSAGES = "messages";
public static final String TABLE_RECEIVERS = "receivers";
public static final String TABLE_SUBMISSIONS = "message_submission";
}

View File

@@ -1,60 +0,0 @@
/* © SRSoftware 2025 */
package de.srsoftware.umbrella.message;
import static java.lang.System.Logger.Level.DEBUG;
import de.srsoftware.umbrella.core.model.Envelope;
import de.srsoftware.umbrella.core.model.TranslatedMessage;
import de.srsoftware.umbrella.core.model.User;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class InMemoryQueue implements MessageQueue<TranslatedMessage> {
public static final System.Logger LOG = System.getLogger(InMemoryQueue.class.getSimpleName());
private LinkedList<Envelope<TranslatedMessage>> queue = new LinkedList<>();
@Override
public Stream<Envelope<TranslatedMessage>> getEnvelopes() {
return queue.stream();
}
@Override
public List<Envelope<TranslatedMessage>> getEnvelopesFor(User user) {
return getEnvelopes().filter(env -> env.isFor(user)).toList();
}
@Override
public Optional<Envelope<TranslatedMessage>> getEnvelope(int hash) {
return getEnvelopes().filter(env -> env.hashCode() == hash).findAny();
}
@Override
public Stream<User> getReceivers() {
return getEnvelopes().map(Envelope::receivers).flatMap(Set::stream).distinct();
}
@Override
public Optional<Envelope<TranslatedMessage>> markRead(int hash, User user) {
for (var env : queue){
if (env.hashCode() == hash) {
LOG.log(DEBUG,"Removing {0} from receiver list of {1}…",user.name(),env.message().subject());
env.receivers().remove(user);
if (env.receivers().isEmpty()) {
LOG.log(DEBUG,"No more due receiers, removing {0} from queue…",env.message().subject());
queue.remove(env);
}
return Optional.of(env);
}
}
return Optional.empty();
}
@Override
public void push(Envelope<TranslatedMessage> envelope) {
queue.add(envelope);
LOG.log(DEBUG,"Pushed '{0}' for '{1}' to queue",envelope.message().subject(),envelope.receivers().stream().map(User::name).collect(Collectors.joining(", ")));
}
}

View File

@@ -9,10 +9,9 @@ import java.util.Optional;
import java.util.stream.Stream;
public interface MessageQueue<T extends Message<?>> {
public Stream<Envelope<T>> getEnvelopes();
public List<Envelope<T>> getEnvelopesFor(User user);
public Optional<Envelope<T>> getEnvelope(int hash);
public Optional<Envelope<T>> getEnvelope(long id);
Stream<User> getReceivers();
public Optional<Envelope<T>> markRead(int hash, User user);
public Optional<Envelope<T>> markRead(long messageId, User user);
public void push(Envelope<T> envelope);
}

View File

@@ -84,8 +84,9 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
public MessageSystem(Configuration config) throws UmbrellaException {
var dbFile = config.get(CONFIG_DB).orElseThrow(() -> missingConfig(CONFIG_DB));
db = new SqliteMessageDb(connect(dbFile));
queue = new InMemoryQueue();
var sqlite = new SqliteMessageDb(connect(dbFile));
db = sqlite;
queue = sqlite;
debugAddress = config.get(DEBUG_ADDREESS).map(Object::toString).orElse(null);
port = config.get(CONFIG_SMTP_PORT,587);
@@ -111,13 +112,13 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
Optional<Token> token = SessionToken.from(ex).map(Token::of);
var user = ModuleRegistry.userService().loadUser(token);
if (user.isEmpty()) return unauthorized(ex);
var head = path.pop();
return switch (head){
var id = path.pop();
return switch (id){
case null -> listMessages(ex,user.get());
case SETTINGS -> getSettings(ex,user.get());
default -> {
try {
yield getMessage(ex,user.get(),Integer.parseInt(head));
yield getMessage(ex,user.get(),Integer.parseInt(id));
} catch (NumberFormatException ignored) {
}
@@ -151,8 +152,8 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
}
}
private boolean getMessage(HttpExchange ex, UmbrellaUser user, int hash) throws IOException {
var envelope = queue.getEnvelope(hash).filter(env -> env.isFor(user));
private boolean getMessage(HttpExchange ex, UmbrellaUser user, long id) throws IOException {
var envelope = queue.getEnvelope(id).filter(env -> env.isFor(user));
if (envelope.isPresent()) return sendMessage(ex, user, envelope.get());
return notFound(ex);
}
@@ -169,7 +170,7 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
@Override
public void onEvent(Event<?> event) {
var message = new TranslatableMessage(event.initiator(),event.subject(),event.describe(),null);
send(new Envelope<>(message,event.audience()));
send(new Envelope<>(0,message,event.audience()));
}
private boolean patchSettings(HttpExchange ex, UmbrellaUser user) throws IOException {
@@ -220,7 +221,7 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
try {
var envelopes = queue.getEnvelopesFor(receiver);
envelopes.stream().map(Envelope::message).forEach(combined::merge);
envelopes.stream().forEach(combined::merge);
send(combined,date);
envelopes.forEach(env -> queue.markRead(env.hashCode(),receiver));
} catch (Exception ex){
@@ -294,7 +295,7 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
var env = map.get(lang);
if (env == null){
TranslatedMessage translated = tm.translate(lang);
env = new Envelope<>(translated,new HashSet<>());
env = new Envelope<>(0, translated,new HashSet<>());
map.put(lang,env);
}
env.receivers().add(receiver);
@@ -328,8 +329,8 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
var sender = message.sender().name();
var subject = message.subject();
var time = envelope.time().format(TIME_FORMATTER);
var hash = envelope.hashCode();
return new JSONObject(Map.of(SENDER,sender,SUBJECT,subject,TIMESTAMP,time,HASH,hash));
var id = envelope.id();
return new JSONObject(Map.of(SENDER,sender,SUBJECT,subject,TIMESTAMP,time,ID,id));
}
}

View File

@@ -2,48 +2,93 @@
package de.srsoftware.umbrella.message;
import static de.srsoftware.tools.jdbc.Condition.equal;
import static de.srsoftware.tools.jdbc.Condition.in;
import static de.srsoftware.tools.jdbc.Query.*;
import static de.srsoftware.tools.jdbc.Query.SelectQuery.ALL;
import static de.srsoftware.umbrella.core.Errors.*;
import static de.srsoftware.umbrella.core.ResponseCode.HTTP_SERVER_ERROR;
import static de.srsoftware.umbrella.core.constants.Constants.TABLE_SETTINGS;
import static de.srsoftware.umbrella.core.ModuleRegistry.userService;
import static de.srsoftware.umbrella.core.constants.Constants.COUNT;
import static de.srsoftware.umbrella.core.constants.Field.*;
import static de.srsoftware.umbrella.core.exceptions.UmbrellaException.*;
import static de.srsoftware.umbrella.core.model.Translatable.t;
import static de.srsoftware.umbrella.message.Constants.*;
import static de.srsoftware.umbrella.message.model.Schedule.schedule;
import static java.text.MessageFormat.format;
import static java.time.ZoneOffset.UTC;
import de.srsoftware.tools.jdbc.Query;
import de.srsoftware.umbrella.core.BaseDb;
import de.srsoftware.umbrella.core.Util;
import de.srsoftware.umbrella.core.constants.Text;
import de.srsoftware.umbrella.core.exceptions.UmbrellaException;
import de.srsoftware.umbrella.core.model.Envelope;
import de.srsoftware.umbrella.core.model.TranslatedMessage;
import de.srsoftware.umbrella.core.model.UmbrellaUser;
import de.srsoftware.umbrella.core.model.User;
import de.srsoftware.umbrella.core.model.*;
import de.srsoftware.umbrella.message.model.Instantly;
import de.srsoftware.umbrella.message.model.Settings;
import de.srsoftware.umbrella.message.model.Silent;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.*;
import java.util.stream.Stream;
public class SqliteMessageDb implements MessageDb, MessageQueue<TranslatedMessage> {
public class SqliteMessageDb extends BaseDb implements MessageDb, MessageQueue<TranslatedMessage> {
private static final System.Logger LOG = System.getLogger(SqliteMessageDb.class.getSimpleName());
private final Connection db;
private static final String DB_VERSION = "message_db_version";
private static final int INITIAL_DB_VERSION = 1;
private static final String TABLE_SUBMISSIONS = "message_submission";
public SqliteMessageDb(Connection conn){
db = conn;
init();
super(conn);
}
private void createMessageTables() {
var sql = """
CREATE TABLE IF NOT EXISTS {0} (
{1} INTEGER PRIMARY KEY,
{2} LONG NOT NULL,
{3} LONG NOT NULL,
{4} TEXT,
{5} TEXT
);
""";
sql = format(sql,TABLE_MESSAGES, ID, TIMESTAMP, SENDER_USER_ID, SUBJECT, BODY);
try {
db.prepareStatement(sql).execute();
} catch (SQLException e) {
throw failedToCreateTable(TABLE_MESSAGES);
}
sql = """
CREATE TABLE IF NOT EXISTS {0} (
{1} INTEGER NOT NULL,
{2} VARCHAR(255) NOT NULL,
{3} VARCHAR(255),
PRIMARY KEY ({1}, {2})
);
""";
sql = format(sql,TABLE_RECEIVERS, MESSAGE_ID, EMAIL, NAME);
try {
db.prepareStatement(sql).execute();
} catch (SQLException e) {
throw failedToCreateTable(TABLE_MESSAGES);
}
sql = """
CREATE TABLE IF NOT EXISTS {0} (
{1} INTEGER NOT NULL,
{2} VARCHAR(255) NOT NULL,
{3} VARCHAR(100),
{4} BLOB NOT NULL,
PRIMARY KEY ({1}, {2})
);
""";
sql = format(sql,TABLE_ATTACHMENTS, MESSAGE_ID, NAME, MIME, DATA);
try {
db.prepareStatement(sql).execute();
} catch (SQLException e) {
throw failedToCreateTable(TABLE_MESSAGES);
}
}
private void createSubmissionTable() {
var createTable = """
CREATE TABLE IF NOT EXISTS {0} ( {1} Integer PRIMARY KEY, {2} VARCHAR(255) NOT NULL);
CREATE TABLE IF NOT EXISTS {0} ( {1} INTEGER PRIMARY KEY, {2} VARCHAR(255) NOT NULL);
""";
try {
var stmt = db.prepareStatement(format(createTable,TABLE_SUBMISSIONS, USER_ID, VALUE));
@@ -54,57 +99,93 @@ CREATE TABLE IF NOT EXISTS {0} ( {1} Integer PRIMARY KEY, {2} VARCHAR(255) NOT N
}
}
private int createSettingsTable() {
var createTable = """
CREATE TABLE IF NOT EXISTS {0} ( {1} VARCHAR(255) PRIMARY KEY, {2} VARCHAR(255) NOT NULL);
""";
try {
var stmt = db.prepareStatement(format(createTable,TABLE_SETTINGS, KEY, VALUE));
stmt.execute();
stmt.close();
} catch (SQLException e) {
throw failedToCreateTable(TABLE_SETTINGS).causedBy(e);
@Override
protected int createTables() {
int currentVersion = createSettingsTable();
switch (currentVersion){
case 0:
createSubmissionTable();
case 1:
createMessageTables();
}
return setCurrentVersion(2);
}
Integer version = null;
@Override
public Optional<Envelope<TranslatedMessage>> getEnvelope(long messageId) {
try {
var rs = select(VALUE).from(TABLE_SETTINGS).where(KEY, equal(DB_VERSION)).exec(db);
if (rs.next()) version = rs.getInt(VALUE);
var attachments = new ArrayList<Attachment>();
var rs = select(ALL).from(TABLE_ATTACHMENTS).where(MESSAGE_ID,equal(messageId)).exec(db);
while (rs.next()) attachments.add(new Attachment(rs.getString(NAME),rs.getString(MIME),rs.getBytes(DATA)));
rs.close();
if (version == null) {
version = INITIAL_DB_VERSION;
insertInto(TABLE_SETTINGS, KEY, VALUE).values(DB_VERSION,version).execute(db).close();
var receivers = new ArrayList<User>();
rs = select(ALL).from(TABLE_RECEIVERS).where(MESSAGE_ID,equal(messageId)).exec(db);
while (rs.next()) receivers.add(new User(rs.getString(NAME),new EmailAddress(rs.getString(EMAIL)),null));
rs.close();
Envelope<TranslatedMessage> envelope = null;
rs = select(ALL).from(TABLE_MESSAGES).where(ID,equal(messageId)).exec(db);
if (rs.next()){
var sender = userService().loadUser(rs.getLong(SENDER_USER_ID));
var msg = new TranslatedMessage(sender,rs.getString(SUBJECT),rs.getString(BODY), attachments);
envelope = new Envelope<>(messageId, msg, receivers);
}
return version;
rs.close();
if (envelope != null) return Optional.of(envelope);
return Optional.empty();
} catch (SQLException e) {
throw databaseException(FAILED_TO_UPDATE_OBJECT, OBJECT,DB_VERSION).causedBy(e);
throw failedToLoadObject(Text.MESSAGE,messageId);
}
}
private int createTables() {
createSubmissionTable();
return createSettingsTable();
}
@Override
public Optional<Envelope<TranslatedMessage>> getEnvelope(int hash) {
throw new UmbrellaException(HTTP_SERVER_ERROR,"{class}.getEnvelope({hash}) not implemented!","class",getClass().getSimpleName(),"hash",hash); // TODO
}
@Override
public Stream<Envelope<TranslatedMessage>> getEnvelopes() {
throw new UmbrellaException(HTTP_SERVER_ERROR,"{class}.getMessages() not implemented!","class",getClass().getSimpleName()); // TODO
}
@Override
public List<Envelope<TranslatedMessage>> getEnvelopesFor(User user) {
throw new UmbrellaException(HTTP_SERVER_ERROR,"{class}.getEnvelopesFor({user}) not implemented!","class",getClass().getSimpleName(),"user",user.name()); // TODO
try {
var messageIds = new ArrayList<Long>();
var rs = select(MESSAGE_ID).from(TABLE_RECEIVERS).where(EMAIL,equal(user.email().toString())).where(NAME,equal(user.name())).exec(db);
while (rs.next()) messageIds.add(rs.getLong(MESSAGE_ID));
rs.close();
var attachments = new HashMap<Long,Collection<Attachment>>();
rs = select(ALL).from(TABLE_ATTACHMENTS).where(MESSAGE_ID,in(messageIds.toArray())).exec(db);
while (rs.next()){
var attachment = new Attachment(rs.getString(NAME),rs.getString(MIME),rs.getBytes(DATA));
var messageId = rs.getLong(MESSAGE_ID);
attachments.computeIfAbsent(messageId, k -> new ArrayList<>()).add(attachment);
}
rs.close();
var envelopes = new ArrayList<Envelope<TranslatedMessage>>();
rs = select(ALL).from(TABLE_MESSAGES).where(ID,in(messageIds.toArray())).exec(db);
while (rs.next()){
var messageId = rs.getLong(ID);
var sender = userService().loadUser(rs.getLong(SENDER_USER_ID));
var msg = new TranslatedMessage(sender,rs.getString(SUBJECT),rs.getString(BODY), attachments.get(messageId));
var envelope = new Envelope<>(messageId, msg, user).time(Util.dateTimeOf(rs.getLong(TIMESTAMP)));
envelopes.add(envelope);
}
rs.close();
return envelopes;
} catch (SQLException e) {
throw failedToLoadObject(t(Text.EMAILS_FOR_RECEIVER,EMAIL,user.email()));
}
}
@Override
public Stream<User> getReceivers() {
return Stream.empty();
try {
var receivers = new ArrayList<User>();
var rs = select(format("DISTINCT {0}, {1}",EMAIL,NAME)).from(TABLE_RECEIVERS).exec(db);
while (rs.next()){
var email = new EmailAddress(rs.getString(EMAIL));
receivers.add(new User(rs.getString(NAME),email,null));
}
rs.close();
return receivers.stream();
} catch (SQLException e) {
throw failedToLoadObject(t(Text.RECEIVERS));
}
}
@Override
@@ -121,19 +202,51 @@ CREATE TABLE IF NOT EXISTS {0} ( {1} VARCHAR(255) PRIMARY KEY, {2} VARCHAR(255)
}
}
private void init() {
var version = createTables();
@Override
public Optional<Envelope<TranslatedMessage>> markRead(long messageId, User user) {
try {
var envelope = getEnvelope(messageId);
Query.delete().from(TABLE_RECEIVERS).where(MESSAGE_ID,equal(messageId)).where(EMAIL,equal(user.email().toString())).execute(db);
var rs = select(COUNT).from(TABLE_RECEIVERS).where(MESSAGE_ID,equal(messageId)).exec(db);
Integer count = null;
if (rs.next()) count = rs.getInt(1);
rs.close();
if (count != null && count == 0){
delete().from(TABLE_ATTACHMENTS).where(MESSAGE_ID,equal(messageId)).execute(db);
delete().from(TABLE_MESSAGES).where(ID,equal(messageId)).execute(db);
}
return envelope;
} catch (SQLException e) {
throw failedToDropObject(Text.RECEIVER);
}
}
@Override
public Optional<Envelope<TranslatedMessage>> markRead(int hash, User user) {
throw new UmbrellaException(HTTP_SERVER_ERROR,"{class}.markRead(hash, user) not implemented!","class",getClass().getSimpleName()); // TODO
// TODO: throw exception if message not found!
}
public void push(Envelope<TranslatedMessage> envelope) {
var timestamp = envelope.time().toEpochSecond(UTC);
var message = envelope.message();
var sender = message.sender().id();
var subject = message.subject();
var body = message.body();
try {
var rs = insertInto(TABLE_MESSAGES, TIMESTAMP, SENDER_USER_ID, SUBJECT, BODY).values(timestamp, sender, subject, body).execute(db).getGeneratedKeys();
Long messageId = null;
if (rs.next()) messageId = rs.getLong(1);
rs.close();
if (messageId == null) throw failedToStoreObject(envelope);
@Override
public void push(Envelope<TranslatedMessage> message) {
throw new UmbrellaException(HTTP_SERVER_ERROR,"{class}.push(message) not implemented!","class",getClass().getSimpleName()); // TODO
for (var receiver : envelope.receivers()){
insertInto(TABLE_RECEIVERS,MESSAGE_ID,EMAIL,NAME).values(messageId,receiver.email(),receiver.name()).execute(db).close();
}
if (envelope.message().attachments() != null) {
for (var attachment : envelope.message().attachments()) {
insertInto(TABLE_ATTACHMENTS, MESSAGE_ID, NAME, MIME, DATA).values(messageId, attachment.name(), attachment.mime(), attachment.content()).execute(db).close();
}
}
} catch (SQLException e) {
throw failedToStoreObject(envelope).causedBy(e);
}
}
private Settings toSettings(ResultSet rs) throws SQLException {

View File

@@ -6,6 +6,7 @@ import static java.lang.System.Logger.Level.TRACE;
import static java.text.MessageFormat.format;
import de.srsoftware.umbrella.core.model.*;
import java.time.format.DateTimeFormatter;
import java.util.*;
public class CombinedMessage {
@@ -14,20 +15,24 @@ public class CombinedMessage {
private final Set<Attachment> attachments = new HashSet<>();
private final StringBuilder combinedBody = new StringBuilder();
private final User receiver;
private final String lang;
private String combinedSubject = null;
private final List<Message<?>> mergedMessages = new ArrayList<>();
private final Translatable subjectForCombinedMessage;
private UmbrellaUser sender = null;
private static DateTimeFormatter DT_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public CombinedMessage(Translatable subjectForCombinedMessage, User receiver){
LOG.log(DEBUG,"Creating combined message for {0}…",receiver);
this.subjectForCombinedMessage = subjectForCombinedMessage;
this.receiver = receiver;
this.lang = receiver.language();
}
public void merge(Message<?> message) {
LOG.log(TRACE,"Merging {0} into combined message…",message);
var lang = receiver.language();
public void merge(Envelope<?> envelope) {
LOG.log(TRACE,"Merging {0} into combined message…",envelope);
var message = envelope.message();
if (message instanceof TranslatableMessage tm) message = tm.translate(lang);
var body = message.body();
var subject = message.subject().toString();
@@ -38,11 +43,11 @@ public class CombinedMessage {
combinedSubject = subject;
break;
case 1:
combinedBody.insert(0,format("# {0} / {1}:\n\n",sender,subject)); // insert sender and subject of first message right before the body of the first message
combinedBody.insert(0,format("# {0} @ {1}\n→ {2}:\n\n",sender,envelope.time().format(DT_FORMAT),subject)); // insert sender and subject of first message right before the body of the first message
combinedSubject = subjectForCombinedMessage.translate(lang);
// no break here, we need to append the subject and content
default:
combinedBody.append("\n\n━━━━━━━━━━━━━━━━━━━━━\n\n# ").append(message.sender()).append(" / ").append(subject).append(":\n\n");
combinedBody.append("\n\n━━━━━━━━━━━━━━━━━━━━━\n\n# ").append(message.sender()).append(" @ ").append(envelope.time().format(DT_FORMAT)).append("\n→ ").append(subject).append(":\n\n");
combinedBody.append(body);
}
if (message.attachments() != null) attachments.addAll(message.attachments());