refaturing message system, step 4: implementing in-memory queue

Signed-off-by: Stephan Richter <s.richter@srsoftware.de>
This commit is contained in:
2026-01-21 22:32:45 +01:00
parent b123c30c5b
commit d6d6aabe51
4 changed files with 75 additions and 16 deletions

View File

@@ -0,0 +1,60 @@
/* © SRSoftware 2025 */
package de.srsoftware.umbrella.message;
import static java.lang.System.Logger.Level.DEBUG;
import de.srsoftware.umbrella.core.model.Envelope;
import de.srsoftware.umbrella.core.model.TranslatedMessage;
import de.srsoftware.umbrella.core.model.User;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class InMemoryQueue implements MessageQueue<TranslatedMessage> {
public static final System.Logger LOG = System.getLogger(InMemoryQueue.class.getSimpleName());
private LinkedList<Envelope<TranslatedMessage>> queue = new LinkedList<>();
@Override
public Stream<Envelope<TranslatedMessage>> getEnvelopes() {
return queue.stream();
}
@Override
public List<Envelope<TranslatedMessage>> getEnvelopesFor(User user) {
return getEnvelopes().filter(env -> env.isFor(user)).toList();
}
@Override
public Optional<Envelope<TranslatedMessage>> getEnvelope(int hash) {
return getEnvelopes().filter(env -> env.hashCode() == hash).findAny();
}
@Override
public Stream<User> getReceivers() {
return getEnvelopes().map(Envelope::receivers).flatMap(Set::stream).distinct();
}
@Override
public Optional<Envelope<TranslatedMessage>> markRead(int hash, User user) {
for (var env : queue){
if (env.hashCode() == hash) {
LOG.log(DEBUG,"Removing {0} from receiver list of {1}…",user.name(),env.message().subject());
env.receivers().remove(user);
if (env.receivers().isEmpty()) {
LOG.log(DEBUG,"No more due receiers, removing {0} from queue…",env.message().subject());
queue.remove(env);
}
return Optional.of(env);
}
}
return Optional.empty();
}
@Override
public void push(Envelope<TranslatedMessage> envelope) {
queue.add(envelope);
LOG.log(DEBUG,"{0} for {1} pushed to queue",envelope.message().subject(),envelope.receivers().stream().map(User::name).collect(Collectors.joining(", ")));
}
}

View File

@@ -3,16 +3,16 @@ package de.srsoftware.umbrella.message;
import de.srsoftware.umbrella.core.model.Envelope;
import de.srsoftware.umbrella.core.model.Message;
import de.srsoftware.umbrella.core.model.UmbrellaUser;
import de.srsoftware.umbrella.core.model.User;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
public interface MessageQueue<T extends Message<?>> {
public Stream<Envelope<T>> getEnvelopes();
public List<Envelope<T>> getEnvelopesFor(UmbrellaUser user);
public List<Envelope<T>> getEnvelopesFor(User user);
public Optional<Envelope<T>> getEnvelope(int hash);
Stream<UmbrellaUser> getReceivers();
public Optional<Envelope<T>> markRead(int hash, UmbrellaUser user);
public void push(Envelope<T> message);
Stream<User> getReceivers();
public Optional<Envelope<T>> markRead(int hash, User user);
public void push(Envelope<T> envelope);
}

View File

@@ -84,9 +84,8 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
public MessageSystem(Configuration config) throws UmbrellaException {
var dbFile = config.get(CONFIG_DB).orElseThrow(() -> missingConfig(CONFIG_DB));
var sqlite = new SqliteMessageDb(connect(dbFile));
db = sqlite;
queue = sqlite;
db = new SqliteMessageDb(connect(dbFile));
queue = new InMemoryQueue();
debugAddress = config.get(DEBUG_ADDREESS).map(Object::toString).orElse(null);
port = config.get(CONFIG_SMTP_PORT,587);
@@ -202,12 +201,11 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
}
}
private boolean sendAt(UmbrellaUser user, Integer scheduledHour){
private boolean sendAt(User user, Integer scheduledHour){
try {
return db.getSettings(user).sendAt(scheduledHour);
} catch (UmbrellaException ignored) {
return true;
}
if (user instanceof UmbrellaUser uu) return db.getSettings(uu).sendAt(scheduledHour);
} catch (UmbrellaException ignored) {}
return true;
}
private synchronized void processMessages(Integer scheduledHour) {

View File

@@ -17,6 +17,7 @@ import de.srsoftware.umbrella.core.exceptions.UmbrellaException;
import de.srsoftware.umbrella.core.model.Envelope;
import de.srsoftware.umbrella.core.model.TranslatedMessage;
import de.srsoftware.umbrella.core.model.UmbrellaUser;
import de.srsoftware.umbrella.core.model.User;
import de.srsoftware.umbrella.message.model.Instantly;
import de.srsoftware.umbrella.message.model.Settings;
import de.srsoftware.umbrella.message.model.Silent;
@@ -97,12 +98,12 @@ CREATE TABLE IF NOT EXISTS {0} ( {1} VARCHAR(255) PRIMARY KEY, {2} VARCHAR(255)
}
@Override
public List<Envelope<TranslatedMessage>> getEnvelopesFor(UmbrellaUser user) {
public List<Envelope<TranslatedMessage>> getEnvelopesFor(User user) {
throw new UmbrellaException(HTTP_SERVER_ERROR,"{class}.getEnvelopesFor({user}) not implemented!","class",getClass().getSimpleName(),"user",user.name()); // TODO
}
@Override
public Stream<UmbrellaUser> getReceivers() {
public Stream<User> getReceivers() {
return Stream.empty();
}
@@ -125,7 +126,7 @@ CREATE TABLE IF NOT EXISTS {0} ( {1} VARCHAR(255) PRIMARY KEY, {2} VARCHAR(255)
}
@Override
public Optional<Envelope<TranslatedMessage>> markRead(int hash, UmbrellaUser user) {
public Optional<Envelope<TranslatedMessage>> markRead(int hash, User user) {
throw new UmbrellaException(HTTP_SERVER_ERROR,"{class}.markRead(hash, user) not implemented!","class",getClass().getSimpleName()); // TODO
// TODO: throw exception if message not found!
}