Files
Umbrella/messages/src/main/java/de/srsoftware/umbrella/message/MessageSystem.java
T
StephanRichter d9f88af6b7 implemented state change for messages:
messages are marked as read upon display

Signed-off-by: Stephan Richter <s.richter@srsoftware.de>
2026-01-18 11:54:40 +01:00

335 lines
12 KiB
Java

/* © SRSoftware 2025 */
package de.srsoftware.umbrella.message;
import static de.srsoftware.umbrella.core.ConnectionProvider.connect;
import static de.srsoftware.umbrella.core.constants.Constants.TIME_FORMATTER;
import static de.srsoftware.umbrella.core.constants.Constants.UTF8;
import static de.srsoftware.umbrella.core.constants.Field.*;
import static de.srsoftware.umbrella.core.constants.Path.READ;
import static de.srsoftware.umbrella.core.constants.Path.SETTINGS;
import static de.srsoftware.umbrella.core.constants.Text.LONG;
import static de.srsoftware.umbrella.core.exceptions.UmbrellaException.invalidField;
import static de.srsoftware.umbrella.core.exceptions.UmbrellaException.missingConfig;
import static de.srsoftware.umbrella.core.model.Translatable.t;
import static de.srsoftware.umbrella.message.Constants.*;
import static de.srsoftware.umbrella.message.model.Schedule.schedule;
import static de.srsoftware.umbrella.messagebus.MessageBus.messageBus;
import static java.lang.System.Logger.Level.*;
import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
import com.sun.net.httpserver.HttpExchange;
import de.srsoftware.configuration.Configuration;
import de.srsoftware.tools.Path;
import de.srsoftware.tools.SessionToken;
import de.srsoftware.umbrella.core.BaseHandler;
import de.srsoftware.umbrella.core.ModuleRegistry;
import de.srsoftware.umbrella.core.api.PostBox;
import de.srsoftware.umbrella.core.exceptions.UmbrellaException;
import de.srsoftware.umbrella.core.model.Envelope;
import de.srsoftware.umbrella.core.model.Token;
import de.srsoftware.umbrella.core.model.UmbrellaUser;
import de.srsoftware.umbrella.core.model.User;
import de.srsoftware.umbrella.message.model.*;
import de.srsoftware.umbrella.messagebus.EventListener;
import de.srsoftware.umbrella.messagebus.events.Event;
import jakarta.activation.DataHandler;
import jakarta.mail.Message.RecipientType;
import jakarta.mail.MessagingException;
import jakarta.mail.Session;
import jakarta.mail.Transport;
import jakarta.mail.internet.MimeBodyPart;
import jakarta.mail.internet.MimeMessage;
import jakarta.mail.internet.MimeMultipart;
import jakarta.mail.util.ByteArrayDataSource;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import org.json.JSONArray;
import org.json.JSONObject;
public class MessageSystem extends BaseHandler implements PostBox, EventListener {
public static final System.Logger LOG = System.getLogger(MessageSystem.class.getSimpleName());
private final Timer timer = new Timer();
private record Receiver(User user, de.srsoftware.umbrella.core.model.Message message){}
private class SubmissionTask extends TimerTask{
private final Integer hour;
public SubmissionTask(Integer hour) {
this.hour = hour;
}
@Override
public void run() {
processMessages(hour);
}
public void schedule() {
var date = Calendar.getInstance();
date.set(Calendar.HOUR_OF_DAY, hour);
date.set(Calendar.MINUTE, 0);
date.set(Calendar.SECOND, 0);
date.set(Calendar.MILLISECOND,0);
if (date.before(Calendar.getInstance())) date.add(Calendar.HOUR, 24);
timer.schedule(this,date.getTime());
LOG.log(INFO,"Scheduled {0} at {1}",getClass().getSimpleName(),date.getTime());
}
}
private final String from,host,user,pass;
private final int port;
private final SqliteMessageDb db;
private Session session;
private final List<Envelope> queue = new CopyOnWriteArrayList<>();
private String debugAddress;
private final HashMap<Receiver,List<Exception>> exceptions = new HashMap<>();
public MessageSystem(Configuration config) throws UmbrellaException {
var dbFile = config.get(CONFIG_DB).orElseThrow(() -> missingConfig(CONFIG_DB));
db = new SqliteMessageDb(connect(dbFile));
debugAddress = config.get(DEBUG_ADDREESS).map(Object::toString).orElse(null);
port = config.get(CONFIG_SMTP_PORT,587);
host = config.get(CONFIG_SMTP_HOST).map(Object::toString).orElseThrow(() -> new RuntimeException("umbrella.modules.message.smtp.host not configured!"));
user = config.get(CONFIG_SMTP_USER).map(Object::toString).orElseThrow(() -> new RuntimeException("umbrella.modules.message.smtp.user not configured!"));
pass = config.get(CONFIG_SMTP_PASS).map(Object::toString).orElseThrow(() -> new RuntimeException("umbrella.modules.message.smtp.pass not configured!"));
from = user;
ModuleRegistry.add(this);
new SubmissionTask(8).schedule();
new SubmissionTask(10).schedule();
new SubmissionTask(12).schedule();
new SubmissionTask(14).schedule();
new SubmissionTask(16).schedule();
new SubmissionTask(18).schedule();
new SubmissionTask(20).schedule();
messageBus().register(this);
}
@Override
public boolean doGet(Path path, HttpExchange ex) throws IOException {
addCors(ex);
try {
Optional<Token> token = SessionToken.from(ex).map(Token::of);
var user = ModuleRegistry.userService().loadUser(token);
if (user.isEmpty()) return unauthorized(ex);
var head = path.pop();
return switch (head){
case null -> listMessages(ex,user.get());
case SETTINGS -> getSettings(ex,user.get());
default -> {
try {
yield getMessage(ex,user.get(),Integer.parseInt(head));
} catch (NumberFormatException ignored) {
}
yield super.doGet(path, ex);
}
};
} catch (NumberFormatException e){
return sendContent(ex,HTTP_BAD_REQUEST,"Invalid project id");
} catch (UmbrellaException e){
return send(ex,e);
}
}
@Override
public boolean doPatch(Path path, HttpExchange ex) throws IOException {
addCors(ex);
try {
Optional<Token> token = SessionToken.from(ex).map(Token::of);
var user = ModuleRegistry.userService().loadUser(token);
if (user.isEmpty()) return unauthorized(ex);
var head = path.pop();
return switch (head){
case SETTINGS -> patchSettings(ex,user.get());
case READ -> patchState(ex,user.get(),path.pop());
default -> super.doGet(path,ex);
};
} catch (NumberFormatException e){
return sendContent(ex,HTTP_BAD_REQUEST,"Invalid project id");
} catch (UmbrellaException e){
return send(ex,e);
}
}
private boolean getMessage(HttpExchange ex, UmbrellaUser user, int hash) throws IOException {
var envelope = queue.stream()
.filter(msg -> msg.isFor(user))
.filter(msg -> msg.hashCode() == hash)
.findFirst();
if (envelope.isPresent()) return sendMessage(ex, user, envelope.get());
return notFound(ex);
}
private boolean getSettings(HttpExchange ex, UmbrellaUser user) throws IOException {
return sendContent(ex,db.getSettings(user));
}
private boolean listMessages(HttpExchange ex, UmbrellaUser user) throws IOException {
var messages = queue.stream().filter(e -> e.isFor(user)).map(e -> summary(e, user.language())).toList();
return sendContent(ex,messages);
}
@Override
public void onEvent(Event<?> event) {
for (var user : event.audience()){
if (debugAddress != null && !debugAddress.equals(user.email().toString())) continue;
var message = new de.srsoftware.umbrella.core.model.Message(event.initiator(),event.subject(),event.describe(),null);
var envelope = new Envelope(message,user);
send(envelope);
}
}
private boolean patchSettings(HttpExchange ex, UmbrellaUser user) throws IOException {
var json = json(ex);
Settings settings = null;
if (json.has(INSTANTLY) && json.get(INSTANTLY) instanceof Boolean b && b){
settings = new Instantly();
} else {
if (json.has(HOURS) && json.get(HOURS) instanceof JSONArray hrs){
var hours = hrs.toList().stream().filter(v -> v instanceof Integer).map(Integer.class::cast).toList();
settings = schedule(hours);
} else settings = new Silent();
}
return sendContent(ex,db.update(user,settings));
}
private boolean patchState(HttpExchange ex, UmbrellaUser user, String path) {
try {
var hash = Integer.parseInt(path);
var envelope = queue.stream().filter(env -> env.hashCode() == hash).findFirst().orElse(null);
if (envelope != null){
envelope.receivers().remove(user);
return sendMessage(ex,user,envelope);
}
return notFound(ex);
} catch (NumberFormatException | IOException e) {
throw invalidField(HASH,LONG);
}
}
private synchronized void processMessages(Integer scheduledHour) {
LOG.log(INFO,"Running {0}…",scheduledHour == null ? "instantly" : "scheduled at "+scheduledHour);
var queue = new ArrayList<>(this.queue);
var dueRecipients = new ArrayList<User>();
List<User> recipients = queue.stream().map(Envelope::receivers).flatMap(Set::stream).filter(Objects::nonNull).distinct().toList();
{ // for known users: get notification preferences, fallback to _immediately_ for unknown users
for (User recv : recipients) {
if (recv instanceof UmbrellaUser uu) {
try {
if (!db.getSettings(uu).sendAt(scheduledHour)) continue;
} catch (UmbrellaException ignored) {}
}
dueRecipients.add(recv);
}
}
var date = new Date();
for (var receiver : dueRecipients){
var combined = new CombinedMessage(t("Collected messages"),receiver);
var envelopes = queue.stream().filter(env -> env.isFor(receiver)).toList();
for (var envelope : envelopes) combined.merge(envelope.message());
try {
send(combined,date);
for (var envelope : envelopes){
var audience = envelope.receivers();
audience.remove(receiver);
if (audience.isEmpty()) queue.remove(envelope);
}
} catch (Exception ex){
LOG.log(WARNING,"Failed to deliver mail ({0}) to {1}.",combined.subject(),receiver,ex);
for (var message : combined.messages()) exceptions.computeIfAbsent(new Receiver(receiver,message), k -> new ArrayList<>()).add(ex);
}
}
if (scheduledHour != null) new SubmissionTask(scheduledHour).schedule();
}
private boolean sendMessage(HttpExchange ex, UmbrellaUser user, Envelope envelope) throws IOException {
var message = envelope.message();
var sender = message.sender().name();
var subject = message.subject().translate(user.language());
var body = message.body().translate(user.language());
return sendContent(ex,Map.of(
SENDER,sender,
SUBJECT,subject,
BODY,body
));
}
private static JSONObject summary(Envelope envelope, String lang) {
var sender = envelope.message().sender().name();
var subject = envelope.message().subject().translate(lang);
var time = envelope.time().format(TIME_FORMATTER);
var hash = envelope.hashCode();
return new JSONObject(Map.of(SENDER,sender,SUBJECT,subject,TIMESTAMP,time,HASH,hash));
}
private void send(CombinedMessage message, Date date) throws MessagingException {
var receiver = message.receiver();
LOG.log(TRACE,"Sending combined message to {0}…",receiver);
session = session();
MimeMessage msg = new MimeMessage(session);
msg.addHeader(CONTENT_TYPE, "text/markdown; charset=UTF-8");
msg.addHeader("format", "flowed");
msg.addHeader("Content-Transfer-Encoding", "8bit");
msg.setFrom(message.sender().email().toString());
msg.setSubject(message.subject(), UTF8);
msg.setSentDate(date);
var toEmail = debugAddress != null ? debugAddress : receiver.email().toString();
msg.setRecipients(RecipientType.TO, toEmail);
if (message.attachments().isEmpty()){
msg.setText(message.body(), UTF8);
} else {
var multipart = new MimeMultipart();
var body = new MimeBodyPart();
body.setText(message.body(),UTF8);
multipart.addBodyPart(body);
for (var attachment : message.attachments()){
var part = new MimeBodyPart();
part.setDataHandler(new DataHandler(new ByteArrayDataSource(attachment.content(),attachment.mime())));
part.setFileName(attachment.name());
multipart.addBodyPart(part);
msg.setContent(multipart);
}
}
LOG.log(TRACE, "Message to {0} is ready…", receiver);
Transport.send(msg,user,pass);
LOG.log(DEBUG, "Sent message to {0}.", receiver);
}
@Override
public void send(Envelope envelope) {
queue.add(envelope);
new Thread(() -> processMessages(null)).start();
}
private Session session() {
if (session == null){
Properties props = new Properties();
props.put(HOST, host);
props.put(PORT, port);
props.put(AUTH, true);
props.put(SSL, true);
props.put(ENVELOPE_FROM,from);
session = Session.getInstance(props);
}
return session;
}
public void setDebugAddress(String newVal) {
this.debugAddress = newVal;
}
}