348 lines
12 KiB
Java
348 lines
12 KiB
Java
/* © SRSoftware 2025 */
|
|
package de.srsoftware.umbrella.message;
|
|
|
|
import static de.srsoftware.umbrella.core.ConnectionProvider.connect;
|
|
import static de.srsoftware.umbrella.core.ModuleRegistry.userService;
|
|
import static de.srsoftware.umbrella.core.constants.Constants.TIME_FORMATTER;
|
|
import static de.srsoftware.umbrella.core.constants.Constants.UTF8;
|
|
import static de.srsoftware.umbrella.core.constants.Field.*;
|
|
import static de.srsoftware.umbrella.core.constants.Path.READ;
|
|
import static de.srsoftware.umbrella.core.constants.Path.SETTINGS;
|
|
import static de.srsoftware.umbrella.core.constants.Text.LONG;
|
|
import static de.srsoftware.umbrella.core.exceptions.UmbrellaException.invalidField;
|
|
import static de.srsoftware.umbrella.core.exceptions.UmbrellaException.missingConfig;
|
|
import static de.srsoftware.umbrella.core.model.Translatable.t;
|
|
import static de.srsoftware.umbrella.message.Constants.*;
|
|
import static de.srsoftware.umbrella.message.model.Schedule.schedule;
|
|
import static de.srsoftware.umbrella.messagebus.MessageBus.messageBus;
|
|
import static java.lang.System.Logger.Level.*;
|
|
import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
|
|
|
|
import com.sun.net.httpserver.HttpExchange;
|
|
import de.srsoftware.configuration.Configuration;
|
|
import de.srsoftware.tools.Path;
|
|
import de.srsoftware.tools.SessionToken;
|
|
import de.srsoftware.umbrella.core.BaseHandler;
|
|
import de.srsoftware.umbrella.core.ModuleRegistry;
|
|
import de.srsoftware.umbrella.core.api.PostBox;
|
|
import de.srsoftware.umbrella.core.exceptions.UmbrellaException;
|
|
import de.srsoftware.umbrella.core.model.*;
|
|
import de.srsoftware.umbrella.message.model.*;
|
|
import de.srsoftware.umbrella.messagebus.EventListener;
|
|
import de.srsoftware.umbrella.messagebus.events.Event;
|
|
import jakarta.activation.DataHandler;
|
|
import jakarta.mail.Message.RecipientType;
|
|
import jakarta.mail.MessagingException;
|
|
import jakarta.mail.Session;
|
|
import jakarta.mail.Transport;
|
|
import jakarta.mail.internet.MimeBodyPart;
|
|
import jakarta.mail.internet.MimeMessage;
|
|
import jakarta.mail.internet.MimeMultipart;
|
|
import jakarta.mail.util.ByteArrayDataSource;
|
|
import java.io.IOException;
|
|
import java.time.Instant;
|
|
import java.time.ZoneId;
|
|
import java.util.*;
|
|
import org.json.JSONArray;
|
|
import org.json.JSONObject;
|
|
|
|
public class MessageSystem extends BaseHandler implements PostBox, EventListener {
|
|
public static final System.Logger LOG = System.getLogger(MessageSystem.class.getSimpleName());
|
|
private final Timer timer = new Timer();
|
|
|
|
private record Receiver(User user, Message<?> message){}
|
|
|
|
private class SubmissionTask extends TimerTask{
|
|
|
|
|
|
private final Integer hour;
|
|
|
|
public SubmissionTask(Integer hour) {
|
|
this.hour = hour;
|
|
}
|
|
|
|
@Override
|
|
public void run() {
|
|
processMessages(hour);
|
|
}
|
|
|
|
public void schedule() {
|
|
var date = Calendar.getInstance();
|
|
date.set(Calendar.HOUR_OF_DAY, hour);
|
|
date.set(Calendar.MINUTE, 0);
|
|
date.set(Calendar.SECOND, 0);
|
|
date.set(Calendar.MILLISECOND,0);
|
|
if (date.before(Calendar.getInstance())) date.add(Calendar.HOUR, 24);
|
|
timer.schedule(this,date.getTime());
|
|
LOG.log(INFO,"Scheduled {0} at {1}",getClass().getSimpleName(),date.getTime());
|
|
}
|
|
}
|
|
private final String from,host,user,pass;
|
|
private final int port;
|
|
private final MessageDb db;
|
|
private final MessageQueue<TranslatedMessage> queue;
|
|
private Session session;
|
|
private final String debugAddress;
|
|
private final HashMap<Receiver,List<Exception>> exceptions = new HashMap<>();
|
|
|
|
public MessageSystem(Configuration config) throws UmbrellaException {
|
|
var dbFile = config.get(CONFIG_DB).orElseThrow(() -> missingConfig(CONFIG_DB));
|
|
var sqlite = new SqliteMessageDb(connect(dbFile));
|
|
db = sqlite;
|
|
queue = sqlite;
|
|
|
|
debugAddress = config.get(DEBUG_ADDREESS).map(Object::toString).orElse(null);
|
|
port = config.get(CONFIG_SMTP_PORT,587);
|
|
host = config.get(CONFIG_SMTP_HOST).map(Object::toString).orElseThrow(() -> new RuntimeException("umbrella.modules.message.smtp.host not configured!"));
|
|
user = config.get(CONFIG_SMTP_USER).map(Object::toString).orElseThrow(() -> new RuntimeException("umbrella.modules.message.smtp.user not configured!"));
|
|
pass = config.get(CONFIG_SMTP_PASS).map(Object::toString).orElseThrow(() -> new RuntimeException("umbrella.modules.message.smtp.pass not configured!"));
|
|
from = user;
|
|
ModuleRegistry.add(this);
|
|
new SubmissionTask(8).schedule();
|
|
new SubmissionTask(10).schedule();
|
|
new SubmissionTask(12).schedule();
|
|
new SubmissionTask(14).schedule();
|
|
new SubmissionTask(16).schedule();
|
|
new SubmissionTask(18).schedule();
|
|
new SubmissionTask(20).schedule();
|
|
messageBus().register(this);
|
|
}
|
|
|
|
@Override
|
|
public boolean doGet(Path path, HttpExchange ex) throws IOException {
|
|
addCors(ex);
|
|
try {
|
|
Optional<Token> token = SessionToken.from(ex).map(Token::of);
|
|
var user = userService().loadUser(token);
|
|
if (user.isEmpty()) return unauthorized(ex);
|
|
var id = path.pop();
|
|
return switch (id){
|
|
case null -> listMessages(ex,user.get());
|
|
case SETTINGS -> getSettings(ex,user.get());
|
|
default -> {
|
|
try {
|
|
yield getMessage(ex,user.get(),Integer.parseInt(id));
|
|
} catch (NumberFormatException ignored) {
|
|
|
|
}
|
|
yield super.doGet(path, ex);
|
|
}
|
|
};
|
|
} catch (NumberFormatException e){
|
|
return sendContent(ex,HTTP_BAD_REQUEST,"Invalid project id");
|
|
} catch (UmbrellaException e){
|
|
return send(ex,e);
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public boolean doPatch(Path path, HttpExchange ex) throws IOException {
|
|
addCors(ex);
|
|
try {
|
|
Optional<Token> token = SessionToken.from(ex).map(Token::of);
|
|
var user = userService().loadUser(token);
|
|
if (user.isEmpty()) return unauthorized(ex);
|
|
var head = path.pop();
|
|
return switch (head){
|
|
case SETTINGS -> patchSettings(ex,user.get());
|
|
case READ -> markRead(ex,user.get(),path.pop());
|
|
default -> super.doGet(path,ex);
|
|
};
|
|
} catch (NumberFormatException e){
|
|
return sendContent(ex,HTTP_BAD_REQUEST,"Invalid project id");
|
|
} catch (UmbrellaException e){
|
|
return send(ex,e);
|
|
}
|
|
}
|
|
|
|
private boolean getMessage(HttpExchange ex, UmbrellaUser user, long id) throws IOException {
|
|
var envelope = queue.getEnvelope(id).filter(env -> env.isFor(user));
|
|
if (envelope.isPresent()) return sendMessage(ex, user, envelope.get());
|
|
return notFound(ex);
|
|
}
|
|
|
|
private boolean getSettings(HttpExchange ex, UmbrellaUser user) throws IOException {
|
|
return sendContent(ex,db.getSettings(user));
|
|
}
|
|
|
|
private boolean listMessages(HttpExchange ex, UmbrellaUser user) throws IOException {
|
|
var messages = queue.getEnvelopesFor(user).stream().map(e -> summary(e, user.language()));
|
|
return sendContent(ex,messages);
|
|
}
|
|
|
|
@Override
|
|
public void onEvent(Event<?> event) {
|
|
var message = new TranslatableMessage(event.initiator(),event.subject(),event.describe(),null);
|
|
send(new Envelope<>(0,message,event.audience()));
|
|
}
|
|
|
|
private boolean patchSettings(HttpExchange ex, UmbrellaUser user) throws IOException {
|
|
var json = json(ex);
|
|
Settings settings;
|
|
if (json.has(INSTANTLY) && json.get(INSTANTLY) instanceof Boolean b && b){
|
|
settings = new Instantly();
|
|
} else {
|
|
if (json.has(HOURS) && json.get(HOURS) instanceof JSONArray hrs){
|
|
var hours = hrs.toList().stream().filter(v -> v instanceof Integer).map(Integer.class::cast).toList();
|
|
settings = schedule(hours);
|
|
} else settings = new Silent();
|
|
}
|
|
return sendContent(ex,db.update(user,settings));
|
|
}
|
|
|
|
private boolean markRead(HttpExchange ex, UmbrellaUser user, String path) {
|
|
try {
|
|
var id = Integer.parseInt(path);
|
|
var envelope = queue.markRead(id, user);
|
|
if (envelope.isPresent()) return sendMessage(ex,user,envelope.get());
|
|
return notFound(ex);
|
|
} catch (NumberFormatException | IOException e) {
|
|
throw invalidField(ID,LONG);
|
|
}
|
|
}
|
|
|
|
private boolean sendAt(User user, Integer scheduledHour){
|
|
try {
|
|
if (user instanceof UmbrellaUser uu) return db.getSettings(uu).sendAt(scheduledHour);
|
|
} catch (UmbrellaException ignored) {}
|
|
return true;
|
|
}
|
|
|
|
private synchronized void processMessages(Integer scheduledHour) {
|
|
LOG.log(INFO,"Running {0}…",scheduledHour == null ? "instantly" : "scheduled at "+scheduledHour);
|
|
|
|
var dueRecipients = queue.getReceivers().map(this::toUmbrellaUser).filter(uu -> sendAt(uu,scheduledHour)).toList();
|
|
|
|
var date = new Date();
|
|
|
|
for (var receiver : dueRecipients){
|
|
if (debugAddress != null && !debugAddress.equals(receiver.email().toString())) {
|
|
LOG.log(DEBUG,"Debug address is set to {0}, ignoring mail to {1}",debugAddress,receiver);
|
|
continue;
|
|
}
|
|
var combined = new CombinedMessage(t("Collected messages"),receiver);
|
|
|
|
try {
|
|
var envelopes = queue.getEnvelopesFor(receiver);
|
|
envelopes.stream().map(Envelope::message).forEach(combined::merge);
|
|
if (receiver instanceof UmbrellaUser) combined.addNote();
|
|
|
|
send(combined,date);
|
|
envelopes.forEach(env -> queue.markRead(env.id(),receiver));
|
|
} catch (Exception ex){
|
|
LOG.log(WARNING,"Failed to deliver mail ({0}) to {1}.",combined.subject(),receiver,ex);
|
|
for (var message : combined.messages()) exceptions.computeIfAbsent(new Receiver(receiver,message), k -> new ArrayList<>()).add(ex);
|
|
|
|
}
|
|
}
|
|
|
|
if (scheduledHour != null) new SubmissionTask(scheduledHour).schedule();
|
|
}
|
|
|
|
private User toUmbrellaUser(User user) {
|
|
var umbrellaUser = userService().load(user.email());
|
|
if (umbrellaUser.isPresent()) return umbrellaUser.get();
|
|
return user;
|
|
}
|
|
|
|
private boolean sendMessage(HttpExchange ex, UmbrellaUser user, Envelope<?> envelope) throws IOException {
|
|
var message = envelope.message();
|
|
if (message instanceof TranslatableMessage tm) message = tm.translate(user.language());
|
|
return sendContent(ex,Map.of(
|
|
SENDER,message.sender(),
|
|
SUBJECT,message.subject(),
|
|
BODY,message.body()
|
|
));
|
|
}
|
|
|
|
|
|
private void send(CombinedMessage message, Date date) throws MessagingException {
|
|
var receiver = message.receiver();
|
|
LOG.log(TRACE,"Sending combined message to {0}…",receiver);
|
|
session = session();
|
|
MimeMessage msg = new MimeMessage(session);
|
|
msg.addHeader(CONTENT_TYPE, "text/markdown; charset=UTF-8");
|
|
msg.addHeader("format", "flowed");
|
|
msg.addHeader("Content-Transfer-Encoding", "8bit");
|
|
msg.setFrom(message.sender().email().toString());
|
|
msg.setSubject(message.subject(), UTF8);
|
|
msg.setSentDate(date);
|
|
var toEmail = debugAddress != null ? debugAddress : receiver.email().toString();
|
|
msg.setRecipients(RecipientType.TO, toEmail);
|
|
|
|
if (message.attachments().isEmpty()){
|
|
msg.setText(message.body(), UTF8);
|
|
} else {
|
|
var multipart = new MimeMultipart();
|
|
var body = new MimeBodyPart();
|
|
body.setText(message.body(),UTF8);
|
|
|
|
multipart.addBodyPart(body);
|
|
for (var attachment : message.attachments()){
|
|
var part = new MimeBodyPart();
|
|
part.setDataHandler(new DataHandler(new ByteArrayDataSource(attachment.content(),attachment.mime())));
|
|
part.setFileName(attachment.name());
|
|
multipart.addBodyPart(part);
|
|
msg.setContent(multipart);
|
|
}
|
|
}
|
|
|
|
LOG.log(TRACE, "Message to {0} is ready…", receiver);
|
|
Transport.send(msg,user,pass);
|
|
LOG.log(DEBUG, "Sent message to {0}.", receiver);
|
|
}
|
|
|
|
@Override
|
|
@SuppressWarnings("unchecked")
|
|
public void send(Envelope<?> envelope) {
|
|
switch (envelope.message()){
|
|
case TranslatedMessage ignored:
|
|
queue.push((Envelope<TranslatedMessage>) envelope);
|
|
break;
|
|
case TranslatableMessage tm:
|
|
Map<String,Envelope<TranslatedMessage>> map = new HashMap<>();
|
|
for (var receiver : envelope.receivers()){
|
|
var lang = receiver.language();
|
|
var env = map.get(lang);
|
|
if (env == null){
|
|
TranslatedMessage translated = tm.translate(lang);
|
|
env = new Envelope<>(0, translated,new HashSet<>());
|
|
map.put(lang,env);
|
|
}
|
|
env.receivers().add(receiver);
|
|
}
|
|
map.values().forEach(queue::push);
|
|
break;
|
|
default:
|
|
return;
|
|
}
|
|
|
|
new Thread(() -> processMessages(null)).start();
|
|
}
|
|
|
|
private Session session() {
|
|
if (session == null){
|
|
Properties props = new Properties();
|
|
props.put(HOST, host);
|
|
props.put(PORT, port);
|
|
props.put(AUTH, true);
|
|
props.put(SSL, true);
|
|
props.put(ENVELOPE_FROM,from);
|
|
session = Session.getInstance(props);
|
|
}
|
|
return session;
|
|
}
|
|
|
|
private static JSONObject summary(Envelope<?> envelope, String lang) {
|
|
var message = envelope.message();
|
|
if (message instanceof TranslatableMessage tm) message = tm.translate(lang);
|
|
|
|
var sender = message.sender().name();
|
|
var subject = message.subject();
|
|
var time = Instant.ofEpochMilli(message.utcTime()).atZone(ZoneId.systemDefault()).toLocalDateTime().format(TIME_FORMATTER);
|
|
var id = envelope.id();
|
|
return new JSONObject(Map.of(SENDER,sender,SUBJECT,subject,TIMESTAMP,time,ID,id));
|
|
}
|
|
|
|
}
|