You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
195 lines
7.0 KiB
195 lines
7.0 KiB
/* © SRSoftware 2025 */ |
|
package de.srsoftware.umbrella.message; |
|
|
|
import static de.srsoftware.tools.PathHandler.CONTENT_TYPE; |
|
import static de.srsoftware.umbrella.core.ConnectionProvider.connect; |
|
import static de.srsoftware.umbrella.core.Constants.*; |
|
import static de.srsoftware.umbrella.message.Constants.*; |
|
import static java.lang.System.Logger.Level.*; |
|
|
|
import de.srsoftware.configuration.Configuration; |
|
import de.srsoftware.umbrella.core.UmbrellaException; |
|
import de.srsoftware.umbrella.core.api.Translator; |
|
import de.srsoftware.umbrella.core.model.UmbrellaUser; |
|
import de.srsoftware.umbrella.message.model.CombinedMessage; |
|
import de.srsoftware.umbrella.message.model.Envelope; |
|
import de.srsoftware.umbrella.message.model.PostBox; |
|
import jakarta.activation.DataHandler; |
|
import jakarta.mail.Message; |
|
import jakarta.mail.MessagingException; |
|
import jakarta.mail.Session; |
|
import jakarta.mail.Transport; |
|
import jakarta.mail.internet.MimeBodyPart; |
|
import jakarta.mail.internet.MimeMessage; |
|
import jakarta.mail.internet.MimeMultipart; |
|
import jakarta.mail.util.ByteArrayDataSource; |
|
import java.util.*; |
|
import java.util.concurrent.CopyOnWriteArrayList; |
|
import java.util.function.BiFunction; |
|
|
|
public class MessageSystem implements PostBox { |
|
public static final System.Logger LOG = System.getLogger(MessageSystem.class.getSimpleName()); |
|
private final Timer timer = new Timer(); |
|
private record Receiver(UmbrellaUser user, de.srsoftware.umbrella.message.model.Message message){} |
|
|
|
private class SubmissionTask extends TimerTask{ |
|
|
|
|
|
private final Integer hour; |
|
|
|
public SubmissionTask(Integer hour) { |
|
this.hour = hour; |
|
} |
|
|
|
@Override |
|
public void run() { |
|
processMessages(hour); |
|
} |
|
|
|
public void schedule() { |
|
var date = Calendar.getInstance(); |
|
date.set(Calendar.HOUR_OF_DAY, hour); |
|
date.set(Calendar.MINUTE, 0); |
|
date.set(Calendar.SECOND, 0); |
|
date.set(Calendar.MILLISECOND,0); |
|
if (date.before(Calendar.getInstance())) date.add(Calendar.HOUR, 24); |
|
timer.schedule(this,date.getTime()); |
|
LOG.log(INFO,"Scheduled {0} at {1}",getClass().getSimpleName(),date.getTime()); |
|
} |
|
|
|
|
|
|
|
} |
|
private final String from,host,user,pass; |
|
private final int port; |
|
private final SqliteMessageDb db; |
|
private Session session; |
|
private List<Envelope> queue = new CopyOnWriteArrayList<>(); |
|
private String debugAddress; |
|
private final HashMap<Receiver,List<Exception>> exceptions = new HashMap<>(); |
|
private final Translator translator; |
|
|
|
public MessageSystem(Translator translator, Configuration config) throws UmbrellaException { |
|
var messageDbFile = config.get(CONFIG_DB).orElseThrow(() -> new UmbrellaException(500,ERROR_MISSING_CONFIG,CONFIG_DB)); |
|
db = new SqliteMessageDb(connect(messageDbFile)); |
|
|
|
this.translator = translator; |
|
debugAddress = config.get(DEBUG_ADDREESS).map(Object::toString).orElse(null); |
|
port = config.get(CONFIG_SMTP_PORT,587); |
|
host = config.get(CONFIG_SMTP_HOST).map(Object::toString).orElseThrow(() -> new RuntimeException("umbrella.modules.message.smtp.host not configured!")); |
|
user = config.get(CONFIG_SMTP_USER).map(Object::toString).orElseThrow(() -> new RuntimeException("umbrella.modules.message.smtp.user not configured!")); |
|
pass = config.get(CONFIG_SMTP_PASS).map(Object::toString).orElseThrow(() -> new RuntimeException("umbrella.modules.message.smtp.pass not configured!")); |
|
from = user; |
|
new SubmissionTask(8).schedule(); |
|
new SubmissionTask(10).schedule(); |
|
new SubmissionTask(12).schedule(); |
|
new SubmissionTask(14).schedule(); |
|
new SubmissionTask(16).schedule(); |
|
new SubmissionTask(18).schedule(); |
|
new SubmissionTask(20).schedule(); |
|
} |
|
|
|
@Override |
|
public void send(Envelope envelope) { |
|
queue.add(envelope); |
|
new Thread(() -> processMessages(null)).start(); |
|
} |
|
|
|
private synchronized void processMessages(Integer scheduledHour) { |
|
LOG.log(INFO,"Running {0}…",scheduledHour == null ? "instantly" : "scheduled at "+scheduledHour); |
|
var queue = new ArrayList<>(this.queue); |
|
var dueRecipients = new ArrayList<UmbrellaUser>(); |
|
List<UmbrellaUser> recipients = queue.stream().map(Envelope::receivers).flatMap(Set::stream).filter(Objects::nonNull).distinct().toList(); |
|
|
|
{ // for known users: get notification preferences, fallback to _immediately_ for unknown users |
|
for (UmbrellaUser recv : recipients) { |
|
if (recv instanceof UmbrellaUser uu) { |
|
try { |
|
if (!db.getSettings(uu).sendAt(scheduledHour)) continue; |
|
} catch (UmbrellaException ignored) {} |
|
} |
|
dueRecipients.add(recv); |
|
} |
|
} |
|
|
|
var date = new Date(); |
|
|
|
for (var receiver : dueRecipients){ |
|
BiFunction<String,Map<String,String>,String> translateFunction = (text,fills) -> translator.translate(receiver.language(),text,fills); |
|
|
|
var combined = new CombinedMessage("Collected messages",translateFunction); |
|
var envelopes = queue.stream().filter(env -> env.isFor(receiver)).toList(); |
|
for (var envelope : envelopes) combined.merge(envelope.message()); |
|
|
|
try { |
|
send(combined,receiver,date); |
|
for (var envelope : envelopes){ |
|
var audience = envelope.receivers(); |
|
audience.remove(receiver); |
|
if (audience.isEmpty()) queue.remove(envelope); |
|
} |
|
} catch (Exception ex){ |
|
LOG.log(WARNING,"Failed to deliver mail ({0}) to {1}.",combined.subject(),receiver,ex); |
|
for (var message : combined.messages()) exceptions.computeIfAbsent(new Receiver(receiver,message), k -> new ArrayList<>()).add(ex); |
|
|
|
} |
|
} |
|
|
|
if (scheduledHour != null) new SubmissionTask(scheduledHour).schedule(); |
|
} |
|
|
|
private Session session() { |
|
if (session == null){ |
|
Properties props = new Properties(); |
|
props.put(HOST, host); |
|
props.put(PORT, port); |
|
props.put(AUTH, true); |
|
props.put(SSL, true); |
|
props.put(ENVELOPE_FROM,from); |
|
session = Session.getInstance(props); |
|
} |
|
return session; |
|
} |
|
|
|
public void setDebugAddress(String newVal) { |
|
this.debugAddress = newVal; |
|
} |
|
|
|
|
|
private void send(CombinedMessage message, UmbrellaUser receiver, Date date) throws MessagingException { |
|
LOG.log(TRACE,"Sending combined message to {0}…",receiver); |
|
session = session(); |
|
MimeMessage msg = new MimeMessage(session); |
|
msg.addHeader(CONTENT_TYPE, "text/markdown; charset=UTF-8"); |
|
msg.addHeader("format", "flowed"); |
|
msg.addHeader("Content-Transfer-Encoding", "8bit"); |
|
msg.setFrom(message.sender().email().toString()); |
|
msg.setSubject(message.subject(), UTF8); |
|
msg.setSentDate(date); |
|
var toEmail = debugAddress != null ? debugAddress : receiver.email().toString(); |
|
msg.setRecipients(Message.RecipientType.TO, toEmail); |
|
|
|
if (message.attachments().isEmpty()){ |
|
msg.setText(message.body(), UTF8); |
|
} else { |
|
var multipart = new MimeMultipart(); |
|
var body = new MimeBodyPart(); |
|
body.setText(message.body(),UTF8); |
|
|
|
multipart.addBodyPart(body); |
|
for (var attachment : message.attachments()){ |
|
var part = new MimeBodyPart(); |
|
part.setDataHandler(new DataHandler(new ByteArrayDataSource(attachment.content(),attachment.mime()))); |
|
part.setFileName(attachment.name()); |
|
multipart.addBodyPart(part); |
|
msg.setContent(multipart); |
|
} |
|
} |
|
|
|
|
|
|
|
LOG.log(TRACE, "Message to {0} is ready…", receiver); |
|
Transport.send(msg,user,pass); |
|
LOG.log(DEBUG, "Sent message to {0}.", receiver); |
|
} |
|
}
|
|
|