working on messagesystem, password reset form
Signed-off-by: Stephan Richter <s.richter@srsoftware.de>
This commit is contained in:
@@ -3,6 +3,7 @@ description = "Umbrella : Message subsystem"
|
||||
dependencies{
|
||||
implementation(project(":core"))
|
||||
implementation(project(":user"))
|
||||
implementation("com.sun.mail:jakarta.mail:2.0.1")
|
||||
implementation("de.srsoftware:configuration.api:1.0.2")
|
||||
implementation("de.srsoftware:tools.jdbc:1.3.2")
|
||||
implementation("de.srsoftware:tools.mime:1.1.2")
|
||||
|
||||
@@ -3,6 +3,7 @@ package de.srsoftware.umbrella.message;
|
||||
|
||||
public class Constants {
|
||||
public static final String AUTH = "mail.smtp.auth";
|
||||
public static final String DEBUG_ADDREESS = "debug_addres";
|
||||
public static final String ENVELOPE_FROM = "mail.smtp.from";
|
||||
public static final String FIELD_MESSAGES = "messages";
|
||||
public static final String FIELD_HOST = "host";
|
||||
@@ -12,6 +13,7 @@ public class Constants {
|
||||
public static final String JSONOBJECT = "json object";
|
||||
public static final String PORT = "mail.smtp.port";
|
||||
public static final String RECEIVERS = "receivers";
|
||||
public static final String SMTP = "smtp";
|
||||
public static final String SSL = "mail.smtp.ssl.enable";
|
||||
public static final String SUBMISSION = "submission";
|
||||
|
||||
|
||||
@@ -1,10 +1,195 @@
|
||||
/* © SRSoftware 2025 */
|
||||
package de.srsoftware.umbrella.message;
|
||||
|
||||
public class MessageSystem {
|
||||
private final SqliteMessageDb db;
|
||||
import static de.srsoftware.tools.PathHandler.CONTENT_TYPE;
|
||||
import static de.srsoftware.umbrella.core.Constants.*;
|
||||
import static de.srsoftware.umbrella.message.Constants.*;
|
||||
import static de.srsoftware.umbrella.user.Constants.PASS;
|
||||
import static java.lang.System.Logger.Level.*;
|
||||
|
||||
public MessageSystem(SqliteMessageDb messageDb) {
|
||||
import de.srsoftware.configuration.Configuration;
|
||||
import de.srsoftware.umbrella.core.UmbrellaException;
|
||||
import de.srsoftware.umbrella.core.api.Translator;
|
||||
import de.srsoftware.umbrella.message.model.CombinedMessage;
|
||||
import de.srsoftware.umbrella.message.model.Envelope;
|
||||
import de.srsoftware.umbrella.message.model.PostBox;
|
||||
import de.srsoftware.umbrella.user.model.UmbrellaUser;
|
||||
import de.srsoftware.umbrella.user.model.User;
|
||||
import jakarta.activation.DataHandler;
|
||||
import jakarta.mail.Message;
|
||||
import jakarta.mail.MessagingException;
|
||||
import jakarta.mail.Session;
|
||||
import jakarta.mail.Transport;
|
||||
import jakarta.mail.internet.MimeBodyPart;
|
||||
import jakarta.mail.internet.MimeMessage;
|
||||
import jakarta.mail.internet.MimeMultipart;
|
||||
import jakarta.mail.util.ByteArrayDataSource;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.function.Function;
|
||||
|
||||
public class MessageSystem implements PostBox {
|
||||
public static final System.Logger LOG = System.getLogger(MessageSystem.class.getSimpleName());
|
||||
private final Timer timer = new Timer();
|
||||
private record Receiver(User user, de.srsoftware.umbrella.message.model.Message message){}
|
||||
|
||||
private class SubmissionTask extends TimerTask{
|
||||
|
||||
|
||||
private final Integer hour;
|
||||
|
||||
public SubmissionTask(Integer hour) {
|
||||
this.hour = hour;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
processMessages(hour);
|
||||
}
|
||||
|
||||
public void schedule() {
|
||||
var date = Calendar.getInstance();
|
||||
date.set(Calendar.HOUR_OF_DAY, hour);
|
||||
date.set(Calendar.MINUTE, 0);
|
||||
date.set(Calendar.SECOND, 0);
|
||||
date.set(Calendar.MILLISECOND,0);
|
||||
if (date.before(Calendar.getInstance())) date.add(Calendar.HOUR, 24);
|
||||
timer.schedule(this,date.getTime());
|
||||
LOG.log(INFO,"Scheduled {0} at {1}",getClass().getSimpleName(),date.getTime());
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
private final String from,host,user,pass;
|
||||
private final int port;
|
||||
private final SqliteMessageDb db;
|
||||
private Session session;
|
||||
private List<Envelope> queue = new CopyOnWriteArrayList<>();
|
||||
private String debugAddress;
|
||||
private final HashMap<Receiver,List<Exception>> exceptions = new HashMap<>();
|
||||
private final Translator translator;
|
||||
|
||||
public MessageSystem(SqliteMessageDb messageDb, Translator translator, Configuration config) {
|
||||
db = messageDb;
|
||||
this.translator = translator;
|
||||
debugAddress = config.get(DEBUG_ADDREESS).map(Object::toString).orElse(null);
|
||||
config = config.subset(SMTP).orElseThrow(() -> new RuntimeException("umbrella.modules.message.smtp not configured!"));
|
||||
port = config.get(FIELD_PORT,587);
|
||||
host = config.get(FIELD_HOST).map(Object::toString).orElseThrow(() -> new RuntimeException("umbrella.modules.message.smtp.host not configured!"));
|
||||
user = config.get(USER).map(Object::toString).orElseThrow(() -> new RuntimeException("umbrella.modules.message.smtp.user not configured!"));
|
||||
pass = config.get(PASS).map(Object::toString).orElseThrow(() -> new RuntimeException("umbrella.modules.message.smtp.pass not configured!"));
|
||||
from = user;
|
||||
new SubmissionTask(8).schedule();
|
||||
new SubmissionTask(10).schedule();
|
||||
new SubmissionTask(12).schedule();
|
||||
new SubmissionTask(14).schedule();
|
||||
new SubmissionTask(16).schedule();
|
||||
new SubmissionTask(18).schedule();
|
||||
new SubmissionTask(20).schedule();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(Envelope envelope) {
|
||||
queue.add(envelope);
|
||||
new Thread(() -> processMessages(null)).start();
|
||||
}
|
||||
|
||||
private synchronized void processMessages(Integer scheduledHour) {
|
||||
LOG.log(INFO,"Running {0}…",scheduledHour == null ? "instantly" : "scheduled at "+scheduledHour);
|
||||
var queue = new ArrayList<>(this.queue);
|
||||
var dueRecipients = new ArrayList<User>();
|
||||
List<User> recipients = queue.stream().map(Envelope::receivers).flatMap(Set::stream).filter(Objects::nonNull).distinct().toList();
|
||||
|
||||
{ // for known users: get notification preferences, fallback to _immediately_ for unknown users
|
||||
for (User recv : recipients) {
|
||||
if (recv instanceof UmbrellaUser uu) {
|
||||
try {
|
||||
if (!db.getSettings(uu).sendAt(scheduledHour)) continue;
|
||||
} catch (UmbrellaException ignored) {}
|
||||
}
|
||||
dueRecipients.add(recv);
|
||||
}
|
||||
}
|
||||
|
||||
var date = new Date();
|
||||
|
||||
for (var receiver : dueRecipients){
|
||||
Function<String,String> translateFunction = receiver instanceof UmbrellaUser uu ? text -> translator.translate(uu.language(),text) : text -> text;
|
||||
// combine messages for user
|
||||
var combined = new CombinedMessage(translateFunction);
|
||||
var envelopes = queue.stream().filter(env -> env.isFor(receiver)).toList();
|
||||
for (var envelope : envelopes) combined.merge(envelope.message());
|
||||
|
||||
try {
|
||||
send(combined,receiver,date);
|
||||
for (var envelope : envelopes){
|
||||
var audience = envelope.receivers();
|
||||
audience.remove(receiver);
|
||||
if (audience.isEmpty()) queue.remove(envelope);
|
||||
}
|
||||
} catch (Exception ex){
|
||||
LOG.log(WARNING,"Failed to deliver mail ({0}) to {1}.",combined.subject(),receiver,ex);
|
||||
for (var message : combined.messages()) exceptions.computeIfAbsent(new Receiver(receiver,message), k -> new ArrayList<>()).add(ex);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
if (scheduledHour != null) new SubmissionTask(scheduledHour).schedule();
|
||||
}
|
||||
|
||||
private Session session() {
|
||||
if (session == null){
|
||||
Properties props = new Properties();
|
||||
props.put(HOST, host);
|
||||
props.put(PORT, port);
|
||||
props.put(AUTH, true);
|
||||
props.put(SSL, true);
|
||||
props.put(ENVELOPE_FROM,from);
|
||||
session = Session.getInstance(props);
|
||||
}
|
||||
return session;
|
||||
}
|
||||
|
||||
public void setDebugAddress(String newVal) {
|
||||
this.debugAddress = newVal;
|
||||
}
|
||||
|
||||
|
||||
private void send(CombinedMessage message, User receiver, Date date) throws MessagingException {
|
||||
LOG.log(TRACE,"Sending combined message to {0}…",receiver);
|
||||
session = session();
|
||||
MimeMessage msg = new MimeMessage(session);
|
||||
msg.addHeader(CONTENT_TYPE, "text/markdown; charset=UTF-8");
|
||||
msg.addHeader("format", "flowed");
|
||||
msg.addHeader("Content-Transfer-Encoding", "8bit");
|
||||
msg.setFrom(message.sender().email());
|
||||
msg.setSubject(message.subject(), UTF8);
|
||||
msg.setSentDate(date);
|
||||
var toEmail = debugAddress != null ? debugAddress : receiver.email();
|
||||
msg.setRecipients(Message.RecipientType.TO, toEmail);
|
||||
|
||||
if (message.attachments().isEmpty()){
|
||||
msg.setText(message.body(), UTF8);
|
||||
} else {
|
||||
var multipart = new MimeMultipart();
|
||||
var body = new MimeBodyPart();
|
||||
body.setText(message.body(),UTF8);
|
||||
|
||||
multipart.addBodyPart(body);
|
||||
for (var attachment : message.attachments()){
|
||||
var part = new MimeBodyPart();
|
||||
part.setDataHandler(new DataHandler(new ByteArrayDataSource(attachment.content(),attachment.mime())));
|
||||
part.setFileName(attachment.name());
|
||||
multipart.addBodyPart(part);
|
||||
msg.setContent(multipart);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
LOG.log(TRACE, "Message to {0} is ready…", receiver);
|
||||
Transport.send(msg,user,pass);
|
||||
LOG.log(DEBUG, "Sent message to {0}.", receiver);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,53 +0,0 @@
|
||||
/* © SRSoftware 2025 */
|
||||
package de.srsoftware.umbrella.message.model;
|
||||
|
||||
|
||||
import de.srsoftware.umbrella.user.model.User;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* This maps recipient email addresses to the pending messages of that recipient
|
||||
*/
|
||||
public class MailQueue extends ArrayList<Envelope>{
|
||||
|
||||
private record Receiver(User user, Message message){}
|
||||
|
||||
private final HashMap<Receiver,List<Exception>> exceptions = new HashMap<>();
|
||||
|
||||
public interface Listener{
|
||||
public void messagesAdded();
|
||||
public void setQueue(MailQueue queue);
|
||||
}
|
||||
|
||||
private final Set<Listener> listeners = new HashSet<>();
|
||||
|
||||
public void addListener(Listener listener) {
|
||||
listeners.add(listener);
|
||||
listener.setQueue(this);
|
||||
}
|
||||
|
||||
public void commit() {
|
||||
listeners.forEach(Listener::messagesAdded);
|
||||
}
|
||||
|
||||
public List<Envelope> envelopesFor(User recv) {
|
||||
return stream().filter(env -> env.isFor(recv)).toList();
|
||||
}
|
||||
|
||||
public void failedAt(User receiver, CombinedMessage combined, Exception ex) {
|
||||
for (var message : combined.messages()) exceptions.computeIfAbsent(new Receiver(receiver,message), k -> new ArrayList<>()).add(ex);
|
||||
}
|
||||
|
||||
/**
|
||||
* return the email addresses of the recipients of all messages in the queue
|
||||
*
|
||||
* @return a list of email addresses
|
||||
*/
|
||||
public List<User> receivers() {
|
||||
return stream().map(Envelope::receivers)
|
||||
.flatMap(Set::stream)
|
||||
.filter(Objects::nonNull)
|
||||
.distinct()
|
||||
.toList();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
/* © SRSoftware 2025 */
|
||||
package de.srsoftware.umbrella.message.model;
|
||||
|
||||
public interface PostBox {
|
||||
public void send(Envelope envelope);
|
||||
}
|
||||
30
messages/src/test/java/TriggerTest.java
Normal file
30
messages/src/test/java/TriggerTest.java
Normal file
@@ -0,0 +1,30 @@
|
||||
/* © SRSoftware 2025 */
|
||||
import static java.lang.Thread.sleep;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public class TriggerTest {
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void testSynchronization() throws InterruptedException {
|
||||
StringBuffer sb = new StringBuffer();
|
||||
new Thread(() -> writeTo(sb,"A")).start();
|
||||
sleep(100);
|
||||
new Thread(() -> writeTo(sb,"B")).start();
|
||||
sleep(100);
|
||||
new Thread(() -> writeTo(sb,"C")).start();
|
||||
sleep(1000);
|
||||
assertEquals("AABBCC",sb.toString());
|
||||
}
|
||||
|
||||
private synchronized void writeTo(StringBuffer sb, String mark) {
|
||||
sb.append(mark);
|
||||
try {
|
||||
sleep(200);
|
||||
} catch (InterruptedException ignored) {}
|
||||
sb.append(mark);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user